block_compressor.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/coding/block_compressor.h"
5 
6 #define ZSTD_STATIC_LINKING_ONLY
7 
8 #include <zstd.h>
9 
10 #include "base/logging.h"
11 
12 namespace util {
13 
14 using strings::ByteRange;
15 using strings::MutableByteRange;
16 using std::ostream;
17 
18 ostream& operator<<(ostream& os, const ZSTD_parameters& p) {
19  os << "wlog: " << p.cParams.windowLog << ", clog: " << p.cParams.chainLog << ", strategy: "
20  << p.cParams.strategy << ", slog: " << p.cParams.searchLog << ", cntflag: "
21  << p.fParams.contentSizeFlag << ", hashlog: " << p.cParams.hashLog;
22  return os;
23 }
24 
25 
26 #define HANDLE ((ZSTD_CCtx*)zstd_cntx_)
27 
28 #define CHECK_ZSTDERR(res) do { auto foo = (res); \
29  CHECK(!ZSTD_isError(foo)) << ZSTD_getErrorName(foo); } while(false)
30 
31 BlockCompressor::BlockCompressor() {
32  zstd_cntx_ = ZSTD_createCCtx();
33 }
34 
35 BlockCompressor::~BlockCompressor() {
36  ZSTD_freeCCtx(HANDLE);
37 }
38 
39 void BlockCompressor::Start() {
40  CHECK(compressed_bufs_.empty() && compressed_blocks_.empty());
41 
42  ZSTD_parameters params{ZSTD_getCParams(6, 0, 0), ZSTD_frameParameters()};
43 
44  // To have 128KB window size we need to set twice more:
45  // effective windowSize = 2^windowLog - block_size.
46  params.cParams.windowLog = BLOCK_SIZE_LOG + 1;
47  params.cParams.hashLog = BLOCK_SIZE_LOG - 2;
48 
49  VLOG(1) << "Starting with " << params;
50 
51  size_t res = ZSTD_compressBegin_advanced(HANDLE, nullptr, 0, params, ZSTD_CONTENTSIZE_UNKNOWN);
52  CHECK_ZSTDERR(res);
53 
54  if (!double_buf_) {
55  double_buf_.reset(new uint8_t[BLOCK_SIZE * 2 + 1]);
56  }
57  pos_ = 0;
58  compress_block_size_ = ZSTD_compressBound(BLOCK_SIZE);
59 }
60 
61 void BlockCompressor::Add(strings::ByteRange br) {
62  if (br.empty())
63  return;
64 
65  if (compress_block_size_ == 0) {
66  Start();
67  }
68 
69  // TODO: There is room for mem. optimizations here.
70  // For example, we could compact consequent compressed blocks together if they end up less
71  // than compress_block_size_.
72  while (pos_ + br.size() >= BLOCK_SIZE) {
73  size_t to_copy = BLOCK_SIZE - pos_;
74  memcpy(buf_start() + pos_, br.data(), to_copy);
75  pos_ = BLOCK_SIZE;
76 
77  br.advance(to_copy);
78  Compress();
79  }
80 
81  if (br.empty())
82  return;
83 
84  // Copy the data into input buffer.
85  memcpy(buf_start() + pos_, br.data(), br.size());
86  pos_ += br.size();
87 }
88 
89 void BlockCompressor::Finalize() {
90  if (compress_block_size_ == 0)
91  return;
92 
93  // At this point either we have pending data and/or compressed data.
94  CompressInternal(true);
95 
96  // Signal that next time we should Start() again.
97  compress_block_size_ = 0;
98 }
99 
100 void BlockCompressor::CompressInternal(bool finalize_frame) {
101  if (!finalize_frame && pos_ == 0)
102  return;
103 
104  std::unique_ptr<uint8_t[]> cbuf(new uint8_t[compress_block_size_]);
105 
106  auto func = finalize_frame ? ZSTD_compressEnd : ZSTD_compressContinue;
107  size_t res = func(HANDLE, cbuf.get(), compress_block_size_, buf_start(), pos_);
108  CHECK_ZSTDERR(res);
109 
110  // TODO: We could optimize memory by compacting compressed buffers together.
111  compressed_blocks_.emplace_back(cbuf.get(), res);
112  compressed_bufs_.push_back(std::move(cbuf));
113  compressed_size_ += res;
114  VLOG(1) << "Compressed from " << pos_ << " to " << res;
115 
116  cur_buf_index_ ^= 1;
117  pos_ = 0;
118 }
119 
120 
121 strings::MutableByteRange BlockCompressor::BlockBuffer() {
122  if (compress_block_size_ == 0) {
123  Start();
124  }
125  return MutableByteRange(buf_start() + pos_, BLOCK_SIZE - pos_);
126 }
127 
128 
129 bool BlockCompressor::Commit(size_t sz) {
130  DCHECK_LE(sz + pos_, BLOCK_SIZE);
131  pos_ += sz;
132  if (pos_ == BLOCK_SIZE) {
133  CompressInternal(false);
134  return true;
135  }
136  return false;
137 }
138 
139 void BlockCompressor::ClearCompressedData() {
140  compressed_blocks_.clear();
141  compressed_bufs_.clear();
142  compressed_size_ = 0;
143 }
144 
145 
146 #undef HANDLE
147 #define HANDLE ((ZSTD_DCtx*)zstd_dcntx_)
148 
149 BlockDecompressor::BlockDecompressor() {
150  zstd_dcntx_ = ZSTD_createDCtx();
151 }
152 
153 BlockDecompressor::~BlockDecompressor() {
154  ZSTD_freeDCtx(HANDLE);
155 }
156 
157 int32_t BlockDecompressor::Decompress(strings::ByteRange br, uint32_t* consumed) {
158  VLOG(1) << "Decompress " << br.size() << " bytes, frame_state_: " << frame_state_;
159 
160  if (frame_state_ & 2) {
161  ZSTD_frameHeader params;
162  size_t res = ZSTD_getFrameHeader(&params, br.data(), br.size());
163  CHECK_EQ(0, res);
164  CHECK_LE(params.windowSize, BLOCK_SIZE * 2);
165  frame_state_ = 1;
166 
167  CHECK_EQ(0, ZSTD_decompressBegin(HANDLE));
168  if (!buf_) {
169  buf_.reset(new uint8_t[BLOCK_SIZE*2]);
170  }
171  }
172 
173  *consumed = 0;
174  decompress_size_ = 0;
175 
176  uint32_t sz = ZSTD_nextSrcSizeToDecompress(HANDLE);
177 
178  frame_state_ ^= 1; // flip buffer index.
179 
180  while (sz > 0) {
181  if (sz > br.size()) {
182  frame_state_ ^= 1; // revert buffer index.
183  return -sz;
184  }
185 
186  size_t res = ZSTD_decompressContinue(HANDLE, buf_.get() + BLOCK_SIZE * frame_state_, BLOCK_SIZE,
187  br.data(), sz);
188  CHECK_ZSTDERR(res);
189  *consumed += sz;
190  br.advance(sz);
191  sz = ZSTD_nextSrcSizeToDecompress(HANDLE);
192 
193  if (res > 0) {
194  decompress_size_ = res;
195  break;
196  }
197  }
198 
199  if (sz == 0) {
200  frame_state_ |= 2; // We must preserve LSB for subsequent GetDecompressedBlock.
201  return 0;
202  }
203 
204  return 1;
205 }
206 
207 strings::ByteRange BlockDecompressor::GetDecompressedBlock() const {
208  unsigned bindex = frame_state_ & 1;
209  return ByteRange(buf_.get() + BLOCK_SIZE * bindex, decompress_size_);
210 }
211 
212 } // namespace util
213 
214