Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 140 additions & 8 deletions src/Transport/HttpTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class HttpTransport implements Transport
{
protected int $requestId = 0;

protected bool $started = false;

protected ?string $sessionId = null;

/**
* @param array<string, mixed> $config
*/
Expand All @@ -22,7 +26,49 @@ public function __construct(
) {}

#[\Override]
public function start(): void {}
public function start(): void
{
if ($this->started) {
return;
}

$this->requestId++;

$initializePayload = [
'jsonrpc' => '2.0',
'id' => (string) $this->requestId,
'method' => 'initialize',
'params' => [
'protocolVersion' => '2025-03-26',
'capabilities' => new \stdClass,
'clientInfo' => [
'name' => 'prism-relay',
'version' => '1.0.0',
],
],
];

$initializeResponse = $this->sendHttpRequest($initializePayload);
$this->validateHttpResponse($initializeResponse);
$this->sessionId = $initializeResponse->header('Mcp-Session-Id');

$initializeJson = $this->parseJsonRpcResponse($initializeResponse);
$this->validateJsonRpcResponse($initializeJson);

if (isset($initializeJson['error'])) {
$this->handleJsonRpcError($initializeJson['error']);
}

$initializedNotification = [
'jsonrpc' => '2.0',
'method' => 'notifications/initialized',
];

$notificationResponse = $this->sendHttpRequest($initializedNotification);
$this->validateHttpResponse($notificationResponse);

$this->started = true;
}

/**
* @param array<string, mixed> $params
Expand All @@ -33,6 +79,8 @@ public function start(): void {}
#[\Override]
public function sendRequest(string $method, array $params = []): array
{
$this->start();

$this->requestId++;
$requestPayload = $this->createRequestPayload($method, $params);

Expand Down Expand Up @@ -65,7 +113,8 @@ protected function createRequestPayload(string $method, array $params = []): arr
'jsonrpc' => '2.0',
'id' => (string) $this->requestId,
'method' => $method,
'params' => $params,
// Some MCP HTTP servers require params to be an object, not an array.
'params' => $params === [] ? new \stdClass : $params,
];
}

Expand All @@ -74,18 +123,23 @@ protected function createRequestPayload(string $method, array $params = []): arr
*/
protected function sendHttpRequest(array $payload): Response
{
$headers = array_merge([
// MCP Streamable HTTP requires both content types to be accepted.
'Accept' => 'application/json, text/event-stream',
], $this->getHeaders());

if ($this->sessionId) {
$headers['Mcp-Session-Id'] = $this->sessionId;
}

$token = $this->resolveAuthToken();

return Http::timeout($this->getTimeout())
->acceptJson()
->withHeaders($headers)
->when(
$token !== null,
fn ($http) => $http->withToken((string) $token)
)
->when(
$this->hasHeaders(),
fn ($http) => $http->withHeaders($this->getHeaders())
)
->post($this->getServerUrl(), $payload);
}

Expand Down Expand Up @@ -158,7 +212,7 @@ protected function getServerUrl(): string
protected function processResponse(Response $response): array
{
$this->validateHttpResponse($response);
$jsonResponse = $response->json();
$jsonResponse = $this->parseJsonRpcResponse($response);
$this->validateJsonRpcResponse($jsonResponse);

if (isset($jsonResponse['error'])) {
Expand All @@ -168,6 +222,84 @@ protected function processResponse(Response $response): array
return $jsonResponse['result'] ?? [];
}

/**
* @return array<string, mixed>
*
* @throws TransportException
*/
protected function parseJsonRpcResponse(Response $response): array
{
$contentType = strtolower($response->header('Content-Type'));

if (str_contains($contentType, 'text/event-stream')) {
return $this->parseSseJsonRpcResponse($response->body());
}

$json = $response->json();

if (! is_array($json)) {
throw new TransportException('Invalid JSON response received from MCP server');
}

return $json;
}

/**
* Parse JSON-RPC payload from an SSE response body.
*
* @return array<string, mixed>
*
* @throws TransportException
*/
protected function parseSseJsonRpcResponse(string $body): array
{
$lines = preg_split("/\r\n|\n|\r/", $body) ?: [];
$dataLines = [];
$messages = [];

foreach ($lines as $line) {
$line = trim($line);

if ($line === '') {
if ($dataLines !== []) {
$decoded = json_decode(implode("\n", $dataLines), true);

if (is_array($decoded) && isset($decoded['jsonrpc'])) {
$messages[] = $decoded;
}

$dataLines = [];
}

continue;
}

if (str_starts_with($line, 'data:')) {
$dataLines[] = ltrim(substr($line, 5));
}
}

if ($dataLines !== []) {
$decoded = json_decode(implode("\n", $dataLines), true);

if (is_array($decoded) && isset($decoded['jsonrpc'])) {
$messages[] = $decoded;
}
}

if ($messages === []) {
throw new TransportException('No JSON-RPC message found in SSE response');
}

foreach ($messages as $message) {
if (isset($message['id']) && (string) $message['id'] === (string) $this->requestId) {
return $message;
}
}

return end($messages) ?: [];
}

/**
* @throws AuthorizationException
* @throws TransportException
Expand Down
25 changes: 14 additions & 11 deletions tests/Unit/RelayFactoryWithTokenTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@

it('RelayBuilder::tools calls Relay::tools with the token injected', function (): void {
Http::fake([
'http://example.com/api' => Http::response([
'jsonrpc' => '2.0',
'id' => '1',
'result' => [
'tools' => [
[
'name' => 'test_tool',
'description' => 'A test tool',
'inputSchema' => ['type' => 'object', 'properties' => []],
'http://example.com/api' => Http::sequence()
->push(['jsonrpc' => '2.0', 'id' => '1', 'result' => ['protocolVersion' => '2025-03-26']])
->push('', 202)
->push([
'jsonrpc' => '2.0',
'id' => '2',
'result' => [
'tools' => [
[
'name' => 'test_tool',
'description' => 'A test tool',
'inputSchema' => ['type' => 'object', 'properties' => []],
],
],
],
],
]),
]),
]);

$factory = new RelayFactory;
Expand Down
25 changes: 14 additions & 11 deletions tests/Unit/RelayOAuthTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@

it('withToken injects token into HTTP requests when tools() is called', function (): void {
Http::fake([
'http://example.com/api' => Http::response([
'jsonrpc' => '2.0',
'id' => '1',
'result' => [
'tools' => [
[
'name' => 'test_tool',
'description' => 'A test tool',
'inputSchema' => ['type' => 'object', 'properties' => []],
'http://example.com/api' => Http::sequence()
->push(['jsonrpc' => '2.0', 'id' => '1', 'result' => ['protocolVersion' => '2025-03-26']])
->push('', 202)
->push([
'jsonrpc' => '2.0',
'id' => '2',
'result' => [
'tools' => [
[
'name' => 'test_tool',
'description' => 'A test tool',
'inputSchema' => ['type' => 'object', 'properties' => []],
],
],
],
],
]),
]),
]);

$relay = new Relay($this->serverName);
Expand Down
27 changes: 12 additions & 15 deletions tests/Unit/Transport/HttpTransportOAuthTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

it('sends access_token as Bearer Authorization header', function (): void {
Http::fake([
'http://example.com/api' => Http::response([
'jsonrpc' => '2.0',
'id' => '1',
'result' => ['status' => 'success'],
]),
'http://example.com/api' => Http::sequence()
->push(['jsonrpc' => '2.0', 'id' => '1', 'result' => ['protocolVersion' => '2025-03-26']])
->push('', 202)
->push(['jsonrpc' => '2.0', 'id' => '2', 'result' => ['status' => 'success']]),
]);

$transport = new HttpTransport([
Expand All @@ -30,11 +29,10 @@

it('uses access_token over api_key when both are present', function (): void {
Http::fake([
'http://example.com/api' => Http::response([
'jsonrpc' => '2.0',
'id' => '1',
'result' => ['status' => 'success'],
]),
'http://example.com/api' => Http::sequence()
->push(['jsonrpc' => '2.0', 'id' => '1', 'result' => ['protocolVersion' => '2025-03-26']])
->push('', 202)
->push(['jsonrpc' => '2.0', 'id' => '2', 'result' => ['status' => 'success']]),
]);

$transport = new HttpTransport([
Expand All @@ -51,11 +49,10 @@

it('falls back to api_key when no access_token is set', function (): void {
Http::fake([
'http://example.com/api' => Http::response([
'jsonrpc' => '2.0',
'id' => '1',
'result' => ['status' => 'success'],
]),
'http://example.com/api' => Http::sequence()
->push(['jsonrpc' => '2.0', 'id' => '1', 'result' => ['protocolVersion' => '2025-03-26']])
->push('', 202)
->push(['jsonrpc' => '2.0', 'id' => '2', 'result' => ['status' => 'success']]),
]);

$transport = new HttpTransport([
Expand Down
Loading
Loading