diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index a97c6e60b3f..c2d0c5aef2f 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -175,6 +175,7 @@ set(VELOX_SRCS memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc operators/functions/RegistrationAllFunctions.cc + operators/functions/DeltaBitmapAggregator.cc operators/functions/RowConstructorWithNull.cc operators/functions/SparkExprToSubfieldFilterParser.cc operators/plannodes/RowVectorStream.cc diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index fdfd7bda25f..43f5763902a 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -37,3 +37,5 @@ add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc) add_velox_benchmark(velox_batch_resizer_benchmark VeloxBatchResizerBenchmark.cc) + +add_velox_benchmark(delta_bitmap_benchmark DeltaBitmapBenchmark.cc) diff --git a/cpp/velox/benchmarks/DeltaBitmapBenchmark.cc b/cpp/velox/benchmarks/DeltaBitmapBenchmark.cc new file mode 100644 index 00000000000..b84d103ef9e --- /dev/null +++ b/cpp/velox/benchmarks/DeltaBitmapBenchmark.cc @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "compute/delta/RoaringBitmapArray.h" +#include "velox/common/base/Exceptions.h" + +using gluten::delta::RoaringBitmapArray; + +namespace { + +enum class RowIndexPattern { + kContiguous, + kSparse, + kClustered, + kMultiBucket, +}; + +enum class PartialDistribution { + kContiguous, + kRoundRobin, +}; + +struct RowIndexSummary { + uint64_t rowSpan{0}; + size_t bucketCount{0}; + double densityPercent{0}; +}; + +std::vector makeRowIndexes(size_t rowCount, RowIndexPattern pattern) { + std::vector rows; + rows.reserve(rowCount); + for (size_t i = 0; i < rowCount; ++i) { + switch (pattern) { + case RowIndexPattern::kContiguous: + rows.push_back(i); + break; + case RowIndexPattern::kSparse: + rows.push_back(i * 97); + break; + case RowIndexPattern::kClustered: + rows.push_back((i / 64) * 4096 + (i % 64)); + break; + case RowIndexPattern::kMultiBucket: + rows.push_back((static_cast(i % 4) << 32) + (i / 4)); + break; + } + } + return rows; +} + +RowIndexSummary summarizeRowIndexes(const std::vector& rows) { + if (rows.empty()) { + return {}; + } + + const auto [minIt, maxIt] = std::minmax_element(rows.begin(), rows.end()); + std::set buckets; + for (const auto row : rows) { + buckets.insert(static_cast(row >> 32)); + } + + const auto rowSpan = *maxIt - *minIt + 1; + return RowIndexSummary{ + rowSpan, buckets.size(), static_cast(rows.size()) * 100.0 / static_cast(rowSpan)}; +} + +std::string buildPayload(const std::vector& rows, bool optimize) { + RoaringBitmapArray bitmap; + for (const auto row : rows) { + bitmap.addSafe(row); + } + return bitmap.serializeToString(optimize); +} + +std::vector buildPartialPayloads( + const std::vector& rows, + size_t partialCount, + bool optimize, + PartialDistribution distribution) { + std::vector partials(partialCount); + for (size_t i = 0; i < rows.size(); ++i) { + const auto partialIndex = distribution == PartialDistribution::kRoundRobin + ? i % partialCount + : std::min(i * partialCount / rows.size(), partialCount - 1); + partials[partialIndex].addSafe(rows[i]); + } + + std::vector payloads; + payloads.reserve(partialCount); + for (const auto& partial : partials) { + payloads.push_back(partial.serializeToString(optimize)); + } + return payloads; +} + +std::vector makeProbeRows(const std::vector& rows) { + const auto hitProbeCount = std::min(rows.size(), 4096); + std::vector probes; + probes.reserve(hitProbeCount * 2); + if (hitProbeCount == 0) { + return probes; + } + + const auto stride = std::max(rows.size() / hitProbeCount, 1); + for (size_t i = 0; i < rows.size() && probes.size() < hitProbeCount * 2; i += stride) { + probes.push_back(rows[i]); + probes.push_back(rows.back() + 4096 + probes.size()); + } + return probes; +} + +void setCounters( + benchmark::State& state, + size_t rowCount, + size_t payloadBytes, + RowIndexSummary summary, + size_t partialCount = 0) { + state.counters["rows"] = benchmark::Counter(rowCount); + state.counters["payload_bytes"] = benchmark::Counter(payloadBytes); + state.counters["payload_bytes_per_row"] = benchmark::Counter(static_cast(payloadBytes) / rowCount); + state.counters["row_span"] = benchmark::Counter(summary.rowSpan); + state.counters["bucket_count"] = benchmark::Counter(summary.bucketCount); + state.counters["density_pct"] = benchmark::Counter(summary.densityPercent); + if (partialCount > 0) { + state.counters["partials"] = benchmark::Counter(partialCount); + } +} + +void BM_BuildAndSerialize(benchmark::State& state, RowIndexPattern pattern) { + const auto rows = makeRowIndexes(state.range(0), pattern); + const auto summary = summarizeRowIndexes(rows); + size_t payloadBytes = 0; + uint64_t cardinality = 0; + + for (auto _ : state) { + RoaringBitmapArray bitmap; + for (const auto row : rows) { + bitmap.addSafe(row); + } + const auto payload = bitmap.serializeToString(true); + payloadBytes = payload.size(); + cardinality = bitmap.cardinality(); + VELOX_CHECK_EQ(cardinality, rows.size()); + benchmark::DoNotOptimize(payload); + } + + state.SetItemsProcessed(state.iterations() * rows.size()); + state.SetBytesProcessed(state.iterations() * rows.size() * sizeof(uint64_t)); + setCounters(state, rows.size(), payloadBytes, summary); + state.counters["cardinality"] = benchmark::Counter(cardinality); +} + +void BM_DeserializeAndProbe(benchmark::State& state, RowIndexPattern pattern) { + const auto rows = makeRowIndexes(state.range(0), pattern); + const auto summary = summarizeRowIndexes(rows); + const auto payload = buildPayload(rows, true); + const auto probes = makeProbeRows(rows); + uint64_t hits = 0; + + for (auto _ : state) { + RoaringBitmapArray bitmap; + bitmap.deserialize(payload.data(), payload.size()); + VELOX_CHECK_EQ(bitmap.cardinality(), rows.size()); + uint64_t localHits = 0; + for (const auto probe : probes) { + localHits += bitmap.containsSafe(probe) ? 1 : 0; + } + hits = localHits; + benchmark::DoNotOptimize(hits); + } + + state.SetItemsProcessed(state.iterations() * probes.size()); + state.SetBytesProcessed(state.iterations() * payload.size()); + setCounters(state, rows.size(), payload.size(), summary); + state.counters["probes"] = benchmark::Counter(probes.size()); + state.counters["hits"] = benchmark::Counter(hits); +} + +void BM_MergePartials(benchmark::State& state, RowIndexPattern pattern, PartialDistribution distribution) { + const auto rows = makeRowIndexes(state.range(0), pattern); + const auto summary = summarizeRowIndexes(rows); + const auto partialCount = static_cast(state.range(1)); + const auto payloads = buildPartialPayloads(rows, partialCount, false, distribution); + size_t mergedPayloadBytes = 0; + uint64_t cardinality = 0; + + for (auto _ : state) { + RoaringBitmapArray merged; + for (const auto& payload : payloads) { + RoaringBitmapArray partial; + partial.deserialize(payload.data(), payload.size()); + merged.merge(partial); + } + const auto mergedPayload = merged.serializeToString(true); + mergedPayloadBytes = mergedPayload.size(); + cardinality = merged.cardinality(); + VELOX_CHECK_EQ(cardinality, rows.size()); + benchmark::DoNotOptimize(mergedPayload); + } + + state.SetItemsProcessed(state.iterations() * rows.size()); + setCounters(state, rows.size(), mergedPayloadBytes, summary, partialCount); + state.counters["cardinality"] = benchmark::Counter(cardinality); +} + +} // namespace + +BENCHMARK_CAPTURE(BM_BuildAndSerialize, Contiguous_1M, RowIndexPattern::kContiguous) + ->Arg(1 << 20) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE(BM_BuildAndSerialize, Sparse_1M, RowIndexPattern::kSparse) + ->Arg(1 << 20) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE(BM_BuildAndSerialize, Clustered_1M, RowIndexPattern::kClustered) + ->Arg(1 << 20) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE(BM_BuildAndSerialize, MultiBucket_256K, RowIndexPattern::kMultiBucket) + ->Arg(1 << 18) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_CAPTURE(BM_DeserializeAndProbe, Contiguous_1M, RowIndexPattern::kContiguous) + ->Arg(1 << 20) + ->Unit(benchmark::kMicrosecond); +BENCHMARK_CAPTURE(BM_DeserializeAndProbe, Sparse_1M, RowIndexPattern::kSparse) + ->Arg(1 << 20) + ->Unit(benchmark::kMicrosecond); +BENCHMARK_CAPTURE(BM_DeserializeAndProbe, MultiBucket_256K, RowIndexPattern::kMultiBucket) + ->Arg(1 << 18) + ->Unit(benchmark::kMicrosecond); + +BENCHMARK_CAPTURE( + BM_MergePartials, + Contiguous_1M_64Partials, + RowIndexPattern::kContiguous, + PartialDistribution::kContiguous) + ->Args({1 << 20, 64}) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE( + BM_MergePartials, + Contiguous_1M_64RoundRobinPartials, + RowIndexPattern::kContiguous, + PartialDistribution::kRoundRobin) + ->Args({1 << 20, 64}) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE(BM_MergePartials, Sparse_1M_64Partials, RowIndexPattern::kSparse, PartialDistribution::kContiguous) + ->Args({1 << 20, 64}) + ->Unit(benchmark::kMillisecond); +BENCHMARK_CAPTURE( + BM_MergePartials, + MultiBucket_256K_64Partials, + RowIndexPattern::kMultiBucket, + PartialDistribution::kContiguous) + ->Args({1 << 18, 64}) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_MAIN(); diff --git a/cpp/velox/compute/delta/RoaringBitmapArray.cpp b/cpp/velox/compute/delta/RoaringBitmapArray.cpp index 7da3364d0b7..696a93f5e09 100644 --- a/cpp/velox/compute/delta/RoaringBitmapArray.cpp +++ b/cpp/velox/compute/delta/RoaringBitmapArray.cpp @@ -46,6 +46,14 @@ uint32_t readUint32LittleEndian(const char* data) { (static_cast(bytes[2]) << 16) | (static_cast(bytes[3]) << 24); } +uint64_t readUint64LittleEndian(const char* data) { + const auto* bytes = reinterpret_cast(data); + return static_cast(bytes[0]) | (static_cast(bytes[1]) << 8) | + (static_cast(bytes[2]) << 16) | (static_cast(bytes[3]) << 24) | + (static_cast(bytes[4]) << 32) | (static_cast(bytes[5]) << 40) | + (static_cast(bytes[6]) << 48) | (static_cast(bytes[7]) << 56); +} + void writeUint32LittleEndian(char* data, uint32_t value) { auto* bytes = reinterpret_cast(data); bytes[0] = static_cast(value & 0xFF); @@ -54,33 +62,181 @@ void writeUint32LittleEndian(char* data, uint32_t value) { bytes[3] = static_cast((value >> 24) & 0xFF); } +void writeUint64LittleEndian(char* data, uint64_t value) { + auto* bytes = reinterpret_cast(data); + bytes[0] = static_cast(value & 0xFF); + bytes[1] = static_cast((value >> 8) & 0xFF); + bytes[2] = static_cast((value >> 16) & 0xFF); + bytes[3] = static_cast((value >> 24) & 0xFF); + bytes[4] = static_cast((value >> 32) & 0xFF); + bytes[5] = static_cast((value >> 40) & 0xFF); + bytes[6] = static_cast((value >> 48) & 0xFF); + bytes[7] = static_cast((value >> 56) & 0xFF); +} + +uint32_t highBytes(uint64_t value) { + return static_cast(value >> 32); +} + +uint32_t lowBytes(uint64_t value) { + return static_cast(value); +} + +uint64_t composeFromHighLowBytes(uint32_t high, uint32_t low) { + return (static_cast(high) << 32) | static_cast(low); +} + } // namespace void RoaringBitmapArray::addSafe(uint64_t value) { - bitmap_.add(value); + VELOX_CHECK_LE( + value, + kMaxRepresentableValue, + "Delta RoaringBitmapArray row index {} exceeds max representable value {}", + value, + kMaxRepresentableValue); + bitmaps_[highBytes(value)].add(lowBytes(value)); } bool RoaringBitmapArray::containsSafe(uint64_t value) const { - return bitmap_.contains(value); + VELOX_CHECK_LE( + value, + kMaxRepresentableValue, + "Delta RoaringBitmapArray row index {} exceeds max representable value {}", + value, + kMaxRepresentableValue); + auto it = bitmaps_.find(highBytes(value)); + if (it == bitmaps_.end()) { + return false; + } + return it->second.contains(lowBytes(value)); } -void RoaringBitmapArray::serialize(char* buffer) const { +void RoaringBitmapArray::merge(const RoaringBitmapArray& other) { + for (const auto& [key, bitmap] : other.bitmaps_) { + bitmaps_[key] |= bitmap; + } +} + +uint64_t RoaringBitmapArray::cardinality() const { + uint64_t cardinality = 0; + for (const auto& [_, bitmap] : bitmaps_) { + cardinality += bitmap.cardinality(); + } + return cardinality; +} + +std::optional RoaringBitmapArray::last() const { + for (auto it = bitmaps_.rbegin(); it != bitmaps_.rend(); ++it) { + if (!it->second.isEmpty()) { + return composeFromHighLowBytes(it->first, it->second.maximum()); + } + } + return std::nullopt; +} + +void RoaringBitmapArray::serializeBitmap(const BitmapMap& bitmaps, char* buffer) { VELOX_CHECK_NOT_NULL(buffer, "RoaringBitmapArray serialization buffer is null"); + uint64_t nonEmptyBitmapCount = 0; + for (const auto& [_, bitmap] : bitmaps) { + if (!bitmap.isEmpty()) { + ++nonEmptyBitmapCount; + } + } + writeUint32LittleEndian(buffer, kPortableSerializationFormatMagicNumber); - bitmap_.write(buffer + sizeof(uint32_t), true); + buffer += sizeof(uint32_t); + writeUint64LittleEndian(buffer, nonEmptyBitmapCount); + buffer += sizeof(uint64_t); + + for (const auto& [key, bitmap] : bitmaps) { + if (bitmap.isEmpty()) { + continue; + } + writeUint32LittleEndian(buffer, key); + buffer += sizeof(uint32_t); + buffer += bitmap.write(buffer, true); + } +} + +void RoaringBitmapArray::serialize(char* buffer) const { + serializeBitmap(bitmaps_, buffer); +} + +std::string RoaringBitmapArray::serializeToString(bool optimize) const { + if (!optimize) { + std::string out(serializedSizeInBytes(bitmaps_), '\0'); + serializeBitmap(bitmaps_, out.data()); + return out; + } + + auto bitmaps = bitmaps_; + for (auto& [_, bitmap] : bitmaps) { + bitmap.runOptimize(); + } + + std::string out(serializedSizeInBytes(bitmaps), '\0'); + serializeBitmap(bitmaps, out.data()); + return out; } void RoaringBitmapArray::deserialize(const char* buffer, size_t size) { VELOX_CHECK_NOT_NULL(buffer, "RoaringBitmapArray input buffer is null"); - VELOX_CHECK_GE(size, sizeof(uint32_t), "RoaringBitmapArray payload is too small: {}", size); + VELOX_CHECK_GE(size, sizeof(uint32_t) + sizeof(uint64_t), "RoaringBitmapArray payload is too small: {}", size); const auto magic = readUint32LittleEndian(buffer); VELOX_CHECK_EQ( magic, kPortableSerializationFormatMagicNumber, "Unexpected RoaringBitmapArray magic number {}", magic); - bitmap_ = roaring::Roaring64Map::readSafe(buffer + sizeof(uint32_t), size - sizeof(uint32_t)); + + bitmaps_.clear(); + const char* cursor = buffer + sizeof(uint32_t); + size_t remaining = size - sizeof(uint32_t); + const auto bitmapCount = readUint64LittleEndian(cursor); + cursor += sizeof(uint64_t); + remaining -= sizeof(uint64_t); + + uint32_t previousKey = 0; + bool hasPreviousKey = false; + for (uint64_t i = 0; i < bitmapCount; ++i) { + VELOX_CHECK_GE(remaining, sizeof(uint32_t), "RoaringBitmapArray payload ended before bitmap key"); + const auto key = readUint32LittleEndian(cursor); + VELOX_CHECK_LE(key, kMaxHighKey, "RoaringBitmapArray bitmap key {} exceeds Delta's max high key", key); + cursor += sizeof(uint32_t); + remaining -= sizeof(uint32_t); + + if (hasPreviousKey) { + VELOX_CHECK_GT(key, previousKey, "RoaringBitmapArray bitmap keys are not strictly ascending"); + } + hasPreviousKey = true; + previousKey = key; + + const auto bitmapSize = roaring::api::roaring_bitmap_portable_deserialize_size(cursor, remaining); + VELOX_CHECK_GT(bitmapSize, 0, "Invalid serialized roaring bitmap in RoaringBitmapArray"); + VELOX_CHECK_LE(bitmapSize, remaining, "Serialized roaring bitmap exceeds remaining payload size"); + auto bitmap = roaring::Roaring::readSafe(cursor, remaining); + VELOX_CHECK( + key != kMaxHighKey || bitmap.isEmpty() || bitmap.maximum() <= kMaxLowKeyForMaxHighKey, + "RoaringBitmapArray bitmap for max high key exceeds Delta's max representable value"); + bitmaps_.emplace(key, std::move(bitmap)); + cursor += bitmapSize; + remaining -= bitmapSize; + } + + VELOX_CHECK_EQ(remaining, 0, "RoaringBitmapArray payload has {} trailing bytes", remaining); } size_t RoaringBitmapArray::serializedSizeInBytes() const { - return sizeof(uint32_t) + bitmap_.getSizeInBytes(true); + return serializedSizeInBytes(bitmaps_); +} + +size_t RoaringBitmapArray::serializedSizeInBytes(const BitmapMap& bitmaps) { + size_t size = sizeof(uint32_t) + sizeof(uint64_t); + for (const auto& [_, bitmap] : bitmaps) { + if (bitmap.isEmpty()) { + continue; + } + size += sizeof(uint32_t) + bitmap.getSizeInBytes(true); + } + return size; } } // namespace gluten::delta diff --git a/cpp/velox/compute/delta/RoaringBitmapArray.h b/cpp/velox/compute/delta/RoaringBitmapArray.h index 68eb2c9db37..f456145520b 100644 --- a/cpp/velox/compute/delta/RoaringBitmapArray.h +++ b/cpp/velox/compute/delta/RoaringBitmapArray.h @@ -34,27 +34,46 @@ #include #include -#include +#include +#include +#include +#include namespace gluten::delta { /// Minimal 64-bit roaring bitmap wrapper for Delta deletion-vector payloads. -/// This intentionally models the payload format consumed by DeltaDeletionVectorReader: -/// a 4-byte magic number followed by CRoaring's portable Roaring64Map serialization. +/// Delta's JVM implementation stores row indexes as an array of 32-bit roaring bitmaps keyed by +/// the high 32 bits. Keeping the same shape avoids Roaring64Map overhead for the common case where +/// per-file row indexes fit in a single 32-bit bitmap. class RoaringBitmapArray { public: static constexpr uint32_t kPortableSerializationFormatMagicNumber = 1681511377; + // Matches Delta JVM RoaringBitmapArray.MAX_REPRESENTABLE_VALUE. + static constexpr uint32_t kMaxHighKey = 0x7ffffffe; + static constexpr uint32_t kMaxLowKeyForMaxHighKey = 0x80000000; + static constexpr uint64_t kMaxRepresentableValue = + (static_cast(kMaxHighKey) << 32) | kMaxLowKeyForMaxHighKey; void addSafe(uint64_t value); bool containsSafe(uint64_t value) const; + void merge(const RoaringBitmapArray& other); + + uint64_t cardinality() const; + std::optional last() const; void serialize(char* buffer) const; + std::string serializeToString(bool optimize = false) const; void deserialize(const char* buffer, size_t size); size_t serializedSizeInBytes() const; private: - roaring::Roaring64Map bitmap_; + using BitmapMap = std::map; + + static void serializeBitmap(const BitmapMap& bitmaps, char* buffer); + static size_t serializedSizeInBytes(const BitmapMap& bitmaps); + + BitmapMap bitmaps_; }; } // namespace gluten::delta diff --git a/cpp/velox/compute/delta/tests/CMakeLists.txt b/cpp/velox/compute/delta/tests/CMakeLists.txt index eabe94b722a..3c65d8443f9 100644 --- a/cpp/velox/compute/delta/tests/CMakeLists.txt +++ b/cpp/velox/compute/delta/tests/CMakeLists.txt @@ -25,7 +25,7 @@ add_test( WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) add_executable( - velox_delta_read_test DeltaConnectorTest.cpp + velox_delta_read_test DeltaConnectorTest.cpp DeltaBitmapAggregatorTest.cpp DeltaDeletionVectorReaderTest.cpp DeltaSplitTest.cpp) target_link_libraries(velox_delta_read_test velox roaring diff --git a/cpp/velox/compute/delta/tests/DeltaBitmapAggregatorTest.cpp b/cpp/velox/compute/delta/tests/DeltaBitmapAggregatorTest.cpp new file mode 100644 index 00000000000..0601aae2a5a --- /dev/null +++ b/cpp/velox/compute/delta/tests/DeltaBitmapAggregatorTest.cpp @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include "compute/delta/RoaringBitmapArray.h" +#include "operators/functions/DeltaBitmapAggregator.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace gluten::delta { +namespace { + +using facebook::velox::StringView; +using facebook::velox::exec::test::AssertQueryBuilder; +using facebook::velox::exec::test::HiveConnectorTestBase; +using facebook::velox::exec::test::PlanBuilder; + +class DeltaBitmapAggregatorTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + gluten::registerDeltaBitmapAggregator(); + } + + RoaringBitmapArray extractBitmap( + const facebook::velox::RowVectorPtr& results, + facebook::velox::vector_size_t row, + facebook::velox::column_index_t column, + int64_t expectedCardinality, + std::optional expectedLast) { + const auto bitmapResult = results->childAt(column)->as(); + EXPECT_EQ(bitmapResult->childAt(0)->asFlatVector()->valueAt(row), expectedCardinality); + if (expectedLast.has_value()) { + EXPECT_FALSE(bitmapResult->childAt(1)->isNullAt(row)); + EXPECT_EQ(bitmapResult->childAt(1)->asFlatVector()->valueAt(row), expectedLast.value()); + } else { + EXPECT_TRUE(bitmapResult->childAt(1)->isNullAt(row)); + } + + const auto payload = bitmapResult->childAt(2)->asFlatVector()->valueAt(row); + RoaringBitmapArray bitmap; + bitmap.deserialize(payload.data(), payload.size()); + return bitmap; + } +}; + +TEST_F(DeltaBitmapAggregatorTest, SingleAggregationIgnoresNullsAndDuplicates) { + const auto input = makeRowVector( + {"row_index"}, {makeNullableFlatVector({1, 7, 7, std::nullopt, static_cast(1ULL << 33)})}); + + const auto plan = + PlanBuilder(pool()).values({input}).singleAggregation({}, {"bitmapaggregator(row_index) AS dv"}).planNode(); + const auto results = AssertQueryBuilder(plan).copyResults(pool()); + ASSERT_EQ(results->size(), 1); + + const auto bitmap = extractBitmap(results, 0, 0, 3, static_cast(1ULL << 33)); + EXPECT_TRUE(bitmap.containsSafe(1)); + EXPECT_TRUE(bitmap.containsSafe(7)); + EXPECT_TRUE(bitmap.containsSafe(1ULL << 33)); + EXPECT_FALSE(bitmap.containsSafe(2)); +} + +TEST_F(DeltaBitmapAggregatorTest, PartialFinalAggregationMergesPayloadsByGroup) { + const auto input = makeRowVector( + {"file_id", "row_index"}, + {makeFlatVector({10, 10, 20, 10, 20, 20}), + makeFlatVector({1, 1, 3, static_cast(1ULL << 32), 5, 5})}); + + const auto plan = PlanBuilder(pool()) + .values({input}) + .partialAggregation({"file_id"}, {"bitmapaggregator(row_index) AS dv"}) + .finalAggregation() + .planNode(); + const auto results = AssertQueryBuilder(plan).copyResults(pool()); + ASSERT_EQ(results->size(), 2); + + for (facebook::velox::vector_size_t row = 0; row < results->size(); ++row) { + const auto fileId = results->childAt(0)->asFlatVector()->valueAt(row); + if (fileId == 10) { + const auto bitmap = extractBitmap(results, row, 1, 2, static_cast(1ULL << 32)); + EXPECT_TRUE(bitmap.containsSafe(1)); + EXPECT_TRUE(bitmap.containsSafe(1ULL << 32)); + } else { + ASSERT_EQ(fileId, 20); + const auto bitmap = extractBitmap(results, row, 1, 2, 5); + EXPECT_TRUE(bitmap.containsSafe(3)); + EXPECT_TRUE(bitmap.containsSafe(5)); + } + } +} + +TEST_F(DeltaBitmapAggregatorTest, AllNullInputProducesEmptyBitmap) { + const auto input = makeRowVector({"row_index"}, {makeNullableFlatVector({std::nullopt})}); + const auto plan = + PlanBuilder(pool()).values({input}).singleAggregation({}, {"bitmapaggregator(row_index) AS dv"}).planNode(); + const auto results = AssertQueryBuilder(plan).copyResults(pool()); + ASSERT_EQ(results->size(), 1); + + const auto bitmap = extractBitmap(results, 0, 0, 0, std::nullopt); + EXPECT_EQ(bitmap.cardinality(), 0); +} + +TEST_F(DeltaBitmapAggregatorTest, RejectsNegativeRowIndexes) { + const auto input = makeRowVector({"row_index"}, {makeFlatVector({-1})}); + const auto plan = + PlanBuilder(pool()).values({input}).singleAggregation({}, {"bitmapaggregator(row_index) AS dv"}).planNode(); + + VELOX_ASSERT_THROW(AssertQueryBuilder(plan).copyResults(pool()), "Delta bitmap row index cannot be negative"); +} + +TEST_F(DeltaBitmapAggregatorTest, RejectsRowIndexesAboveDeltaMax) { + const auto tooLarge = static_cast(RoaringBitmapArray::kMaxRepresentableValue + 1); + const auto input = makeRowVector({"row_index"}, {makeFlatVector({tooLarge})}); + const auto plan = + PlanBuilder(pool()).values({input}).singleAggregation({}, {"bitmapaggregator(row_index) AS dv"}).planNode(); + + VELOX_ASSERT_THROW(AssertQueryBuilder(plan).copyResults(pool()), "exceeds max representable value"); +} + +} // namespace +} // namespace gluten::delta diff --git a/cpp/velox/compute/delta/tests/RoaringBitmapArrayTest.cpp b/cpp/velox/compute/delta/tests/RoaringBitmapArrayTest.cpp index 0eca672608d..7d248ec00f5 100644 --- a/cpp/velox/compute/delta/tests/RoaringBitmapArrayTest.cpp +++ b/cpp/velox/compute/delta/tests/RoaringBitmapArrayTest.cpp @@ -16,23 +16,48 @@ */ #include "compute/delta/RoaringBitmapArray.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/tests/GTestUtils.h" #include #include #include +#include namespace gluten::delta { namespace { +void writeUint32LittleEndian(char* data, uint32_t value) { + auto* bytes = reinterpret_cast(data); + bytes[0] = static_cast(value & 0xFF); + bytes[1] = static_cast((value >> 8) & 0xFF); + bytes[2] = static_cast((value >> 16) & 0xFF); + bytes[3] = static_cast((value >> 24) & 0xFF); +} + +void writeUint64LittleEndian(char* data, uint64_t value) { + auto* bytes = reinterpret_cast(data); + bytes[0] = static_cast(value & 0xFF); + bytes[1] = static_cast((value >> 8) & 0xFF); + bytes[2] = static_cast((value >> 16) & 0xFF); + bytes[3] = static_cast((value >> 24) & 0xFF); + bytes[4] = static_cast((value >> 32) & 0xFF); + bytes[5] = static_cast((value >> 40) & 0xFF); + bytes[6] = static_cast((value >> 48) & 0xFF); + bytes[7] = static_cast((value >> 56) & 0xFF); +} + TEST(RoaringBitmapArrayTest, SerializeRoundTrip) { RoaringBitmapArray bitmap; bitmap.addSafe(1); bitmap.addSafe(7); + bitmap.addSafe(7); bitmap.addSafe(1ULL << 33); std::vector serialized(bitmap.serializedSizeInBytes()); bitmap.serialize(serialized.data()); + EXPECT_EQ(serialized.size(), bitmap.serializeToString().size()); RoaringBitmapArray restored; restored.deserialize(serialized.data(), serialized.size()); @@ -41,12 +66,132 @@ TEST(RoaringBitmapArrayTest, SerializeRoundTrip) { EXPECT_TRUE(restored.containsSafe(7)); EXPECT_TRUE(restored.containsSafe(1ULL << 33)); EXPECT_FALSE(restored.containsSafe(2)); + EXPECT_EQ(restored.cardinality(), 3); +} + +TEST(RoaringBitmapArrayTest, EmptyPortableSerializationMatchesDeltaJvmHeader) { + RoaringBitmapArray bitmap; + const auto serialized = bitmap.serializeToString(); + + const std::vector expectedBytes{0xd1, 0xd3, 0x39, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + ASSERT_EQ(serialized.size(), expectedBytes.size()); + for (size_t i = 0; i < expectedBytes.size(); ++i) { + EXPECT_EQ(static_cast(serialized[i]), expectedBytes[i]); + } + + RoaringBitmapArray restored; + restored.deserialize(serialized.data(), serialized.size()); + EXPECT_EQ(restored.cardinality(), 0); + EXPECT_FALSE(restored.last().has_value()); +} + +TEST(RoaringBitmapArrayTest, DeserializesDeltaJvmPortablePayloadWithSparseGap) { + // Delta 3.3.2 portable serialization for values 1, 7 and 1 << 33. + // Delta JVM includes the empty intermediate high-key bucket in this sparse + // case; native accepts it but writes the compact portable equivalent. + const std::vector deltaJvmBytes{ + 0xd1, 0xd3, 0x39, 0x64, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3a, 0x30, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x10, 0x00, 0x00, 0x00, 0x01, 0x00, 0x07, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x3a, 0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x3a, 0x30, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00}; + + RoaringBitmapArray bitmap; + bitmap.deserialize(reinterpret_cast(deltaJvmBytes.data()), deltaJvmBytes.size()); + + EXPECT_EQ(bitmap.cardinality(), 3); + EXPECT_TRUE(bitmap.containsSafe(1)); + EXPECT_TRUE(bitmap.containsSafe(7)); + EXPECT_TRUE(bitmap.containsSafe(1ULL << 33)); + ASSERT_TRUE(bitmap.last().has_value()); + EXPECT_EQ(bitmap.last().value(), 1ULL << 33); +} + +TEST(RoaringBitmapArrayTest, SerializesSparseGapAsCompactPortablePayload) { + RoaringBitmapArray bitmap; + bitmap.addSafe(1); + bitmap.addSafe(7); + bitmap.addSafe(1ULL << 33); + + const auto serialized = bitmap.serializeToString(); + + const std::vector expectedBytes{0xd1, 0xd3, 0x39, 0x64, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x3a, 0x30, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x10, 0x00, 0x00, 0x00, 0x01, 0x00, 0x07, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x3a, 0x30, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00}; + + ASSERT_EQ(serialized.size(), expectedBytes.size()); + for (size_t i = 0; i < expectedBytes.size(); ++i) { + EXPECT_EQ(static_cast(serialized[i]), expectedBytes[i]); + } +} + +TEST(RoaringBitmapArrayTest, MergeCardinalityAndLast) { + RoaringBitmapArray left; + left.addSafe(1); + left.addSafe((1ULL << 32) + 5); + + RoaringBitmapArray right; + right.addSafe(1); + right.addSafe((2ULL << 32) + 3); + + left.merge(right); + + EXPECT_EQ(left.cardinality(), 3); + ASSERT_TRUE(left.last().has_value()); + EXPECT_EQ(left.last().value(), (2ULL << 32) + 3); + EXPECT_TRUE(left.containsSafe(1)); + EXPECT_TRUE(left.containsSafe((1ULL << 32) + 5)); + EXPECT_TRUE(left.containsSafe((2ULL << 32) + 3)); +} + +TEST(RoaringBitmapArrayTest, EnforcesDeltaRowIndexBounds) { + EXPECT_EQ(RoaringBitmapArray::kMaxRepresentableValue, 9223372030412324864ULL); + + RoaringBitmapArray bitmap; + bitmap.addSafe(RoaringBitmapArray::kMaxRepresentableValue); + EXPECT_TRUE(bitmap.containsSafe(RoaringBitmapArray::kMaxRepresentableValue)); + + VELOX_ASSERT_THROW(bitmap.addSafe(RoaringBitmapArray::kMaxRepresentableValue + 1), "exceeds max representable value"); + VELOX_ASSERT_THROW( + bitmap.containsSafe(RoaringBitmapArray::kMaxRepresentableValue + 1), "exceeds max representable value"); +} + +TEST(RoaringBitmapArrayTest, RejectsPayloadAboveDeltaMaxRowIndex) { + roaring::Roaring highBitmap; + highBitmap.add(RoaringBitmapArray::kMaxLowKeyForMaxHighKey + 1); + + std::vector payload(sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint32_t) + highBitmap.getSizeInBytes(true)); + auto* cursor = payload.data(); + writeUint32LittleEndian(cursor, RoaringBitmapArray::kPortableSerializationFormatMagicNumber); + cursor += sizeof(uint32_t); + writeUint64LittleEndian(cursor, 1); + cursor += sizeof(uint64_t); + writeUint32LittleEndian(cursor, RoaringBitmapArray::kMaxHighKey); + cursor += sizeof(uint32_t); + cursor += highBitmap.write(cursor, true); + ASSERT_EQ(cursor, payload.data() + payload.size()); + + RoaringBitmapArray bitmap; + VELOX_ASSERT_THROW( + bitmap.deserialize(payload.data(), payload.size()), + "bitmap for max high key exceeds Delta's max representable value"); } TEST(RoaringBitmapArrayTest, RejectsBadMagic) { RoaringBitmapArray bitmap; - std::vector invalid(sizeof(uint32_t), '\0'); - EXPECT_ANY_THROW(bitmap.deserialize(invalid.data(), invalid.size())); + std::vector invalid(sizeof(uint32_t) + sizeof(uint64_t), '\0'); + VELOX_ASSERT_THROW(bitmap.deserialize(invalid.data(), invalid.size()), "Unexpected RoaringBitmapArray magic number"); +} + +TEST(RoaringBitmapArrayTest, RejectsTrailingBytes) { + RoaringBitmapArray empty; + auto serialized = empty.serializeToString(); + serialized.push_back('\0'); + + RoaringBitmapArray bitmap; + VELOX_ASSERT_THROW( + bitmap.deserialize(serialized.data(), serialized.size()), "RoaringBitmapArray payload has 1 trailing bytes"); } } // namespace diff --git a/cpp/velox/operators/functions/DeltaBitmapAggregator.cc b/cpp/velox/operators/functions/DeltaBitmapAggregator.cc new file mode 100644 index 00000000000..a2a33ea764a --- /dev/null +++ b/cpp/velox/operators/functions/DeltaBitmapAggregator.cc @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/functions/DeltaBitmapAggregator.h" + +#include +#include +#include +#include + +#include "compute/delta/RoaringBitmapArray.h" +#include "velox/exec/SimpleAggregateAdapter.h" +#include "velox/expression/FunctionSignature.h" +#include "velox/type/SimpleFunctionApi.h" + +using namespace facebook::velox; + +namespace gluten { +namespace { + +class DeltaBitmapAggregatorFunction { + public: + using InputType = Row; + using IntermediateType = Varbinary; + using OutputType = Row; + + static constexpr bool default_null_behavior_ = false; + + static void addRowIndex(delta::RoaringBitmapArray& bitmap, int64_t value) { + VELOX_CHECK_GE(value, 0, "Delta bitmap row index cannot be negative: {}", value); + bitmap.addSafe(static_cast(value)); + } + + static bool toIntermediate(exec::out_type& out, exec::optional_arg_type value) { + delta::RoaringBitmapArray bitmap; + if (value.has_value()) { + addRowIndex(bitmap, value.value()); + } + const auto serialized = bitmap.serializeToString(); + out.copy_from(StringView(serialized.data(), serialized.size())); + return true; + } + + struct AccumulatorType { + static constexpr bool use_external_memory_ = true; + + explicit AccumulatorType(HashStringAllocator* /* allocator */, DeltaBitmapAggregatorFunction* /* fn */) {} + + bool addInput(HashStringAllocator* /* allocator */, exec::optional_arg_type value) { + if (!value.has_value()) { + return false; + } + addRowIndex(bitmap, value.value()); + return true; + } + + bool combine(HashStringAllocator* /* allocator */, exec::optional_arg_type other) { + if (!other.has_value()) { + return false; + } + const auto serialized = other.value(); + delta::RoaringBitmapArray otherBitmap; + otherBitmap.deserialize(serialized.data(), serialized.size()); + bitmap.merge(otherBitmap); + return true; + } + + bool writeIntermediateResult(bool /* nonNullGroup */, exec::out_type& out) { + const auto serialized = bitmap.serializeToString(); + out.copy_from(StringView(serialized.data(), serialized.size())); + return true; + } + + bool writeFinalResult(bool /* nonNullGroup */, exec::out_type& out) { + const auto serialized = bitmap.serializeToString(true); + const auto last = bitmap.last(); + out.copy_from(std::make_tuple( + static_cast(bitmap.cardinality()), + last.has_value() ? std::optional(static_cast(*last)) : std::nullopt, + StringView(serialized.data(), serialized.size()))); + return true; + } + + delta::RoaringBitmapArray bitmap; + }; +}; + +} // namespace + +void registerDeltaBitmapAggregator(const std::string& prefix, bool withCompanionFunctions, bool overwrite) { + std::vector> signatures{ + exec::AggregateFunctionSignatureBuilder() + .argumentType("bigint") + .intermediateType("varbinary") + .returnType("row(bigint,bigint,varbinary)") + .build()}; + + exec::registerAggregateFunction( + prefix + "bitmapaggregator", + std::move(signatures), + [](core::AggregationNode::Step step, + const std::vector& argTypes, + const TypePtr& resultType, + const core::QueryConfig& /* config */) -> std::unique_ptr { + VELOX_CHECK_EQ(argTypes.size(), 1, "bitmapaggregator takes one argument"); + VELOX_CHECK_EQ(argTypes[0]->kind(), exec::isRawInput(step) ? TypeKind::BIGINT : TypeKind::VARBINARY); + return std::make_unique>( + step, argTypes, resultType); + }, + withCompanionFunctions, + overwrite); +} + +} // namespace gluten diff --git a/cpp/velox/operators/functions/DeltaBitmapAggregator.h b/cpp/velox/operators/functions/DeltaBitmapAggregator.h new file mode 100644 index 00000000000..a3dba24809e --- /dev/null +++ b/cpp/velox/operators/functions/DeltaBitmapAggregator.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace gluten { + +void registerDeltaBitmapAggregator( + const std::string& prefix = "", + bool withCompanionFunctions = true, + bool overwrite = true); + +} // namespace gluten diff --git a/cpp/velox/operators/functions/RegistrationAllFunctions.cc b/cpp/velox/operators/functions/RegistrationAllFunctions.cc index dd1be7805c7..ccf60963108 100644 --- a/cpp/velox/operators/functions/RegistrationAllFunctions.cc +++ b/cpp/velox/operators/functions/RegistrationAllFunctions.cc @@ -17,6 +17,7 @@ #include "operators/functions/RegistrationAllFunctions.h" #include "operators/functions/Arithmetic.h" +#include "operators/functions/DeltaBitmapAggregator.h" #include "operators/functions/RowConstructorWithNull.h" #include "operators/functions/RowFunctionWithNull.h" #include "velox/expression/SpecialFormRegistry.h" @@ -94,6 +95,7 @@ void registerAllFunctions() { registerFunctionOverwrite(); velox::functions::iceberg::registerFunctions(); + registerDeltaBitmapAggregator(); } } // namespace gluten diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala index a567903edad..cd5b17a7206 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/AggregateFunctionsBuilder.scala @@ -24,6 +24,8 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.types.DataType +import scala.util.Try + object AggregateFunctionsBuilder { def create(context: SubstraitContext, aggregateFunc: AggregateFunction): Long = { // First handle the custom aggregate functions @@ -50,6 +52,8 @@ object AggregateFunctionsBuilder { ExpressionNames.FIRST_IGNORE_NULL case Last(_, ignoreNulls) if ignoreNulls => ExpressionNames.LAST_IGNORE_NULL + case _ if isPortableDeltaBitmapAggregator(aggregateFunc) => + ExpressionNames.BITMAP_AGGREGATOR case _ => val nameOpt = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass) if (nameOpt.isEmpty) { @@ -62,4 +66,14 @@ object AggregateFunctionsBuilder { } } } + + private def isPortableDeltaBitmapAggregator(aggregateFunc: AggregateFunction): Boolean = { + aggregateFunc.prettyName == ExpressionNames.BITMAP_AGGREGATOR && + Try { + aggregateFunc.getClass + .getMethod("serializationFormatString") + .invoke(aggregateFunc) + .asInstanceOf[String] + }.toOption.contains("Portable") + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index d4afb7ff739..0528945a0e9 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -32,6 +32,7 @@ object ExpressionNames { final val COLLECT_LIST = "collect_list" final val COLLECT_SET = "collect_set" final val BLOOM_FILTER_AGG = "bloom_filter_agg" + final val BITMAP_AGGREGATOR = "bitmapaggregator" final val VAR_SAMP = "var_samp" final val VAR_POP = "var_pop" final val BIT_AND_AGG = "bit_and"