Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions lib/core/services/websocket_cache_integration.dart
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,20 @@ class WebSocketCacheIntegration {
if (state == SocketConnectionState.connected) {
_logger.i(
'WebSocketCacheIntegration: Connected! Subscribing to resources...');
// A (re)connect always yields a FRESH socket whose AnyCable subscription
// state is empty server-side. Normal reconnects go connected→reconnecting
// →connected and NEVER emit `disconnected` (see WebSocketService: a drop
// with autoReconnect calls _scheduleReconnect → `reconnecting`, not
// `disconnected`), so the disconnected-branch reset is skipped and
// `_channelConfirmed` stays stale-true. That makes `_subscribeToChannel`
// skip re-sending the channel `subscribe`, so the gateway has no
// subscription for the new connection and drops every perform (write) as
// "unknown subscription" → updates time out forever until app restart.
// Reset the handshake here so each new socket re-subscribes cleanly.
_channelConfirmed = false;
_channelSubscribeSent = false;
_resourcesSubscribed = false;
_confirmedResources.clear();
_subscribeToChannel();
// Load full inventory over REST on connect. The FIRST connect (app launch
// / persisted-session reopen, `droppedAt == null`) must seed — there is
Expand Down Expand Up @@ -452,6 +465,30 @@ class WebSocketCacheIntegration {
return false;
}

/// Whether the RxgChannel subscription is confirmed — i.e. AnyCable will
/// accept performs (writes) on it. Writes sent while this is false are
/// silently dropped server-side as "unknown subscription".
bool get isChannelConfirmed => _channelConfirmed;

/// Waits until the RxgChannel subscription is confirmed (kicking the
/// handshake if it hasn't been sent yet), up to [timeout]. Call this before
/// issuing a WS write so a write fired in the window between `connected` and
/// channel confirmation isn't silently dropped and left to time out. Returns
/// false if the socket is down or confirmation doesn't arrive in time.
Future<bool> ensureChannelReady({
Duration timeout = const Duration(seconds: 8),
}) async {
if (_channelConfirmed) return true;
if (!_webSocketService.isConnected) return false;
_ensureChannelSubscription();
final deadline = DateTime.now().add(timeout);
while (!_channelConfirmed && DateTime.now().isBefore(deadline)) {
await Future<void>.delayed(const Duration(milliseconds: 150));
if (!_webSocketService.isConnected) return false;
}
return _channelConfirmed;
}

bool _sendActionCableMessage(Map<String, dynamic> data) {
if (!_webSocketService.isConnected) {
_logger.w(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:logger/logger.dart';
import 'package:rgnets_fdk/core/services/logger_service.dart';
import 'package:rgnets_fdk/core/services/websocket_cache_integration.dart';
Expand Down Expand Up @@ -321,26 +323,54 @@ class SpeedTestWebSocketDataSource implements SpeedTestDataSource {
throw ArgumentError('Cannot update speed test result without id');
}

final response = await _webSocketService.requestActionCable(
action: 'update_resource',
resourceType: _speedTestResultResourceType,
additionalData: {
'id': result.id,
'params': result.toJson(),
},
timeout: const Duration(seconds: 15),
);
Future<SpeedTestResult> send() async {
final response = await _webSocketService.requestActionCable(
action: 'update_resource',
resourceType: _speedTestResultResourceType,
additionalData: {
'id': result.id,
'params': result.toJson(),
},
timeout: const Duration(seconds: 15),
);

final data = response.payload['data'];
if (data != null) {
return SpeedTestResult.fromJsonWithValidation(
Map<String, dynamic>.from(data as Map),
final data = response.payload['data'];
if (data != null) {
return SpeedTestResult.fromJsonWithValidation(
Map<String, dynamic>.from(data as Map),
);
}

throw Exception(
response.payload['error']?.toString() ??
'Failed to update speed test result',
);
}

throw Exception(
response.payload['error']?.toString() ??
'Failed to update speed test result',
);
// The update is an AnyCable perform on the RxgChannel subscription, which
// the gateway drops as "unknown subscription" until that subscription is
// confirmed. Wait for write-readiness first so an update issued in the
// window right after (re)connect isn't silently dropped and left to time
// out — previously the user had to manually re-run the test to recover.
await _cacheIntegration.ensureChannelReady();

try {
return await send();
} on TimeoutException {
// The channel may have only just (re)subscribed. Re-ensure readiness and
// retry once — update_resource is idempotent, so a retry can't duplicate
// data. If the channel still isn't ready, surface a clear reason.
_logger.w(
'SpeedTestWebSocketDataSource: update(${result.id}) timed out; '
're-ensuring channel readiness and retrying once',
);
final ready = await _cacheIntegration.ensureChannelReady();
if (!ready) {
throw TimeoutException(
'Speed test update could not be sent: realtime channel not ready',
);
}
return await send();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ class SpeedTestRepositoryImpl implements SpeedTestRepository {

Failure _mapExceptionToFailure(Exception exception) {
final message = exception.toString();
final lower = message.toLowerCase();

if (message.contains('not found') || message.contains('404')) {
if (lower.contains('not found') || lower.contains('404')) {
return NotFoundFailure(message: message);
} else if (message.contains('not connected') ||
message.contains('network')) {
} else if (lower.contains('not connected') ||
lower.contains('network')) {
return NetworkFailure(message: message);
} else if (message.contains('timeout')) {
} else if (lower.contains('timeout') || lower.contains('timed out')) {
return TimeoutFailure(message: message);
} else if (message.contains('server') || message.contains('500')) {
} else if (lower.contains('server') || lower.contains('500')) {
return ServerFailure(message: message);
}

Expand Down