Skip to content

Commit 9d23bf5

Browse files
authored
Merge pull request #367 from byian/add_zstd
Add ZSTD compression
2 parents 0fb4835 + 9da259d commit 9d23bf5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+76039
-65
lines changed

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ OPTION (WITH_OPENSSL "Use OpenSSL for TLS connections" OFF)
1414
OPTION (WITH_SYSTEM_ABSEIL "Use system ABSEIL" OFF)
1515
OPTION (WITH_SYSTEM_LZ4 "Use system LZ4" OFF)
1616
OPTION (WITH_SYSTEM_CITYHASH "Use system cityhash" OFF)
17+
OPTION (WITH_SYSTEM_ZSTD "Use system ZSTD" OFF)
1718
OPTION (DEBUG_DEPENDENCIES "Print debug info about dependencies duting build" ON)
1819
OPTION (CHECK_VERSION "Check that version number corresponds to git tag, usefull in CI/CD to validate that new version published on GitHub has same version in sources" OFF)
1920

@@ -93,6 +94,13 @@ ELSE ()
9394
SUBDIRS (contrib/cityhash/cityhash)
9495
ENDIF ()
9596

97+
IF (WITH_SYSTEM_ZSTD)
98+
FIND_PACKAGE(zstd REQUIRED)
99+
ELSE ()
100+
INCLUDE_DIRECTORIES (contrib/zstd/zstd)
101+
SUBDIRS (contrib/zstd/zstd)
102+
ENDIF ()
103+
96104
SUBDIRS (
97105
clickhouse
98106
)
@@ -141,4 +149,5 @@ if(DEBUG_DEPENDENCIES)
141149
print_target_debug_info(absl::int128)
142150
print_target_debug_info(cityhash::cityhash)
143151
print_target_debug_info(lz4::lz4)
152+
print_target_debug_info(zstd::zstd)
144153
endif()

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Optional dependencies:
3434
- openssl
3535
- liblz4
3636
- libabsl
37+
- libzstd
3738

3839
## Building
3940

clickhouse/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib
113113
absl::int128
114114
cityhash::cityhash
115115
lz4::lz4
116+
zstd::zstd
116117
)
117118
TARGET_INCLUDE_DIRECTORIES (clickhouse-cpp-lib
118119
PUBLIC ${PROJECT_SOURCE_DIR}

clickhouse/base/compressed.cpp

Lines changed: 139 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
#include "compressed.h"
22
#include "wire_format.h"
33
#include "output.h"
4-
#include "../exceptions.h"
4+
#include "clickhouse/exceptions.h"
55

66
#include <city.h>
77
#include <lz4.h>
88
#include <exception>
9+
#include <zstd.h>
910
#include <stdexcept>
1011
#include <system_error>
1112

1213
namespace {
1314
constexpr size_t HEADER_SIZE = 9;
14-
// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
15-
constexpr uint8_t COMPRESSION_METHOD = 0x82;
16-
// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
15+
16+
// see DB::CompressionMethodByte from src/Compression/CompressionInfo.h of ClickHouse project
17+
enum class CompressionMethodByte : uint8_t {
18+
NONE = 0x02,
19+
LZ4 = 0x82,
20+
ZSTD = 0x90,
21+
};
22+
23+
// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound/ZSTD_compressBound estimation.
1724
constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096;
1825
constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB
1926
}
@@ -32,7 +39,7 @@ CompressedInput::~CompressedInput() {
3239
#else
3340
if (!std::uncaught_exceptions()) {
3441
#endif
35-
throw LZ4Error("some data was not read");
42+
throw CompressionError("some data was not read");
3643
}
3744
}
3845
}
@@ -60,55 +67,79 @@ bool CompressedInput::Decompress() {
6067
return false;
6168
}
6269

63-
if (method != COMPRESSION_METHOD) {
64-
throw LZ4Error("unsupported compression method " + std::to_string(int(method)));
65-
} else {
66-
if (!WireFormat::ReadFixed(*input_, &compressed)) {
67-
return false;
68-
}
69-
if (!WireFormat::ReadFixed(*input_, &original)) {
70-
return false;
71-
}
70+
if (method != static_cast<uint8_t>(CompressionMethodByte::LZ4) && method != static_cast<uint8_t>(CompressionMethodByte::ZSTD)) {
71+
throw CompressionError("unsupported compression method " + std::to_string((method)));
72+
}
7273

73-
if (compressed > DBMS_MAX_COMPRESSED_SIZE) {
74-
throw LZ4Error("compressed data too big");
75-
}
74+
if (!WireFormat::ReadFixed(*input_, &compressed)) {
75+
return false;
76+
}
77+
if (!WireFormat::ReadFixed(*input_, &original)) {
78+
return false;
79+
}
80+
81+
if (compressed > DBMS_MAX_COMPRESSED_SIZE) {
82+
throw CompressionError("compressed data too big");
83+
}
7684

77-
Buffer tmp(compressed);
85+
Buffer tmp(compressed);
7886

79-
// Data header
80-
{
81-
BufferOutput out(&tmp);
82-
out.Write(&method, sizeof(method));
83-
out.Write(&compressed, sizeof(compressed));
84-
out.Write(&original, sizeof(original));
85-
out.Flush();
87+
// Data header
88+
{
89+
BufferOutput out(&tmp);
90+
out.Write(&method, sizeof(method));
91+
out.Write(&compressed, sizeof(compressed));
92+
out.Write(&original, sizeof(original));
93+
out.Flush();
94+
}
95+
96+
if (!WireFormat::ReadBytes(*input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
97+
return false;
98+
} else {
99+
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
100+
throw CompressionError("data was corrupted");
86101
}
102+
}
103+
104+
data_ = Buffer(original);
87105

88-
if (!WireFormat::ReadBytes(*input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
89-
return false;
106+
switch (method) {
107+
case static_cast<uint8_t>(CompressionMethodByte::LZ4): {
108+
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), static_cast<int>(compressed - HEADER_SIZE), original) < 0) {
109+
throw CompressionError("can't decompress LZ4-encoded data");
90110
} else {
91-
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
92-
throw LZ4Error("data was corrupted");
93-
}
111+
mem_.Reset(data_.data(), original);
94112
}
113+
return true;
114+
}
95115

96-
data_ = Buffer(original);
116+
case static_cast<uint8_t>(CompressionMethodByte::ZSTD): {
117+
size_t res = ZSTD_decompress((char*)data_.data(), original, (const char*)tmp.data() + HEADER_SIZE, static_cast<int>(compressed - HEADER_SIZE));
97118

98-
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), static_cast<int>(compressed - HEADER_SIZE), original) < 0) {
99-
throw LZ4Error("can't decompress data");
119+
if (ZSTD_isError(res)) {
120+
throw CompressionError("can't decompress ZSTD-encoded data, ZSTD error: " + std::string(ZSTD_getErrorName(res)));
100121
} else {
101122
mem_.Reset(data_.data(), original);
102123
}
124+
return true;
125+
}
126+
127+
case static_cast<uint8_t>(CompressionMethodByte::NONE): {
128+
throw CompressionError("compression method not defined" + std::to_string((method)));
129+
}
130+
default: {
131+
throw CompressionError("Unknown or unsupported compression method " + std::to_string((method)));
132+
}
103133
}
104134

105135
return true;
106136
}
107137

108138

109-
CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
139+
CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size, CompressionMethod method)
110140
: destination_(destination)
111141
, max_compressed_chunk_size_(max_compressed_chunk_size)
142+
, method_(method)
112143
{
113144
PreallocateCompressBuffer(max_compressed_chunk_size);
114145
}
@@ -139,37 +170,89 @@ void CompressedOutput::DoFlush() {
139170
}
140171

141172
void CompressedOutput::Compress(const void * data, size_t len) {
142-
const auto compressed_size = LZ4_compress_default(
143-
(const char*)data,
144-
(char*)compressed_buffer_.data() + HEADER_SIZE,
145-
static_cast<int>(len),
146-
static_cast<int>(compressed_buffer_.size() - HEADER_SIZE));
147-
if (compressed_size <= 0)
148-
throw LZ4Error("Failed to compress chunk of " + std::to_string(len) + " bytes, "
149-
"LZ4 error: " + std::to_string(compressed_size));
173+
switch (method_) {
174+
case clickhouse::CompressionMethod::LZ4: {
175+
const auto compressed_size = LZ4_compress_default(
176+
(const char*)data,
177+
(char*)compressed_buffer_.data() + HEADER_SIZE,
178+
static_cast<int>(len),
179+
static_cast<int>(compressed_buffer_.size() - HEADER_SIZE));
180+
if (compressed_size <= 0)
181+
throw CompressionError("Failed to compress chunk of " + std::to_string(len) + " bytes, "
182+
"LZ4 error: " + std::to_string(compressed_size));
150183

151-
{
152-
auto header = compressed_buffer_.data();
153-
WriteUnaligned(header, COMPRESSION_METHOD);
154-
// Compressed data size with header
155-
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
156-
// Original data size
157-
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
184+
{
185+
auto header = compressed_buffer_.data();
186+
WriteUnaligned(header, CompressionMethodByte::LZ4);
187+
// Compressed data size with header
188+
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
189+
// Original data size
190+
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
191+
}
192+
193+
WireFormat::WriteFixed(*destination_, CityHash128((const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
194+
WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
195+
break;
158196
}
159197

160-
WireFormat::WriteFixed(*destination_, CityHash128(
161-
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
162-
WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
198+
case clickhouse::CompressionMethod::ZSTD: {
199+
const size_t compressed_size = ZSTD_compress(
200+
(char*)compressed_buffer_.data() + HEADER_SIZE,
201+
static_cast<int>(compressed_buffer_.size() - HEADER_SIZE),
202+
(const char*)data,
203+
static_cast<int>(len),
204+
ZSTD_fast);
205+
if (ZSTD_isError(compressed_size))
206+
throw CompressionError("Failed to compress chunk of " + std::to_string(len) + " bytes, "
207+
"ZSTD error: " + std::string(ZSTD_getErrorName(compressed_size)));
208+
209+
{
210+
auto header = compressed_buffer_.data();
211+
WriteUnaligned(header, CompressionMethodByte::ZSTD);
212+
// Compressed data size with header
213+
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
214+
// Original data size
215+
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
216+
}
217+
218+
WireFormat::WriteFixed(*destination_, CityHash128((const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
219+
WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
220+
break;
221+
}
222+
223+
case clickhouse::CompressionMethod::None: {
224+
throw CompressionError("no compression defined");
225+
}
226+
}
163227

164228
destination_->Flush();
165229
}
166230

167231
void CompressedOutput::PreallocateCompressBuffer(size_t input_size) {
168-
const auto estimated_compressed_buffer_size = LZ4_compressBound(static_cast<int>(input_size));
169-
if (estimated_compressed_buffer_size <= 0)
170-
throw LZ4Error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));
232+
switch (method_) {
233+
case clickhouse::CompressionMethod::LZ4: {
234+
const auto estimated_compressed_buffer_size = LZ4_compressBound(static_cast<int>(input_size));
235+
if (estimated_compressed_buffer_size <= 0)
236+
throw CompressionError("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));
237+
238+
compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
239+
break;
240+
}
171241

172-
compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
242+
case clickhouse::CompressionMethod::ZSTD: {
243+
const size_t estimated_compressed_buffer_size = ZSTD_compressBound(static_cast<int>(input_size));
244+
if (ZSTD_isError(estimated_compressed_buffer_size))
245+
throw CompressionError("Failed to estimate compressed buffer size, ZSTD error: " + std::string(ZSTD_getErrorName(estimated_compressed_buffer_size)));
246+
247+
compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
248+
break;
249+
}
250+
251+
case clickhouse::CompressionMethod::None: {
252+
/// do nothing
253+
break;
254+
}
255+
}
173256
}
174257

175258
}

clickhouse/base/compressed.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "output.h"
55
#include "buffer.h"
66

7+
#include "clickhouse/client.h"
8+
79
namespace clickhouse {
810

911
class CompressedInput : public ZeroCopyInput {
@@ -25,7 +27,7 @@ class CompressedInput : public ZeroCopyInput {
2527

2628
class CompressedOutput : public OutputStream {
2729
public:
28-
explicit CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
30+
explicit CompressedOutput(OutputStream* destination, size_t max_compressed_chunk_size = 0, CompressionMethod method = CompressionMethod::LZ4);
2931
~CompressedOutput() override;
3032

3133
protected:
@@ -40,6 +42,7 @@ class CompressedOutput : public OutputStream {
4042
OutputStream * destination_;
4143
const size_t max_compressed_chunk_size_;
4244
Buffer compressed_buffer_;
45+
CompressionMethod method_;
4346
};
4447

4548
}

clickhouse/client.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) {
8686
<< " send_retries:" << opt.send_retries
8787
<< " retry_timeout:" << opt.retry_timeout.count()
8888
<< " compression_method:"
89-
<< (opt.compression_method == CompressionMethod::LZ4 ? "LZ4" : "None");
89+
<< (opt.compression_method == CompressionMethod::LZ4 ? "LZ4"
90+
: opt.compression_method == CompressionMethod::ZSTD ? "ZSTD"
91+
: "None");
9092
#if defined(WITH_OPENSSL)
9193
if (opt.ssl_options) {
9294
const auto & ssl_options = *opt.ssl_options;
@@ -858,9 +860,8 @@ void Client::Impl::SendData(const Block& block) {
858860
}
859861

860862
if (compression_ == CompressionState::Enable) {
861-
assert(options_.compression_method == CompressionMethod::LZ4);
862863

863-
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size);
864+
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
864865
BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size);
865866

866867
WriteBlock(block, buffered);

clickhouse/client.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ struct ServerInfo {
4040
};
4141

4242
/// Methods of block compression.
43-
enum class CompressionMethod {
44-
None = -1,
45-
LZ4 = 1,
43+
enum class CompressionMethod : int8_t {
44+
None = -1,
45+
LZ4 = 1,
46+
ZSTD = 2,
4647
};
4748

4849
struct Endpoint {

clickhouse/exceptions.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class OpenSSLError : public Error {
3333
using Error::Error;
3434
};
3535

36-
class LZ4Error : public Error {
36+
class CompressionError : public Error {
3737
using Error::Error;
3838
};
3939

0 commit comments

Comments
 (0)