4 #define ZSTD_STATIC_LINKING_ONLY 8 #include "util/zstd_sinksource.h" 10 #include "base/logging.h" 14 #define HANDLE reinterpret_cast<ZSTD_CStream*>(zstd_handle_) 16 inline Status ZstdStatus(
size_t res) {
17 return Status(StatusCode::IO_ERROR, ZSTD_getErrorName(res));
21 size_t ZStdSink::CompressBound(
size_t src_size) {
22 return ZSTD_compressBound(src_size);
25 ZStdSink::ZStdSink(Sink* upstream) : upstream_(upstream) {
26 buf_sz_ = ZSTD_CStreamOutSize();
27 buf_.reset(
new uint8_t[buf_sz_]);
31 zstd_handle_ = ZSTD_createCStream();
32 VLOG(1) <<
"Allocated " << buf_sz_ <<
" bytes";
35 ZStdSink::~ZStdSink() {
36 ZSTD_freeCStream(HANDLE);
40 Status ZStdSink::Init(
int level) {
41 size_t const res = ZSTD_initCStream_srcSize(HANDLE, level, 0);
42 if (ZSTD_isError(res)) {
43 return ZstdStatus(res);
45 VLOG(1) <<
"allocated " << ZSTD_sizeof_CStream(HANDLE);
49 Status ZStdSink::Append(
const strings::ByteRange& slice) {
50 ZSTD_inBuffer input = { slice.data(), slice.size(), 0 };
51 while (input.pos < input.size) {
52 ZSTD_outBuffer out_buf{ buf_.get(), buf_sz_, 0};
53 size_t res = ZSTD_compressStream(HANDLE, &out_buf , &input);
54 if (ZSTD_isError(res)) {
55 return ZstdStatus(res);
57 RETURN_IF_ERROR(upstream_->Append(strings::ByteRange(buf_.get(), out_buf.pos)));
62 Status ZStdSink::Flush() {
63 ZSTD_outBuffer out_buf{buf_.get(), buf_sz_, 0};
65 size_t res = ZSTD_endStream(HANDLE, &out_buf);
66 if (ZSTD_isError(res)) {
67 return ZstdStatus(res);
71 RETURN_IF_ERROR(upstream_->Append(strings::ByteRange(buf_.get(), out_buf.pos)));
73 return upstream_->Flush();
77 #define DC_HANDLE reinterpret_cast<ZSTD_DStream*>(zstd_handle_) 79 bool ZStdSource::HasValidHeader(Source* upstream) {
81 auto res = upstream->Read(strings::MutableByteRange(buf, arraysize(buf)));
82 if (!res.ok() || res.obj != arraysize(buf))
84 upstream->Prepend(strings::ByteRange(buf, arraysize(buf)));
86 return ZSTD_isFrame(buf, arraysize(buf));
90 const unsigned kReadBuf = 1 << 12;
92 ZStdSource::ZStdSource(Source* upstream)
93 : sub_stream_(upstream) {
95 zstd_handle_ = ZSTD_createDStream();
96 size_t const res = ZSTD_initDStream(DC_HANDLE);
97 CHECK(!ZSTD_isError(res)) << ZSTD_getErrorName(res);
98 buf_.reset(
new uint8_t[kReadBuf]);
101 ZStdSource::~ZStdSource() {
102 ZSTD_freeDStream(DC_HANDLE);
106 StatusObject<size_t> ZStdSource::ReadInternal(
const strings::MutableByteRange& range) {
107 ZSTD_outBuffer output = { range.begin(), range.size(), 0 };
109 if (buf_range_.empty()) {
110 auto res = sub_stream_->Read(strings::MutableByteRange(buf_.get(), kReadBuf));
115 buf_range_.reset(buf_.get(), res.obj);
118 ZSTD_inBuffer input{buf_range_.begin(), buf_range_.size(), 0 };
120 size_t to_read = ZSTD_decompressStream(DC_HANDLE, &output , &input);
121 if (ZSTD_isError(to_read)) {
122 return ZstdStatus(to_read);
125 buf_range_.advance(input.pos);
126 if (input.pos < input.size) {
127 CHECK_EQ(output.pos, output.size);
130 }
while (output.pos < output.size);