From 6fe52ffed0b57d037f4a5dbf52baed51c5fc82cc Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Fri, 20 Dec 2024 16:43:17 +0900 Subject: [PATCH 1/5] temp: introduce NetMQChannel --- src/Libplanet.Net/Transports/NetMQChannel.cs | 147 ++++++++ .../Transports/NetMQTransport.cs | 332 +++--------------- 2 files changed, 195 insertions(+), 284 deletions(-) create mode 100644 src/Libplanet.Net/Transports/NetMQChannel.cs diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs new file mode 100644 index 00000000000..158f92ea23c --- /dev/null +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -0,0 +1,147 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using NetMQ; +using NetMQ.Sockets; +using Serilog; + +namespace Libplanet.Net.Transports +{ + public class NetMQChannel + { + private readonly BoundPeer _peer; + private readonly Channel _requests; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly ILogger _logger; + + private DateTimeOffset _lastUpdated; + + public NetMQChannel(BoundPeer peer) + { + _peer = peer; + _requests = Channel.CreateUnbounded(); + _cancellationTokenSource = new CancellationTokenSource(); + _logger = Log.Logger + .ForContext() + .ForContext("Source", nameof(NetMQTransport)); + } + + public event EventHandler? Closed; + +#pragma warning disable SA1005, SA1515, S125 + //public event EventHandler? Faulted; +#pragma warning restore SA1005, SA1515, S125 + + public event EventHandler? Opened; + + public void Abort() + { + _cancellationTokenSource.Cancel(); + } + + public void Close() + { + _cancellationTokenSource.Cancel(); + Closed?.Invoke(this, EventArgs.Empty); + } + + public void Open() + { + TaskCreationOptions taskCreationOptions = + TaskCreationOptions.DenyChildAttach | + TaskCreationOptions.LongRunning | + TaskCreationOptions.HideScheduler; + Task.Factory.StartNew( + () => ProcessRuntime(_cancellationTokenSource.Token), + _cancellationTokenSource.Token, + taskCreationOptions, + TaskScheduler.Default); + Opened?.Invoke(this, EventArgs.Empty); + } + + public async IAsyncEnumerable SendMessageAsync( + NetMQMessage message, + int expectedResponses, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var channel = Channel.CreateUnbounded(); + await _requests.Writer.WriteAsync( + new MessageRequest( + message, + expectedResponses, + channel, + cancellationToken), + cancellationToken); + + foreach (var unused in Enumerable.Range(0, expectedResponses)) + { + // FIXME: Can be replaced with Channel.Reader.Completion? + yield return await channel.Reader.ReadAsync(cancellationToken); + } + } + + private async Task ProcessRuntime(CancellationToken ct) + { + using var dealer = new DealerSocket(); + dealer.Options.DisableTimeWait = true; + dealer.Connect(await _peer.ResolveNetMQAddressAsync()); + while (!ct.IsCancellationRequested) + { + MessageRequest req = await _requests.Reader.ReadAsync(ct); + _lastUpdated = DateTimeOffset.UtcNow; + CancellationTokenSource linked = + CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); + _logger.Debug("[NetMQChannel] Trying to send message {Message}", req.Message); + if (!dealer.TrySendMultipartMessage(req.Message)) + { + _logger.Debug( + "[NetMQChannel] Failed to send {Message} to {Peer}", + req.Message, + _peer); + continue; + } + + _logger.Debug("[NetMQChannel] Message {Message} successfully sent.", req.Message); + + foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + { + NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync( + cancellationToken: linked.Token + ); + _lastUpdated = DateTimeOffset.UtcNow; + + await req.Channel.Writer.WriteAsync(raw, linked.Token); + } + + req.Channel.Writer.Complete(); + } + } + + private readonly struct MessageRequest + { + public MessageRequest( + NetMQMessage message, + in int expectedResponses, + Channel channel, + CancellationToken cancellationToken) + { + Message = message; + ExpectedResponses = expectedResponses; + Channel = channel; + CancellationToken = cancellationToken; + } + + public NetMQMessage Message { get; } + + public int ExpectedResponses { get; } + + public Channel Channel { get; } + + public CancellationToken CancellationToken { get; } + } + } +} diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 5d31d1839a0..6d0a4fbd8bf 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -1,5 +1,6 @@ #nullable enable using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; @@ -32,11 +33,11 @@ public class NetMQTransport : ITransport private readonly HostOptions _hostOptions; private readonly MessageValidator _messageValidator; private readonly NetMQMessageCodec _messageCodec; - private readonly Channel _requests; - private readonly Task _runtimeProcessor; private readonly AsyncManualResetEvent _runningEvent; private readonly ActivitySource _activitySource; + private ConcurrentDictionary _channels; + private NetMQQueue<(AsyncManualResetEvent, NetMQMessage)>? _replyQueue; private RouterSocket? _router; @@ -47,9 +48,6 @@ public class NetMQTransport : ITransport private CancellationTokenSource _runtimeCancellationTokenSource; private CancellationTokenSource _turnCancellationTokenSource; - // Used only for logging. - private long _requestCount; - private long _socketCount; private bool _disposed = false; /// @@ -76,7 +74,6 @@ private NetMQTransport( .ForContext() .ForContext("Source", nameof(NetMQTransport)); - _socketCount = 0; _privateKey = privateKey; _hostOptions = hostOptions; _appProtocolVersionOptions = appProtocolVersionOptions; @@ -84,35 +81,10 @@ private NetMQTransport( _appProtocolVersionOptions, messageTimestampBuffer); _messageCodec = new NetMQMessageCodec(); - _requests = Channel.CreateUnbounded(); _runtimeCancellationTokenSource = new CancellationTokenSource(); _turnCancellationTokenSource = new CancellationTokenSource(); _activitySource = new ActivitySource("Libplanet.Net.Transports.NetMQTransport"); - _requestCount = 0; - CancellationToken runtimeCt = _runtimeCancellationTokenSource.Token; - _runtimeProcessor = Task.Factory.StartNew( - () => - { - // Ignore NetMQ related exceptions during NetMQRuntime.Dispose() to stabilize - // tests - try - { - using var runtime = new NetMQRuntime(); - runtime.Run(ProcessRuntime(runtimeCt)); - } - catch (Exception e) - when (e is NetMQException || e is ObjectDisposedException) - { - _logger.Error( - e, - "An exception has occurred while running {TaskName}", - nameof(_runtimeProcessor)); - } - }, - runtimeCt, - TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, - TaskScheduler.Default - ); + _channels = new ConcurrentDictionary(); _runningEvent = new AsyncManualResetEvent(); ProcessMessageHandler = new AsyncDelegate(); @@ -191,6 +163,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) throw new TransportException("Transport is already running."); } + _channels = new ConcurrentDictionary(); _runtimeCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _turnCancellationTokenSource = @@ -239,6 +212,11 @@ public async Task StopAsync( _routerPoller.Stop(); } + foreach (var channel in _channels.Values) + { + channel.Abort(); + } + _replyQueue.Dispose(); _runtimeCancellationTokenSource.Cancel(); @@ -256,10 +234,8 @@ public void Dispose() if (!_disposed) { - _requests.Writer.TryComplete(); _runtimeCancellationTokenSource.Cancel(); _turnCancellationTokenSource.Cancel(); - _runtimeProcessor.WaitWithoutException(); _runtimeCancellationTokenSource.Dispose(); _turnCancellationTokenSource.Dispose(); @@ -272,6 +248,11 @@ public void Dispose() _turnClient?.Dispose(); } + foreach (var channel in _channels.Values) + { + channel.Abort(); + } + _routerPoller?.Dispose(); _disposed = true; @@ -338,11 +319,8 @@ CancellationToken cancellationToken Guid reqId = Guid.NewGuid(); var replies = new List(); - Channel channel = Channel.CreateUnbounded(); - try { - DateTimeOffset now = DateTimeOffset.UtcNow; NetMQMessage rawMessage = _messageCodec.Encode( content, _privateKey, @@ -350,39 +328,31 @@ CancellationToken cancellationToken AsPeer, DateTimeOffset.UtcNow ); - var req = new MessageRequest( - reqId, - rawMessage, - peer, - now, - expectedResponses, - channel, - linkedCt); - Interlocked.Increment(ref _requestCount); - await _requests.Writer.WriteAsync( - req, - linkedCt).ConfigureAwait(false); - _logger.Verbose( - "Enqueued a request {RequestId} to the peer {Peer}: {@Message}; " + - "{LeftRequests} left", - reqId, - peer, - content, - Interlocked.Read(ref _requestCount) - ); - foreach (var i in Enumerable.Range(0, expectedResponses)) + NetMQChannel channel; + if (_channels.TryGetValue(peer, out var c)) + { + channel = c!; + } + else + { + channel = new NetMQChannel(peer); + channel.Open(); + _channels[peer] = channel; + } + + await foreach (var raw in channel.SendMessageAsync( + rawMessage, + expectedResponses, + linkedCt)) { - NetMQMessage raw = await channel.Reader - .ReadAsync(linkedCt) - .ConfigureAwait(false); Message reply = _messageCodec.Decode(raw, true); _logger.Information( "Received {Reply} as a reply to request {Request} {RequestId} from {Peer}", reply.Content.Type, content.Type, - req.Id, + reqId, reply.Remote); try { @@ -400,8 +370,8 @@ await _requests.Writer.WriteAsync( reply.Content.Type, reply.Remote, content.Type, - req.Id); - channel.Writer.Complete(imte); + reqId); + linkedCts.Cancel(); } catch (DifferentAppProtocolVersionException dapve) { @@ -414,8 +384,8 @@ await _requests.Writer.WriteAsync( reply.Content.Type, reply.Remote, content.Type, - req.Id); - channel.Writer.Complete(dapve); + reqId); + linkedCts.Cancel(); } replies.Add(reply); @@ -457,7 +427,12 @@ await _requests.Writer.WriteAsync( "{MethodName}() was cancelled while waiting for a reply to " + "{Content} {RequestId} from {Peer}"; _logger.Debug( - oce2, dbgMsg, nameof(SendMessageAsync), content, reqId, peer); + oce2, + dbgMsg, + nameof(SendMessageAsync), + content, + reqId, + peer); a?.SetStatus(ActivityStatusCode.Error); a?.AddTag("Exception", nameof(TaskCanceledException)); @@ -477,15 +452,16 @@ await _requests.Writer.WriteAsync( "{MethodName}() encountered an unexpected exception while waiting for " + "a reply to {Content} {RequestId} from {Peer}"; _logger.Error( - e, errMsg, nameof(SendMessageAsync), content, reqId, peer.Address); + e, + errMsg, + nameof(SendMessageAsync), + content, + reqId, + peer.Address); a?.SetStatus(ActivityStatusCode.Error); a?.AddTag("Exception", e.GetType().ToString()); throw; } - finally - { - channel.Writer.TryComplete(); - } } /// @@ -741,183 +717,6 @@ private void DoReply( ev.Set(); } - private async Task ProcessRuntime(CancellationToken cancellationToken) - { - const string waitMsg = "Waiting for a new request..."; - ChannelReader reader = _requests.Reader; -#if NETCOREAPP3_0 || NETCOREAPP3_1 || NET - _logger.Verbose(waitMsg); - await foreach (MessageRequest req in reader.ReadAllAsync(cancellationToken)) - { -#else - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - _logger.Verbose(waitMsg); - MessageRequest req = await reader.ReadAsync(cancellationToken); -#endif - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); - long left = Interlocked.Decrement(ref _requestCount); - _logger.Debug( - "Request {Message} {RequestId} taken for processing; {Count} requests left", - messageType, - req.Id, - left); - - _ = SynchronizationContext.Current.PostAsync( - () => ProcessRequest(req, req.CancellationToken) - ); - -#if NETCOREAPP3_0 || NETCOREAPP3_1 || NET - _logger.Verbose(waitMsg); -#endif - } - } - - private async Task ProcessRequest(MessageRequest req, CancellationToken cancellationToken) - { - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - - _logger.Debug( - "Request {Message} {RequestId} is ready to be processed in {TimeSpan}", - messageType, - req.Id, - DateTimeOffset.UtcNow - req.RequestedTime); - - Channel channel = req.Channel; - - _logger.Debug( - "Trying to send request {Message} {RequestId} to {Peer}", - messageType, - req.Id, - req.Peer - ); - int receivedCount = 0; - long? incrementedSocketCount = null; - - // Normal OperationCanceledException initiated from outside should bubble up. - try - { - cancellationToken.ThrowIfCancellationRequested(); - - using var dealer = new DealerSocket(); - dealer.Options.DisableTimeWait = true; - dealer.Options.Identity = req.Id.ToByteArray(); - try - { - _logger.Debug( - "Trying to connect to {Peer} for request {Message} {RequestId}", - req.Peer, - messageType, - req.Id); - dealer.Connect(await req.Peer.ResolveNetMQAddressAsync()); - incrementedSocketCount = Interlocked.Increment(ref _socketCount); - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "SocketCount") - .Debug( - "{SocketCount} sockets open for processing request " + - "{Message} {RequestId}", - incrementedSocketCount, - messageType, - req.Id); - } - catch (NetMQException nme) - { - const string logMsg = - "{SocketCount} sockets open for processing requests; " + - "failed to create an additional socket for request {Message} {RequestId}"; - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "SocketCount") - .Debug( - nme, - logMsg, - Interlocked.Read(ref _socketCount), - messageType, - req.Id); - throw; - } - - if (dealer.TrySendMultipartMessage(req.Message)) - { - _logger.Debug( - "Request {RequestId} {Message} sent to {Peer}", - req.Id, - messageType, - req.Peer); - } - else - { - _logger.Debug( - "Failed to send {RequestId} {Message} to {Peer}", - req.Id, - messageType, - req.Peer); - - throw new SendMessageFailException( - $"Failed to send {messageType} to {req.Peer}.", - req.Peer); - } - - foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) - { - NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync( - cancellationToken: cancellationToken - ); - - _logger.Verbose( - "Received a raw message with {FrameCount} frames as a reply to " + - "request {RequestId} from {Peer}", - raw.FrameCount, - req.Id, - req.Peer - ); - await channel.Writer.WriteAsync(raw, cancellationToken); - receivedCount += 1; - } - - channel.Writer.Complete(); - } - catch (Exception e) - { - _logger.Error( - e, - "Failed to process {Message} {RequestId}; discarding it", - messageType, - req.Id); - channel.Writer.TryComplete(e); - } - finally - { - if (req.ExpectedResponses == 0) - { - // FIXME: Temporary fix to wait for a message to be sent. - await Task.Delay(1000); - } - - if (incrementedSocketCount is { }) - { - Interlocked.Decrement(ref _socketCount); - } - - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "OutboundMessageReport") - .Information( - "Request {RequestId} {Message} " + - "processed in {DurationMs} ms with {ReceivedCount} replies received " + - "out of {ExpectedCount} expected replies", - req.Id, - messageType, - stopwatch.ElapsedMilliseconds, - receivedCount, - req.ExpectedResponses); - } - } - private async Task RunPoller(NetMQPoller poller) { TaskCreationOptions taskCreationOptions = @@ -972,40 +771,5 @@ Guid reqId innerException ); } - - private readonly struct MessageRequest - { - public MessageRequest( - in Guid id, - NetMQMessage message, - BoundPeer peer, - DateTimeOffset requestedTime, - in int expectedResponses, - Channel channel, - CancellationToken cancellationToken) - { - Id = id; - Message = message; - Peer = peer; - RequestedTime = requestedTime; - ExpectedResponses = expectedResponses; - Channel = channel; - CancellationToken = cancellationToken; - } - - public Guid Id { get; } - - public NetMQMessage Message { get; } - - public BoundPeer Peer { get; } - - public DateTimeOffset RequestedTime { get; } - - public int ExpectedResponses { get; } - - public Channel Channel { get; } - - public CancellationToken CancellationToken { get; } - } } } From 5803b75e3b4b4211e4ea6601425bb31336c0001c Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Fri, 20 Dec 2024 17:48:46 +0900 Subject: [PATCH 2/5] temp: remove all async dealermethods --- src/Libplanet.Net/Transports/NetMQChannel.cs | 30 +++++++++++++++---- .../Transports/NetMQTransport.cs | 1 + 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs index 158f92ea23c..c3165a2ad97 100644 --- a/src/Libplanet.Net/Transports/NetMQChannel.cs +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -65,6 +65,7 @@ public void Open() public async IAsyncEnumerable SendMessageAsync( NetMQMessage message, + TimeSpan? timeout, int expectedResponses, [EnumeratorCancellation] CancellationToken cancellationToken) { @@ -72,6 +73,7 @@ public async IAsyncEnumerable SendMessageAsync( await _requests.Writer.WriteAsync( new MessageRequest( message, + timeout, expectedResponses, channel, cancellationToken), @@ -88,14 +90,19 @@ private async Task ProcessRuntime(CancellationToken ct) { using var dealer = new DealerSocket(); dealer.Options.DisableTimeWait = true; - dealer.Connect(await _peer.ResolveNetMQAddressAsync()); + var address = await _peer.ResolveNetMQAddressAsync(); + _logger.Debug("[NetMQChannel] Connecting {Address}", address); + dealer.Connect(address); while (!ct.IsCancellationRequested) { MessageRequest req = await _requests.Reader.ReadAsync(ct); _lastUpdated = DateTimeOffset.UtcNow; CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); - _logger.Debug("[NetMQChannel] Trying to send message {Message}", req.Message); + _logger.Debug( + "[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})", + req.Message, + req.ExpectedResponses); if (!dealer.TrySendMultipartMessage(req.Message)) { _logger.Debug( @@ -109,9 +116,18 @@ private async Task ProcessRuntime(CancellationToken ct) foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) { - NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync( - cancellationToken: linked.Token - ); + _logger.Debug( + "[NetMQChannel] Waiting for replies... (#{Index})", i); + var raw = new NetMQMessage(); + if (!dealer.TryReceiveMultipartMessage( + req.Timeout ?? TimeSpan.FromSeconds(1), + ref raw)) + { + break; + } + + _logger.Debug( + "[NetMQChannel] Successfully received replies #{Index}", i); _lastUpdated = DateTimeOffset.UtcNow; await req.Channel.Writer.WriteAsync(raw, linked.Token); @@ -125,11 +141,13 @@ private readonly struct MessageRequest { public MessageRequest( NetMQMessage message, + TimeSpan? timeout, in int expectedResponses, Channel channel, CancellationToken cancellationToken) { Message = message; + Timeout = timeout; ExpectedResponses = expectedResponses; Channel = channel; CancellationToken = cancellationToken; @@ -137,6 +155,8 @@ public MessageRequest( public NetMQMessage Message { get; } + public TimeSpan? Timeout { get; } + public int ExpectedResponses { get; } public Channel Channel { get; } diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 6d0a4fbd8bf..3f6159f3172 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -343,6 +343,7 @@ CancellationToken cancellationToken await foreach (var raw in channel.SendMessageAsync( rawMessage, + timeout, expectedResponses, linkedCt)) { From eec8d8200ff2f3892b269a8e2dff90830bd3bc44 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Mon, 23 Dec 2024 14:38:07 +0900 Subject: [PATCH 3/5] clean up exceptions --- .../Protocols/KademliaProtocol.cs | 36 ++++++++++--------- ...outException.cs => PingFailedException.cs} | 16 +++------ .../Protocols/TestTransport.cs | 2 +- .../SwarmTest.AppProtocolVersion.cs | 33 +++++++++-------- 4 files changed, 45 insertions(+), 42 deletions(-) rename src/Libplanet.Net/Protocols/{PingTimeoutException.cs => PingFailedException.cs} (67%) diff --git a/src/Libplanet.Net/Protocols/KademliaProtocol.cs b/src/Libplanet.Net/Protocols/KademliaProtocol.cs index 2a0e9f035ac..37b117fdb6d 100644 --- a/src/Libplanet.Net/Protocols/KademliaProtocol.cs +++ b/src/Libplanet.Net/Protocols/KademliaProtocol.cs @@ -95,7 +95,7 @@ await PingAsync(peer, dialTimeout, cancellationToken) dialTimeout, cancellationToken)); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Warning("A timeout exception occurred connecting to seed peer"); RemovePeer(peer); @@ -140,19 +140,25 @@ public async Task AddPeersAsync( var tasks = new List(); foreach (BoundPeer peer in peers) { - tasks.Add(PingAsync( - peer, - timeout: timeout, - cancellationToken: cancellationToken)); + tasks.Add( + PingAsync( + peer, + timeout: timeout, + cancellationToken: cancellationToken)); } _logger.Verbose("Trying to ping {PeerCount} peers", tasks.Count); await Task.WhenAll(tasks).ConfigureAwait(false); _logger.Verbose("Update complete"); } - catch (PingTimeoutException e) + catch (PingFailedException pfe) { - _logger.Debug(e, "Ping timed out"); + if (pfe.InnerException is { } e) + { + throw e; + } + + throw; } catch (TaskCanceledException e) { @@ -284,7 +290,7 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken await PingAsync(replacement, _requestTimeout, cancellationToken) .ConfigureAwait(false); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Verbose( "Removed stale peer {Peer} from replacement cache", @@ -327,7 +333,7 @@ await PingAsync(replacement, _requestTimeout, cancellationToken) await PingAsync(boundPeer, _requestTimeout, cancellationToken) .ConfigureAwait(false); } - catch (PingTimeoutException) + catch (PingFailedException) { var msg = "{BoundPeer}, a target peer, is in the routing table does not respond"; @@ -394,7 +400,7 @@ await PingAsync(found, _requestTimeout, cancellationToken) throw new TaskCanceledException( $"Task is cancelled during {nameof(FindSpecificPeerAsync)}()"); } - catch (PingTimeoutException) + catch (PingFailedException) { // Ignore peer not responding } @@ -439,11 +445,9 @@ internal async Task PingAsync( AddPeer(peer); } - catch (CommunicationFailException) + catch (Exception e) { - throw new PingTimeoutException( - $"Failed to send Ping to {peer}.", - peer); + throw new PingFailedException(peer, e); } } @@ -497,7 +501,7 @@ private async Task ValidateAsync( await PingAsync(peer, timeout, cancellationToken).ConfigureAwait(false); _table.Check(peer, check, DateTimeOffset.UtcNow); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Verbose("Removing invalid peer {Peer}...", peer); RemovePeer(peer); @@ -711,7 +715,7 @@ private async Task ProcessFoundAsync( AggregateException aggregateException = aggregateTask.Exception!; foreach (Exception e in aggregateException.InnerExceptions) { - if (e is PingTimeoutException pte) + if (e is PingFailedException pte) { peers.Remove(pte.Target); } diff --git a/src/Libplanet.Net/Protocols/PingTimeoutException.cs b/src/Libplanet.Net/Protocols/PingFailedException.cs similarity index 67% rename from src/Libplanet.Net/Protocols/PingTimeoutException.cs rename to src/Libplanet.Net/Protocols/PingFailedException.cs index ec40b608c44..576d366de2f 100644 --- a/src/Libplanet.Net/Protocols/PingTimeoutException.cs +++ b/src/Libplanet.Net/Protocols/PingFailedException.cs @@ -4,27 +4,21 @@ namespace Libplanet.Net.Protocols { [Serializable] - public class PingTimeoutException : TimeoutException + public class PingFailedException : Exception { - public PingTimeoutException(BoundPeer target) - : base() + public PingFailedException(BoundPeer target, Exception innerException) + : base($"Failed to send ping to target peer {target}", innerException) { Target = target; } - public PingTimeoutException(string message, BoundPeer target) - : base(message) - { - Target = target; - } - - public PingTimeoutException(string message, BoundPeer target, Exception innerException) + public PingFailedException(string message, BoundPeer target, Exception innerException) : base(message, innerException) { Target = target; } - protected PingTimeoutException(SerializationInfo info, StreamingContext context) + protected PingFailedException(SerializationInfo info, StreamingContext context) : base(info, context) { Target = info.GetValue(nameof(Target), typeof(BoundPeer)) is BoundPeer target diff --git a/test/Libplanet.Net.Tests/Protocols/TestTransport.cs b/test/Libplanet.Net.Tests/Protocols/TestTransport.cs index 730e94d32fb..33a82ab379d 100644 --- a/test/Libplanet.Net.Tests/Protocols/TestTransport.cs +++ b/test/Libplanet.Net.Tests/Protocols/TestTransport.cs @@ -255,7 +255,7 @@ async Task DoAddPeersAsync() "Different version encountered during {MethodName}()", nameof(AddPeersAsync)); } - catch (PingTimeoutException) + catch (PingFailedException) { var msg = $"Timeout occurred during {nameof(AddPeersAsync)}() after {timeout}"; diff --git a/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs b/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs index 8e39c2f777b..b179bd7ebb0 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs @@ -31,13 +31,12 @@ public async Task DetectAppProtocolVersion() await StartAsync(c); await StartAsync(d); - var peers = new[] { c.AsPeer, d.AsPeer }; - - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.AddPeersAsync(new[] { c.AsPeer }, null); + await Assert.ThrowsAsync( + () => a.AddPeersAsync(new[] { d.AsPeer }, null)); + await Assert.ThrowsAsync( + () => b.AddPeersAsync(new[] { c.AsPeer }, null)); + await b.AddPeersAsync(new[] { d.AsPeer }, null); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); @@ -163,14 +162,20 @@ AppProtocolVersion localVersion await StartAsync(f); await a.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1)); - - await b.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync( + () => a.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync( + () => a.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync( + () => a.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1))); + + await Assert.ThrowsAsync( + () => b.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1))); await b.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1)); - await b.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1)); - await b.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync( + () => b.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync( + () => b.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1))); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); From 9f747b2ca24ebd8b4688717d5d9389c4bb0c2b3e Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Tue, 24 Dec 2024 18:09:53 +0900 Subject: [PATCH 4/5] add test and logging --- src/Libplanet.Net/Transports/NetMQChannel.cs | 129 +++++++++++------- .../Transports/NetMQTransport.cs | 25 +++- .../Transports/NetMQChannelTest.cs | 121 ++++++++++++++++ 3 files changed, 224 insertions(+), 51 deletions(-) create mode 100644 test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs index c3165a2ad97..42fd5fbc380 100644 --- a/src/Libplanet.Net/Transports/NetMQChannel.cs +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -19,6 +19,7 @@ public class NetMQChannel private readonly ILogger _logger; private DateTimeOffset _lastUpdated; + private bool _opened; public NetMQChannel(BoundPeer peer) { @@ -32,35 +33,38 @@ public NetMQChannel(BoundPeer peer) public event EventHandler? Closed; -#pragma warning disable SA1005, SA1515, S125 - //public event EventHandler? Faulted; -#pragma warning restore SA1005, SA1515, S125 + public event EventHandler? Faulted; public event EventHandler? Opened; public void Abort() { + if (!_opened) + { + throw new InvalidOperationException("Cannot abort an unopened channel."); + } + + _opened = false; _cancellationTokenSource.Cancel(); } public void Close() { + if (!_opened) + { + throw new InvalidOperationException("Cannot close an unopened channel."); + } + + _opened = false; _cancellationTokenSource.Cancel(); Closed?.Invoke(this, EventArgs.Empty); } public void Open() { - TaskCreationOptions taskCreationOptions = - TaskCreationOptions.DenyChildAttach | - TaskCreationOptions.LongRunning | - TaskCreationOptions.HideScheduler; - Task.Factory.StartNew( - () => ProcessRuntime(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, - taskCreationOptions, - TaskScheduler.Default); + _opened = true; Opened?.Invoke(this, EventArgs.Empty); + DoOpen(); } public async IAsyncEnumerable SendMessageAsync( @@ -69,6 +73,12 @@ public async IAsyncEnumerable SendMessageAsync( int expectedResponses, [EnumeratorCancellation] CancellationToken cancellationToken) { + if (!_opened) + { + throw new InvalidOperationException( + "Cannot send message with an unopened channel."); + } + var channel = Channel.CreateUnbounded(); await _requests.Writer.WriteAsync( new MessageRequest( @@ -86,57 +96,84 @@ await _requests.Writer.WriteAsync( } } - private async Task ProcessRuntime(CancellationToken ct) + private void DoOpen() { + TaskCreationOptions taskCreationOptions = + TaskCreationOptions.DenyChildAttach | + TaskCreationOptions.LongRunning | + TaskCreationOptions.HideScheduler; + Task.Factory.StartNew( + ProcessRuntime, + _cancellationTokenSource.Token, + taskCreationOptions, + TaskScheduler.Default); + } + + private async Task ProcessRuntime() + { + var ct = _cancellationTokenSource.Token; using var dealer = new DealerSocket(); dealer.Options.DisableTimeWait = true; var address = await _peer.ResolveNetMQAddressAsync(); - _logger.Debug("[NetMQChannel] Connecting {Address}", address); - dealer.Connect(address); + try + { + dealer.Connect(address); + } + catch (Exception e) + { + Faulted?.Invoke(this, e); + Close(); + } + while (!ct.IsCancellationRequested) { MessageRequest req = await _requests.Reader.ReadAsync(ct); - _lastUpdated = DateTimeOffset.UtcNow; - CancellationTokenSource linked = - CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); - _logger.Debug( - "[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})", - req.Message, - req.ExpectedResponses); - if (!dealer.TrySendMultipartMessage(req.Message)) - { - _logger.Debug( - "[NetMQChannel] Failed to send {Message} to {Peer}", - req.Message, - _peer); - continue; - } - - _logger.Debug("[NetMQChannel] Message {Message} successfully sent.", req.Message); - - foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + try { - _logger.Debug( - "[NetMQChannel] Waiting for replies... (#{Index})", i); - var raw = new NetMQMessage(); - if (!dealer.TryReceiveMultipartMessage( - req.Timeout ?? TimeSpan.FromSeconds(1), - ref raw)) + _lastUpdated = DateTimeOffset.UtcNow; + CancellationTokenSource linked = + CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); + if (!dealer.TrySendMultipartMessage(req.Message)) { + _requests.Writer.Complete(); + dealer.Close(); + DoOpen(); break; } - _logger.Debug( - "[NetMQChannel] Successfully received replies #{Index}", i); - _lastUpdated = DateTimeOffset.UtcNow; + foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + { + var raw = new NetMQMessage(); + if (!dealer.TryReceiveMultipartMessage( + req.Timeout ?? TimeSpan.FromSeconds(1), + ref raw)) + { + break; + } - await req.Channel.Writer.WriteAsync(raw, linked.Token); - } + _lastUpdated = DateTimeOffset.UtcNow; + + await req.Channel.Writer.WriteAsync(raw, linked.Token); + } - req.Channel.Writer.Complete(); + req.Channel.Writer.Complete(); + } + catch (Exception) + { + req.Channel.Writer.Complete(); + dealer.Close(); + DoOpen(); + break; + } } } + private bool HandShake(DealerSocket dealerSocket) + { + var msg = default(Msg); + return dealerSocket.TrySend(ref msg, TimeSpan.Zero, false); + } + private readonly struct MessageRequest { public MessageRequest( diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 3f6159f3172..28a0e5ec2cf 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -328,6 +328,8 @@ CancellationToken cancellationToken AsPeer, DateTimeOffset.UtcNow ); + var stopwatch = new Stopwatch(); + stopwatch.Start(); NetMQChannel channel; if (_channels.TryGetValue(peer, out var c)) @@ -341,11 +343,12 @@ CancellationToken cancellationToken _channels[peer] = channel; } - await foreach (var raw in channel.SendMessageAsync( - rawMessage, - timeout, - expectedResponses, - linkedCt)) + await foreach ( + var raw in channel.SendMessageAsync( + rawMessage, + timeout, + expectedResponses, + linkedCt)) { Message reply = _messageCodec.Decode(raw, true); @@ -400,6 +403,18 @@ CancellationToken cancellationToken reqId, peer, replies.Select(reply => reply.Content.Type)); + _logger + .ForContext("Tag", "Metric") + .ForContext("Subtag", "OutboundMessageReport") + .Information( + "Request {RequestId} {Message} " + + "processed in {DurationMs} ms with {ReceivedCount} replies received " + + "out of {ExpectedCount} expected replies", + reqId, + content.Type, + stopwatch.ElapsedMilliseconds, + replies.Count, + expectedResponses); a?.SetStatus(ActivityStatusCode.Ok); return replies; } diff --git a/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs new file mode 100644 index 00000000000..0065a0e3759 --- /dev/null +++ b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs @@ -0,0 +1,121 @@ +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Libplanet.Crypto; +using Libplanet.Net.Messages; +using Libplanet.Net.Options; +using Libplanet.Net.Transports; +using Xunit; + +namespace Libplanet.Net.Tests.Transports +{ + // Test uses NetMQTransport as BoundPeer, which can be replaced with simple RouterSocket. + public class NetMQChannelTest + { + private const int Timeout = 60 * 1000; + + [Fact] + public void Abort() + { + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + Assert.Throws(() => channel.Abort()); + channel.Open(); + channel.Abort(); + Assert.True(true); + } + + [Fact] + public void Close() + { + var closed = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Closed += (_, _) => closed = true; + Assert.Throws(() => channel.Close()); + channel.Open(); + channel.Close(); + Assert.True(closed); + } + + [Fact] + public void Open() + { + var opened = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Opened += (_, _) => opened = true; + channel.Open(); + Assert.True(opened); + } + + [Fact] + public async Task Faulted() + { + var faulted = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Faulted += (_, _) => faulted = true; + channel.Open(); + await Task.Delay(100); + Assert.True(faulted); + } + + [Fact(Timeout = Timeout)] + public async Task SendMessageAsync() + { + var received = false; + var receiverKey = new PrivateKey(); + var transport = await NetMQTransport.Create( + receiverKey, + new AppProtocolVersionOptions(), + new HostOptions( + IPAddress.Loopback.ToString(), + new IceServer[] { })); + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + transport.ProcessMessageHandler.Register( + async msg => + { + received = msg.Content is PingMsg; + await transport.ReplyMessageAsync( + new PongMsg(), + msg.Identity!, + CancellationToken.None); + }); + var channel = new NetMQChannel(transport.AsPeer); + channel.Open(); + var key = new PrivateKey(); + await foreach ( + var reply in channel.SendMessageAsync( + new NetMQMessageCodec().Encode( + new PingMsg(), + key, + default, + new BoundPeer( + key.PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0)), + DateTimeOffset.UtcNow), + null, + 1, + CancellationToken.None)) + { + if (new NetMQMessageCodec().Decode(reply, true).Content is PongMsg) + { + received = true; + } + } + + Assert.True(received); + } + } +} From cb5da2d51552f30f0ac908597d007224f3284a01 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Fri, 27 Dec 2024 14:53:16 +0900 Subject: [PATCH 5/5] feat: use random guid as identity --- src/Libplanet.Net/Transports/NetMQChannel.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs index 42fd5fbc380..b92b9b7657d 100644 --- a/src/Libplanet.Net/Transports/NetMQChannel.cs +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -114,6 +114,7 @@ private async Task ProcessRuntime() var ct = _cancellationTokenSource.Token; using var dealer = new DealerSocket(); dealer.Options.DisableTimeWait = true; + dealer.Options.Identity = Guid.NewGuid().ToByteArray(); var address = await _peer.ResolveNetMQAddressAsync(); try {