Skip to content

Refactored streams #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions clickhouse/base/compressed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ bool CompressedInput::Decompress() {
uint32_t original = 0;
uint8_t method = 0;

if (!WireFormat::ReadFixed(input_, &hash)) {
if (!WireFormat::ReadFixed(*input_, &hash)) {
return false;
}
if (!WireFormat::ReadFixed(input_, &method)) {
if (!WireFormat::ReadFixed(*input_, &method)) {
return false;
}

if (method != COMPRESSION_METHOD) {
throw std::runtime_error("unsupported compression method " + std::to_string(int(method)));
} else {
if (!WireFormat::ReadFixed(input_, &compressed)) {
if (!WireFormat::ReadFixed(*input_, &compressed)) {
return false;
}
if (!WireFormat::ReadFixed(input_, &original)) {
if (!WireFormat::ReadFixed(*input_, &original)) {
return false;
}

Expand All @@ -80,9 +80,10 @@ bool CompressedInput::Decompress() {
out.Write(&method, sizeof(method));
out.Write(&compressed, sizeof(compressed));
out.Write(&original, sizeof(original));
out.Flush();
}

if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
if (!WireFormat::ReadBytes(*input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
return false;
} else {
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
Expand Down Expand Up @@ -110,9 +111,7 @@ CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compre
PreallocateCompressBuffer(max_compressed_chunk_size);
}

CompressedOutput::~CompressedOutput() {
Flush();
}
CompressedOutput::~CompressedOutput() { }

size_t CompressedOutput::DoWrite(const void* data, size_t len) {
const size_t original_len = len;
Expand Down Expand Up @@ -156,9 +155,9 @@ void CompressedOutput::Compress(const void * data, size_t len) {
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
}

WireFormat::WriteFixed(destination_, CityHash128(
WireFormat::WriteFixed(*destination_, CityHash128(
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);

destination_->Flush();
}
Expand Down
10 changes: 5 additions & 5 deletions clickhouse/base/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ size_t ArrayInput::DoNext(const void** ptr, size_t len) {
}


BufferedInput::BufferedInput(InputStream* slave, size_t buflen)
: slave_(slave)
BufferedInput::BufferedInput(std::unique_ptr<InputStream> source, size_t buflen)
: source_(std::move(source))
, array_input_(nullptr, 0)
, buffer_(buflen)
{
Expand All @@ -72,7 +72,7 @@ void BufferedInput::Reset() {
size_t BufferedInput::DoNext(const void** ptr, size_t len) {
if (array_input_.Exhausted()) {
array_input_.Reset(
buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
);
}

Expand All @@ -82,11 +82,11 @@ size_t BufferedInput::DoNext(const void** ptr, size_t len) {
size_t BufferedInput::DoRead(void* buf, size_t len) {
if (array_input_.Exhausted()) {
if (len > buffer_.size() / 2) {
return slave_->Read(buf, len);
return source_->Read(buf, len);
}

array_input_.Reset(
buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
);
}

Expand Down
5 changes: 3 additions & 2 deletions clickhouse/base/input.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstddef>
#include <cstdint>
#include <vector>
#include <memory>

namespace clickhouse {

Expand Down Expand Up @@ -84,7 +85,7 @@ class ArrayInput : public ZeroCopyInput {

class BufferedInput : public ZeroCopyInput {
public:
BufferedInput(InputStream* slave, size_t buflen = 8192);
BufferedInput(std::unique_ptr<InputStream> source, size_t buflen = 8192);
~BufferedInput() override;

void Reset();
Expand All @@ -94,7 +95,7 @@ class BufferedInput : public ZeroCopyInput {
size_t DoNext(const void** ptr, size_t len) override;

private:
InputStream* const slave_;
std::unique_ptr<InputStream> const source_;
ArrayInput array_input_;
std::vector<uint8_t> buffer_;
};
Expand Down
24 changes: 6 additions & 18 deletions clickhouse/base/output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,35 +66,23 @@ size_t BufferOutput::DoNext(void** data, size_t len) {
}


BufferedOutput::BufferedOutput(OutputStream* slave, size_t buflen)
: slave_(slave)
BufferedOutput::BufferedOutput(std::unique_ptr<OutputStream> destination, size_t buflen)
: destination_(std::move(destination))
, buffer_(buflen)
, array_output_(buffer_.data(), buflen)
{
}

BufferedOutput::~BufferedOutput() {
try
{
Flush();
}
catch (...)
{
// That means we've failed to flush some data e.g. to the socket,
// but there is nothing we can do at this point (can't bring the socket back),
// and throwing in destructor is really a bad idea.
// The best we can do is to log the error and ignore it, but currently there is no logging subsystem.
}
}
BufferedOutput::~BufferedOutput() { }

void BufferedOutput::Reset() {
array_output_.Reset(buffer_.data(), buffer_.size());
}

void BufferedOutput::DoFlush() {
if (array_output_.Data() != buffer_.data()) {
slave_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
slave_->Flush();
destination_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
destination_->Flush();

array_output_.Reset(buffer_.data(), buffer_.size());
}
Expand All @@ -114,7 +102,7 @@ size_t BufferedOutput::DoWrite(const void* data, size_t len) {
Flush();

if (len > buffer_.size() / 2) {
return slave_->Write(data, len);
return destination_->Write(data, len);
}
}

Expand Down
17 changes: 13 additions & 4 deletions clickhouse/base/output.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cstdint>
#include <vector>
#include <memory.h>
#include <memory>

namespace clickhouse {

Expand Down Expand Up @@ -92,11 +93,13 @@ class ArrayOutput : public ZeroCopyOutput {

/**
* A ZeroCopyOutput stream backed by a vector.
*
* Doesn't Flush() in destructor, client must ensure to do it manually at some point.
*/
class BufferOutput : public ZeroCopyOutput {
public:
BufferOutput(Buffer* buf);
~BufferOutput();
~BufferOutput() override;

protected:
size_t DoNext(void** data, size_t len) override;
Expand All @@ -106,10 +109,16 @@ class BufferOutput : public ZeroCopyOutput {
size_t pos_;
};


/** BufferedOutput writes data to internal buffer first.
*
* Any data goes to underlying stream only if internal buffer is full
* or when client invokes Flush() on this.
*
* Doesn't Flush() in destructor, client must ensure to do it manually at some point.
*/
class BufferedOutput : public ZeroCopyOutput {
public:
BufferedOutput(OutputStream* slave, size_t buflen = 8192);
explicit BufferedOutput(std::unique_ptr<OutputStream> destination, size_t buflen = 8192);
~BufferedOutput() override;

void Reset();
Expand All @@ -120,7 +129,7 @@ class BufferedOutput : public ZeroCopyOutput {
size_t DoWrite(const void* data, size_t len) override;

private:
OutputStream* const slave_;
std::unique_ptr<OutputStream> const destination_;
Buffer buffer_;
ArrayOutput array_output_;
};
Expand Down
59 changes: 0 additions & 59 deletions clickhouse/base/streamstack.h

This file was deleted.

18 changes: 9 additions & 9 deletions clickhouse/base/wire_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ constexpr int MAX_VARINT_BYTES = 10;

namespace clickhouse {

bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) {
bool WireFormat::ReadAll(InputStream& input, void* buf, size_t len) {
uint8_t* p = static_cast<uint8_t*>(buf);

size_t read_previously = 1; // 1 to execute loop at least once
while (len > 0 && read_previously) {
read_previously = input->Read(p, len);
read_previously = input.Read(p, len);

p += read_previously;
len -= read_previously;
Expand All @@ -25,13 +25,13 @@ bool WireFormat::ReadAll(InputStream * input, void* buf, size_t len) {
return !len;
}

void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) {
void WireFormat::WriteAll(OutputStream& output, const void* buf, size_t len) {
const size_t original_len = len;
const uint8_t* p = static_cast<const uint8_t*>(buf);

size_t written_previously = 1; // 1 to execute loop at least once
while (len > 0 && written_previously) {
written_previously = output->Write(p, len);
written_previously = output.Write(p, len);

p += written_previously;
len -= written_previously;
Expand All @@ -43,13 +43,13 @@ void WireFormat::WriteAll(OutputStream* output, const void* buf, size_t len) {
}
}

bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) {
bool WireFormat::ReadVarint64(InputStream& input, uint64_t* value) {
*value = 0;

for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
uint8_t byte = 0;

if (!input->ReadByte(&byte)) {
if (!input.ReadByte(&byte)) {
return false;
} else {
*value |= uint64_t(byte & 0x7F) << (7 * i);
Expand All @@ -64,7 +64,7 @@ bool WireFormat::ReadVarint64(InputStream* input, uint64_t* value) {
return false;
}

void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) {
void WireFormat::WriteVarint64(OutputStream& output, uint64_t value) {
uint8_t bytes[MAX_VARINT_BYTES];
int size = 0;

Expand All @@ -84,14 +84,14 @@ void WireFormat::WriteVarint64(OutputStream* output, uint64_t value) {
WriteAll(output, bytes, size);
}

bool WireFormat::SkipString(InputStream* input) {
bool WireFormat::SkipString(InputStream& input) {
uint64_t len = 0;

if (ReadVarint64(input, &len)) {
if (len > 0x00FFFFFFULL)
return false;

return input->Skip((size_t)len);
return input.Skip((size_t)len);
}

return false;
Expand Down
Loading