Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 87 additions & 62 deletions agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -2050,29 +2050,36 @@ private Flux<AgentEvent> modelCallStream(
.concatMap(
chunk -> {
List<Msg> chunkMsgs = context.processChunk(chunk);
for (Msg msg : chunkMsgs) {
hookDispatcher
.fireReasoningChunk(
msg,
context,
mci.model().getModelName())
.subscribe();
}
return Flux.deferContextual(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Consider adding a brief inline comment explaining why Flux.deferContextual is needed here (e.g. // Propagate parent Reactor context so chunk hooks can access SubagentEventBus), similar to the existing comment at line 2402–2406 for executeToolCalls. This helps future readers understand the non-obvious context-propagation requirement without having to trace the full call chain.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Consider adding a brief inline comment explaining why Flux.deferContextual is needed here (e.g. // Propagate parent Reactor context so chunk hooks can access SubagentEventBus), similar to the existing comment at line 2402–2406 for executeToolCalls. This helps future readers understand the non-obvious context-propagation requirement without having to trace the full call chain.

parentCtx -> {
for (Msg msg : chunkMsgs) {
hookDispatcher
.fireReasoningChunk(
msg,
context,
mci.model().getModelName())
.contextWrite(
ctx ->
ctx.putAll(
parentCtx))
.subscribe();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] The .subscribe() call remains bare — no error consumer is provided. If a hook's onEvent throws (or the Mono<Void> signals an error), Reactor will invoke onErrorDropped, silently swallowing the exception. This is a pre-existing issue, not introduced by this PR, but since you're already touching this line, consider adding a minimal error handler (e.g. .subscribe(v -> {}, e -> log.warn("Reasoning chunk hook failed", e))) to surface failures. Same applies to lines 2463 and 2975.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] The .subscribe() call remains bare — no error consumer is provided. If a hook's onEvent throws (or the Mono<Void> signals an error), Reactor will invoke onErrorDropped, silently swallowing the exception. This is a pre-existing issue, not introduced by this PR, but since you're already touching this line, consider adding a minimal error handler (e.g. .subscribe(v -> {}, e -> log.warn("Reasoning chunk hook failed", e))) to surface failures. Same applies to lines 2463 and 2975.

}

List<AgentEvent> events = new ArrayList<>();
for (ContentBlock block : chunk.getContent()) {
emitBlockEvents(
block,
replyId,
context,
textStarted,
thinkingStarted,
withToolEvents
? startedToolCalls
: new ConcurrentHashMap<>(),
events);
}
return Flux.fromIterable(events);
List<AgentEvent> events = new ArrayList<>();
for (ContentBlock block : chunk.getContent()) {
emitBlockEvents(
block,
replyId,
context,
textStarted,
thinkingStarted,
withToolEvents
? startedToolCalls
: new ConcurrentHashMap<>(),
events);
}
return Flux.fromIterable(events);
});
});

Flux<AgentEvent> endEvents =
Expand Down Expand Up @@ -2449,6 +2456,10 @@ private Flux<AgentEvent> runToolBatch(
hookDispatcher
.fireActingChunk(
toolUse, chunk, toolkit)
.contextWrite(
ctx ->
ctx.putAll(
parentCtx))
.subscribe();
});

Expand Down Expand Up @@ -2947,47 +2958,61 @@ private Flux<AgentEvent> summaryModelCallStream(
.concatMap(
chunk -> {
List<Msg> chunkMsgs = context.processChunk(chunk);
for (Msg msg : chunkMsgs) {
hookDispatcher
.fireSummaryChunk(
msg,
context,
hookOptions,
model.getModelName())
.subscribe();
}
return Flux.deferContextual(
parentCtx -> {
for (Msg msg : chunkMsgs) {
hookDispatcher
.fireSummaryChunk(
msg,
context,
hookOptions,
model.getModelName())
.contextWrite(
ctx ->
ctx.putAll(
parentCtx))
.subscribe();
}

List<AgentEvent> events = new ArrayList<>();
for (ContentBlock block : chunk.getContent()) {
if (block instanceof TextBlock tb) {
if (textStarted.compareAndSet(false, true)) {
events.add(
new TextBlockStartEvent(
replyId, "text"));
}
if (tb.getText() != null
&& !tb.getText().isEmpty()) {
events.add(
new TextBlockDeltaEvent(
replyId, "text", tb.getText()));
}
} else if (block instanceof ThinkingBlock tb) {
if (thinkingStarted.compareAndSet(false, true)) {
events.add(
new ThinkingBlockStartEvent(
replyId, "thinking"));
}
if (tb.getThinking() != null
&& !tb.getThinking().isEmpty()) {
events.add(
new ThinkingBlockDeltaEvent(
replyId,
"thinking",
tb.getThinking()));
}
}
}
return Flux.fromIterable(events);
List<AgentEvent> events = new ArrayList<>();
for (ContentBlock block : chunk.getContent()) {
if (block instanceof TextBlock tb) {
if (textStarted.compareAndSet(
false, true)) {
events.add(
new TextBlockStartEvent(
replyId, "text"));
}
if (tb.getText() != null
&& !tb.getText().isEmpty()) {
events.add(
new TextBlockDeltaEvent(
replyId,
"text",
tb.getText()));
}
} else if (block
instanceof ThinkingBlock tb) {
if (thinkingStarted.compareAndSet(
false, true)) {
events.add(
new ThinkingBlockStartEvent(
replyId,
"thinking"));
}
if (tb.getThinking() != null
&& !tb.getThinking()
.isEmpty()) {
events.add(
new ThinkingBlockDeltaEvent(
replyId,
"thinking",
tb.getThinking()));
}
}
}
return Flux.fromIterable(events);
});
});

Flux<AgentEvent> endEvents =
Expand Down
Loading