Skip to content
3 changes: 3 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/multi_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ class MultiSpanProcessor : public SpanProcessor
ProcessorNode *head_{nullptr};
ProcessorNode *tail_{nullptr};
size_t count_{0};

// For testing
friend class MultiSpanProcessorTestPeer;
};
} // namespace trace
} // namespace sdk
Expand Down
5 changes: 5 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/tracer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ namespace sdk
namespace trace
{

// forward declare to be able to have a raw pointer to it
class MultiSpanProcessor;

/**
* A class which stores the TracerProvider context.
*
Expand Down Expand Up @@ -129,6 +132,8 @@ class TracerContext
std::unique_ptr<IdGenerator> id_generator_;
std::unique_ptr<SpanProcessor> processor_;
std::unique_ptr<instrumentationscope::ScopeConfigurator<TracerConfig>> tracer_configurator_;
// shares the pointer with processor_ if it is a MultiSpanProcessor, null otherwise
MultiSpanProcessor *multi_processor_;
};

} // namespace trace
Expand Down
10 changes: 8 additions & 2 deletions sdk/src/trace/span.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ Span::Span(std::shared_ptr<Tracer> &&tracer,
recordable_->SetStartTime(NowOr(options.start_system_time));
start_steady_time = NowOr(options.start_steady_time);
recordable_->SetResource(tracer_->GetResource());
tracer_->GetProcessor().OnStart(*recordable_, parent_span_context);

// store the span processor, this is important to use the same processor even
// in case the tracer changes from single processor to a MultiSpanProcessor
span_processor_ = &tracer_->GetProcessor();
span_processor_->OnStart(*recordable_, parent_span_context);
}

Span::~Span()
Expand Down Expand Up @@ -203,6 +207,8 @@ void Span::End(const opentelemetry::trace::EndSpanOptions &options) noexcept
{
std::lock_guard<std::mutex> lock_guard{mu_};

// has_ended_ could be removed and instead rely on span_processor_,
// resetting it after calling OnEnd. it would save 8 bytes in each Span object.
if (has_ended_ == true)
{
return;
Expand All @@ -218,7 +224,7 @@ void Span::End(const opentelemetry::trace::EndSpanOptions &options) noexcept
recordable_->SetDuration(std::chrono::steady_clock::time_point(end_steady_time) -
std::chrono::steady_clock::time_point(start_steady_time));

tracer_->GetProcessor().OnEnd(std::move(recordable_));
span_processor_->OnEnd(std::move(recordable_));
recordable_.reset();
}

Expand Down
4 changes: 3 additions & 1 deletion sdk/src/trace/span.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "opentelemetry/common/key_value_iterable.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/sdk/trace/tracer.h"
#include "opentelemetry/trace/span.h"
Expand Down Expand Up @@ -79,11 +80,12 @@ class Span final : public opentelemetry::trace::Span
}

private:
std::shared_ptr<Tracer> tracer_;
std::shared_ptr<Tracer> tracer_; // also keeps span_processor_ alive
mutable std::mutex mu_;
std::unique_ptr<Recordable> recordable_;
opentelemetry::common::SteadyTimestamp start_steady_time;
std::unique_ptr<opentelemetry::trace::SpanContext> span_context_;
SpanProcessor *span_processor_{nullptr}; // kept alive by tracer_
bool has_ended_{false};
};
} // namespace trace
Expand Down
45 changes: 40 additions & 5 deletions sdk/src/trace/tracer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,22 @@ TracerContext::TracerContext(std::vector<std::unique_ptr<SpanProcessor>> &&proce
: resource_(resource),
sampler_(std::move(sampler)),
id_generator_(std::move(id_generator)),
processor_(std::unique_ptr<SpanProcessor>(new MultiSpanProcessor(std::move(processors)))),
tracer_configurator_(std::move(tracer_configurator))
{}
tracer_configurator_(std::move(tracer_configurator)),
multi_processor_(nullptr)
{
if (processors.empty())
{
processor_ = std::unique_ptr<SpanProcessor>(new MultiSpanProcessor(std::move(processors)));
}
else
{
// at least one processor is available here
for (auto &&processor : processors)
{
AddProcessor(std::move(processor));
}
}
}

Sampler &TracerContext::GetSampler() const noexcept
{
Expand All @@ -60,9 +73,31 @@ opentelemetry::sdk::trace::IdGenerator &TracerContext::GetIdGenerator() const no

void TracerContext::AddProcessor(std::unique_ptr<SpanProcessor> processor) noexcept
{
if (!processor)
{
return;
}

if (!processor_)
{
// this is the first processor to be added
processor_ = std::move(processor);
}
else if (multi_processor_ == nullptr)
{
// a processor exists, but it's not a MultiSpanProcessor. make a new MultiSpanProcessor
multi_processor_ = new MultiSpanProcessor({});
std::unique_ptr<MultiSpanProcessor> multi_processor(multi_processor_);
multi_processor->AddProcessor(std::move(processor_));
multi_processor->AddProcessor(std::move(processor));

auto multi_processor = static_cast<MultiSpanProcessor *>(processor_.get());
multi_processor->AddProcessor(std::move(processor));
processor_ = std::move(multi_processor);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this break spans that were already started before AddProcessor() runs? Those spans keep the recordable created by the old single processor, but after this line Span::End() will call MultiSpanProcessor::OnEnd(), which expects a MultiRecordable. Maybe promotion needs to preserve the old processor path for existing spans.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. yes I agree that it breaks currently live spans in they way you describe. unfortunate. do you have a suggestion on how to get around this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the cleanest fix is for Span to remember the processor used when it was started.

That should not require shared ownership or another heap allocation per span. It could just store a raw SpanProcessor* captured at start, then use that same processor for OnEnd(). The span already keeps the tracer/context alive, so the processor lifetime should still be covered.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hailios - Just checking in on this. I think the PR still needs a code change here before it can be approved.

The main invariant is that a span should use the same SpanProcessor for MakeRecordable(), OnStart(), and OnEnd(). Otherwise spans that were already in flight before AddProcessor() can still end through the new MultiSpanProcessor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I'm in a busy period, I'm aware of the wish to have that work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb - could you have a look at the current patch set, I've added a pointer to the Span class to keep track of its SpanProcessor?

}
else /*if (multi_processor_ != nullptr)*/
{
// already have a MultiSpanProcessor, add the processor to it
multi_processor_->AddProcessor(std::move(processor));
}
}

void TracerContext::SetTracerConfigurator(
Expand Down
112 changes: 112 additions & 0 deletions sdk/test/trace/tracer_provider_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/id_generator.h"
#include "opentelemetry/sdk/trace/multi_span_processor.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/random_id_generator.h"
#include "opentelemetry/sdk/trace/sampler.h"
Expand Down Expand Up @@ -49,6 +50,33 @@ using namespace opentelemetry::sdk::trace;
using namespace opentelemetry::sdk::resource;
using opentelemetry::sdk::instrumentationscope::ScopeConfigurator;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{
class MultiSpanProcessorTestPeer
{
public:
static std::vector<SpanProcessor *> GetProcessors(MultiSpanProcessor *multi_span_processor)
{
std::vector<SpanProcessor *> res;

MultiSpanProcessor::ProcessorNode *node = multi_span_processor->head_;
while (node != nullptr)
{
auto processor = node->value_.get();
res.emplace_back(processor);
node = node->next_;
}

return res;
}
};
} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE

TEST(TracerProvider, GetTracer)
{
std::unique_ptr<SpanProcessor> processor(new SimpleSpanProcessor(nullptr));
Expand Down Expand Up @@ -329,6 +357,90 @@ TEST(TracerProvider, GetTracerAbiv2)
}
#endif /* OPENTELEMETRY_ABI_VERSION_NO >= 2 */

// get the same processor back, not wrapped in a MultiSpanProcessor
TEST(TracerProvider, GetProcessor)
{
std::unique_ptr<SpanProcessor> processor(new SimpleSpanProcessor(nullptr));
std::vector<std::unique_ptr<SpanProcessor>> processors;
processors.push_back(std::move(processor));

std::unique_ptr<TracerContext> context1(new TracerContext(std::move(processors)));

auto &span_processor = context1->GetProcessor();

// Should be the SimpleSpanProcessor processor that was created above.
#ifdef OPENTELEMETRY_RTTI_ENABLED
auto processor_typeed = dynamic_cast<SimpleSpanProcessor *>(&span_processor);
#else
auto processor_typeed = static_cast<SimpleSpanProcessor *>(&span_processor);
#endif
ASSERT_NE(nullptr, processor_typeed);
}

// get a MultiSpanProcessor back that wraps both processors
TEST(TracerProvider, GetProcessorsTwo)
{
std::vector<SpanProcessor *> processors_raw(2);
processors_raw[0] = new SimpleSpanProcessor(nullptr); // deleted via unique_ptr
processors_raw[1] = new SimpleSpanProcessor(nullptr); // deleted via unique_ptr

std::unique_ptr<SpanProcessor> processor1(processors_raw[0]);
std::unique_ptr<SpanProcessor> processor2(processors_raw[1]);

std::vector<std::unique_ptr<SpanProcessor>> processors;
processors.push_back(std::move(processor1));
processors.push_back(std::move(processor2));

std::unique_ptr<TracerContext> context1(new TracerContext(std::move(processors)));

auto &span_processor = context1->GetProcessor();

// Should be the SimpleSpanProcessor processor that was created above.
#ifdef OPENTELEMETRY_RTTI_ENABLED
auto processor_typeed = dynamic_cast<MultiSpanProcessor *>(&span_processor);
#else
auto processor_typeed = static_cast<MultiSpanProcessor *>(&span_processor);
#endif
ASSERT_NE(nullptr, processor_typeed);

std::vector<SpanProcessor *> contained_processors =
MultiSpanProcessorTestPeer::GetProcessors(processor_typeed);
EXPECT_EQ(processors_raw, contained_processors);
}

// get a MultiSpanProcessor back that wraps all three processors
TEST(TracerProvider, GetProcessorsThree)
{
std::vector<SpanProcessor *> processors_raw(3);
processors_raw[0] = new SimpleSpanProcessor(nullptr); // deleted via unique_ptr
processors_raw[1] = new SimpleSpanProcessor(nullptr); // deleted via unique_ptr
processors_raw[2] = new SimpleSpanProcessor(nullptr); // deleted via unique_ptr

std::unique_ptr<SpanProcessor> processor1(processors_raw[0]);
std::unique_ptr<SpanProcessor> processor2(processors_raw[1]);
std::unique_ptr<SpanProcessor> processor3(processors_raw[2]);
std::vector<std::unique_ptr<SpanProcessor>> processors;
processors.push_back(std::move(processor1));
processors.push_back(std::move(processor2));
processors.push_back(std::move(processor3));

std::unique_ptr<TracerContext> context1(new TracerContext(std::move(processors)));

auto &span_processor = context1->GetProcessor();

// Should be the SimpleSpanProcessor processor that was created above.
#ifdef OPENTELEMETRY_RTTI_ENABLED
auto processor_typeed = dynamic_cast<MultiSpanProcessor *>(&span_processor);
#else
auto processor_typeed = static_cast<MultiSpanProcessor *>(&span_processor);
#endif
ASSERT_NE(nullptr, processor_typeed);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoule we check processor_typeed contains all of processor1,processor2 and processor3 here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but how?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add friend class only for test, like OtlpHttpExporterTestPeer in exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h and exporters/otlp/test/otlp_http_exporter_test.cc


std::vector<SpanProcessor *> contained_processors =
MultiSpanProcessorTestPeer::GetProcessors(processor_typeed);
EXPECT_EQ(processors_raw, contained_processors);
}

TEST(TracerProvider, Shutdown)
{
std::unique_ptr<SpanProcessor> processor(new SimpleSpanProcessor(nullptr));
Expand Down
Loading