diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java
index 5c06623766..a1dbc3d376 100644
--- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java
+++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-client/src/main/java/io/agentscope/core/a2a/agent/event/TaskUpdateEventHandler.java
@@ -18,6 +18,7 @@
import io.a2a.client.TaskUpdateEvent;
import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.UpdateEvent;
@@ -89,6 +90,22 @@ private static class TaskStatusUpdateEventHandler
public void handle(TaskStatusUpdateEvent event, ClientEventContext context) {
String currentRequestId = context.getCurrentRequestId();
if (event.isFinal()) {
+ TaskState state = event.getStatus().state();
+ if (!TaskState.COMPLETED.equals(state)) {
+ String errorMsg =
+ "A2A task ended with state: "
+ + state
+ + (event.getStatus().message() != null
+ ? ", message: " + event.getStatus().message()
+ : "");
+ LoggerUtil.warn(
+ log,
+ "[{}] A2aAgent task ended with non-completed state: {}.",
+ currentRequestId,
+ state);
+ context.getSink().success(Msg.builder().textContent(errorMsg).build());
+ return;
+ }
Msg msg =
MessageConvertUtil.convertFromArtifact(
context.getTask().getArtifacts(), context.getAgent().getName());
diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml
index 210c31781e..f1d8a29c81 100644
--- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml
+++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/pom.xml
@@ -52,6 +52,11 @@
io.github.a2asdk
a2a-java-sdk-transport-jsonrpc
+
+ io.projectreactor
+ reactor-test
+ test
+
diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java
index 909a19f587..ba82d6d753 100644
--- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java
+++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/executor/AgentScopeAgentExecutor.java
@@ -197,13 +197,17 @@ private void processTaskNonBlocking(
processStreamingOutput(resultFlux, taskUpdater, context);
} catch (Exception e) {
log.error("[{}] Error processing streaming output", context.getTaskId(), e);
- taskUpdater.fail(
- taskUpdater.newAgentMessage(
- List.of(
- new TextPart(
- "Error processing streaming output: "
- + e.getMessage())),
- Map.of()));
+ try {
+ taskUpdater.fail(
+ taskUpdater.newAgentMessage(
+ List.of(
+ new TextPart(
+ "Error processing streaming output: "
+ + e.getMessage())),
+ Map.of()));
+ } catch (IllegalStateException ignored) {
+ // doOnError already transitioned the task to a terminal state; nothing to do.
+ }
}
}
diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java
index 111fe79aa6..64db6acf2a 100644
--- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java
+++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/main/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapper.java
@@ -58,6 +58,7 @@
import org.reactivestreams.FlowAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
/**
@@ -77,6 +78,16 @@ public class JsonRpcTransportWrapper implements TransportWrapper
private static final Logger log = LoggerFactory.getLogger(JsonRpcTransportWrapper.class);
+ private static final String STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY =
+ "agentscope.a2a.streaming.backpressure-buffer-size";
+
+ private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192;
+
+ private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE =
+ Integer.getInteger(
+ STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY,
+ DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
+
private final JSONRPCHandler jsonRpcHandler;
public JsonRpcTransportWrapper(JSONRPCHandler jsonrpcHandler) {
@@ -160,10 +171,38 @@ private Flux extends JSONRPCResponse>> handleStreamRequest(
return Flux.just(generateErrorResponse(request, new UnsupportedOperationError()));
}
- return Flux.from(FlowAdapters.toPublisher(publisher))
+ String method = (String) context.getState().get(JSONRPCContextKeys.METHOD_NAME_KEY);
+ Object requestId = request.getId();
+ return applyStreamingBackpressureBuffer(
+ Flux.from(FlowAdapters.toPublisher(publisher)), method, requestId)
.delaySubscription(Duration.ofMillis(10));
}
+ private Flux extends JSONRPCResponse>> applyStreamingBackpressureBuffer(
+ Flux extends JSONRPCResponse>> stream, String method, Object requestId) {
+ if (STREAMING_BACKPRESSURE_BUFFER_SIZE <= 0) {
+ return stream.onBackpressureBuffer();
+ }
+ return stream.onBackpressureBuffer(
+ STREAMING_BACKPRESSURE_BUFFER_SIZE,
+ response ->
+ log.error(
+ "JsonRpcTransportWrapper.stream backpressure buffer overflow:"
+ + " method={}, requestId={}, bufferSize={}, dropped={}",
+ method,
+ requestId,
+ STREAMING_BACKPRESSURE_BUFFER_SIZE,
+ summarizeResponse(response)),
+ BufferOverflowStrategy.ERROR);
+ }
+
+ private String summarizeResponse(JSONRPCResponse> response) {
+ if (response == null) {
+ return "responseType=null";
+ }
+ return "responseType=" + response.getClass().getName();
+ }
+
private JSONRPCResponse> handleNonStreamRequest(String body, ServerCallContext context)
throws JsonProcessingException {
NonStreamingJSONRPCRequest> request =
diff --git a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java
index 5b5eeb959f..34409b4d31 100644
--- a/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java
+++ b/agentscope-extensions/agentscope-extensions-protocol/agentscope-extensions-a2a/agentscope-extensions-a2a-server/src/test/java/io/agentscope/core/a2a/server/transport/jsonrpc/JsonRpcTransportWrapperTest.java
@@ -19,6 +19,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -51,13 +53,16 @@
import io.a2a.spec.TransportProtocol;
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
/**
* Unit tests for JsonRpcTransportWrapper.
@@ -279,6 +284,48 @@ void testHandleTaskResubscriptionRequest() throws Exception {
}
}
+ @Nested
+ @DisplayName("Streaming Backpressure Tests")
+ class StreamingBackpressureTests {
+
+ @Test
+ @DisplayName("Should show unbuffered streaming can lose burst responses")
+ void testUnbufferedStreamingBurstDropsResponse() {
+ SendStreamingMessageResponse reasoningResponse =
+ mock(SendStreamingMessageResponse.class);
+ SendStreamingMessageResponse textResponse = mock(SendStreamingMessageResponse.class);
+
+ Flux unbuffered =
+ Flux.from(
+ FlowAdapters.toPublisher(
+ burstPublisher(List.of(reasoningResponse, textResponse))));
+
+ AssertionError overflowError =
+ assertThrows(
+ AssertionError.class,
+ () ->
+ StepVerifier.create(unbuffered, 1)
+ .expectNext(reasoningResponse)
+ .verifyComplete());
+ assertTrue(
+ String.valueOf(overflowError.getMessage()).contains("request overflow"),
+ "Expected request overflow but got: " + overflowError.getMessage());
+
+ Flux buffered =
+ Flux.from(
+ FlowAdapters.toPublisher(
+ burstPublisher(
+ List.of(reasoningResponse, textResponse))))
+ .onBackpressureBuffer();
+
+ StepVerifier.create(buffered, 1)
+ .expectNext(reasoningResponse)
+ .thenRequest(1)
+ .expectNext(textResponse)
+ .verifyComplete();
+ }
+ }
+
@Nested
@DisplayName("Error Handling Tests")
class ErrorHandlingTests {
@@ -370,4 +417,27 @@ void testHandleMethodNull() {
assertInstanceOf(InvalidRequestError.class, error);
}
}
+
+ private static Flow.Publisher burstPublisher(List values) {
+ return subscriber ->
+ subscriber.onSubscribe(
+ new Flow.Subscription() {
+ private boolean emitted;
+
+ @Override
+ public void request(long n) {
+ if (emitted) {
+ return;
+ }
+ emitted = true;
+ for (T value : values) {
+ subscriber.onNext(value);
+ }
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void cancel() {}
+ });
+ }
}