diff --git a/pp/bare_bones/tests/vector_tests.cpp b/pp/bare_bones/tests/vector_tests.cpp index 4caac15ad..f7a546444 100644 --- a/pp/bare_bones/tests/vector_tests.cpp +++ b/pp/bare_bones/tests/vector_tests.cpp @@ -108,7 +108,7 @@ class BareBonesVectorEraseFixture : public testing::Test { } }; -TEST_F(BareBonesVectorEraseFixture, EraseLastItem) { +TEST_F(BareBonesVectorEraseFixture, EraseLastItemByRange) { // Arrange // Act @@ -120,7 +120,19 @@ TEST_F(BareBonesVectorEraseFixture, EraseLastItem) { EXPECT_EQ("2", *vector_[1]); } -TEST_F(BareBonesVectorEraseFixture, EraseFirstItem) { +TEST_F(BareBonesVectorEraseFixture, EraseLastItem) { + // Arrange + + // Act + vector_.erase(vector_.end() - 1); + + // Assert + EXPECT_EQ(2U, vector_.size()); + EXPECT_EQ("1", *vector_[0]); + EXPECT_EQ("2", *vector_[1]); +} + +TEST_F(BareBonesVectorEraseFixture, EraseFirstItemByRange) { // Arrange // Act @@ -132,7 +144,19 @@ TEST_F(BareBonesVectorEraseFixture, EraseFirstItem) { EXPECT_EQ("3", *vector_[1]); } -TEST_F(BareBonesVectorEraseFixture, EraseSecondItem) { +TEST_F(BareBonesVectorEraseFixture, EraseFirstItem) { + // Arrange + + // Act + vector_.erase(vector_.begin()); + + // Assert + EXPECT_EQ(2U, vector_.size()); + EXPECT_EQ("2", *vector_[0]); + EXPECT_EQ("3", *vector_[1]); +} + +TEST_F(BareBonesVectorEraseFixture, EraseSecondItemByRange) { // Arrange // Act @@ -144,6 +168,18 @@ TEST_F(BareBonesVectorEraseFixture, EraseSecondItem) { EXPECT_EQ("3", *vector_[1]); } +TEST_F(BareBonesVectorEraseFixture, EraseSecondItem) { + // Arrange + + // Act + vector_.erase(vector_.begin() + 1); + + // Assert + EXPECT_EQ(2U, vector_.size()); + EXPECT_EQ("1", *vector_[0]); + EXPECT_EQ("3", *vector_[1]); +} + TEST_F(BareBonesVectorEraseFixture, EraseAllItems) { // Arrange diff --git a/pp/bare_bones/vector.h b/pp/bare_bones/vector.h index 3af44068e..6de26cb0c 100644 --- a/pp/bare_bones/vector.h +++ b/pp/bare_bones/vector.h @@ -109,6 +109,14 @@ class GenericVector { derived()->set_size(0); } + PROMPP_ALWAYS_INLINE iterator erase(iterator it) noexcept { + if (it == end()) [[unlikely]] { + return it; + } + + return erase(it, it + 1); + } + PROMPP_ALWAYS_INLINE iterator erase(iterator first, iterator last) noexcept { assert(first >= begin()); assert(last <= end()); diff --git a/pp/entrypoint/go_constants.cpp b/pp/entrypoint/go_constants.cpp index b435d7fd7..ef896f163 100644 --- a/pp/entrypoint/go_constants.cpp +++ b/pp/entrypoint/go_constants.cpp @@ -16,6 +16,7 @@ static_assert(sizeof(PromPP::Prometheus::Relabel::InnerSeries) == Sizeof_InnerSe static_assert(sizeof(entrypoint::series_data::SamplesIterator) == Sizeof_SerializedDataSamplesIterator); static_assert(sizeof(entrypoint::series_data::AggregationIterator) == Sizeof_SerializedDataAggregationIterator); +static_assert(sizeof(entrypoint::series_data::MultiSeriesDecodeIterator) == Sizeof_MultiSeriesDecodeIterator); static_assert(sizeof(metrics::Storage::Iterator) == Sizeof_MetricsIterator); diff --git a/pp/entrypoint/go_constants.h b/pp/entrypoint/go_constants.h index 0b248ef77..a2f54f7e0 100644 --- a/pp/entrypoint/go_constants.h +++ b/pp/entrypoint/go_constants.h @@ -7,6 +7,7 @@ #define Sizeof_SerializedDataSamplesIterator 152 #define Sizeof_SerializedDataAggregationIterator 208 +#define Sizeof_MultiSeriesDecodeIterator 48 #define Sizeof_MetricsIterator 24 diff --git a/pp/entrypoint/multiseries_iterator.cpp b/pp/entrypoint/multiseries_iterator.cpp new file mode 100644 index 000000000..608ca6f89 --- /dev/null +++ b/pp/entrypoint/multiseries_iterator.cpp @@ -0,0 +1,33 @@ +#include "multiseries_iterator.h" + +#include "series_data/serialization.h" + +extern "C" void prompp_series_data_serialization_serialized_data_multi_series_iterator_ctor(void* args) { + struct Arguments { + entrypoint::series_data::MultiSeriesDecodeIterator* iterator; + entrypoint::series_data::SerializedDataPtr serialized_data; + PromPP::Primitives::Go::SliceView series_ids; + }; + + const auto in = static_cast(args); + in->serialized_data->construct_multi_series_iterator(in->iterator, in->series_ids.span()); +} + +extern "C" void prompp_series_data_serialization_serialized_data_multi_series_iterator_reset(void* args) { + struct Arguments { + entrypoint::series_data::MultiSeriesDecodeIterator* iterator; + entrypoint::series_data::SerializedDataPtr serialized_data; + PromPP::Primitives::Go::SliceView series_ids; + }; + + const auto in = static_cast(args); + in->serialized_data->reset_multi_series_iterator(*in->iterator, in->series_ids.span()); +} + +extern "C" void prompp_series_data_serialization_serialized_data_multi_series_iterator_next(void* iterator) { + ++(*static_cast(iterator)); +} + +extern "C" void prompp_series_data_serialization_serialized_data_multi_series_iterator_dtor(void* iterator) { + std::destroy_at(static_cast(iterator)); +} diff --git a/pp/entrypoint/multiseries_iterator.h b/pp/entrypoint/multiseries_iterator.h new file mode 100644 index 000000000..a8800cc92 --- /dev/null +++ b/pp/entrypoint/multiseries_iterator.h @@ -0,0 +1,43 @@ +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Construct a multi-series decode iterator over the given series ids. + * + * @param args { + * iterator uintptr // pointer to storage of size Sizeof_MultiSeriesDecodeIterator (placement new). + * serializedData uintptr // pointer to serialized data. + * seriesIDs []uint32 // slice view of series ids to use in iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_ctor(void* args); + +/** + * @brief Reset a multi-series decode iterator into the given series ids. + * + * @param args { + * iterator uintptr // pointer to a constructed MultiSeriesDecodeIterator. + * serializedData uintptr // pointer to serialized data. + * seriesIDs []uint32 // slice view of series ids to use in iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_reset(void* args); + +/** + * @brief Advance multi-series decode iterator. + * + * @param iterator uintptr // pointer to multi-series decode iterator + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_next(void* iterator); + +/** + * @brief Destroy multi-series decode iterator (call before reusing). + * + * @param iterator uintptr // pointer to multi-series decode iterator + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_dtor(void* iterator); + +#ifdef __cplusplus +} // extern "C" +#endif diff --git a/pp/entrypoint/series_data/aggregation_iterator.h b/pp/entrypoint/series_data/aggregation_iterator.h index 76a6b377f..4841ca4ce 100644 --- a/pp/entrypoint/series_data/aggregation_iterator.h +++ b/pp/entrypoint/series_data/aggregation_iterator.h @@ -1,6 +1,7 @@ #pragma once #include "prometheus/promql/window_function.h" +#include "select_hints.h" #include "series_data/decoder/decorator/changes_iterator.h" #include "series_data/decoder/decorator/delta_iterator.h" #include "series_data/decoder/decorator/downsampling_decode_iterator.h" @@ -193,11 +194,6 @@ class AggregationIterator { Type type_; }; -struct SelectHints { - ::series_data::decoder::decorator::WindowFunctionParameters function_parameters; - PromPP::Prometheus::promql::WindowFunction window_function{PromPP::Prometheus::promql::WindowFunction::kNone}; -}; - PROMPP_ALWAYS_INLINE AggregationIterator create_aggregation_iterator(::series_data::serialization::SerializedDataView::SeriesIterator&& iterator, const SelectHints& select_hints, PromPP::Primitives::Timestamp downsampling_ms) { diff --git a/pp/entrypoint/series_data/multiseries_decode_iterator.h b/pp/entrypoint/series_data/multiseries_decode_iterator.h new file mode 100644 index 000000000..12c8daf32 --- /dev/null +++ b/pp/entrypoint/series_data/multiseries_decode_iterator.h @@ -0,0 +1,167 @@ +#pragma once + +#include "select_hints.h" +#include "series_data/decoder/decorator/last_over_step.h" +#include "series_data/decoder/decorator/lookback_delta_iterator.h" +#include "series_data/decoder/decorator/max_over_time.h" +#include "series_data/decoder/decorator/min_over_time.h" +#include "series_data/decoder/decorator/multiseries_iterator.h" +#include "series_data/decoder/decorator/sum_over_time.h" +#include "series_data/decoder/universal_decode_iterator.h" +#include "series_data/serialization/serialized_data.h" + +namespace entrypoint::series_data { + +class MultiSeriesDecodeIterator { + public: + using DecodeIteratorSentinel = ::series_data::decoder::DecodeIteratorSentinel; + using UniversalDecodeIterator = ::series_data::decoder::UniversalDecodeIterator; + using WindowBoundaryCalculator = ::series_data::decoder::decorator::StepLookbackDeltaWindowCalculator; + + template + using MultiSeriesIterator = ::series_data::decoder::decorator::MultiSeriesIterator; + using SeriesIterator = ::series_data::serialization::SerializedDataView::SeriesIterator; + + using LastOverTimeWithStaleNansIterator = ::series_data::decoder::decorator::LastOverTimeWithStaleNansIterator; + + using Iterator = ::series_data::decoder::decorator::LookbackDeltaIterator; + + using FindMinElement = ::series_data::decoder::decorator::FindMinElement; + using FindMaxElement = ::series_data::decoder::decorator::FindMaxElement; + using SumOfElements = ::series_data::decoder::decorator::SumOfElements; + + using MinMultiSeriesIterator = MultiSeriesIterator; + using MaxMultiSeriesIterator = MultiSeriesIterator; + using SumMultiSeriesIterator = MultiSeriesIterator; + + enum class Type : uint8_t { + kMin = 0, + kMax, + kSum, + }; + + DECODE_ITERATOR_TYPE_TRAITS(); + +#define DEFINE_CONSTRUCTOR(MultiSeriesIteratorType, field, type) \ + template \ + explicit MultiSeriesDecodeIterator(std::in_place_type_t, Args&&... args) \ + : iterator_{.field{std::forward(args)...}}, type_{Type::type} {} + + DEFINE_CONSTRUCTOR(MinMultiSeriesIterator, min, kMin) + DEFINE_CONSTRUCTOR(MaxMultiSeriesIterator, max, kMax) + DEFINE_CONSTRUCTOR(SumMultiSeriesIterator, sum, kSum) + +#undef DEFINE_CONSTRUCTOR + + template + PROMPP_ALWAYS_INLINE decltype(auto) visit(Visitor&& visitor) const { + switch (type_) { + case Type::kMin: { + return std::forward(visitor)(iterator_.min); + } + + case Type::kMax: { + return std::forward(visitor)(iterator_.max); + } + + default: { + return std::forward(visitor)(iterator_.sum); + } + } + } + + template + PROMPP_ALWAYS_INLINE decltype(auto) visit(Visitor&& visitor) { + return const_cast(this)->visit( + [&](const Iterator& iterator) PROMPP_LAMBDA_INLINE { return std::forward(visitor)(const_cast(iterator)); }); + } + + ~MultiSeriesDecodeIterator() { + visit([](const auto& iterator) PROMPP_LAMBDA_INLINE { std::destroy_at(&iterator); }); + } + + PROMPP_ALWAYS_INLINE const ::series_data::encoder::Sample& operator*() const { + return visit([](const auto& iterator) PROMPP_LAMBDA_INLINE -> const auto& { return *iterator; }); + } + PROMPP_ALWAYS_INLINE const ::series_data::encoder::Sample* operator->() const { + return visit([](const auto& iterator) PROMPP_LAMBDA_INLINE -> const auto* { return iterator.operator->(); }); + } + + PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel& sentinel) const { + return visit([&sentinel](const auto& iterator) PROMPP_LAMBDA_INLINE { return iterator == sentinel; }); + } + + PROMPP_ALWAYS_INLINE MultiSeriesDecodeIterator& operator++() { + visit([](Iterator& iterator) PROMPP_LAMBDA_INLINE { ++iterator; }); + return *this; + } + + template + PROMPP_ALWAYS_INLINE void reset(const ::series_data::decoder::decorator::WindowFunctionParameters& parameters, IteratorsGenerator&& iterators_generator) { + visit([&](auto& iterator) PROMPP_LAMBDA_INLINE { iterator.reset(parameters, std::forward(iterators_generator)); }); + } + + PROMPP_ALWAYS_INLINE static void create_series_iterators(const SelectHints& select_hints, + std::span series_ids, + ::series_data::serialization::SerializedDataView data_view, + BareBones::Vector& iterators) { + const auto initial_interval = WindowBoundaryCalculator::initial_window(select_hints.function_parameters); + + iterators.reserve(series_ids.size()); + data_view.enumerate_series([&](const auto& chunk, uint32_t chunk_id) { + if (std::ranges::find(series_ids, chunk.label_set_id) != series_ids.end()) { + iterators.emplace_back(LastOverTimeWithStaleNansIterator(data_view.create_series_iterator(chunk_id), initial_interval), + select_hints.function_parameters.lookback_delta); + } + }); + } + + [[nodiscard]] PROMPP_ALWAYS_INLINE Type type() const noexcept { return type_; } + + private: + union Iterators { + ~Iterators() {} + + MinMultiSeriesIterator min; + MaxMultiSeriesIterator max; + SumMultiSeriesIterator sum; + } iterator_; + + Type type_; +}; + +PROMPP_ALWAYS_INLINE void construct_multi_series_decode_iterator(MultiSeriesDecodeIterator* iterator, + const SelectHints& select_hints, + std::span series_ids, + ::series_data::serialization::SerializedDataView data_view) { + const auto create_series_iterators = [&] { + BareBones::Vector iterators; + MultiSeriesDecodeIterator::create_series_iterators(select_hints, series_ids, data_view, iterators); + return iterators; + }; + + switch (select_hints.window_function) { + using enum PromPP::Prometheus::promql::WindowFunction; + + case kMin: { + std::construct_at(iterator, std::in_place_type, create_series_iterators(), + select_hints.function_parameters); + break; + } + + case kMax: { + std::construct_at(iterator, std::in_place_type, create_series_iterators(), + select_hints.function_parameters); + break; + } + + case kSum: + default: { + std::construct_at(iterator, std::in_place_type, create_series_iterators(), + select_hints.function_parameters); + break; + } + } +} + +} // namespace entrypoint::series_data diff --git a/pp/entrypoint/series_data/select_hints.h b/pp/entrypoint/series_data/select_hints.h new file mode 100644 index 000000000..2cdc7447f --- /dev/null +++ b/pp/entrypoint/series_data/select_hints.h @@ -0,0 +1,13 @@ +#pragma once + +#include "prometheus/promql/window_function.h" +#include "series_data/decoder/decorator/window_function_iterator.h" + +namespace entrypoint::series_data { + +struct SelectHints { + ::series_data::decoder::decorator::WindowFunctionParameters function_parameters; + PromPP::Prometheus::promql::WindowFunction window_function{PromPP::Prometheus::promql::WindowFunction::kNone}; +}; + +} // namespace entrypoint::series_data \ No newline at end of file diff --git a/pp/entrypoint/series_data/serialization.h b/pp/entrypoint/series_data/serialization.h index d12c45c13..283fe9440 100644 --- a/pp/entrypoint/series_data/serialization.h +++ b/pp/entrypoint/series_data/serialization.h @@ -1,6 +1,7 @@ #pragma once #include "aggregation_iterator.h" +#include "entrypoint/series_data/multiseries_decode_iterator.h" #include "primitives/go_slice.h" #include "primitives/primitives.h" #include "prometheus/query.h" @@ -28,6 +29,15 @@ class SerializedDataGo { [[nodiscard]] PROMPP_ALWAYS_INLINE AggregationIterator aggregation_iterator(uint32_t chunk_id) const noexcept { return create_aggregation_iterator(data_view_.create_series_iterator(chunk_id), select_hints_, downsampling_ms_); } + PROMPP_ALWAYS_INLINE void construct_multi_series_iterator(MultiSeriesDecodeIterator* iterator, std::span series_ids) const noexcept { + return construct_multi_series_decode_iterator(iterator, select_hints_, series_ids, data_view_); + } + + PROMPP_ALWAYS_INLINE void reset_multi_series_iterator(MultiSeriesDecodeIterator& iterator, std::span series_ids) const noexcept { + iterator.reset(select_hints_.function_parameters, [&](auto& iterators) PROMPP_LAMBDA_INLINE { + MultiSeriesDecodeIterator::create_series_iterators(select_hints_, series_ids, data_view_, iterators); + }); + } private: ::series_data::serialization::SerializedData data_; diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index ad3a0417f..c1261b3b6 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -33,6 +33,7 @@ type ( CppRoaringBitset = [C.Sizeof_RoaringBitset]byte CppSerializedDataSamplesIterator = [C.Sizeof_SerializedDataSamplesIterator]byte CppSerializedDataAggregationIterator = [C.Sizeof_SerializedDataAggregationIterator]byte + CppSerializedDataMultiSeriesIterator = [C.Sizeof_MultiSeriesDecodeIterator]byte CppMetricsIterator = [C.Sizeof_MetricsIterator]byte CppSegmentSamplesStorage = [C.Sizeof_SegmentSamplesStorage]byte CppRemoteWriteMessageEncoder = [C.Sizeof_RemoteWriteMessageEncoder]byte @@ -2348,6 +2349,60 @@ func seriesDataSerializedDataAggregationIteratorReset( ) } +func seriesDataSerializedDataMultiSeriesIteratorCtor( + iterator *DataStorageSerializedDataMultiSeriesIterator, + serializedData uintptr, + seriesIDs []uint32, +) { + args := struct { + iterator uintptr + serializedData uintptr + seriesIDs []uint32 + }{uintptr(unsafe.Pointer(iterator)), serializedData, seriesIDs} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_series_data_serialization_serialized_data_multi_series_iterator_ctor, + uintptr(unsafe.Pointer(&args)), + ) + runtime.KeepAlive(seriesIDs) +} + +func seriesDataSerializedDataMultiSeriesIteratorReset( + iterator *DataStorageSerializedDataMultiSeriesIterator, + serializedData uintptr, + seriesIDs []uint32, +) { + args := struct { + iterator uintptr + serializedData uintptr + seriesIDs []uint32 + }{uintptr(unsafe.Pointer(iterator)), serializedData, seriesIDs} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_series_data_serialization_serialized_data_multi_series_iterator_reset, + uintptr(unsafe.Pointer(&args)), + ) + runtime.KeepAlive(seriesIDs) +} + +func seriesDataSerializedDataMultiSeriesIteratorNext(iterator *DataStorageSerializedDataMultiSeriesIterator) { + testGC() + fastcgo.UnsafeCall1( + C.prompp_series_data_serialization_serialized_data_multi_series_iterator_next, + uintptr(unsafe.Pointer(iterator)), + ) +} + +func seriesDataSerializedDataMultiSeriesIteratorDtor(iterator *DataStorageSerializedDataMultiSeriesIterator) { + testGC() + fastcgo.UnsafeCall1( + C.prompp_series_data_serialization_serialized_data_multi_series_iterator_dtor, + uintptr(unsafe.Pointer(iterator)), + ) +} + func seriesDataDataStorageTimeInterval(dataStorage uintptr) TimeInterval { args := struct { dataStorage uintptr diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index f9e551718..43942f4b2 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -80,6 +80,7 @@ void prompp_dump_memory_profile(void* args, void* res); #define Sizeof_SerializedDataSamplesIterator 152 #define Sizeof_SerializedDataAggregationIterator 208 +#define Sizeof_MultiSeriesDecodeIterator 48 #define Sizeof_MetricsIterator 24 @@ -606,6 +607,49 @@ void prompp_metrics_page_for_test_detach(void* args); extern "C" { #endif +/** + * @brief Construct a multi-series decode iterator over the given series ids. + * + * @param args { + * iterator uintptr // pointer to storage of size Sizeof_MultiSeriesDecodeIterator (placement new). + * serializedData uintptr // pointer to serialized data. + * seriesIDs []uint32 // slice view of series ids to use in iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_ctor(void* args); + +/** + * @brief Reset a multi-series decode iterator into the given series ids. + * + * @param args { + * iterator uintptr // pointer to a constructed MultiSeriesDecodeIterator. + * serializedData uintptr // pointer to serialized data. + * seriesIDs []uint32 // slice view of series ids to use in iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_reset(void* args); + +/** + * @brief Advance multi-series decode iterator. + * + * @param iterator uintptr // pointer to multi-series decode iterator + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_next(void* iterator); + +/** + * @brief Destroy multi-series decode iterator (call before reusing). + * + * @param iterator uintptr // pointer to multi-series decode iterator + */ +void prompp_series_data_serialization_serialized_data_multi_series_iterator_dtor(void* iterator); + +#ifdef __cplusplus +} // extern "C" +#endif +#ifdef __cplusplus +extern "C" { +#endif + #include /** diff --git a/pp/go/cppbridge/head.go b/pp/go/cppbridge/head.go index ab9e35aa6..e90797976 100644 --- a/pp/go/cppbridge/head.go +++ b/pp/go/cppbridge/head.go @@ -305,6 +305,18 @@ type DataStorageSerializedDataAggregationIteratorControlBlock struct { value float64 } +func (it *DataStorageSerializedDataAggregationIteratorControlBlock) HasData() bool { + return it.timestamp != math.MinInt64 +} + +func (it *DataStorageSerializedDataAggregationIteratorControlBlock) Timestamp() int64 { + return it.timestamp +} + +func (it *DataStorageSerializedDataAggregationIteratorControlBlock) Value() float64 { + return it.value +} + type DataStorageSerializedDataAggregationIterator struct { DataStorageSerializedDataAggregationIteratorControlBlock cppInternalData [unsafe.Sizeof(CppSerializedDataAggregationIterator{}) - unsafe.Sizeof(DataStorageSerializedDataAggregationIteratorControlBlock{})]byte @@ -324,16 +336,27 @@ func (it *DataStorageSerializedDataAggregationIterator) Reset(serializedData *Da seriesDataSerializedDataAggregationIteratorReset(it, serializedData.serializedData, chunkRef) } -func (it *DataStorageSerializedDataAggregationIterator) HasData() bool { - return it.timestamp != math.MinInt64 +type DataStorageSerializedDataMultiSeriesIterator struct { + DataStorageSerializedDataAggregationIteratorControlBlock + cppInternalData [unsafe.Sizeof(CppSerializedDataMultiSeriesIterator{}) - unsafe.Sizeof(DataStorageSerializedDataAggregationIteratorControlBlock{})]byte } -func (it *DataStorageSerializedDataAggregationIterator) Timestamp() int64 { - return it.timestamp +func NewDataStorageSerializedDataMultiSeriesIterator(serializedData *DataStorageSerializedData, seriesIDs []uint32) DataStorageSerializedDataMultiSeriesIterator { + it := DataStorageSerializedDataMultiSeriesIterator{} + seriesDataSerializedDataMultiSeriesIteratorCtor(&it, serializedData.serializedData, seriesIDs) + return it } -func (it *DataStorageSerializedDataAggregationIterator) Value() float64 { - return it.value +func (it *DataStorageSerializedDataMultiSeriesIterator) Next() { + seriesDataSerializedDataMultiSeriesIteratorNext(it) +} + +func (it *DataStorageSerializedDataMultiSeriesIterator) Reset(serializedData *DataStorageSerializedData, seriesIDs []uint32) { + seriesDataSerializedDataMultiSeriesIteratorReset(it, serializedData.serializedData, seriesIDs) +} + +func (it *DataStorageSerializedDataMultiSeriesIterator) Close() { + seriesDataSerializedDataMultiSeriesIteratorDtor(it) } // UnloadedDataLoader is Go wrapper around series_data::Loader. diff --git a/pp/go/cppbridge/head_test.go b/pp/go/cppbridge/head_test.go index f3470745c..02628176b 100644 --- a/pp/go/cppbridge/head_test.go +++ b/pp/go/cppbridge/head_test.go @@ -1,6 +1,7 @@ package cppbridge_test import ( + "runtime" "testing" "unsafe" @@ -263,3 +264,163 @@ func (s *HeadSuite) TestQueryFirstTimestampsInFinalizedChunk() { // Assert s.Equal([]int64{5}, timestamps) } + +type DataStorageSerializedDataMultiSeriesIteratorSuite struct { + suite.Suite + lss *cppbridge.LabelSetStorage + ds *cppbridge.DataStorage + enc *cppbridge.HeadEncoder +} + +func TestDataStorageSerializedDataMultiSeriesIteratorSuite(t *testing.T) { + suite.Run(t, new(DataStorageSerializedDataMultiSeriesIteratorSuite)) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) SetupTest() { + s.lss = cppbridge.NewQueryableLssStorage() + s.ds = cppbridge.NewDataStorage() + s.enc = cppbridge.NewHeadEncoderWithDataStorage(s.ds) + + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "a").Build()) + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "b").Build()) +} + +type createIteratorMethod = func(*cppbridge.DataStorageSerializedData, []uint32) cppbridge.DataStorageSerializedDataMultiSeriesIterator + +var createMultiSeriesIterator = cppbridge.NewDataStorageSerializedDataMultiSeriesIterator +var createAndResetMultiSeriesIterator = func( + serializedData *cppbridge.DataStorageSerializedData, + seriesIDs []uint32, +) cppbridge.DataStorageSerializedDataMultiSeriesIterator { + it := createMultiSeriesIterator(serializedData, seriesIDs) + for it.HasData() { + it.Next() + } + it.Reset(serializedData, seriesIDs) + return it +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) collectSamples( + hints storage.SelectHints, + seriesToSerialize []uint32, + series []uint32, + createIterator createIteratorMethod, +) []cppbridge.Sample { + result := s.ds.Query(cppbridge.DataStorageQuery{ + StartTimestampMs: hints.Start, + EndTimestampMs: hints.End, + LabelSetIDs: seriesToSerialize, + }, cppbridge.NoDownsampling, unsafe.Pointer(&hints)) + + it := createIterator(result.SerializedData, series) + defer it.Close() + + out := make([]cppbridge.Sample, 0) + for it.HasData() { + out = append(out, cppbridge.Sample{Timestamp: it.Timestamp(), Value: it.Value()}) + it.Next() + } + + runtime.KeepAlive(result.SerializedData) + return out +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) TestSum() { + s.testSum(createMultiSeriesIterator) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) TestSumWithIteratorReset() { + s.testSum(createAndResetMultiSeriesIterator) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) testSum(method createIteratorMethod) { + // Arrange + s.enc.Encode(0, 50, 10.0) + s.enc.Encode(1, 80, 20.0) + s.enc.Encode(0, 150, 20.0) + s.enc.Encode(1, 180, 30.0) + + // Act + samples := s.collectSamples(storage.SelectHints{ + Start: 1, + End: 200, + Step: 100, + LookbackDelta: 100, + Func: "sum", + }, []uint32{0, 1}, []uint32{0, 1}, method) + + // Assert + s.Equal([]cppbridge.Sample{ + {Timestamp: 100, Value: 30.0}, + {Timestamp: 200, Value: 50.0}, + }, samples) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) TestMin() { + // Arrange + s.enc.Encode(0, 50, 10.0) + s.enc.Encode(1, 130, 20.0) + s.enc.Encode(0, 150, 30.0) + s.enc.Encode(1, 180, 20.0) + + // Act + samples := s.collectSamples(storage.SelectHints{ + Start: 1, + End: 200, + Step: 100, + LookbackDelta: 50, + Func: "min", + }, []uint32{0, 1}, []uint32{0, 1}, createMultiSeriesIterator) + + // Assert + s.Equal([]cppbridge.Sample{ + {Timestamp: 50, Value: 10.0}, + {Timestamp: 150, Value: 20.0}, + {Timestamp: 200, Value: 20.0}, + }, samples) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) TestMax() { + // Arrange + s.enc.Encode(0, 50, 20.0) + s.enc.Encode(1, 80, 10.0) + s.enc.Encode(0, 150, 20.0) + s.enc.Encode(1, 180, 30.0) + + // Act + samples := s.collectSamples(storage.SelectHints{ + Start: 1, + End: 200, + Step: 100, + LookbackDelta: 50, + Func: "max", + }, []uint32{0, 1}, []uint32{0, 1}, createMultiSeriesIterator) + + // Assert + s.Equal([]cppbridge.Sample{ + {Timestamp: 50, Value: 20.0}, + {Timestamp: 150, Value: 20.0}, + {Timestamp: 200, Value: 30.0}, + }, samples) +} + +func (s *DataStorageSerializedDataMultiSeriesIteratorSuite) TestNoSeries() { + // Arrange + s.enc.Encode(0, 50, 20.0) + s.enc.Encode(1, 80, 10.0) + s.enc.Encode(0, 150, 20.0) + s.enc.Encode(1, 180, 30.0) + s.enc.Encode(2, 180, 30.0) + + // Act + samples := s.collectSamples(storage.SelectHints{ + Start: 0, + End: 200, + Step: 100, + Range: 100, + Func: "max", + }, []uint32{0, 1}, []uint32{2}, createMultiSeriesIterator) + + // Assert + s.Equal([]cppbridge.Sample{}, samples) +} diff --git a/pp/go/storage/querier/querier_optimize_test.go b/pp/go/storage/querier/querier_optimize_test.go index 19d9e4652..99ff057ac 100644 --- a/pp/go/storage/querier/querier_optimize_test.go +++ b/pp/go/storage/querier/querier_optimize_test.go @@ -184,15 +184,15 @@ func (s *SwitchFuncOptimizeSuite) TestCrossSeries() { }, { hints: &prom_storage.SelectHints{Func: "sum"}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum"}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: true}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum", By: true}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: true, Grouping: []string{"label"}}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum", By: true, Grouping: []string{"label"}}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: false, Grouping: []string{"label"}}, @@ -229,15 +229,15 @@ func (s *SwitchFuncOptimizeSuite) TestAll() { }, { hints: &prom_storage.SelectHints{Func: "sum"}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum"}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: true}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum", By: true}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: true, Grouping: []string{"label"}}, - expected: &prom_storage.SelectHints{}, + expected: &prom_storage.SelectHints{Func: "sum", By: true, Grouping: []string{"label"}}, }, { hints: &prom_storage.SelectHints{Func: "sum", By: false, Grouping: []string{"label"}}, diff --git a/pp/prometheus/promql/function_names.gperf b/pp/prometheus/promql/function_names.gperf index 70200431b..f2896dcf2 100644 --- a/pp/prometheus/promql/function_names.gperf +++ b/pp/prometheus/promql/function_names.gperf @@ -46,7 +46,9 @@ ln log10 log2 mad_over_time +max max_over_time +min min_over_time minute month @@ -73,6 +75,7 @@ sort_desc sqrt stddev_over_time stdvar_over_time +sum sum_over_time tan tanh diff --git a/pp/prometheus/promql/function_names_hash.h b/pp/prometheus/promql/function_names_hash.h index ffdfc04d7..f6842c020 100644 --- a/pp/prometheus/promql/function_names_hash.h +++ b/pp/prometheus/promql/function_names_hash.h @@ -1,6 +1,6 @@ /* C++ code produced by gperf version 3.2.1 */ /* Command-line: gperf -L C++ function_names.gperf */ -/* Computed positions: -k'2-4,$' */ +/* Computed positions: -k'1-4,$' */ #pragma once #include @@ -19,12 +19,12 @@ #error "gperf generated tables don't work with this execution character set. Please report a bug to ." #endif -#define TOTAL_KEYWORDS 82 +#define TOTAL_KEYWORDS 85 #define MIN_WORD_LENGTH 2 #define MAX_WORD_LENGTH 18 -#define MIN_HASH_VALUE 2 -#define MAX_HASH_VALUE 198 -/* maximum key range = 197, duplicates = 0 */ +#define MIN_HASH_VALUE 9 +#define MAX_HASH_VALUE 313 +/* maximum key range = 305, duplicates = 0 */ namespace PromPP::Prometheus::promql { class FunctionNamesHash { @@ -33,39 +33,30 @@ class FunctionNamesHash { }; constexpr unsigned int FunctionNamesHash::hash(const char* str, size_t len) { - constexpr static unsigned char asso_values[] = { - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 5, 5, 0, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 0, 199, 25, 45, 60, 20, 15, 75, 90, 5, 5, 199, 5, 45, 45, 0, 0, 60, 20, 0, 0, - 0, 30, 55, 199, 45, 10, 10, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, - 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199, 199}; + constexpr static unsigned short asso_values[] = { + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 0, 0, 0, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 15, 314, 0, 100, 0, 40, 20, 5, 125, 0, 5, 314, 0, 25, 50, 5, 25, 0, 70, 15, 5, + 0, 50, 20, 314, 35, 60, 0, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, + 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314, 314}; unsigned int hval = len; switch (hval) { default: hval += asso_values[static_cast(str[3])]; -#if (defined __cplusplus && (__cplusplus >= 201703L || (__cplusplus >= 201103L && defined __clang__ && __clang_major__ + (__clang_minor__ >= 9) > 3))) || \ - (defined __STDC_VERSION__ && __STDC_VERSION__ >= 202000L && ((defined __GNUC__ && __GNUC__ >= 10) || (defined __clang__ && __clang_major__ >= 9))) [[fallthrough]]; -#elif (defined __GNUC__ && __GNUC__ >= 7) || (defined __clang__ && __clang_major__ >= 10) - __attribute__((__fallthrough__)); -#endif - /*FALLTHROUGH*/ case 3: hval += asso_values[static_cast(str[2])]; -#if (defined __cplusplus && (__cplusplus >= 201703L || (__cplusplus >= 201103L && defined __clang__ && __clang_major__ + (__clang_minor__ >= 9) > 3))) || \ - (defined __STDC_VERSION__ && __STDC_VERSION__ >= 202000L && ((defined __GNUC__ && __GNUC__ >= 10) || (defined __clang__ && __clang_major__ >= 9))) [[fallthrough]]; -#elif (defined __GNUC__ && __GNUC__ >= 7) || (defined __clang__ && __clang_major__ >= 10) - __attribute__((__fallthrough__)); -#endif - /*FALLTHROUGH*/ case 2: hval += asso_values[static_cast(str[1])]; + [[fallthrough]]; + case 1: + hval += asso_values[static_cast(str[0])]; break; } return hval + asso_values[static_cast(str[len - 1])]; diff --git a/pp/prometheus/promql/window_function.h b/pp/prometheus/promql/window_function.h index c4bafa342..b7bf3d3e9 100644 --- a/pp/prometheus/promql/window_function.h +++ b/pp/prometheus/promql/window_function.h @@ -55,9 +55,9 @@ constexpr std::array kFunctions = { Function{.name = "delta", .type = FunctionType::kExtrapolatedRate}, Function{.name = "resets", .type = FunctionType::kNone}, Function{.name = "changes", .type = FunctionType::kThinning}, - Function{.name = "sum", .type = FunctionType::kNone}, - Function{.name = "min", .type = FunctionType::kNone}, - Function{.name = "max", .type = FunctionType::kNone}, + Function{.name = "sum", .type = FunctionType::kCrossSeriesSynthesizing}, + Function{.name = "min", .type = FunctionType::kCrossSeriesSynthesizing}, + Function{.name = "max", .type = FunctionType::kCrossSeriesSynthesizing}, }; constexpr uint32_t function_name_hash(std::string_view str) { @@ -92,6 +92,9 @@ constexpr uint32_t function_name_hash(std::string_view str) { PROMQL_WINDOW_FUNC_CASE(kDelta) PROMQL_WINDOW_FUNC_CASE(kResets) PROMQL_WINDOW_FUNC_CASE(kChanges) + PROMQL_WINDOW_FUNC_CASE(kSum) + PROMQL_WINDOW_FUNC_CASE(kMax) + PROMQL_WINDOW_FUNC_CASE(kMin) default: return WindowFunction::kNone; } diff --git a/pp/series_data/decoder/decorator/last_over_step.h b/pp/series_data/decoder/decorator/last_over_step.h index 32bbe27e5..d369c9974 100644 --- a/pp/series_data/decoder/decorator/last_over_step.h +++ b/pp/series_data/decoder/decorator/last_over_step.h @@ -8,10 +8,13 @@ class LastOverStep { public: explicit LastOverStep(encoder::Sample& sample, const PromPP::Primitives::TimeInterval& interval) : sample_(sample), interval_(interval) {} - PROMPP_ALWAYS_INLINE void operator()(PromPP::Primitives::Timestamp, double value) const noexcept { sample_.value = value; } + PROMPP_ALWAYS_INLINE void operator()(PromPP::Primitives::Timestamp, double value) noexcept { + sample_.value = value; + has_value_ = true; + } ~LastOverStep() { - if (!BareBones::Encoding::Gorilla::isstalenan(sample_.value)) [[likely]] { + if (has_value_) [[likely]] { sample_.timestamp = interval_.max; } } @@ -19,9 +22,10 @@ class LastOverStep { private: encoder::Sample& sample_; const PromPP::Primitives::TimeInterval& interval_; + bool has_value_{}; }; template -using LastOverStepIterator = OverTimeFuncIterator; +using LastOverStepIterator = OverTimeFuncIterator; } // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/last_over_time.h b/pp/series_data/decoder/decorator/last_over_time.h index b1d591ecc..43d10db68 100644 --- a/pp/series_data/decoder/decorator/last_over_time.h +++ b/pp/series_data/decoder/decorator/last_over_time.h @@ -18,6 +18,9 @@ class LastOverTime { }; template -using LastOverTimeIterator = OverTimeFuncIterator; +using LastOverTimeIterator = OverTimeFuncIterator; + +template +using LastOverTimeWithStaleNansIterator = OverTimeFuncIterator; } // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/lookback_delta_iterator.h b/pp/series_data/decoder/decorator/lookback_delta_iterator.h new file mode 100644 index 000000000..f5ceb664b --- /dev/null +++ b/pp/series_data/decoder/decorator/lookback_delta_iterator.h @@ -0,0 +1,68 @@ +#pragma once + +#include "primitives/primitives.h" +#include "series_data/decoder/universal_decode_iterator.h" + +namespace series_data::decoder::decorator { + +template +class LookbackDeltaIterator { + public: + DECODE_ITERATOR_TYPE_TRAITS(); + + LookbackDeltaIterator(Iterator&& iterator, PromPP::Primitives::Timestamp lookback_delta) : iterator_(std::move(iterator)), lookback_delta_(lookback_delta) { + update_sample(); + } + + [[nodiscard]] PROMPP_ALWAYS_INLINE const PromPP::Primitives::TimeInterval& interval() const noexcept { return iterator_.interval(); } + PROMPP_ALWAYS_INLINE void set_interval(const PromPP::Primitives::TimeInterval& interval) { + iterator_.set_interval(interval); + update_sample(); + } + + PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const { return sample_; } + PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const { return &sample_; } + + PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const { return sample_.timestamp == kInvalidTimestamp; } + [[nodiscard]] PROMPP_ALWAYS_INLINE bool has_more_samples() const noexcept { return iterator_.has_more_samples(); } + + PROMPP_ALWAYS_INLINE LookbackDeltaIterator& operator++() { + ++iterator_; + update_sample(); + return *this; + } + + PROMPP_ALWAYS_INLINE LookbackDeltaIterator operator++(int) { + const auto result = *this; + ++*this; + return result; + } + + protected: + static constexpr auto kInvalidSample = encoder::Sample{.timestamp = kInvalidTimestamp, .value = BareBones::Encoding::Gorilla::STALE_NAN}; + + encoder::Sample sample_{kInvalidSample}; + Iterator iterator_; + PromPP::Primitives::Timestamp lookback_delta_; + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + if (iterator_ != DecodeIteratorSentinel{}) [[likely]] { + if (BareBones::Encoding::Gorilla::isstalenan(iterator_->value)) [[unlikely]] { + sample_ = kInvalidSample; + return; + } + + sample_ = *iterator_; + } + + if (!sample_in_lookback_delta_interval()) { + sample_ = kInvalidSample; + } + } + + [[nodiscard]] PROMPP_ALWAYS_INLINE bool sample_in_lookback_delta_interval() const noexcept { + return sample_.timestamp > iterator_.interval().max - lookback_delta_; + } +}; + +} // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/max_over_time.h b/pp/series_data/decoder/decorator/max_over_time.h index 9d2b29643..818c8125e 100644 --- a/pp/series_data/decoder/decorator/max_over_time.h +++ b/pp/series_data/decoder/decorator/max_over_time.h @@ -20,6 +20,6 @@ class FindMaxElement { }; template -using MaxOverTimeIterator = OverTimeFuncIterator; +using MaxOverTimeIterator = OverTimeFuncIterator; } // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/min_over_time.h b/pp/series_data/decoder/decorator/min_over_time.h index dae567f73..741dd3ed1 100644 --- a/pp/series_data/decoder/decorator/min_over_time.h +++ b/pp/series_data/decoder/decorator/min_over_time.h @@ -20,6 +20,6 @@ class FindMinElement { }; template -using MinOverTimeIterator = OverTimeFuncIterator; +using MinOverTimeIterator = OverTimeFuncIterator; } // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/multiseries_iterator.h b/pp/series_data/decoder/decorator/multiseries_iterator.h new file mode 100644 index 000000000..e680f1ce7 --- /dev/null +++ b/pp/series_data/decoder/decorator/multiseries_iterator.h @@ -0,0 +1,90 @@ +#pragma once + +#include "bare_bones/gorilla.h" +#include "series_data/decoder/traits.h" +#include "window_boundary_calculator.h" + +namespace series_data::decoder::decorator { + +template +class MultiSeriesIterator { + public: + DECODE_ITERATOR_TYPE_TRAITS(); + + explicit MultiSeriesIterator(BareBones::Vector&& iterators, const WindowFunctionParameters& parameters) + : iterators_(std::move(iterators)), parameters_(¶meters) { + seek_to_first_non_stale_nan_sample(); + } + + PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const { return sample_; } + PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const { return &sample_; } + + PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const { return sample_.timestamp == kInvalidTimestamp; } + + PROMPP_ALWAYS_INLINE MultiSeriesIterator& operator++() { + update_sample(); + return *this; + } + + PROMPP_ALWAYS_INLINE MultiSeriesIterator operator++(int) { + const auto result = *this; + ++*this; + return result; + } + + template + PROMPP_ALWAYS_INLINE void reset(const WindowFunctionParameters& parameters, IteratorsGenerator&& iterators_generator) { + iterators_.clear(); + std::forward(iterators_generator)(iterators_); + + parameters_ = ¶meters; + + seek_to_first_non_stale_nan_sample(); + } + + private: + encoder::Sample sample_; + BareBones::Vector iterators_; + const WindowFunctionParameters* parameters_; + + PROMPP_ALWAYS_INLINE void seek_to_first_non_stale_nan_sample() { + do { + update_sample(); + } while (*this != DecodeIteratorSentinel{} && BareBones::Encoding::Gorilla::isstalenan(sample_.value)); + } + + void update_sample() { + sample_ = encoder::Sample{.timestamp = kInvalidTimestamp, .value = BareBones::Encoding::Gorilla::STALE_NAN}; + + if (iterators_.empty()) [[unlikely]] { + return; + } + + const auto current_window = iterators_[0].interval(); + if (current_window.min > current_window.max) [[unlikely]] { + return; + } + + handle_samples(current_window, WindowBoundaryCalculator::next_window(current_window, *parameters_)); + sample_.timestamp = current_window.max; + } + + PROMPP_ALWAYS_INLINE void handle_samples(const PromPP::Primitives::TimeInterval& current_window, + const PromPP::Primitives::TimeInterval& next_window) noexcept { + SampleHandler handler{sample_, current_window}; + for (auto it = iterators_.begin(); it != iterators_.end();) { + auto& iterator = *it; + if (iterator != DecodeIteratorSentinel{}) [[likely]] { + handler(iterator->timestamp, iterator->value); + } else if (!iterator.has_more_samples()) { + iterators_.erase(it); + continue; + } + + iterator.set_interval(next_window); + ++it; + } + } +}; + +} // namespace series_data::decoder::decorator diff --git a/pp/series_data/decoder/decorator/over_time_func_iterator.h b/pp/series_data/decoder/decorator/over_time_func_iterator.h index cfff00044..8ce8bccab 100644 --- a/pp/series_data/decoder/decorator/over_time_func_iterator.h +++ b/pp/series_data/decoder/decorator/over_time_func_iterator.h @@ -5,7 +5,7 @@ namespace series_data::decoder::decorator { -template +template class OverTimeFuncIterator { public: DECODE_ITERATOR_TYPE_TRAITS(); @@ -25,6 +25,7 @@ class OverTimeFuncIterator { PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const { return &sample_; } PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const { return sample_.timestamp == kInvalidTimestamp; } + [[nodiscard]] PROMPP_ALWAYS_INLINE bool has_more_samples() const noexcept { return iterator_ != DecodeIteratorSentinel{}; } PROMPP_ALWAYS_INLINE OverTimeFuncIterator& operator++() { sample_.timestamp = kInvalidTimestamp; @@ -55,10 +56,11 @@ class OverTimeFuncIterator { return SeekResult::kStop; } - if (!BareBones::Encoding::Gorilla::isstalenan(value)) [[likely]] { - handler(timestamp, value); + if (SkipStaleNans && BareBones::Encoding::Gorilla::isstalenan(value)) [[unlikely]] { + return SeekResult::kNext; } + handler(timestamp, value); return SeekResult::kNext; }); } diff --git a/pp/series_data/decoder/decorator/sum_over_time.h b/pp/series_data/decoder/decorator/sum_over_time.h index f3866ccf3..fd4236714 100644 --- a/pp/series_data/decoder/decorator/sum_over_time.h +++ b/pp/series_data/decoder/decorator/sum_over_time.h @@ -4,19 +4,6 @@ namespace series_data::decoder::decorator { -PROMPP_ALWAYS_INLINE void kahan_sum_inc(double inc, double& sum, double& c) noexcept { - const auto t = sum + inc; - if (std::isinf(t)) { - c = 0; - } else if (std::abs(sum) >= std::abs(inc)) { - c += sum - t + inc; - } else { - c += inc - t + sum; - } - - sum = t; -} - class SumOfElements { public: explicit SumOfElements(encoder::Sample& sum, const PromPP::Primitives::TimeInterval& interval) : sum_(sum), interval_(interval) {} @@ -44,9 +31,22 @@ class SumOfElements { encoder::Sample& sum_; const PromPP::Primitives::TimeInterval& interval_; double c_{}; + + PROMPP_ALWAYS_INLINE void kahan_sum_inc(double inc, double& sum, double& c) noexcept { + const auto t = sum + inc; + if (std::isinf(t)) { + c = 0; + } else if (std::abs(sum) >= std::abs(inc)) { + c += sum - t + inc; + } else { + c += inc - t + sum; + } + + sum = t; + } }; template -using SumOverTimeIterator = OverTimeFuncIterator; +using SumOverTimeIterator = OverTimeFuncIterator; } // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/serialization/serialized_data.h b/pp/series_data/serialization/serialized_data.h index 5b73e315b..ccb1b5792 100644 --- a/pp/series_data/serialization/serialized_data.h +++ b/pp/series_data/serialization/serialized_data.h @@ -420,6 +420,19 @@ class SerializedDataView { return {get_buffer_view(), get_chunks_view(), series_first_chunk_id}; } + template + void enumerate_series(const SeriesChunkHandler& handler) { + const auto& chunks = get_chunks_view(); + for (auto it = chunks.begin(); it != chunks.end();) { + handler(*it, it - chunks.begin()); + + const auto series_id = it->label_set_id; + do { + ++it; + } while (it != chunks.end() && it->label_set_id == series_id); + } + } + private: const SerializedData& data_; uint32_t series_first_chunk_id_{kNoMoreSeries}; diff --git a/pp/series_data/tests/decoder/decorator/last_over_time_tests.cpp b/pp/series_data/tests/decoder/decorator/last_over_time_tests.cpp index 88eaa87a8..38cc62180 100644 --- a/pp/series_data/tests/decoder/decorator/last_over_time_tests.cpp +++ b/pp/series_data/tests/decoder/decorator/last_over_time_tests.cpp @@ -15,6 +15,7 @@ using series_data::chunk::DataChunk; using series_data::decoder::DecodeIteratorSentinel; using series_data::decoder::UniversalDecodeIterator; using series_data::decoder::decorator::LastOverTimeIterator; +using series_data::decoder::decorator::LastOverTimeWithStaleNansIterator; using series_data::encoder::Sample; struct LastOverTimeIteratorCase { @@ -23,7 +24,8 @@ struct LastOverTimeIteratorCase { std::vector expected{}; }; -class LastOverTimeFixture : public ::testing::TestWithParam { +template +class GenericLastOverTimeFixture : public ::testing::TestWithParam { protected: DataStorage storage_; @@ -33,20 +35,26 @@ class LastOverTimeFixture : public ::testing::TestWithParam actual_samples; + void test() { + // Arrange + std::vector actual_samples; + + // Act + Decoder::create_decode_iterator(storage_, storage_.open_chunks[0], [&actual_samples](Iterator&& begin, auto&&) { + std::ranges::copy(LastOverTimeIterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, GetParam().interval), + DecodeIteratorSentinel{}, std::back_inserter(actual_samples)); + }); + + // Assert + EXPECT_EQ(GetParam().expected, actual_samples); + } +}; - // Act - Decoder::create_decode_iterator(storage_, storage_.open_chunks[0], [&actual_samples](Iterator&& begin, auto&&) { - std::ranges::copy(LastOverTimeIterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, GetParam().interval), - DecodeIteratorSentinel{}, std::back_inserter(actual_samples)); - }); +using LastOverTimeFixture = GenericLastOverTimeFixture>; - // Assert - EXPECT_EQ(GetParam().expected, actual_samples); +TEST_P(LastOverTimeFixture, Test) { + test(); } INSTANTIATE_TEST_SUITE_P( @@ -97,4 +105,59 @@ INSTANTIATE_TEST_SUITE_P(TimeInterval, .interval{.min = 100, .max = 200}, .expected{Sample{.timestamp = 180, .value = 1.2}}})); +using LastOverTimeWithStaleNansFixture = GenericLastOverTimeFixture>; + +TEST_P(LastOverTimeWithStaleNansFixture, Test) { + test(); +} + +INSTANTIATE_TEST_SUITE_P( + OneSample, + LastOverTimeWithStaleNansFixture, + testing::Values(LastOverTimeIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, + .interval{.min = 0, .max = 100}, + .expected{Sample{.timestamp = 100, .value = 1.0}}}, + LastOverTimeIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, .interval{.min = 0, .max = 99}, .expected{}}, + LastOverTimeIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, .interval{.min = 101, .max = 200}, .expected{}})); + +INSTANTIATE_TEST_SUITE_P(StaleNan, + LastOverTimeWithStaleNansFixture, + testing::Values(LastOverTimeIteratorCase{.samples{ + Sample{.timestamp = 5, .value = STALE_NAN}, + Sample{.timestamp = 10, .value = 1.0}, + Sample{.timestamp = 20, .value = STALE_NAN}, + Sample{.timestamp = 30, .value = 1.1}, + }, + .interval{.min = 0, .max = 100}, + .expected{Sample{.timestamp = 30, .value = 1.1}}}, + LastOverTimeIteratorCase{.samples{Sample{.timestamp = 100, .value = STALE_NAN}}, + .interval{.min = 0, .max = 100}, + .expected{Sample{.timestamp = 100, .value = STALE_NAN}}})); + +INSTANTIATE_TEST_SUITE_P(TimeInterval, + LastOverTimeWithStaleNansFixture, + testing::Values(LastOverTimeIteratorCase{.samples{ + Sample{.timestamp = 99, .value = 1.1}, + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 200, .value = 1.0}, + Sample{.timestamp = 201, .value = 1.1}, + }, + .interval{.min = 100, .max = 200}, + .expected{Sample{.timestamp = 200, .value = 1.0}}}, + LastOverTimeIteratorCase{.samples{ + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 150, .value = 1.1}, + Sample{.timestamp = 200, .value = 1.2}, + }, + .interval{.min = 100, .max = 200}, + .expected{Sample{.timestamp = 200, .value = 1.2}}}, + LastOverTimeIteratorCase{.samples{ + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 150, .value = 1.1}, + Sample{.timestamp = 180, .value = 1.2}, + Sample{.timestamp = 200, .value = STALE_NAN}, + }, + .interval{.min = 100, .max = 200}, + .expected{Sample{.timestamp = 200, .value = STALE_NAN}}})); + } // namespace \ No newline at end of file diff --git a/pp/series_data/tests/decoder/decorator/lookback_delta_iterator_tests.cpp b/pp/series_data/tests/decoder/decorator/lookback_delta_iterator_tests.cpp new file mode 100644 index 000000000..d835edf4c --- /dev/null +++ b/pp/series_data/tests/decoder/decorator/lookback_delta_iterator_tests.cpp @@ -0,0 +1,169 @@ +#include + +#include "series_data/data_storage.h" +#include "series_data/decoder.h" +#include "series_data/decoder/decorator/last_over_time.h" +#include "series_data/decoder/decorator/lookback_delta_iterator.h" +#include "series_data/encoder.h" + +namespace { + +using BareBones::Encoding::Gorilla::STALE_NAN; +using PromPP::Primitives::TimeInterval; +using PromPP::Primitives::Timestamp; +using series_data::DataStorage; +using series_data::Decoder; +using series_data::Encoder; +using series_data::chunk::DataChunk; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::decoder::kInvalidTimestamp; +using series_data::decoder::UniversalDecodeIterator; +using series_data::decoder::decorator::LastOverTimeWithStaleNansIterator; +using series_data::decoder::decorator::LookbackDeltaIterator; +using series_data::encoder::Sample; + +constexpr Sample kInvalidSample{.timestamp = kInvalidTimestamp, .value = STALE_NAN}; + +struct LookbackDeltaIteratorCase { + std::vector samples; + std::vector intervals; + Timestamp lookback_delta{}; + std::vector expected{}; +}; + +class LookbackDeltaIteratorFixture : public testing::TestWithParam { + protected: + using Iterator = LookbackDeltaIterator>; + + DataStorage storage_; + Encoder<> encoder_{storage_}; + + void SetUp() override { + for (const auto& sample : GetParam().samples) { + encoder_.encode(0, sample.timestamp, sample.value); + } + } + + std::vector get_samples(const std::vector& intervals, Timestamp lookback_delta) { + std::vector samples; + + Decoder::create_decode_iterator( + storage_, storage_.open_chunks[0], [&samples, &intervals, lookback_delta](DecodeIterator&& begin, auto&&) { + Iterator it(LastOverTimeWithStaleNansIterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, + intervals.front()), + lookback_delta); + samples.emplace_back(*it); + + std::ranges::for_each(intervals.begin() + 1, intervals.end(), [&samples, &it](const auto& interval) { + it.set_interval(interval); + samples.emplace_back(*it); + }); + }); + + return samples; + } +}; + +TEST_P(LookbackDeltaIteratorFixture, Test) { + // Arrange + + // Act + const auto actual_samples = get_samples(GetParam().intervals, GetParam().lookback_delta); + + // Assert + EXPECT_EQ(GetParam().expected, actual_samples); +} + +INSTANTIATE_TEST_SUITE_P(OneSample, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, + .intervals{{.min = 0, .max = 100}}, + .lookback_delta = 50, + .expected{Sample{.timestamp = 100, .value = 1.0}}}, + LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, + .intervals{{.min = 0, .max = 99}}, + .lookback_delta = 50, + .expected{kInvalidSample}}, + LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, + .intervals{{.min = 101, .max = 200}}, + .lookback_delta = 50, + .expected{kInvalidSample}})); + +INSTANTIATE_TEST_SUITE_P(StaleNan, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{ + Sample{.timestamp = 5, .value = STALE_NAN}, + Sample{.timestamp = 10, .value = 1.0}, + Sample{.timestamp = 20, .value = STALE_NAN}, + Sample{.timestamp = 51, .value = 1.1}, + }, + .intervals{{.min = 0, .max = 100}}, + .lookback_delta = 50, + .expected{Sample{.timestamp = 51, .value = 1.1}}}, + LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 100, .value = STALE_NAN}}, + .intervals{{.min = 0, .max = 100}}, + .lookback_delta = 50, + .expected{kInvalidSample}}, + LookbackDeltaIteratorCase{.samples{ + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 101, .value = STALE_NAN}, + Sample{.timestamp = 299, .value = 2.0}, + }, + .intervals{ + {.min = 0, .max = 100}, + {.min = 101, .max = 200}, + {.min = 201, .max = 300}, + }, + .lookback_delta = 50, + .expected{ + Sample{.timestamp = 100, .value = 1.0}, + kInvalidSample, + Sample{.timestamp = 299, .value = 2.0}, + }})); + +INSTANTIATE_TEST_SUITE_P(TimeInterval, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{ + Sample{.timestamp = 99, .value = 1.1}, + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 200, .value = 1.0}, + Sample{.timestamp = 201, .value = 1.1}, + }, + .intervals{{.min = 100, .max = 200}}, + .lookback_delta = 50, + .expected{Sample{.timestamp = 200, .value = 1.0}}}, + LookbackDeltaIteratorCase{.samples{ + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 150, .value = 1.1}, + Sample{.timestamp = 200, .value = 1.2}, + }, + .intervals{{.min = 100, .max = 200}}, + .lookback_delta = 50, + .expected{Sample{.timestamp = 200, .value = 1.2}}})); + +INSTANTIATE_TEST_SUITE_P(LookbackDeltaBoundary, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 50, .value = 1.0}}, + .intervals{{.min = 100, .max = 200}}, + .lookback_delta = 50, + .expected{kInvalidSample}})); + +INSTANTIATE_TEST_SUITE_P(KeepSample, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 150, .value = 2.0}}, + .intervals{{.min = 100, .max = 200}, {.min = 201, .max = 300}}, + .lookback_delta = 151, + .expected{Sample{.timestamp = 150, .value = 2.0}, Sample{.timestamp = 150, .value = 2.0}}}, + LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 150, .value = 2.0}}, + .intervals{{.min = 100, .max = 200}, {.min = 201, .max = 300}}, + .lookback_delta = 150, + .expected{Sample{.timestamp = 150, .value = 2.0}, kInvalidSample}})); + +INSTANTIATE_TEST_SUITE_P(DropSample, + LookbackDeltaIteratorFixture, + testing::Values(LookbackDeltaIteratorCase{.samples{Sample{.timestamp = 50, .value = 2.0}}, + .intervals{{.min = 0, .max = 100}}, + .lookback_delta = 50, + .expected{kInvalidSample}})); + +} // namespace diff --git a/pp/series_data/tests/decoder/decorator/multiseries_iterator_tests.cpp b/pp/series_data/tests/decoder/decorator/multiseries_iterator_tests.cpp new file mode 100644 index 000000000..e6e8630f8 --- /dev/null +++ b/pp/series_data/tests/decoder/decorator/multiseries_iterator_tests.cpp @@ -0,0 +1,313 @@ +#include + +#include "series_data/data_storage.h" +#include "series_data/decoder.h" +#include "series_data/decoder/decorator/last_over_time.h" +#include "series_data/decoder/decorator/lookback_delta_iterator.h" +#include "series_data/decoder/decorator/min_over_time.h" +#include "series_data/decoder/decorator/multiseries_iterator.h" +#include "series_data/decoder/decorator/sum_over_time.h" +#include "series_data/encoder.h" + +namespace { + +using BareBones::Encoding::Gorilla::STALE_NAN; +using PromPP::Primitives::TimeInterval; +using PromPP::Primitives::Timestamp; +using series_data::DataStorage; +using series_data::Decoder; +using series_data::Encoder; +using series_data::chunk::DataChunk; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::decoder::UniversalDecodeIterator; +using series_data::decoder::decorator::FindMinElement; +using series_data::decoder::decorator::LastOverTimeWithStaleNansIterator; +using series_data::decoder::decorator::LookbackDeltaIterator; +using series_data::decoder::decorator::MultiSeriesIterator; +using series_data::decoder::decorator::StepLookbackDeltaWindowCalculator; +using series_data::decoder::decorator::SumOfElements; +using series_data::decoder::decorator::WindowFunctionParameters; +using series_data::encoder::Sample; + +template +class MultiSeriesIteratorFixture : public ::testing::Test { + protected: + using Iterator = MultiSeriesIterator>, SampleHandler, StepLookbackDeltaWindowCalculator>; + + DataStorage storage_; + Encoder<> encoder_{storage_}; + BareBones::Vector>> iterators_; + BareBones::Vector samples_; + + void create_iterators(std::initializer_list series_ids, const WindowFunctionParameters& parameters) { + const auto initial_window = StepLookbackDeltaWindowCalculator::initial_window(parameters); + + for (const auto series_id : series_ids) { + Decoder::create_decode_iterator(storage_, storage_.open_chunks[series_id], [&](Iterator&& begin, auto&&) { + iterators_.emplace_back( + LastOverTimeWithStaleNansIterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, initial_window), + parameters.lookback_delta); + }); + } + } + + void get_samples(const WindowFunctionParameters& parameters) { + std::ranges::copy(Iterator{std::move(iterators_), parameters}, DecodeIteratorSentinel{}, std::back_insert_iterator(samples_)); + } +}; + +using MultiSeriesIteratorMinElementFixture = MultiSeriesIteratorFixture; + +TEST_F(MultiSeriesIteratorMinElementFixture, EmptyIteratorListIsImmediatelyExhausted) { + // Arrange + + // Act + get_samples({}); + + // Assert + EXPECT_TRUE(samples_.empty()); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, SingleSeriesOneSampleYieldsThatSample) { + // Arrange + encoder_.encode(0, 100, 3.5); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 99, .max = 100}, + .step = 100, + .lookback_delta = 1, + }; + create_iterators({0}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{Sample{.timestamp = 100, .value = 3.5}}), samples_); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, MinValueAcrossTwoSeries) { + // Arrange + encoder_.encode(0, 11, 7.0); + encoder_.encode(1, 20, 2.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 10, .max = 20}, + .step = 10, + .lookback_delta = 10, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{Sample{.timestamp = 20, .value = 2.0}}), samples_); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, TwoWindowsAcrossTwoSeries) { + // Arrange + encoder_.encode(0, 150, 5.0); + encoder_.encode(0, 201, 6.0); + encoder_.encode(1, 150, 4.0); + encoder_.encode(1, 201, 5.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 100, .max = 250}, + .step = 50, + .lookback_delta = 50, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 150, .value = 4.0}, + Sample{.timestamp = 200, .value = STALE_NAN}, + Sample{.timestamp = 250, .value = 5.0}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, LastStaleNan) { + // Arrange + encoder_.encode(0, 101, 5.0); + encoder_.encode(0, 200, 6.0); + encoder_.encode(1, 101, 4.0); + encoder_.encode(1, 200, 5.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 100, .max = 250}, + .step = 50, + .lookback_delta = 50, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 150, .value = 4.0}, + Sample{.timestamp = 200, .value = 5.0}, + Sample{.timestamp = 250, .value = STALE_NAN}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, LookbackInterval) { + // Arrange + encoder_.encode(0, 50, 20.0); + encoder_.encode(1, 80, 10.0); + encoder_.encode(0, 150, 20.0); + encoder_.encode(1, 180, 30.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 200}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 100, .value = 10.0}, + Sample{.timestamp = 200, .value = 20.0}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorMinElementFixture, StaleNansInSeries) { + // Arrange + encoder_.encode(0, 50, 20.0); + encoder_.encode(1, 80, 10.0); + encoder_.encode(0, 150, STALE_NAN); + encoder_.encode(1, 180, STALE_NAN); + encoder_.encode(0, 250, 20.0); + encoder_.encode(1, 280, 30.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 300}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 100, .value = 10.0}, + Sample{.timestamp = 200, .value = STALE_NAN}, + Sample{.timestamp = 300, .value = 20.0}, + }), + samples_); +} + +using MultiSeriesIteratorSumOfElementsFixture = MultiSeriesIteratorFixture; + +TEST_F(MultiSeriesIteratorSumOfElementsFixture, SumOfNoSamplesInStep) { + // Arrange + encoder_.encode(0, 50, 1.0); + encoder_.encode(0, 150, STALE_NAN); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 200}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 100, .value = 1.0}, + Sample{.timestamp = 200, .value = STALE_NAN}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorSumOfElementsFixture, SkipLeadingStaleNans) { + // Arrange + encoder_.encode(0, 50, STALE_NAN); + encoder_.encode(0, 150, STALE_NAN); + encoder_.encode(0, 250, STALE_NAN); + encoder_.encode(0, 350, 1.0); + encoder_.encode(0, 450, 2.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 500}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 400, .value = 1.0}, + Sample{.timestamp = 500, .value = 2.0}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorSumOfElementsFixture, SkipTrailingStaleNans) { + // Arrange + encoder_.encode(0, 50, 1.0); + encoder_.encode(1, 50, 1.0); + encoder_.encode(1, 150, 2.0); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 500}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 100, .value = 2.0}, + Sample{.timestamp = 200, .value = 2.0}, + Sample{.timestamp = 300, .value = STALE_NAN}, + }), + samples_); +} + +TEST_F(MultiSeriesIteratorSumOfElementsFixture, SkipTrailingStaleNansWithLastStaleNans) { + // Arrange + encoder_.encode(0, 50, 1.0); + encoder_.encode(1, 50, 1.0); + encoder_.encode(0, 150, STALE_NAN); + + constexpr WindowFunctionParameters parameters = { + .interval = TimeInterval{.min = 0, .max = 500}, + .step = 100, + .lookback_delta = 100, + }; + create_iterators({0, 1}, parameters); + + // Act + get_samples(parameters); + + // Assert + EXPECT_EQ((BareBones::Vector{ + Sample{.timestamp = 100, .value = 2.0}, + Sample{.timestamp = 200, .value = STALE_NAN}, + }), + samples_); +} + +} // namespace diff --git a/pp/series_data/tests/serialization/serialized_data_tests.cpp b/pp/series_data/tests/serialization/serialized_data_tests.cpp new file mode 100644 index 000000000..177af6a5a --- /dev/null +++ b/pp/series_data/tests/serialization/serialized_data_tests.cpp @@ -0,0 +1,110 @@ +#include + +#include "series_data/chunk_finalizer.h" +#include "series_data/data_storage.h" +#include "series_data/encoder.h" +#include "series_data/serialization/serialized_data.h" + +namespace { + +using series_data::ChunkFinalizer; +using series_data::DataStorage; +using series_data::Encoder; +using series_data::serialization::DataSerializer; +using series_data::serialization::SerializedData; +using series_data::serialization::SerializedDataView; + +class SerializedDataViewEnumerateSeriesFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + DataSerializer serializer_{storage_}; + + struct ChunkInfo { + uint32_t series_id; + uint32_t chunk_id; + + bool operator==(const ChunkInfo& other) const noexcept = default; + }; + + [[nodiscard]] SerializedData serialize() noexcept { + auto data = serializer_.serialize(); + storage_.reset(); + return data; + } + + std::vector enumerate_series() noexcept { + std::vector series_ids; + SerializedDataView{serialize()}.enumerate_series([&](const auto& chunk, uint32_t chunk_id) { series_ids.emplace_back(chunk.label_set_id, chunk_id); }); + return series_ids; + } +}; + +TEST_F(SerializedDataViewEnumerateSeriesFixture, NoSeries) { + // Arrange + + // Act + const auto series_ids = enumerate_series(); + + // Assert + EXPECT_TRUE(series_ids.empty()); +} + +TEST_F(SerializedDataViewEnumerateSeriesFixture, OneSeries) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 1.0); + + // Act + const auto series_ids = enumerate_series(); + + // Assert + EXPECT_EQ((std::vector{ChunkInfo{.series_id = 0U, .chunk_id = 0}}), series_ids); +} + +TEST_F(SerializedDataViewEnumerateSeriesFixture, TwoSeries) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 2.0); + + // Act + const auto series_ids = enumerate_series(); + + // Assert + EXPECT_EQ((std::vector{ChunkInfo{.series_id = 0U, .chunk_id = 0}, ChunkInfo{.series_id = 1U, .chunk_id = 1U}}), series_ids); +} + +TEST_F(SerializedDataViewEnumerateSeriesFixture, OneSeriesWithTwoChunks) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 1.0); + encoder_.encode(0, 3, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 4, 1.0); + + // Act + const auto series_ids = enumerate_series(); + + // Assert + EXPECT_EQ((std::vector{ChunkInfo{.series_id = 0U, .chunk_id = 0}}), series_ids); +} + +TEST_F(SerializedDataViewEnumerateSeriesFixture, TwoSeriesWithMultipleChunksEach) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 2.0); + encoder_.encode(0, 2, 1.0); + encoder_.encode(1, 2, 2.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + ChunkFinalizer::finalize(storage_, 1, storage_.open_chunks[1]); + encoder_.encode(0, 3, 1.0); + encoder_.encode(1, 3, 2.0); + + // Act + const auto series_ids = enumerate_series(); + + // Assert + EXPECT_EQ((std::vector{ChunkInfo{.series_id = 0U, .chunk_id = 0}, ChunkInfo{.series_id = 1U, .chunk_id = 2U}}), series_ids); +} + +} // namespace