4 #include "util/coding/block_compressor.h" 6 #define ZSTD_STATIC_LINKING_ONLY 10 #include "base/logging.h" 14 using strings::ByteRange;
15 using strings::MutableByteRange;
18 ostream& operator<<(ostream& os,
const ZSTD_parameters& p) {
19 os <<
"wlog: " << p.cParams.windowLog <<
", clog: " << p.cParams.chainLog <<
", strategy: " 20 << p.cParams.strategy <<
", slog: " << p.cParams.searchLog <<
", cntflag: " 21 << p.fParams.contentSizeFlag <<
", hashlog: " << p.cParams.hashLog;
26 #define HANDLE ((ZSTD_CCtx*)zstd_cntx_) 28 #define CHECK_ZSTDERR(res) do { auto foo = (res); \ 29 CHECK(!ZSTD_isError(foo)) << ZSTD_getErrorName(foo); } while(false) 31 BlockCompressor::BlockCompressor() {
32 zstd_cntx_ = ZSTD_createCCtx();
35 BlockCompressor::~BlockCompressor() {
36 ZSTD_freeCCtx(HANDLE);
39 void BlockCompressor::Start() {
40 CHECK(compressed_bufs_.empty() && compressed_blocks_.empty());
42 ZSTD_parameters params{ZSTD_getCParams(6, 0, 0), ZSTD_frameParameters()};
46 params.cParams.windowLog = BLOCK_SIZE_LOG + 1;
47 params.cParams.hashLog = BLOCK_SIZE_LOG - 2;
49 VLOG(1) <<
"Starting with " << params;
51 size_t res = ZSTD_compressBegin_advanced(HANDLE,
nullptr, 0, params, ZSTD_CONTENTSIZE_UNKNOWN);
55 double_buf_.reset(
new uint8_t[BLOCK_SIZE * 2 + 1]);
58 compress_block_size_ = ZSTD_compressBound(BLOCK_SIZE);
61 void BlockCompressor::Add(strings::ByteRange br) {
65 if (compress_block_size_ == 0) {
72 while (pos_ + br.size() >= BLOCK_SIZE) {
73 size_t to_copy = BLOCK_SIZE - pos_;
74 memcpy(buf_start() + pos_, br.data(), to_copy);
85 memcpy(buf_start() + pos_, br.data(), br.size());
89 void BlockCompressor::Finalize() {
90 if (compress_block_size_ == 0)
94 CompressInternal(
true);
97 compress_block_size_ = 0;
100 void BlockCompressor::CompressInternal(
bool finalize_frame) {
101 if (!finalize_frame && pos_ == 0)
104 std::unique_ptr<uint8_t[]> cbuf(
new uint8_t[compress_block_size_]);
106 auto func = finalize_frame ? ZSTD_compressEnd : ZSTD_compressContinue;
107 size_t res = func(HANDLE, cbuf.get(), compress_block_size_, buf_start(), pos_);
111 compressed_blocks_.emplace_back(cbuf.get(), res);
112 compressed_bufs_.push_back(std::move(cbuf));
113 compressed_size_ += res;
114 VLOG(1) <<
"Compressed from " << pos_ <<
" to " << res;
121 strings::MutableByteRange BlockCompressor::BlockBuffer() {
122 if (compress_block_size_ == 0) {
125 return MutableByteRange(buf_start() + pos_, BLOCK_SIZE - pos_);
129 bool BlockCompressor::Commit(
size_t sz) {
130 DCHECK_LE(sz + pos_, BLOCK_SIZE);
132 if (pos_ == BLOCK_SIZE) {
133 CompressInternal(
false);
139 void BlockCompressor::ClearCompressedData() {
140 compressed_blocks_.clear();
141 compressed_bufs_.clear();
142 compressed_size_ = 0;
147 #define HANDLE ((ZSTD_DCtx*)zstd_dcntx_) 149 BlockDecompressor::BlockDecompressor() {
150 zstd_dcntx_ = ZSTD_createDCtx();
153 BlockDecompressor::~BlockDecompressor() {
154 ZSTD_freeDCtx(HANDLE);
157 int32_t BlockDecompressor::Decompress(strings::ByteRange br, uint32_t* consumed) {
158 VLOG(1) <<
"Decompress " << br.size() <<
" bytes, frame_state_: " << frame_state_;
160 if (frame_state_ & 2) {
161 ZSTD_frameHeader params;
162 size_t res = ZSTD_getFrameHeader(¶ms, br.data(), br.size());
164 CHECK_LE(params.windowSize, BLOCK_SIZE * 2);
167 CHECK_EQ(0, ZSTD_decompressBegin(HANDLE));
169 buf_.reset(
new uint8_t[BLOCK_SIZE*2]);
174 decompress_size_ = 0;
176 uint32_t sz = ZSTD_nextSrcSizeToDecompress(HANDLE);
181 if (sz > br.size()) {
186 size_t res = ZSTD_decompressContinue(HANDLE, buf_.get() + BLOCK_SIZE * frame_state_, BLOCK_SIZE,
191 sz = ZSTD_nextSrcSizeToDecompress(HANDLE);
194 decompress_size_ = res;
207 strings::ByteRange BlockDecompressor::GetDecompressedBlock()
const {
208 unsigned bindex = frame_state_ & 1;
209 return ByteRange(buf_.get() + BLOCK_SIZE * bindex, decompress_size_);