Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions osu.Server.QueueProcessor/QueueConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,14 @@ public class QueueConfiguration
/// Setting above 1 will allow processing in batches (see <see cref="QueueProcessor{T}.ProcessResults"/>).
/// </summary>
public int BatchSize { get; set; } = 1;

/// <summary>
/// When enabled, uses <c>BRPOP</c> to wait for items instead of polling with <c>RPOP</c>.
/// </summary>
/// <remarks>
/// <see cref="BatchSize"/> is ignored when this is enabled.
/// <see cref="TimeBetweenPolls"/> is still used as a delay for when processor is overloaded.
/// </remarks>
public bool UseBlockingPop { get; set; } = false;
}
}
41 changes: 34 additions & 7 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,20 @@

private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());

private readonly Lazy<IDatabase> blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());

/// <summary>
/// Access redis instance.
/// </summary>
protected IDatabase Redis => redis.Value;

/// <summary>
/// Allows access to a separate redis instance (connection) dedicated to blocking calls.
/// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>.
/// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
/// </summary>
private IDatabase BlockingRedis => blockingRedis.Value;

Check failure on line 63 in osu.Server.QueueProcessor/QueueProcessor.cs

View workflow job for this annotation

GitHub Actions / Code Quality

Name 'BlockingRedis' does not match rule 'private properties'. Suggested name is 'blockingRedis'. in osu.Server.QueueProcessor\QueueProcessor.cs on line 63
Copy link
Copy Markdown
Member

@peppy peppy Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this, to keep things simple?

diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs
index 12be26f..aecc445 100644
--- a/osu.Server.QueueProcessor/QueueProcessor.cs
+++ b/osu.Server.QueueProcessor/QueueProcessor.cs
@@ -46,21 +46,12 @@ namespace osu.Server.QueueProcessor
 
         private readonly QueueConfiguration config;
 
-        private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
-        private readonly Lazy<IDatabase> blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
         /// <summary>
         /// Access redis instance.
         /// </summary>
         protected IDatabase Redis => redis.Value;
 
-        /// <summary>
-        /// Allows access to a separate redis instance (connection) dedicated to blocking calls.
-        /// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>.
-        /// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
-        /// </summary>
-        private IDatabase BlockingRedis => blockingRedis.Value;
+        private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
 
         private long totalProcessed;
 
@@ -94,6 +85,9 @@ namespace osu.Server.QueueProcessor
         /// <param name="cancellation">An optional cancellation token.</param>
         public void Run(CancellationToken cancellation = default)
         {
+            // Open a separate redis for the queue watcher. We ensure a dedicated connection specifically for the BRPOP path, which blocks indefinitely.
+            IDatabase blockingRedis = RedisAccess.GetConnection().GetDatabase();
+
             using (SentrySdk.Init(setupSentry))
             using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
             using (var cts = new GracefulShutdownSource(cancellation))
@@ -123,7 +117,8 @@ namespace osu.Server.QueueProcessor
                                 // timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds)
                                 const string timeout = "1";
 
-                                RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout);
+                                // workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
+                                RedisResult redisResult = blockingRedis.Execute("BRPOP", QueueName, timeout);
 
                                 if (redisResult.IsNull)
                                     continue;
@@ -132,7 +127,7 @@ namespace osu.Server.QueueProcessor
                             }
                             else
                             {
-                                redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
+                                redisItems = blockingRedis.ListRightPop(QueueName, config.BatchSize);
 
                                 // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
                                 // queue doesn't exist.

It will mean we keep two connections open even in the non-blocking path, but I don't think that's an issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already made a change addressing the code quality there. I would say there is no reason to keep a separate connection open for queue processors which do not use this feature. Although arguably they are pinned to the old versions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's more for the code simplicity. If you want you can apply my patch and then make the connection only used in the blocking flow, but I'd argue it's not worth it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine now? Removed the field to simplify but kept it lazy so no unnecessary connections.


private long totalProcessed;

private long totalDequeued;
Expand Down Expand Up @@ -100,20 +109,38 @@

try
{
if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold)
// avoid processing too many items at once
if (totalInFlight >= config.MaxInFlightItems)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
}

var redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
RedisValue[] redisItems;

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
// queue doesn't exist.
if (redisItems == null)
if (config.UseBlockingPop)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
// timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds)
const string timeout = "1";

RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout);

if (redisResult.IsNull)
continue;

redisItems = [(RedisValue)redisResult[1]];
}
else
{
redisItems = Redis.ListRightPop(QueueName, config.BatchSize);

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
// queue doesn't exist.
if (redisItems == null)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
}
}

List<T> items = new List<T>();
Expand Down Expand Up @@ -260,7 +287,7 @@
public void PublishMessage<TMessage>(string channelName, TMessage message)
{
Redis.Publish(new RedisChannel(channelName, RedisChannel.PatternMode.Auto), JsonConvert.SerializeObject(message));
DogStatsd.Increment("messages_published", tags: new[] { $"channel:{channelName}", $"type:{typeof(TMessage).FullName}" });

Check notice on line 290 in osu.Server.QueueProcessor/QueueProcessor.cs

View workflow job for this annotation

GitHub Actions / Code Quality

Use collection expression in osu.Server.QueueProcessor\QueueProcessor.cs on line 290
}

/// <summary>
Expand Down
Loading