Skip to content

Wait for quiescence in MergeManyChangeSets stress tests#1100

Merged
dwcullop merged 2 commits into
reactivemarbles:mainfrom
dwcullop:fix/mergemany-stress-test-flake
May 28, 2026
Merged

Wait for quiescence in MergeManyChangeSets stress tests#1100
dwcullop merged 2 commits into
reactivemarbles:mainfrom
dwcullop:fix/mergemany-stress-test-flake

Conversation

@dwcullop
Copy link
Copy Markdown
Member

@dwcullop dwcullop commented May 27, 2026

Problem

MergeManyChangeSetsCacheSourceCompareFixture.MultiThreadedStressTest(10, 50) fails intermittently in CI with prices present in market.PricesCache.Items but missing from the live priceResults aggregator. The mechanism is the post-#1079 cache delivery model:

  • Old (Synchronize(lock) shape): AddOrUpdate did not return until every subscriber had finished processing the notification. Cache state and observed notifications were always in lockstep.
  • New (SharedDeliveryQueue shape): AddOrUpdate enqueues a notification and returns; the actual delivery runs later on whichever thread wins the drain. This is intentional - it removes the cross-cache deadlock the old shape produced - but it opens a small window between mutation and observed delivery.

A test that compares a live aggregator's view against cache.Items at assert time can see disagreement during that window. The stress fixture's add/remove streams are still in flight at the moment of _marketCache.Dispose; any per-market AddOrUpdate that lands after MergeManyChangeSets unsubscribes from that market's LatestPrices stream mutates PricesCache (so the post-hoc .Items snapshot sees it) without delivering a notification to priceResults (so the live aggregator never sees it).

The source-compare fixture had already adopted the right shape to defend against this:

var merged = source.MergeManyChangeSets(...).Publish();
var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
using var local = merged.AsAggregator();
using var connect = merged.Connect();
// ... stress ...
await cacheCompleted;
CheckResultContents(..., local);

The two sibling stress fixtures (MergeManyChangeSetsCacheFixture and MergeManyChangeSetsListFixture) did not. They subscribed two independent chains (one for the aggregator, one for the sub/unsub loop) and asserted without waiting for either to complete.

Fix

Port the source-compare pattern to both sibling fixtures:

  • Publish the merged stream so every consumer in the test shares one upstream chain.
  • Capture merged.LastOrDefaultAsync().ToTask() before connecting.
  • Place the verification aggregator on the Publish chain so it shares the completion semantics.
  • await the completion task between the stress loop and CheckResultContents.

No production code is changed. No operator semantics change.

Verification

20 consecutive runs of MergeManyChangeSetsCacheSourceCompareFixture.MultiThreadedStressTest at xUnit.MaxParallelThreads=16, zero failures. The combined run of all three stress fixtures passes 163/163 tests per iteration.

Note on SynchronizedRandomizer

The previous commit on this branch added SynchronizedRandomizer to wrap Bogus.Randomizer with explicit synchronization. That was based on a misreading: Bogus.Randomizer takes a process-wide lock on Locker.Value for every generator call (verified via IL inspection of Number(int, int)), regardless of whether localSeed is set, so the wrapper was solving a non-problem. That commit is reverted in this push.

Darrin Cullop added 2 commits May 26, 2026 22:57
MultiThreadedStressTest(10, 50) fails intermittently in CI with two prices
present in market.PricesCache.Items but missing from the live aggregator.
The two affected prices have the latest timestamps in the batch, which is
the signature of a race during high-contention production.

Bogus.Randomizer wraps System.Random. When constructed with a seed, the
randomizer stores the random in a protected localSeed field and bypasses
its internal Locker on every generator call. The test shares one seeded
Randomizer across many parallel producer threads:

- Directly via _randomizer.Number / .Bool / .TimeSpan / .Interval
- Indirectly via _marketFaker.WithSeed(_randomizer), since every
  Faker<T>.Generate call routes through the same randomizer

Concurrent calls into the underlying System.Random corrupt its internal
state, producing values inconsistent with what a serialized run would
produce. That is sufficient to explain the observed asymmetry between
the post-hoc PricesCache snapshot and the live aggregator stream.

Introduce SynchronizedRandomizer, a Randomizer subclass that replaces
the protected localSeed field with a LockedRandom (a Random subclass that
serializes every virtual method on an internal lock). The seed and method
contracts are unchanged; the wrapper only adds synchronization.

Apply it to the failing fixture. Other Randomizer uses across the test
project remain unchanged for now; they are either single-threaded or
have not exhibited flake symptoms.

Verified: 20 consecutive runs of the fixture pass at MaxParallelThreads=16,
zero failures.
The post-reactivemarbles#1079 cache delivery model decouples mutation from notification:
AddOrUpdate enqueues a notification and returns; the actual delivery to
subscribers runs later on whichever thread wins the drain. That removed
the cross-cache deadlock the old Synchronize(lock) shape produced, but it
opened a small window between mutation and observed delivery. Tests that
compare a live aggregator's view against the cache's current Items at
assert time can see disagreement during that window.

The source-compare fixture already adopted the right shape:

    var merged = source.MergeManyChangeSets(...).Publish();
    var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
    using var local = merged.AsAggregator();
    using var connect = merged.Connect();
    ...
    await cacheCompleted;
    CheckResultContents(..., local);

Port the same pattern to the cache and list MergeManyChangeSets stress
fixtures. The local aggregator now sits on the Publish chain so it shares
the completion task; the await before CheckResultContents pins the
quiescence point.

Also delete the SynchronizedRandomizer change made earlier on this branch.
Bogus.Randomizer takes a process-wide lock on Locker.Value for every
generator call regardless of whether localSeed is set, so the wrapper was
addressing a non-problem.
@dwcullop dwcullop changed the title Make MergeMany source-compare stress test deterministic Wait for quiescence in MergeManyChangeSets stress tests May 28, 2026
@dwcullop dwcullop marked this pull request as ready for review May 28, 2026 16:03
@dwcullop dwcullop merged commit 8033135 into reactivemarbles:main May 28, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants