From 358e57a09b88d7184fb80279919f8d1cadd3d455 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 16 Mar 2026 18:36:20 +0800 Subject: [PATCH] Deepen effect-log Framework Integrations --- README.md | 6 +- .../python/effect_log/middleware/__init__.py | 7 + .../python/effect_log/middleware/anthropic.py | 144 ++++++++++++++ .../effect_log/middleware/openai_agents.py | 14 +- bindings/python/tests/test_middleware.py | 170 ++++++++++++++++- examples/anthropic_integration.py | 106 +++++++++++ examples/e2e_anthropic.py | 178 ++++++++++++++++++ 7 files changed, 611 insertions(+), 14 deletions(-) create mode 100644 bindings/python/python/effect_log/middleware/anthropic.py create mode 100644 examples/anthropic_integration.py create mode 100644 examples/e2e_anthropic.py diff --git a/README.md b/README.md index 6c5dfbe..509cca7 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ Built-in middleware for major agent frameworks: | **LangGraph** | `effect_log.middleware.langgraph` | `EffectLogToolNode`, `effect_logged_tools` | | **OpenAI Agents SDK** | `effect_log.middleware.openai_agents` | `effect_logged_agent`, `wrap_function_tool` | | **CrewAI** | `effect_log.middleware.crewai` | `effect_logged_crew`, `effect_logged_tool` | +| **Pydantic AI** | `effect_log.middleware.pydantic_ai` | `effect_logged_agent`, `EffectLogToolset` | +| **Anthropic Claude API** | `effect_log.middleware.anthropic` | `effect_logged_tool_executor`, `process_tool_calls` | See [`examples/`](examples/) for runnable demos: @@ -61,6 +63,8 @@ See [`examples/`](examples/) for runnable demos: - [`langgraph_integration.py`](examples/langgraph_integration.py) — LangGraph ToolNode + tool wrapping - [`openai_agents_integration.py`](examples/openai_agents_integration.py) — OpenAI Agents SDK wrapping - [`crewai_integration.py`](examples/crewai_integration.py) — CrewAI tool + crew wrapping +- [`pydantic_ai_integration.py`](examples/pydantic_ai_integration.py) — Pydantic AI toolset wrapping +- [`anthropic_integration.py`](examples/anthropic_integration.py) — Anthropic Claude API tool_use ## How It Works @@ -89,7 +93,7 @@ pytest tests/ -v - [x] Core library — WAL engine, recovery engine, SQLite + in-memory backends - [x] Python bindings — PyO3 + maturin -- [x] Framework middleware — LangGraph, OpenAI Agents SDK, CrewAI +- [x] Framework middleware — LangGraph, OpenAI Agents SDK, CrewAI, Pydantic AI, Anthropic Claude API - [ ] TypeScript bindings — napi-rs, Vercel AI SDK - [ ] Additional backends — RocksDB, S3, Restate journal - [ ] Auto-classification — infer effect kind from HTTP methods / API metadata diff --git a/bindings/python/python/effect_log/middleware/__init__.py b/bindings/python/python/effect_log/middleware/__init__.py index fd2eaea..3362042 100644 --- a/bindings/python/python/effect_log/middleware/__init__.py +++ b/bindings/python/python/effect_log/middleware/__init__.py @@ -3,4 +3,11 @@ Each middleware module provides integration with a specific agent framework. Framework dependencies are optional — import errors are raised only when you actually try to use the middleware. + +Available middleware: + - effect_log.middleware.langgraph — LangGraph / LangChain + - effect_log.middleware.openai_agents — OpenAI Agents SDK + - effect_log.middleware.crewai — CrewAI + - effect_log.middleware.pydantic_ai — Pydantic AI + - effect_log.middleware.anthropic — Anthropic Claude API """ diff --git a/bindings/python/python/effect_log/middleware/anthropic.py b/bindings/python/python/effect_log/middleware/anthropic.py new file mode 100644 index 0000000..0b377b5 --- /dev/null +++ b/bindings/python/python/effect_log/middleware/anthropic.py @@ -0,0 +1,144 @@ +"""Anthropic Claude API middleware for effect-log. + +Wraps the tool execution loop for Claude's tool_use pattern so every tool +invocation goes through the effect-log WAL, gaining crash recovery and +duplicate prevention. + +Usage: + import anthropic + from effect_log import EffectLog, EffectKind, ToolDef + from effect_log.middleware.anthropic import effect_logged_tool_executor, make_tooldefs + + tool_specs = [ + {"func": search_db, "effect": EffectKind.ReadOnly}, + {"func": send_email, "effect": EffectKind.IrreversibleWrite}, + ] + tools = make_tooldefs(tool_specs) + log = EffectLog(execution_id="task-001", tools=tools, storage="sqlite:///effects.db") + + executor = effect_logged_tool_executor(log, { + "search_db": search_db, + "send_email": send_email, + }) + + # In the Claude API loop: + for block in response.content: + if block.type == "tool_use": + result = executor(block.name, block.input, block.id) +""" + +from __future__ import annotations + +import json +from typing import Any, Callable + + +def _ensure_anthropic(): + try: + import anthropic # noqa: F401 + except ImportError: + raise ImportError( + "Anthropic middleware requires the anthropic SDK. " + "Install it with: pip install anthropic" + ) + + +def make_tooldefs(tool_specs): + """Create ToolDef entries from raw functions for Anthropic tool_use. + + Anthropic's tool_use pattern uses raw functions, so this helper takes + the same functions and produces EffectLog ToolDefs. + + Args: + tool_specs: List of dicts with keys: + - "func": A raw callable + - "effect": The EffectKind for this tool + + Returns: + List of ToolDef instances ready for EffectLog construction. + """ + from effect_log import ToolDef + + defs = [] + for spec in tool_specs: + fn, effect = spec["func"], spec["effect"] + + def adapted(args, _fn=fn): + return _fn(**args) + + defs.append(ToolDef(fn.__name__, effect, adapted)) + return defs + + +def effect_logged_tool_executor( + log: Any, + tool_map: dict[str, Callable], + tool_effects: dict[str, Any] | None = None, +) -> Callable: + """Create a tool executor that routes Claude tool_use calls through effect-log. + + Returns a callable that takes (tool_name, tool_input, tool_use_id) and + returns a tool_result content block dict suitable for passing back to + the Claude API. + + Args: + log: An initialized EffectLog instance (tools must already be registered). + tool_map: Dict mapping tool name -> callable. Used for documentation; + actual execution goes through the EffectLog. + tool_effects: Optional dict mapping tool name -> EffectKind. + For documentation only; tools must already be registered. + + Returns: + A callable: (tool_name, tool_input, tool_use_id) -> tool_result dict. + """ + + def execute(tool_name: str, tool_input: dict, tool_use_id: str) -> dict: + """Execute a tool through effect-log and return a tool_result block.""" + if tool_name not in tool_map: + return { + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": json.dumps({"error": f"Unknown tool: {tool_name}"}), + "is_error": True, + } + + args = tool_input if isinstance(tool_input, dict) else {"input": tool_input} + result = log.execute(tool_name, args) + content = json.dumps(result) if not isinstance(result, str) else result + + return { + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": content, + } + + return execute + + +def process_tool_calls( + log: Any, + tool_map: dict[str, Callable], + response: Any, +) -> list[dict]: + """Process all tool_use blocks from a Claude API response. + + Iterates over the response content, executes each tool_use block through + effect-log, and returns the list of tool_result blocks to send back. + + Args: + log: An initialized EffectLog instance. + tool_map: Dict mapping tool name -> callable. + response: A Claude API response (Message object with .content). + + Returns: + List of tool_result dicts, one per tool_use block in the response. + """ + executor = effect_logged_tool_executor(log, tool_map) + results = [] + + for block in response.content: + if getattr(block, "type", None) == "tool_use": + result = executor(block.name, block.input, block.id) + results.append(result) + + return results diff --git a/bindings/python/python/effect_log/middleware/openai_agents.py b/bindings/python/python/effect_log/middleware/openai_agents.py index 911afd8..7428acd 100644 --- a/bindings/python/python/effect_log/middleware/openai_agents.py +++ b/bindings/python/python/effect_log/middleware/openai_agents.py @@ -86,8 +86,6 @@ def wrap_function_tool(log: Any, tool: Any, effect_kind: Any = None) -> Any: _ensure_openai_agents() from agents import FunctionTool - original_invoke = tool.on_invoke_tool - async def effect_logged_invoke(ctx, args_json: str) -> Any: tool_name = tool.name try: @@ -95,14 +93,10 @@ async def effect_logged_invoke(ctx, args_json: str) -> Any: except (json.JSONDecodeError, TypeError): args = {"raw_input": str(args_json)} - try: - result = log.execute( - tool_name, args if isinstance(args, dict) else {"input": args} - ) - return json.dumps(result) if not isinstance(result, str) else result - except Exception: - # Fall back to original if effect-log fails (e.g., tool not registered) - return await original_invoke(ctx, args_json) + result = log.execute( + tool_name, args if isinstance(args, dict) else {"input": args} + ) + return json.dumps(result) if not isinstance(result, str) else result return FunctionTool( name=tool.name, diff --git a/bindings/python/tests/test_middleware.py b/bindings/python/tests/test_middleware.py index 28137bc..a7d4f40 100644 --- a/bindings/python/tests/test_middleware.py +++ b/bindings/python/tests/test_middleware.py @@ -306,8 +306,8 @@ def test_wrap_function_tool(self): assert wrapped.name == "search" assert wrapped.description == "Search the web" - @pytest.mark.asyncio - async def test_wrapped_tool_invocation(self): + def test_wrapped_tool_invocation(self): + import asyncio from effect_log.middleware.openai_agents import wrap_function_tool tools, counts = make_tools() @@ -321,7 +321,9 @@ async def test_wrapped_tool_invocation(self): ) wrapped = wrap_function_tool(log, original) - result = await wrapped.on_invoke_tool(None, '{"query": "test"}') + result = asyncio.get_event_loop().run_until_complete( + wrapped.on_invoke_tool(None, '{"query": "test"}') + ) parsed = json.loads(result) assert "results" in parsed @@ -731,3 +733,165 @@ def test_recovery_through_pydantic_ai(self): assert counts2["send_email"] == 0 # Same result assert result1 == result2 + + +# ── Anthropic Claude API Middleware Tests ───────────────────────────────────── + + +def _mock_anthropic(): + """Install mock anthropic module.""" + anthropic_mod = types.ModuleType("anthropic") + sys.modules["anthropic"] = anthropic_mod + return anthropic_mod + + +class TestAnthropicMiddleware: + def setup_method(self): + self.anthropic_mod = _mock_anthropic() + + def teardown_method(self): + sys.modules.pop("anthropic", None) + sys.modules.pop("effect_log.middleware.anthropic", None) + + def test_make_tooldefs(self): + """make_tooldefs creates ToolDefs from raw functions.""" + from effect_log.middleware.anthropic import make_tooldefs + + call_log = [] + + def search_db(query: str = "") -> str: + call_log.append(("search", query)) + return f"results for {query}" + + def send_email(to: str = "", subject: str = "") -> str: + call_log.append(("email", to)) + return f"sent to {to}" + + defs = make_tooldefs( + [ + {"func": search_db, "effect": EffectKind.ReadOnly}, + {"func": send_email, "effect": EffectKind.IrreversibleWrite}, + ] + ) + + assert len(defs) == 2 + log = make_log(defs) + log.execute("search_db", {"query": "revenue"}) + assert call_log == [("search", "revenue")] + + log.execute("send_email", {"to": "ceo@co.com", "subject": "Report"}) + assert call_log == [("search", "revenue"), ("email", "ceo@co.com")] + + def test_executor_normal_execution(self): + """Executor routes tool calls through effect-log.""" + from effect_log.middleware.anthropic import effect_logged_tool_executor + + tools, counts = make_tools() + log = make_log(tools) + + tool_map = {"search": lambda **kw: None, "send_email": lambda **kw: None} + executor = effect_logged_tool_executor(log, tool_map) + + result = executor("search", {"query": "test"}, "call-001") + assert result["type"] == "tool_result" + assert result["tool_use_id"] == "call-001" + assert "is_error" not in result + parsed = json.loads(result["content"]) + assert "results" in parsed + assert counts["search"] == 1 + + def test_executor_unknown_tool(self): + """Executor returns error for unknown tools.""" + from effect_log.middleware.anthropic import effect_logged_tool_executor + + tools, counts = make_tools() + log = make_log(tools) + + executor = effect_logged_tool_executor(log, {"search": lambda **kw: None}) + + result = executor("nonexistent", {"query": "test"}, "call-002") + assert result["is_error"] is True + assert "Unknown tool" in result["content"] + + def test_executor_irreversible_write(self): + """Executor handles IrreversibleWrite tools.""" + from effect_log.middleware.anthropic import effect_logged_tool_executor + + tools, counts = make_tools() + log = make_log(tools) + + tool_map = {"send_email": lambda **kw: None} + executor = effect_logged_tool_executor(log, tool_map) + + result = executor("send_email", {"to": "ceo@co.com"}, "call-003") + assert "is_error" not in result + parsed = json.loads(result["content"]) + assert parsed["sent"] is True + assert counts["send_email"] == 1 + + def test_process_tool_calls(self): + """process_tool_calls handles a full response.""" + from effect_log.middleware.anthropic import process_tool_calls + + tools, counts = make_tools() + log = make_log(tools) + + tool_map = {"search": lambda **kw: None, "send_email": lambda **kw: None} + + # Mock a Claude response with tool_use blocks + block1 = MagicMock() + block1.type = "tool_use" + block1.name = "search" + block1.input = {"query": "revenue"} + block1.id = "tu_001" + + block2 = MagicMock() + block2.type = "tool_use" + block2.name = "send_email" + block2.input = {"to": "ceo@co.com"} + block2.id = "tu_002" + + text_block = MagicMock() + text_block.type = "text" + + response = MagicMock() + response.content = [block1, text_block, block2] + + results = process_tool_calls(log, tool_map, response) + assert len(results) == 2 + assert results[0]["tool_use_id"] == "tu_001" + assert results[1]["tool_use_id"] == "tu_002" + assert counts["search"] == 1 + assert counts["send_email"] == 1 + + def test_recovery_through_anthropic(self): + """Verify sealed results work through the Anthropic middleware.""" + import tempfile, os + from effect_log.middleware.anthropic import effect_logged_tool_executor + + tmpdir = tempfile.mkdtemp() + db = f"sqlite:///{os.path.join(tmpdir, 'test.db')}" + + tools, counts1 = make_tools() + log = make_log(tools, storage=db) + + tool_map = {"send_email": lambda **kw: None} + executor = effect_logged_tool_executor(log, tool_map) + + # First execution + result1 = executor("send_email", {"to": "ceo@co.com"}, "call-001") + assert counts1["send_email"] == 1 + + # Recovery + tools2, counts2 = make_tools() + log2 = EffectLog( + execution_id="test-mw-001", tools=tools2, storage=db, recover=True + ) + + executor2 = effect_logged_tool_executor(log2, tool_map) + result2 = executor2("send_email", {"to": "ceo@co.com"}, "call-001") + + # Sealed — not re-executed + assert counts2["send_email"] == 0 + # Same content + assert result1["content"] == result2["content"] diff --git a/examples/anthropic_integration.py b/examples/anthropic_integration.py new file mode 100644 index 0000000..c666538 --- /dev/null +++ b/examples/anthropic_integration.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +Anthropic Claude API integration example for effect-log. + +This example runs standalone (no API key required). With the anthropic SDK +installed, you can also use the middleware wrappers shown in comments. + + pip install ai-effectlog anthropic +""" + +import os +import tempfile + +from effect_log import EffectKind, EffectLog, ToolDef + +# ── Define tools with semantic classification ──────────────────────────────── + +call_counts = {} + + +def counting(name): + call_counts[name] = 0 + + def fn(args): + call_counts[name] += 1 + return {"tool": name, "args": args, "call": call_counts[name]} + + return fn + + +tools = [ + ToolDef("search_db", EffectKind.ReadOnly, counting("search_db")), + ToolDef("send_email", EffectKind.IrreversibleWrite, counting("send_email")), + ToolDef("upsert_record", EffectKind.IdempotentWrite, counting("upsert_record")), +] + +# ── With Anthropic middleware (requires anthropic SDK): ────────────────────── +# +# from effect_log.middleware.anthropic import ( +# effect_logged_tool_executor, +# make_tooldefs, +# process_tool_calls, +# ) +# +# tool_specs = [ +# {"func": search_db, "effect": EffectKind.ReadOnly}, +# {"func": send_email, "effect": EffectKind.IrreversibleWrite}, +# ] +# tools = make_tooldefs(tool_specs) +# log = EffectLog(execution_id="task-001", tools=tools, storage=db) +# executor = effect_logged_tool_executor(log, {"search_db": search_db, ...}) +# +# # In the Claude API tool_use loop: +# for block in response.content: +# if block.type == "tool_use": +# result = executor(block.name, block.input, block.id) +# +# ───────────────────────────────────────────────────────────────────────────── + +tmpdir = tempfile.mkdtemp() +db = f"sqlite:///{os.path.join(tmpdir, 'effects.db')}" + +print("=== First execution (search + email, then crash before upsert) ===\n") + +log = EffectLog(execution_id="anthropic-demo", tools=tools, storage=db) + +search_result = log.execute("search_db", {"query": "Q4 revenue"}) +print(f" 1. search_db: {search_result}") + +email_result = log.execute( + "send_email", {"to": "ceo@co.com", "subject": "Report", "body": "Q4 looks great"} +) +print(f" 2. send_email: {email_result}") + +print(f"\n *** CRASH *** send_email called {call_counts['send_email']} time(s)\n") + +# Reset counters (simulating new process) +for k in call_counts: + call_counts[k] = 0 + +tools2 = [ + ToolDef("search_db", EffectKind.ReadOnly, counting("search_db")), + ToolDef("send_email", EffectKind.IrreversibleWrite, counting("send_email")), + ToolDef("upsert_record", EffectKind.IdempotentWrite, counting("upsert_record")), +] + +print("=== Recovery ===\n") + +log2 = EffectLog(execution_id="anthropic-demo", tools=tools2, storage=db, recover=True) +log2.execute("search_db", {"query": "Q4 revenue"}) +log2.execute( + "send_email", {"to": "ceo@co.com", "subject": "Report", "body": "Q4 looks great"} +) +log2.execute( + "upsert_record", + {"table": "reports", "data": {"id": "Q4-2024", "status": "sent"}}, +) + +print(f" search_db re-executed: {call_counts['search_db']} (ReadOnly -> replayed)") +print( + f" send_email re-executed: {call_counts['send_email']} (IrreversibleWrite -> SEALED)" +) +print(f" upsert_record re-executed: {call_counts['upsert_record']} (new step)") + +assert call_counts["send_email"] == 0, "send_email should NOT be re-executed!" +print("\n PASS: email not re-sent on recovery") diff --git a/examples/e2e_anthropic.py b/examples/e2e_anthropic.py new file mode 100644 index 0000000..16d4ae4 --- /dev/null +++ b/examples/e2e_anthropic.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +End-to-end Anthropic Claude API example with effect-log. + +Demonstrates the full Claude tool_use loop with crash recovery. +Requires: pip install ai-effectlog anthropic +Requires: ANTHROPIC_API_KEY environment variable + +Usage: + ANTHROPIC_API_KEY=sk-... python examples/e2e_anthropic.py +""" + +import os +import sys +import tempfile + +try: + import anthropic +except ImportError: + print("This example requires the anthropic SDK: pip install anthropic") + sys.exit(1) + +from effect_log import EffectKind, EffectLog +from effect_log.middleware.anthropic import ( + effect_logged_tool_executor, + make_tooldefs, + process_tool_calls, +) + +# ── Define tools ───────────────────────────────────────────────────────────── + +call_counts = {"search_db": 0, "send_email": 0} + + +def search_db(query: str = "", limit: int = 5) -> dict: + """Search the database for records matching the query.""" + call_counts["search_db"] += 1 + return { + "results": [f"Record matching '{query}' #{i}" for i in range(1, limit + 1)], + "total": limit, + } + + +def send_email(to: str = "", subject: str = "", body: str = "") -> dict: + """Send an email to the specified recipient.""" + call_counts["send_email"] += 1 + return {"sent": True, "to": to, "subject": subject, "message_id": "msg-001"} + + +# ── Claude API tool definitions ────────────────────────────────────────────── + +claude_tools = [ + { + "name": "search_db", + "description": "Search the database for records matching a query.", + "input_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query"}, + "limit": { + "type": "integer", + "description": "Max results", + "default": 5, + }, + }, + "required": ["query"], + }, + }, + { + "name": "send_email", + "description": "Send an email to a recipient.", + "input_schema": { + "type": "object", + "properties": { + "to": {"type": "string", "description": "Recipient email"}, + "subject": {"type": "string", "description": "Email subject"}, + "body": {"type": "string", "description": "Email body"}, + }, + "required": ["to", "subject", "body"], + }, + }, +] + +# ── Setup ───────────────────────────────────────────────────────────────────── + +tmpdir = tempfile.mkdtemp() +db = f"sqlite:///{os.path.join(tmpdir, 'effects.db')}" + +tool_map = {"search_db": search_db, "send_email": send_email} +tool_specs = [ + {"func": search_db, "effect": EffectKind.ReadOnly}, + {"func": send_email, "effect": EffectKind.IrreversibleWrite}, +] +tooldefs = make_tooldefs(tool_specs) + +# ── First execution ────────────────────────────────────────────────────────── + +print("=== First execution ===\n") + +log = EffectLog(execution_id="e2e-anthropic", tools=tooldefs, storage=db) +executor = effect_logged_tool_executor(log, tool_map) + +client = anthropic.Anthropic() +messages = [ + { + "role": "user", + "content": "Search for Q4 revenue data, then email the results to ceo@co.com", + } +] + +# Claude API conversation loop +while True: + response = client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=1024, + tools=claude_tools, + messages=messages, + ) + + print(f" Claude stop_reason: {response.stop_reason}") + + if response.stop_reason == "end_turn": + # Extract final text + for block in response.content: + if hasattr(block, "text"): + print(f" Claude: {block.text}") + break + + if response.stop_reason == "tool_use": + # Process tool calls through effect-log + messages.append({"role": "assistant", "content": response.content}) + + tool_results = process_tool_calls(log, tool_map, response) + for tr in tool_results: + print(f" Tool result ({tr['tool_use_id'][:8]}...): {tr['content'][:80]}") + + messages.append({"role": "user", "content": tool_results}) + else: + print(f" Unexpected stop_reason: {response.stop_reason}") + break + +print(f"\n send_email called: {call_counts['send_email']} time(s)") + +# ── Simulate crash and recovery ────────────────────────────────────────────── + +print("\n=== Simulating crash and recovery ===\n") + +# Reset call counts (new process) +call_counts["search_db"] = 0 +call_counts["send_email"] = 0 + +# Create new EffectLog with recovery +tooldefs2 = make_tooldefs(tool_specs) +log2 = EffectLog( + execution_id="e2e-anthropic", tools=tooldefs2, storage=db, recover=True +) +executor2 = effect_logged_tool_executor(log2, tool_map) + +# Re-execute the same tool calls that Claude made +# In a real app, Claude would re-issue these; here we replay them directly +history = log2.history() +print(f" WAL has {len(history)} entries from first execution") + +for entry in history: + tool_name = entry["tool_name"] + # Re-execute through effect-log (will be sealed for IrreversibleWrite) + result = executor2(tool_name, entry.get("input", {}), f"recovery-{tool_name}") + print(f" Recovered {tool_name}: is_error={result.get('is_error', False)}") + +print(f"\n search_db re-executed: {call_counts['search_db']} (ReadOnly -> replayed)") +print( + f" send_email re-executed: {call_counts['send_email']} (IrreversibleWrite -> SEALED)" +) + +assert call_counts["send_email"] == 0, ( + "FAIL: send_email should NOT be re-executed on recovery!" +) +print("\n PASS: IrreversibleWrite was sealed on recovery")