Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Increment the:

## [Unreleased]

* [METRICS SDK] Implement configurable cardinality limit at three priority levels
[#3292](https://github.com/open-telemetry/opentelemetry-cpp/issues/3292)

* [SDK] Add `TracerProvider::UpdateTracerConfigurator()` and example
[#4065](https://github.com/open-telemetry/opentelemetry-cpp/issues/4065)

Expand Down
10 changes: 10 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -108,6 +109,15 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>
*/
nostd::span<std::shared_ptr<CollectorHandle>> GetCollectors() noexcept;

/**
* Get the cardinality limit for a given instrument type from configured readers.
* Returns the maximum limit across all readers, or 0 if no reader specifies a limit.
*
* @param instrument_type The instrument type to get the limit for
* @return The maximum cardinality limit, or 0 if not configured
*/
size_t GetReaderCardinalityLimit(InstrumentType instrument_type) noexcept;

/**
* GET SDK Start time
*
Expand Down
34 changes: 34 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <atomic>
#include <chrono>
#include <cstddef>

#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
Expand All @@ -16,6 +17,22 @@ namespace sdk
{
namespace metrics
{
/**
* Configuration options for cardinality limits per instrument type.
* A value of 0 means use the SDK default (2000).
*/
struct CardinalityLimitOptions
{
size_t default_limit = 0; // 0 means use SDK default (2000)
size_t counter = 0;
size_t gauge = 0;
size_t histogram = 0;
size_t observable_counter = 0;
size_t observable_gauge = 0;
size_t observable_up_down_counter = 0;
size_t up_down_counter = 0;
};

/**
* MetricReader defines the interface to collect metrics from SDK
*/
Expand Down Expand Up @@ -45,6 +62,22 @@ class MetricReader
virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept = 0;

/**
* Get the cardinality limit for a given instrument type.
* Returns 0 if no limit is configured for this instrument type.
*
* @param instrument_type The instrument type to get the limit for
* @return The cardinality limit, or 0 if not configured
*/
size_t GetCardinalityLimit(InstrumentType instrument_type) const noexcept;

/**
* Set cardinality limit options for this reader.
*
* @param options The cardinality limit options
*/
void SetCardinalityLimitOptions(const CardinalityLimitOptions &options) noexcept;

/**
* Shutdown the metric reader.
*/
Expand Down Expand Up @@ -73,6 +106,7 @@ class MetricReader
private:
MetricProducer *metric_producer_{nullptr};
std::atomic<bool> shutdown_{false};
CardinalityLimitOptions cardinality_limit_options_;
};
} // namespace metrics
} // namespace sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
ExemplarFilterType exempler_filter_type,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
#endif
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
const AggregationConfig *aggregation_config,
std::shared_ptr<AggregationConfig> owned_aggregation_config = nullptr)
: owned_aggregation_config_(std::move(owned_aggregation_config)),
instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
aggregation_config_{AggregationConfig::GetOrDefault(aggregation_config)},
cumulative_hash_map_(
Expand Down Expand Up @@ -139,6 +141,8 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
}

private:
// Keep owned config alive if created from reader-level defaults
std::shared_ptr<AggregationConfig> owned_aggregation_config_;
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
const AggregationConfig *aggregation_config_;
Expand Down
16 changes: 16 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <chrono>
#include <cstddef>
#include <memory>

#include "opentelemetry/nostd/function_ref.h"
Expand Down Expand Up @@ -36,6 +37,19 @@ class CollectorHandle

virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept = 0;

/**
* Get the cardinality limit for a given instrument type.
* Returns 0 if no limit is configured.
*
* @param instrument_type The instrument type to get the limit for
* @return The cardinality limit, or 0 if not configured
*/
virtual size_t GetCardinalityLimit(InstrumentType instrument_type) noexcept
{
(void)instrument_type;
return 0;
}
};

/**
Expand All @@ -61,6 +75,8 @@ class MetricCollector : public MetricProducer, public CollectorHandle
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept override;

size_t GetCardinalityLimit(InstrumentType instrument_type) noexcept override;

/**
* The callback to be called for each metric exporter. This will only be those
* metrics that have been produced since the last time this method was called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
ExemplarFilterType exempler_filter_type,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
#endif
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
const AggregationConfig *aggregation_config,
std::shared_ptr<AggregationConfig> owned_aggregation_config = nullptr)
: owned_aggregation_config_(std::move(owned_aggregation_config)),
instrument_descriptor_(instrument_descriptor),
aggregation_config_(AggregationConfig::GetOrDefault(aggregation_config)),
attributes_hashmap_(
std::make_unique<AttributesHashMap>(aggregation_config_->cardinality_limit_)),
Expand Down Expand Up @@ -291,6 +293,8 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
}
#endif

// Keep owned config alive if created from reader-level defaults
std::shared_ptr<AggregationConfig> owned_aggregation_config_;
InstrumentDescriptor instrument_descriptor_;
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
const AggregationConfig *aggregation_config_;
Expand Down
44 changes: 39 additions & 5 deletions sdk/src/configuration/sdk_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1471,14 +1471,24 @@ std::unique_ptr<opentelemetry::sdk::metrics::MetricReader> SdkBuilder::CreatePer
OTEL_INTERNAL_LOG_WARN("metric producer not supported, ignoring");
}

sdk = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(
std::move(exporter_sdk), options);

if (model->cardinality_limits != nullptr)
{
OTEL_INTERNAL_LOG_WARN("cardinality limits not supported, ignoring");
opentelemetry::sdk::metrics::CardinalityLimitOptions cardinality_options;
cardinality_options.default_limit = model->cardinality_limits->default_limit;
cardinality_options.counter = model->cardinality_limits->counter;
cardinality_options.gauge = model->cardinality_limits->gauge;
cardinality_options.histogram = model->cardinality_limits->histogram;
cardinality_options.observable_counter = model->cardinality_limits->observable_counter;
cardinality_options.observable_gauge = model->cardinality_limits->observable_gauge;
cardinality_options.observable_up_down_counter =
model->cardinality_limits->observable_up_down_counter;
cardinality_options.up_down_counter = model->cardinality_limits->up_down_counter;
sdk->SetCardinalityLimitOptions(cardinality_options);
}

sdk = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(
std::move(exporter_sdk), options);

return sdk;
}

Expand All @@ -1496,7 +1506,17 @@ std::unique_ptr<opentelemetry::sdk::metrics::MetricReader> SdkBuilder::CreatePul

if (model->cardinality_limits != nullptr)
{
OTEL_INTERNAL_LOG_WARN("cardinality limits not supported, ignoring");
opentelemetry::sdk::metrics::CardinalityLimitOptions cardinality_options;
cardinality_options.default_limit = model->cardinality_limits->default_limit;
cardinality_options.counter = model->cardinality_limits->counter;
cardinality_options.gauge = model->cardinality_limits->gauge;
cardinality_options.histogram = model->cardinality_limits->histogram;
cardinality_options.observable_counter = model->cardinality_limits->observable_counter;
cardinality_options.observable_gauge = model->cardinality_limits->observable_gauge;
cardinality_options.observable_up_down_counter =
model->cardinality_limits->observable_up_down_counter;
cardinality_options.up_down_counter = model->cardinality_limits->up_down_counter;
sdk->SetCardinalityLimitOptions(cardinality_options);
}

return sdk;
Expand Down Expand Up @@ -1600,6 +1620,20 @@ void SdkBuilder::AddView(

sdk_aggregation_config = CreateAggregationConfig(stream->aggregation, sdk_aggregation_type);

// Apply aggregation_cardinality_limit from the view stream configuration
if (stream->aggregation_cardinality_limit != 0)
{
if (sdk_aggregation_config)
{
sdk_aggregation_config->cardinality_limit_ = stream->aggregation_cardinality_limit;
}
else
{
sdk_aggregation_config = std::make_shared<opentelemetry::sdk::metrics::AggregationConfig>(
stream->aggregation_cardinality_limit);
}
}

std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor> sdk_attribute_processor;

if (stream->attribute_keys != nullptr)
Expand Down
46 changes: 44 additions & 2 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include <cstddef>
#include <cstdint>
#include <functional>
#include <mutex>
Expand All @@ -22,6 +23,7 @@
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/instrumentationscope/scope_configurator.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h"
#include "opentelemetry/sdk/metrics/async_instruments.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/instruments.h"
Expand Down Expand Up @@ -542,14 +544,34 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
else
{
WarnOnDuplicateInstrument(GetInstrumentationScope(), storage_registry_, view_instr_desc);

// Determine effective aggregation config with reader-level fallback
const AggregationConfig *effective_config = view.GetAggregationConfig();
std::shared_ptr<AggregationConfig> reader_config;

// If the view does not specify a cardinality limit, use the MetricReader-level default
if (!effective_config)
{
auto ctx_ptr = meter_context_.lock();
if (ctx_ptr)
{
size_t reader_limit = ctx_ptr->GetReaderCardinalityLimit(instrument_descriptor.type_);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks per-reader semantics. Example: reader A has limit 10, reader B has limit 1000. The storage uses 1000, so reader A can export far more series than configured. This is also a heap/memory regression for low-limit readers.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible fix is to avoid resolving reader-level limits into a single storage-level config. The view-level limit can stay on the shared stream/storage, but the MetricReader fallback should be applied per collector/reader, since each reader may have a different configured limit. So instead of taking the max across readers, the collection path likely needs to use the current CollectorHandle's GetCardinalityLimit(instrument_type) when no view-level limit exists.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @lalitb, thanks! You're right, resolving the reader-level fallback into shared storage breaks the per-reader semantics you described.

I've removed that logic here and reverted the storage constructors. For now, this PR only enforces view-level aggregation_cardinality_limit (plus the existing SDK default of 2000). Reader-level limits are still parsed, but not enforced.

I agree the right place to apply the MetricReader fallback is in the collection path on a per-CollectorHandle basis when no view-level limit exists. Since that's a more involved change, I'd rather tackle it in a follow-up PR.

Also fixed the IWYU warning in the latest commit.

if (reader_limit != 0)
{
reader_config = std::make_shared<AggregationConfig>(reader_limit);
effective_config = reader_config.get();
}
}
}

sync_storage = std::shared_ptr<SyncMetricStorage>(new SyncMetricStorage(
view_instr_desc, view.GetAggregationType(), view.GetAttributesProcessor(),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_filter_type,
GetExemplarReservoir(view.GetAggregationType(), view.GetAggregationConfig(),
view_instr_desc),
#endif
view.GetAggregationConfig()));
effective_config, std::move(reader_config)));
storage_registry_.insert({view_instr_desc, sync_storage});
}
auto sync_multi_storage = static_cast<SyncMultiMetricStorage *>(storages.get());
Expand Down Expand Up @@ -615,14 +637,34 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
else
{
WarnOnDuplicateInstrument(GetInstrumentationScope(), storage_registry_, view_instr_desc);

// Determine effective aggregation config with reader-level fallback
const AggregationConfig *effective_config = view.GetAggregationConfig();
std::shared_ptr<AggregationConfig> reader_config;

// If the view does not specify a cardinality limit, use the MetricReader-level default
if (!effective_config)
{
auto ctx_ptr = meter_context_.lock();
if (ctx_ptr)
{
size_t reader_limit = ctx_ptr->GetReaderCardinalityLimit(instrument_descriptor.type_);
if (reader_limit != 0)
{
reader_config = std::make_shared<AggregationConfig>(reader_limit);
effective_config = reader_config.get();
}
}
}

async_storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_filter_type,
GetExemplarReservoir(view.GetAggregationType(), view.GetAggregationConfig(),
view_instr_desc),
#endif
view.GetAggregationConfig()));
effective_config, std::move(reader_config)));
storage_registry_.insert({view_instr_desc, async_storage});
}
auto async_multi_storage = static_cast<AsyncMultiMetricStorage *>(storages.get());
Expand Down
16 changes: 16 additions & 0 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <ostream>
Expand All @@ -18,6 +19,7 @@
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/instrumentationscope/scope_configurator.h"
#include "opentelemetry/sdk/metrics/export/metric_filter.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/meter.h"
#include "opentelemetry/sdk/metrics/meter_config.h"
#include "opentelemetry/sdk/metrics/meter_context.h"
Expand Down Expand Up @@ -91,6 +93,20 @@ nostd::span<std::shared_ptr<CollectorHandle>> MeterContext::GetCollectors() noex
return nostd::span<std::shared_ptr<CollectorHandle>>(collectors_.data(), collectors_.size());
}

size_t MeterContext::GetReaderCardinalityLimit(InstrumentType instrument_type) noexcept
{
size_t max_limit = 0;
for (const auto &collector : collectors_)
{
size_t limit = collector->GetCardinalityLimit(instrument_type);
if (limit > max_limit)
{
max_limit = limit;
}
}
return max_limit;
}

opentelemetry::common::SystemTimestamp MeterContext::GetSDKStartTime() noexcept
{
return sdk_start_ts_;
Expand Down
Loading
Loading