diff --git a/lib/core/services/websocket_cache_integration.dart b/lib/core/services/websocket_cache_integration.dart index 087f0bc0..43c429b5 100644 --- a/lib/core/services/websocket_cache_integration.dart +++ b/lib/core/services/websocket_cache_integration.dart @@ -349,7 +349,20 @@ class WebSocketCacheIntegration { if (state == SocketConnectionState.connected) { _logger.i( 'WebSocketCacheIntegration: Connected! Subscribing to resources...'); + // A (re)connect always yields a FRESH socket whose AnyCable subscription + // state is empty server-side. Normal reconnects go connected→reconnecting + // →connected and NEVER emit `disconnected` (see WebSocketService: a drop + // with autoReconnect calls _scheduleReconnect → `reconnecting`, not + // `disconnected`), so the disconnected-branch reset is skipped and + // `_channelConfirmed` stays stale-true. That makes `_subscribeToChannel` + // skip re-sending the channel `subscribe`, so the gateway has no + // subscription for the new connection and drops every perform (write) as + // "unknown subscription" → updates time out forever until app restart. + // Reset the handshake here so each new socket re-subscribes cleanly. + _channelConfirmed = false; _channelSubscribeSent = false; + _resourcesSubscribed = false; + _confirmedResources.clear(); _subscribeToChannel(); // Load full inventory over REST on connect. The FIRST connect (app launch // / persisted-session reopen, `droppedAt == null`) must seed — there is @@ -452,6 +465,30 @@ class WebSocketCacheIntegration { return false; } + /// Whether the RxgChannel subscription is confirmed — i.e. AnyCable will + /// accept performs (writes) on it. Writes sent while this is false are + /// silently dropped server-side as "unknown subscription". + bool get isChannelConfirmed => _channelConfirmed; + + /// Waits until the RxgChannel subscription is confirmed (kicking the + /// handshake if it hasn't been sent yet), up to [timeout]. Call this before + /// issuing a WS write so a write fired in the window between `connected` and + /// channel confirmation isn't silently dropped and left to time out. Returns + /// false if the socket is down or confirmation doesn't arrive in time. + Future ensureChannelReady({ + Duration timeout = const Duration(seconds: 8), + }) async { + if (_channelConfirmed) return true; + if (!_webSocketService.isConnected) return false; + _ensureChannelSubscription(); + final deadline = DateTime.now().add(timeout); + while (!_channelConfirmed && DateTime.now().isBefore(deadline)) { + await Future.delayed(const Duration(milliseconds: 150)); + if (!_webSocketService.isConnected) return false; + } + return _channelConfirmed; + } + bool _sendActionCableMessage(Map data) { if (!_webSocketService.isConnected) { _logger.w( diff --git a/lib/features/speed_test/data/datasources/speed_test_websocket_data_source.dart b/lib/features/speed_test/data/datasources/speed_test_websocket_data_source.dart index dce4db7f..334e7011 100644 --- a/lib/features/speed_test/data/datasources/speed_test_websocket_data_source.dart +++ b/lib/features/speed_test/data/datasources/speed_test_websocket_data_source.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:logger/logger.dart'; import 'package:rgnets_fdk/core/services/logger_service.dart'; import 'package:rgnets_fdk/core/services/websocket_cache_integration.dart'; @@ -321,26 +323,54 @@ class SpeedTestWebSocketDataSource implements SpeedTestDataSource { throw ArgumentError('Cannot update speed test result without id'); } - final response = await _webSocketService.requestActionCable( - action: 'update_resource', - resourceType: _speedTestResultResourceType, - additionalData: { - 'id': result.id, - 'params': result.toJson(), - }, - timeout: const Duration(seconds: 15), - ); + Future send() async { + final response = await _webSocketService.requestActionCable( + action: 'update_resource', + resourceType: _speedTestResultResourceType, + additionalData: { + 'id': result.id, + 'params': result.toJson(), + }, + timeout: const Duration(seconds: 15), + ); - final data = response.payload['data']; - if (data != null) { - return SpeedTestResult.fromJsonWithValidation( - Map.from(data as Map), + final data = response.payload['data']; + if (data != null) { + return SpeedTestResult.fromJsonWithValidation( + Map.from(data as Map), + ); + } + + throw Exception( + response.payload['error']?.toString() ?? + 'Failed to update speed test result', ); } - throw Exception( - response.payload['error']?.toString() ?? - 'Failed to update speed test result', - ); + // The update is an AnyCable perform on the RxgChannel subscription, which + // the gateway drops as "unknown subscription" until that subscription is + // confirmed. Wait for write-readiness first so an update issued in the + // window right after (re)connect isn't silently dropped and left to time + // out — previously the user had to manually re-run the test to recover. + await _cacheIntegration.ensureChannelReady(); + + try { + return await send(); + } on TimeoutException { + // The channel may have only just (re)subscribed. Re-ensure readiness and + // retry once — update_resource is idempotent, so a retry can't duplicate + // data. If the channel still isn't ready, surface a clear reason. + _logger.w( + 'SpeedTestWebSocketDataSource: update(${result.id}) timed out; ' + 're-ensuring channel readiness and retrying once', + ); + final ready = await _cacheIntegration.ensureChannelReady(); + if (!ready) { + throw TimeoutException( + 'Speed test update could not be sent: realtime channel not ready', + ); + } + return await send(); + } } } diff --git a/lib/features/speed_test/data/repositories/speed_test_repository_impl.dart b/lib/features/speed_test/data/repositories/speed_test_repository_impl.dart index 699e32d2..436b6156 100644 --- a/lib/features/speed_test/data/repositories/speed_test_repository_impl.dart +++ b/lib/features/speed_test/data/repositories/speed_test_repository_impl.dart @@ -137,15 +137,16 @@ class SpeedTestRepositoryImpl implements SpeedTestRepository { Failure _mapExceptionToFailure(Exception exception) { final message = exception.toString(); + final lower = message.toLowerCase(); - if (message.contains('not found') || message.contains('404')) { + if (lower.contains('not found') || lower.contains('404')) { return NotFoundFailure(message: message); - } else if (message.contains('not connected') || - message.contains('network')) { + } else if (lower.contains('not connected') || + lower.contains('network')) { return NetworkFailure(message: message); - } else if (message.contains('timeout')) { + } else if (lower.contains('timeout') || lower.contains('timed out')) { return TimeoutFailure(message: message); - } else if (message.contains('server') || message.contains('500')) { + } else if (lower.contains('server') || lower.contains('500')) { return ServerFailure(message: message); }