Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"aspire.enableSettingsFileCreationPromptOnStartup": false
}
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="A2A" Version="0.3.3-preview" />
<PackageVersion Include="A2A" Version="1.0.0-preview2" />
<PackageVersion Include="Microsoft.Agents.AI.A2A" Version="1.3.0-preview.260423.1" />
<PackageVersion Include="CsvHelper" Version="33.1.0" />
<PackageVersion Include="FuzzySharp" Version="2.0.2" />
<PackageVersion Include="Google_GenerativeAI" Version="3.6.3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<ItemGroup>
<PackageReference Include="A2A" />
<PackageReference Include="Microsoft.Agents.AI.A2A" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public async Task<bool> Execute(RoleDialogModel message)
);

message.Content = responseText;
message.StopCompletion = true;
return true;
}
catch (Exception ex)
Expand Down
59 changes: 36 additions & 23 deletions src/Infrastructure/BotSharp.Core.A2A/Hooks/A2AAgentHook.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using A2A;
using BotSharp.Abstraction.Agents;
using BotSharp.Abstraction.Agents.Enums;
using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Agents.Settings;
using BotSharp.Abstraction.Functions.Models;
using BotSharp.Core.A2A.Services;
using BotSharp.Core.A2A.Settings;
using Microsoft.Extensions.Logging;
using System.Text.Json;

namespace BotSharp.Core.A2A.Hooks;
Expand All @@ -15,12 +17,14 @@ public class A2AAgentHook : AgentHookBase

private readonly A2ASettings _a2aSettings;
private readonly IA2AService _a2aService;
private readonly ILogger<A2AAgentHook> _logger;

public A2AAgentHook(IServiceProvider services, IA2AService a2aService, A2ASettings a2aSettings, AgentSettings agentSettings)
public A2AAgentHook(IServiceProvider services, IA2AService a2aService, A2ASettings a2aSettings, AgentSettings agentSettings, ILogger<A2AAgentHook> logger)
: base(services, agentSettings)
{
_a2aService = a2aService;
_a2aSettings = a2aSettings;
_logger = logger;
}

public override async Task<string?> OnAgentLoading(string id)
Expand All @@ -45,7 +49,16 @@ public override async Task OnAgentLoaded(Agent agent)
var remoteConfig = _a2aSettings.Agents?.FirstOrDefault(x => x.Id == agent.Id);
if (remoteConfig != null)
{
var agentCard = await _a2aService.GetCapabilitiesAsync(remoteConfig.Endpoint);
AgentCard? agentCard = null;
try
{
agentCard = await _a2aService.GetCapabilitiesAsync(remoteConfig.Endpoint);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to resolve A2A agent card for endpoint {AgentEndpoint}. Using configured metadata.", remoteConfig.Endpoint);
}

if (agentCard != null)
{
agent.Name = agentCard.Name;
Expand All @@ -54,34 +67,34 @@ public override async Task OnAgentLoaded(Agent agent)
$"Your ONLY goal is to forward the user's request verbatim to the external service. " +
$"You must use the function 'delegate_to_a2a' to communicate with it. " +
$"Do not attempt to answer the question yourself.";
}

var properties = new Dictionary<string, object>
var properties = new Dictionary<string, object>
{
{
"user_query",
new
{
"user_query",
new
{
type = "string",
description = "The exact user request or task description to be forwarded."
}
type = "string",
description = "The exact user request or task description to be forwarded."
}
};
}
};

var propertiesJson = JsonSerializer.Serialize(properties);
var propertiesDocument = JsonDocument.Parse(propertiesJson);
var propertiesJson = JsonSerializer.Serialize(properties);
var propertiesDocument = JsonDocument.Parse(propertiesJson);

agent.Functions.Add(new FunctionDef
agent.Functions.Add(new FunctionDef
{
Name = "delegate_to_a2a",
Description = $"Delegates the task to the external {remoteConfig.Name} via A2A protocol.",
Parameters = new FunctionParametersDef()
{
Name = "delegate_to_a2a",
Description = $"Delegates the task to the external {remoteConfig.Name} via A2A protocol.",
Parameters = new FunctionParametersDef()
{
Type = "object",
Properties = propertiesDocument,
Required = new List<string> { "user_query" }
}
});
}
Type = "object",
Properties = propertiesDocument,
Required = new List<string> { "user_query" }
}
});
}
await base.OnAgentLoaded(agent);
}
Expand Down
187 changes: 88 additions & 99 deletions src/Infrastructure/BotSharp.Core.A2A/Services/A2AService.cs
Original file line number Diff line number Diff line change
@@ -1,80 +1,112 @@
using A2A;
using BotSharp.Core.A2A.Settings;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using System.Net.ServerSentEvents;
using System.Text.Json;

namespace BotSharp.Core.A2A.Services;


public class A2AService : IA2AService
{
// Protocol binding name constants from the A2A v1 specification.
private const string BindingHttpJson = "http+json";
private const string BindingJsonRpc = "json-rpc";

private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<A2AService> _logger;
private readonly IServiceProvider _services;
private readonly A2ASettings _settings;

// High-level A2A v1 agent cache
private readonly Dictionary<string, AIAgent> _aiAgentCache = new();
#pragma warning disable MEAI001
private readonly Dictionary<string, ResponseContinuationToken> _continuationTokenCache = new();
#pragma warning restore MEAI001

private readonly Dictionary<string, A2AClient> _clientCache = new Dictionary<string, A2AClient>();
// LEGACY: Used for task APIs and the StreamResponse compatibility overload.
private readonly Dictionary<string, A2AClient> _clientCache = new();

public A2AService(IHttpClientFactory httpClientFactory, IServiceProvider services, ILogger<A2AService> logger)
public A2AService(IHttpClientFactory httpClientFactory, IServiceProvider services, ILogger<A2AService> logger, A2ASettings settings)
{
_httpClientFactory = httpClientFactory;
_services = services;
_logger = logger;
}
_settings = settings;
}

public async Task<AgentCard> GetCapabilitiesAsync(string agentEndpoint, CancellationToken cancellationToken = default)
{
var resolver = new A2ACardResolver(new Uri(agentEndpoint));
return await resolver.GetAgentCardAsync();
return await resolver.GetAgentCardAsync(cancellationToken);
}

public async Task<string> SendMessageAsync(string agentEndpoint, string text, string contextId, CancellationToken cancellationToken)
private async Task<AIAgent> CreateAIAgentAsync(string agentEndpoint, CancellationToken cancellationToken = default)
{
if (_aiAgentCache.TryGetValue(agentEndpoint, out var cachedAgent))
{
return cachedAgent;
}

var resolver = new A2ACardResolver(new Uri(agentEndpoint));
var aiAgent = await resolver.GetAIAgentAsync();
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
_aiAgentCache[agentEndpoint] = aiAgent;
return aiAgent;
}

private static string BuildSessionCacheKey(string agentEndpoint, string contextId)
=> $"{agentEndpoint}::{contextId}";

if (!_clientCache.TryGetValue(agentEndpoint, out var client))
#pragma warning disable MEAI001
private AgentRunOptions? GetRunOptions(string agentEndpoint, string contextId)
{
if (string.IsNullOrWhiteSpace(contextId))
{
HttpClient httpclient = _httpClientFactory.CreateClient();
return null;
}

client = new A2AClient(new Uri(agentEndpoint), httpclient);
_clientCache[agentEndpoint] = client;
var cacheKey = BuildSessionCacheKey(agentEndpoint, contextId);
if (!_continuationTokenCache.TryGetValue(cacheKey, out var continuationToken))
{
return null;
}

var messagePayload = new AgentMessage
return new AgentRunOptions
{
Role = MessageRole.User,
ContextId = contextId,
Parts = new List<Part>
{
new TextPart { Text = text }
}
ContinuationToken = continuationToken
};
}

var sendParams = new MessageSendParams
private void UpdateContinuationToken(string agentEndpoint, string contextId, ResponseContinuationToken? continuationToken)
{
if (string.IsNullOrWhiteSpace(contextId) || continuationToken == null)
{
Message = messagePayload
};
return;
}

var cacheKey = BuildSessionCacheKey(agentEndpoint, contextId);
_continuationTokenCache[cacheKey] = continuationToken;
}
#pragma warning restore MEAI001

// HIGH-LEVEL: Preferred A2A v1 API for message sending
public async Task<string> SendMessageAsync(string agentEndpoint, string text, string contextId, CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"Sending A2A message to {agentEndpoint}. ContextId: {contextId}");
var responseBase = await client.SendMessageAsync(sendParams, cancellationToken);

if (responseBase is AgentMessage responseMsg)
{
if (responseMsg.Parts != null && responseMsg.Parts.Any())
{
var textPart = responseMsg.Parts.First() as TextPart;
return textPart?.Text ?? string.Empty;
}
}
else if( responseBase is AgentTask atask)
{
return $"Task created with ID: {atask.Id}, Status: {atask.Status}";
}
else
{
return "Unexpected task type.";
}

return string.Empty;
var agent = await CreateAIAgentAsync(agentEndpoint, cancellationToken);
var runOptions = GetRunOptions(agentEndpoint, contextId);
_logger.LogInformation("Sending A2A message via AIAgent to {AgentEndpoint}. ContextId: {ContextId}", agentEndpoint, contextId);
var response = await agent.RunAsync(
message: text ?? string.Empty,
options: runOptions,
cancellationToken: cancellationToken);

#pragma warning disable MEAI001
UpdateContinuationToken(agentEndpoint, contextId, response.ContinuationToken);
#pragma warning restore MEAI001
return response.Text ?? string.Empty;
}
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
catch (HttpRequestException ex)
{
Expand All @@ -88,71 +120,28 @@ public async Task<string> SendMessageAsync(string agentEndpoint, string text, st
}
}

public async Task SendMessageStreamingAsync(string endPoint, List<Part> parts, Func<SseItem<A2AEvent>, Task>? onStreamingEventReceived, CancellationToken cancellationToken = default)
// HIGH-LEVEL: Streaming uses AIAgent.RunStreamingAsync in A2A v1.
public async Task SendMessageStreamingAsync(string endPoint, List<Part> parts, Func<AgentResponseUpdate, Task>? onStreamingEventReceived, CancellationToken cancellationToken = default)
{
A2ACardResolver cardResolver = new(new Uri(endPoint));
AgentCard agentCard = await cardResolver.GetAgentCardAsync();
A2AClient client = new A2AClient(new Uri(agentCard.Url));

AgentMessage userMessage = new()
var userMessage = new Message
{
Role = MessageRole.User,
MessageId = Guid.NewGuid().ToString("N"),
Role = Role.User,
Parts = parts
};
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

await foreach (SseItem<A2AEvent> sseItem in client.SendMessageStreamingAsync(new MessageSendParams { Message = userMessage }))
{
await onStreamingEventReceived?.Invoke(sseItem);
}

Console.WriteLine(" Streaming completed.");
}

public async Task ListenForTaskEventAsync(string endPoint, string taskId, Func<SseItem<A2AEvent>, ValueTask>? onTaskEventReceived = null, CancellationToken cancellationToken = default)
{
var agent = await CreateAIAgentAsync(endPoint, cancellationToken);
var chatMessage = userMessage.ToChatMessage();

if (onTaskEventReceived == null)
await foreach (var streamResponse in agent.RunStreamingAsync(
messages: new[] { chatMessage },
options: null,
cancellationToken: cancellationToken))
{
return;
}

A2ACardResolver cardResolver = new(new Uri(endPoint));
AgentCard agentCard = await cardResolver.GetAgentCardAsync();
A2AClient client = new A2AClient(new Uri(agentCard.Url));

await foreach (SseItem<A2AEvent> sseItem in client.SubscribeToTaskAsync(taskId))
{
await onTaskEventReceived.Invoke(sseItem);
Console.WriteLine(" Task event received: " + JsonSerializer.Serialize(sseItem.Data));
if (onStreamingEventReceived != null)
await onStreamingEventReceived(streamResponse);
}

}

public async Task SetPushNotifications(string endPoint, PushNotificationConfig config, CancellationToken cancellationToken = default)
{
A2ACardResolver cardResolver = new(new Uri(endPoint));
AgentCard agentCard = await cardResolver.GetAgentCardAsync();
A2AClient client = new A2AClient(new Uri(agentCard.Url));
await client.SetPushNotificationAsync(new TaskPushNotificationConfig()
{
PushNotificationConfig = config
});
}

public async Task<AgentTask> CancelTaskAsync(string endPoint, string taskId, CancellationToken cancellationToken = default)
{
A2ACardResolver cardResolver = new(new Uri(endPoint));
AgentCard agentCard = await cardResolver.GetAgentCardAsync();
A2AClient client = new A2AClient(new Uri(agentCard.Url));
return await client.CancelTaskAsync(taskId);
}

public async Task<AgentTask> GetTaskAsync(string endPoint, string taskId, CancellationToken cancellationToken = default)
{
A2ACardResolver cardResolver = new(new Uri(endPoint));
AgentCard agentCard = await cardResolver.GetAgentCardAsync();
A2AClient client = new A2AClient(new Uri(agentCard.Url));
return await client.GetTaskAsync(taskId);
}

_logger.LogInformation("Streaming completed.");
}
}
Loading
Loading