diff --git a/clickhouse/columns/array.cpp b/clickhouse/columns/array.cpp index 0a7a543d..9ef160b5 100644 --- a/clickhouse/columns/array.cpp +++ b/clickhouse/columns/array.cpp @@ -1,5 +1,6 @@ #include "array.h" #include "numeric.h" + #include namespace clickhouse { @@ -45,10 +46,9 @@ ColumnRef ColumnArray::Slice(size_t begin, size_t size) const { if (size && begin + size > Size()) throw ValidationError("Slice indexes are out of bounds"); - auto result = std::make_shared(data_->CloneEmpty()); - for (size_t i = 0; i < size; i++) { - result->AppendAsColumn(GetAsColumn(begin + i)); - } + auto result = std::make_shared(data_->Slice(GetOffset(begin), GetOffset(begin + size) - GetOffset(begin))); + for (size_t i = 0; i < size; i++) + result->AddOffset(GetSize(begin + i)); return result; } diff --git a/ut/column_array_ut.cpp b/ut/column_array_ut.cpp index 6177f8aa..001cc62e 100644 --- a/ut/column_array_ut.cpp +++ b/ut/column_array_ut.cpp @@ -134,6 +134,8 @@ TEST(ColumnArray, Slice) { } TEST(ColumnArray, Slice_2D) { + // Verify that ColumnArray::Slice on 2D Array produces a 2D Array of proper type, size and contents. + // Also check that slices can be of any size. const std::vector>> values = { {{1u, 2u}, {3u}}, {{4u}, {5u, 6u, 7u}, {8u, 9u}, {}}, @@ -143,13 +145,20 @@ TEST(ColumnArray, Slice_2D) { }; std::shared_ptr untyped_array = Create2DArray(values); - for (size_t i = 0; i < values.size() - 1; ++i) { - auto slice = untyped_array->Slice(i, 1)->AsStrict(); - EXPECT_EQ(1u, slice->Size()); - - for (size_t j = 0; j < values[i].size(); ++j) { - EXPECT_TRUE(CompareRecursive(values[i][j], *slice->GetAsColumnTyped(0)->GetAsColumnTyped(j))); + for (size_t slice_size = 0; slice_size < values.size() - i; ++slice_size) { + auto slice = untyped_array->Slice(i, slice_size)->AsStrict(); + EXPECT_EQ(slice_size, slice->Size()); + + for (size_t slice_row = 0; slice_row < slice_size; ++slice_row) { + SCOPED_TRACE(::testing::Message() << "i: " << i << " slice_size:" << slice_size << " row:" << slice_row); + auto val = slice->GetAsColumnTyped(slice_row); + ASSERT_EQ(values[i + slice_row].size(), val->Size()); + + for (size_t j = 0; j < values[i + slice_row].size(); ++j) { + ASSERT_TRUE(CompareRecursive(values[i + slice_row][j], *val->GetAsColumnTyped(j))); + } + } } } }