From c6c95b61ec17ebbb1eea7832951e35b74d0a6489 Mon Sep 17 00:00:00 2001 From: Rain Date: Fri, 12 Jun 2026 16:31:55 +0800 Subject: [PATCH] fix(a2a): handle streaming backpressure --- .../agent/event/TaskUpdateEventHandler.java | 17 +++++ .../agentscope-extensions-a2a-server/pom.xml | 5 ++ .../executor/AgentScopeAgentExecutor.java | 18 +++-- .../jsonrpc/JsonRpcTransportWrapper.java | 41 ++++++++++- .../jsonrpc/JsonRpcTransportWrapperTest.java | 70 +++++++++++++++++++ 5 files changed, 143 insertions(+), 8 deletions(-) diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java index 5c06623766..a1dbc3d376 100644 --- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java +++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java @@ -18,6 +18,7 @@ import io.a2a.client.TaskUpdateEvent; import io.a2a.spec.TaskArtifactUpdateEvent; +import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.UpdateEvent; @@ -89,6 +90,22 @@ private static class TaskStatusUpdateEventHandler public void handle(TaskStatusUpdateEvent event, ClientEventContext context) { String currentRequestId = context.getCurrentRequestId(); if (event.isFinal()) { + TaskState state = event.getStatus().state(); + if (!TaskState.COMPLETED.equals(state)) { + String errorMsg = + "A2A task ended with state: " + + state + + (event.getStatus().message() != null + ? ", message: " + event.getStatus().message() + : ""); + LoggerUtil.warn( + log, + "[{}] A2aAgent task ended with non-completed state: {}.", + currentRequestId, + state); + context.getSink().success(Msg.builder().textContent(errorMsg).build()); + return; + } Msg msg = MessageConvertUtil.convertFromArtifact( context.getTask().getArtifacts(), context.getAgent().getName()); diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml index 210c31781e..f1d8a29c81 100644 --- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml +++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml @@ -52,6 +52,11 @@ io.github.a2asdk a2a-java-sdk-transport-jsonrpc + + io.projectreactor + reactor-test + test + diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java index 909a19f587..ba82d6d753 100644 --- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java +++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java @@ -197,13 +197,17 @@ private void processTaskNonBlocking( processStreamingOutput(resultFlux, taskUpdater, context); } catch (Exception e) { log.error("[{}] Error processing streaming output", context.getTaskId(), e); - taskUpdater.fail( - taskUpdater.newAgentMessage( - List.of( - new TextPart( - "Error processing streaming output: " - + e.getMessage())), - Map.of())); + try { + taskUpdater.fail( + taskUpdater.newAgentMessage( + List.of( + new TextPart( + "Error processing streaming output: " + + e.getMessage())), + Map.of())); + } catch (IllegalStateException ignored) { + // doOnError already transitioned the task to a terminal state; nothing to do. + } } } diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java index 111fe79aa6..64db6acf2a 100644 --- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java +++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java @@ -58,6 +58,7 @@ import org.reactivestreams.FlowAdapters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; /** @@ -77,6 +78,16 @@ public class JsonRpcTransportWrapper implements TransportWrapper private static final Logger log = LoggerFactory.getLogger(JsonRpcTransportWrapper.class); + private static final String STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY = + "agentscope.a2a.streaming.backpressure-buffer-size"; + + private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192; + + private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = + Integer.getInteger( + STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, + DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE); + private final JSONRPCHandler jsonRpcHandler; public JsonRpcTransportWrapper(JSONRPCHandler jsonrpcHandler) { @@ -160,10 +171,38 @@ private Flux> handleStreamRequest( return Flux.just(generateErrorResponse(request, new UnsupportedOperationError())); } - return Flux.from(FlowAdapters.toPublisher(publisher)) + String method = (String) context.getState().get(JSONRPCContextKeys.METHOD_NAME_KEY); + Object requestId = request.getId(); + return applyStreamingBackpressureBuffer( + Flux.from(FlowAdapters.toPublisher(publisher)), method, requestId) .delaySubscription(Duration.ofMillis(10)); } + private Flux> applyStreamingBackpressureBuffer( + Flux> stream, String method, Object requestId) { + if (STREAMING_BACKPRESSURE_BUFFER_SIZE <= 0) { + return stream.onBackpressureBuffer(); + } + return stream.onBackpressureBuffer( + STREAMING_BACKPRESSURE_BUFFER_SIZE, + response -> + log.error( + "JsonRpcTransportWrapper.stream backpressure buffer overflow:" + + " method={}, requestId={}, bufferSize={}, dropped={}", + method, + requestId, + STREAMING_BACKPRESSURE_BUFFER_SIZE, + summarizeResponse(response)), + BufferOverflowStrategy.ERROR); + } + + private String summarizeResponse(JSONRPCResponse response) { + if (response == null) { + return "responseType=null"; + } + return "responseType=" + response.getClass().getName(); + } + private JSONRPCResponse handleNonStreamRequest(String body, ServerCallContext context) throws JsonProcessingException { NonStreamingJSONRPCRequest request = diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java index 5b5eeb959f..34409b4d31 100644 --- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java +++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java @@ -19,6 +19,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -51,13 +53,16 @@ import io.a2a.spec.TransportProtocol; import io.a2a.transport.jsonrpc.handler.JSONRPCHandler; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Flow; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.reactivestreams.FlowAdapters; import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; /** * Unit tests for JsonRpcTransportWrapper. @@ -279,6 +284,48 @@ void testHandleTaskResubscriptionRequest() throws Exception { } } + @Nested + @DisplayName("Streaming Backpressure Tests") + class StreamingBackpressureTests { + + @Test + @DisplayName("Should show unbuffered streaming can lose burst responses") + void testUnbufferedStreamingBurstDropsResponse() { + SendStreamingMessageResponse reasoningResponse = + mock(SendStreamingMessageResponse.class); + SendStreamingMessageResponse textResponse = mock(SendStreamingMessageResponse.class); + + Flux unbuffered = + Flux.from( + FlowAdapters.toPublisher( + burstPublisher(List.of(reasoningResponse, textResponse)))); + + AssertionError overflowError = + assertThrows( + AssertionError.class, + () -> + StepVerifier.create(unbuffered, 1) + .expectNext(reasoningResponse) + .verifyComplete()); + assertTrue( + String.valueOf(overflowError.getMessage()).contains("request overflow"), + "Expected request overflow but got: " + overflowError.getMessage()); + + Flux buffered = + Flux.from( + FlowAdapters.toPublisher( + burstPublisher( + List.of(reasoningResponse, textResponse)))) + .onBackpressureBuffer(); + + StepVerifier.create(buffered, 1) + .expectNext(reasoningResponse) + .thenRequest(1) + .expectNext(textResponse) + .verifyComplete(); + } + } + @Nested @DisplayName("Error Handling Tests") class ErrorHandlingTests { @@ -370,4 +417,27 @@ void testHandleMethodNull() { assertInstanceOf(InvalidRequestError.class, error); } } + + private static Flow.Publisher burstPublisher(List values) { + return subscriber -> + subscriber.onSubscribe( + new Flow.Subscription() { + private boolean emitted; + + @Override + public void request(long n) { + if (emitted) { + return; + } + emitted = true; + for (T value : values) { + subscriber.onNext(value); + } + subscriber.onComplete(); + } + + @Override + public void cancel() {} + }); + } }