Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.a2a.client.TaskUpdateEvent;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.UpdateEvent;
Expand Down Expand Up @@ -89,6 +90,22 @@ private static class TaskStatusUpdateEventHandler
public void handle(TaskStatusUpdateEvent event, ClientEventContext context) {
String currentRequestId = context.getCurrentRequestId();
if (event.isFinal()) {
TaskState state = event.getStatus().state();
if (!TaskState.COMPLETED.equals(state)) {
String errorMsg =
"A2A task ended with state: "
+ state
+ (event.getStatus().message() != null
? ", message: " + event.getStatus().message()
: "");
LoggerUtil.warn(
log,
"[{}] A2aAgent task ended with non-completed state: {}.",
currentRequestId,
state);
context.getSink().success(Msg.builder().textContent(errorMsg).build());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.

If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.

If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.

return;
}
Msg msg =
MessageConvertUtil.convertFromArtifact(
context.getTask().getArtifacts(), context.getAgent().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<groupId>io.github.a2asdk</groupId>
<artifactId>a2a-java-sdk-transport-jsonrpc</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,17 @@ private void processTaskNonBlocking(
processStreamingOutput(resultFlux, taskUpdater, context);
} catch (Exception e) {
log.error("[{}] Error processing streaming output", context.getTaskId(), e);
taskUpdater.fail(
taskUpdater.newAgentMessage(
List.of(
new TextPart(
"Error processing streaming output: "
+ e.getMessage())),
Map.of()));
try {
taskUpdater.fail(
taskUpdater.newAgentMessage(
List.of(
new TextPart(
"Error processing streaming output: "
+ e.getMessage())),
Map.of()));
} catch (IllegalStateException ignored) {
// doOnError already transitioned the task to a terminal state; nothing to do.
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.reactivestreams.FlowAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;

/**
Expand All @@ -77,6 +78,16 @@ public class JsonRpcTransportWrapper implements TransportWrapper<String, Object>

private static final Logger log = LoggerFactory.getLogger(JsonRpcTransportWrapper.class);

private static final String STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY =
"agentscope.a2a.streaming.backpressure-buffer-size";

private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.

Consider adding a defensive parse:

private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();

private static int parseBufferSize() {
    String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
    if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    try {
        return Integer.parseInt(raw);
    } catch (NumberFormatException e) {
        LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
            .warn("Invalid value for {}: '{}', using default {}",
                STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
                DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
        return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    }
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.

Consider adding a defensive parse:

private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();

private static int parseBufferSize() {
    String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
    if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    try {
        return Integer.parseInt(raw);
    } catch (NumberFormatException e) {
        LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
            .warn("Invalid value for {}: '{}', using default {}",
                STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
                DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
        return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    }
}

private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE =
Integer.getInteger(
STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY,
DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);

private final JSONRPCHandler jsonRpcHandler;

public JsonRpcTransportWrapper(JSONRPCHandler jsonrpcHandler) {
Expand Down Expand Up @@ -160,10 +171,38 @@ private Flux<? extends JSONRPCResponse<?>> handleStreamRequest(
return Flux.just(generateErrorResponse(request, new UnsupportedOperationError()));
}

return Flux.from(FlowAdapters.toPublisher(publisher))
String method = (String) context.getState().get(JSONRPCContextKeys.METHOD_NAME_KEY);
Object requestId = request.getId();
return applyStreamingBackpressureBuffer(
Flux.from(FlowAdapters.toPublisher(publisher)), method, requestId)
.delaySubscription(Duration.ofMillis(10));
}

private Flux<? extends JSONRPCResponse<?>> applyStreamingBackpressureBuffer(
Flux<? extends JSONRPCResponse<?>> stream, String method, Object requestId) {
if (STREAMING_BACKPRESSURE_BUFFER_SIZE <= 0) {
return stream.onBackpressureBuffer();
}
return stream.onBackpressureBuffer(
STREAMING_BACKPRESSURE_BUFFER_SIZE,
response ->
log.error(
"JsonRpcTransportWrapper.stream backpressure buffer overflow:"
+ " method={}, requestId={}, bufferSize={}, dropped={}",
method,
requestId,
STREAMING_BACKPRESSURE_BUFFER_SIZE,
summarizeResponse(response)),
BufferOverflowStrategy.ERROR);
}

private String summarizeResponse(JSONRPCResponse<?> response) {
if (response == null) {
return "responseType=null";
}
return "responseType=" + response.getClass().getName();
}

private JSONRPCResponse<?> handleNonStreamRequest(String body, ServerCallContext context)
throws JsonProcessingException {
NonStreamingJSONRPCRequest<?> request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -51,13 +53,16 @@
import io.a2a.spec.TransportProtocol;
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/**
* Unit tests for JsonRpcTransportWrapper.
Expand Down Expand Up @@ -279,6 +284,48 @@ void testHandleTaskResubscriptionRequest() throws Exception {
}
}

@Nested
@DisplayName("Streaming Backpressure Tests")
class StreamingBackpressureTests {

@Test
@DisplayName("Should show unbuffered streaming can lose burst responses")
void testUnbufferedStreamingBurstDropsResponse() {
SendStreamingMessageResponse reasoningResponse =
mock(SendStreamingMessageResponse.class);
SendStreamingMessageResponse textResponse = mock(SendStreamingMessageResponse.class);

Flux<SendStreamingMessageResponse> unbuffered =
Flux.from(
FlowAdapters.toPublisher(
burstPublisher(List.of(reasoningResponse, textResponse))));

AssertionError overflowError =
assertThrows(
AssertionError.class,
() ->
StepVerifier.create(unbuffered, 1)
.expectNext(reasoningResponse)
.verifyComplete());
assertTrue(
String.valueOf(overflowError.getMessage()).contains("request overflow"),
"Expected request overflow but got: " + overflowError.getMessage());

Flux<SendStreamingMessageResponse> buffered =
Flux.from(
FlowAdapters.toPublisher(
burstPublisher(
List.of(reasoningResponse, textResponse))))
.onBackpressureBuffer();

StepVerifier.create(buffered, 1)
.expectNext(reasoningResponse)
.thenRequest(1)
.expectNext(textResponse)
.verifyComplete();
}
}

@Nested
@DisplayName("Error Handling Tests")
class ErrorHandlingTests {
Expand Down Expand Up @@ -370,4 +417,27 @@ void testHandleMethodNull() {
assertInstanceOf(InvalidRequestError.class, error);
}
}

private static <T> Flow.Publisher<T> burstPublisher(List<T> values) {
return subscriber ->
subscriber.onSubscribe(
new Flow.Subscription() {
private boolean emitted;

@Override
public void request(long n) {
if (emitted) {
return;
}
emitted = true;
for (T value : values) {
subscriber.onNext(value);
}
subscriber.onComplete();
}

@Override
public void cancel() {}
});
}
}
Loading