Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 100 additions & 7 deletions crates/tower-mcp/src/transport/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ use crate::transport::service::{CatchError, InjectAnnotations};
// Shared helpers
// ============================================================================

/// Strip an optional UTF-8 BOM, then trim whitespace.
///
/// Windows tools sometimes prefix the first stdout line with a UTF-8 BOM
/// (`\u{feff}`). Without stripping it, the JSON parser sees an unexpected
/// character at offset 0 and rejects the whole message.
fn clean_input_line(line: &str) -> &str {
line.strip_prefix('\u{feff}').unwrap_or(line).trim()
}

/// Process a single line of JSON-RPC input
///
/// Returns `Ok(Some(response))` for requests, `Ok(None)` for notifications.
Expand Down Expand Up @@ -241,7 +250,7 @@ impl StdioTransport {
break;
}

let trimmed = line.trim();
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
Expand Down Expand Up @@ -433,7 +442,7 @@ where
}

async fn process_input(&mut self, line: &str, stdout: &mut tokio::io::Stdout) -> Result<()> {
let trimmed = line.trim();
let trimmed = clean_input_line(line);
if trimmed.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -534,7 +543,7 @@ impl SyncStdioTransport {
let line =
line.map_err(|e| Error::Transport(format!("Failed to read from stdin: {}", e)))?;

let trimmed = line.trim();
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
Expand Down Expand Up @@ -695,7 +704,7 @@ impl BidirectionalStdioTransport {
break;
}

let trimmed = line.trim();
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
Expand Down Expand Up @@ -729,7 +738,16 @@ impl BidirectionalStdioTransport {
) -> Result<()> {
tracing::debug!(input = %line, "Received message");

let parsed: serde_json::Value = serde_json::from_str(line)?;
// Malformed JSON must produce a JSON-RPC parse error response, not
// tear down the run loop. Per the spec, id is null when the request
// can't be parsed at all.
let parsed: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "Malformed JSON on stdin");
return self.write_parse_error(&e.to_string(), stdout).await;
}
};

// Check if this is a response to one of our pending requests
if parsed.get("method").is_none()
Expand All @@ -746,8 +764,15 @@ impl BidirectionalStdioTransport {
return Ok(());
}

// Process as a request
let message: JsonRpcMessage = serde_json::from_str(line)?;
// Process as a request. The shape parse can also fail (e.g. id of
// wrong type); treat it the same way so the loop keeps running.
let message: JsonRpcMessage = match serde_json::from_str(line) {
Ok(m) => m,
Err(e) => {
tracing::warn!(error = %e, "JSON did not match JSON-RPC request shape");
return self.write_parse_error(&e.to_string(), stdout).await;
}
};
match self.service.call_message(message).await {
Ok(response) => {
let response_json = serde_json::to_string(&response).map_err(|e| {
Expand All @@ -771,6 +796,18 @@ impl BidirectionalStdioTransport {
Ok(())
}

async fn write_parse_error(
&self,
message: &str,
stdout: Arc<Mutex<tokio::io::Stdout>>,
) -> Result<()> {
let error_response =
JsonRpcResponse::error(None, crate::error::JsonRpcError::parse_error(message));
let response_json = serde_json::to_string(&error_response)
.map_err(|e| Error::Transport(format!("Failed to serialize error: {}", e)))?;
self.write_line(&response_json, stdout).await
}

/// Handle a response to one of our pending requests
async fn handle_response(&self, parsed: &serde_json::Value) -> Result<()> {
let id = match parsed.get("id") {
Expand Down Expand Up @@ -1059,6 +1096,62 @@ mod tests {
assert!(result.is_err());
}

// =========================================================================
// clean_input_line tests
// =========================================================================

#[test]
fn test_clean_input_line_no_bom() {
assert_eq!(
clean_input_line(r#"{"jsonrpc":"2.0"}"#),
r#"{"jsonrpc":"2.0"}"#
);
}

#[test]
fn test_clean_input_line_strips_leading_bom() {
let with_bom = "\u{feff}{\"jsonrpc\":\"2.0\"}";
assert_eq!(clean_input_line(with_bom), r#"{"jsonrpc":"2.0"}"#);
}

#[test]
fn test_clean_input_line_strips_bom_then_trims() {
// BOM, then whitespace, then content, then trailing newline.
let input = "\u{feff} {\"id\":1}\n";
assert_eq!(clean_input_line(input), r#"{"id":1}"#);
}

#[test]
fn test_clean_input_line_does_not_strip_internal_bom() {
// Only a *leading* BOM is stripped; one inside the payload stays.
let input = "{\"text\":\"hi\u{feff}there\"}";
assert_eq!(clean_input_line(input), input);
}

#[test]
fn test_clean_input_line_empty() {
assert_eq!(clean_input_line(""), "");
assert_eq!(clean_input_line("\u{feff}"), "");
assert_eq!(clean_input_line(" \n\t"), "");
}

#[tokio::test]
async fn test_process_line_with_bom_stripped_input_parses() {
// After clean_input_line, a BOM-prefixed request should parse like
// any other request and return a normal response.
let router = make_router();
let mut service = init_service(&router).await;

let raw = "\u{feff}{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"tools/list\",\"params\":{}}";
let cleaned = clean_input_line(raw);
let result = process_line(&mut service, &router, cleaned).await;

let response = result.unwrap().unwrap();
let json = serde_json::to_value(&response).unwrap();
assert_eq!(json["id"], 7);
assert!(json["result"]["tools"].is_array());
}

#[tokio::test]
async fn test_process_line_tools_list() {
let router = make_router();
Expand Down
Loading