Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ Built-in middleware for major agent frameworks:
| **LangGraph** | `effect_log.middleware.langgraph` | `EffectLogToolNode`, `effect_logged_tools` |
| **OpenAI Agents SDK** | `effect_log.middleware.openai_agents` | `effect_logged_agent`, `wrap_function_tool` |
| **CrewAI** | `effect_log.middleware.crewai` | `effect_logged_crew`, `effect_logged_tool` |
| **Pydantic AI** | `effect_log.middleware.pydantic_ai` | `effect_logged_agent`, `EffectLogToolset` |
| **Anthropic Claude API** | `effect_log.middleware.anthropic` | `effect_logged_tool_executor`, `process_tool_calls` |

See [`examples/`](examples/) for runnable demos:

- [`crash_recovery.py`](examples/crash_recovery.py) — Core crash recovery demo (Phase 1 milestone)
- [`langgraph_integration.py`](examples/langgraph_integration.py) — LangGraph ToolNode + tool wrapping
- [`openai_agents_integration.py`](examples/openai_agents_integration.py) — OpenAI Agents SDK wrapping
- [`crewai_integration.py`](examples/crewai_integration.py) — CrewAI tool + crew wrapping
- [`pydantic_ai_integration.py`](examples/pydantic_ai_integration.py) — Pydantic AI toolset wrapping
- [`anthropic_integration.py`](examples/anthropic_integration.py) — Anthropic Claude API tool_use

## How It Works

Expand Down Expand Up @@ -89,7 +93,7 @@ pytest tests/ -v

- [x] Core library — WAL engine, recovery engine, SQLite + in-memory backends
- [x] Python bindings — PyO3 + maturin
- [x] Framework middleware — LangGraph, OpenAI Agents SDK, CrewAI
- [x] Framework middleware — LangGraph, OpenAI Agents SDK, CrewAI, Pydantic AI, Anthropic Claude API
- [ ] TypeScript bindings — napi-rs, Vercel AI SDK
- [ ] Additional backends — RocksDB, S3, Restate journal
- [ ] Auto-classification — infer effect kind from HTTP methods / API metadata
Expand Down
7 changes: 7 additions & 0 deletions bindings/python/python/effect_log/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@
Each middleware module provides integration with a specific agent framework.
Framework dependencies are optional — import errors are raised only when
you actually try to use the middleware.

Available middleware:
- effect_log.middleware.langgraph — LangGraph / LangChain
- effect_log.middleware.openai_agents — OpenAI Agents SDK
- effect_log.middleware.crewai — CrewAI
- effect_log.middleware.pydantic_ai — Pydantic AI
- effect_log.middleware.anthropic — Anthropic Claude API
"""
144 changes: 144 additions & 0 deletions bindings/python/python/effect_log/middleware/anthropic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Anthropic Claude API middleware for effect-log.

Wraps the tool execution loop for Claude's tool_use pattern so every tool
invocation goes through the effect-log WAL, gaining crash recovery and
duplicate prevention.

Usage:
import anthropic
from effect_log import EffectLog, EffectKind, ToolDef
from effect_log.middleware.anthropic import effect_logged_tool_executor, make_tooldefs

tool_specs = [
{"func": search_db, "effect": EffectKind.ReadOnly},
{"func": send_email, "effect": EffectKind.IrreversibleWrite},
]
tools = make_tooldefs(tool_specs)
log = EffectLog(execution_id="task-001", tools=tools, storage="sqlite:///effects.db")

executor = effect_logged_tool_executor(log, {
"search_db": search_db,
"send_email": send_email,
})

# In the Claude API loop:
for block in response.content:
if block.type == "tool_use":
result = executor(block.name, block.input, block.id)
"""

from __future__ import annotations

import json
from typing import Any, Callable


def _ensure_anthropic():
try:
import anthropic # noqa: F401
except ImportError:
raise ImportError(
"Anthropic middleware requires the anthropic SDK. "
"Install it with: pip install anthropic"
)


def make_tooldefs(tool_specs):
"""Create ToolDef entries from raw functions for Anthropic tool_use.

Anthropic's tool_use pattern uses raw functions, so this helper takes
the same functions and produces EffectLog ToolDefs.

Args:
tool_specs: List of dicts with keys:
- "func": A raw callable
- "effect": The EffectKind for this tool

Returns:
List of ToolDef instances ready for EffectLog construction.
"""
from effect_log import ToolDef

defs = []
for spec in tool_specs:
fn, effect = spec["func"], spec["effect"]

def adapted(args, _fn=fn):
return _fn(**args)

defs.append(ToolDef(fn.__name__, effect, adapted))
return defs


def effect_logged_tool_executor(
log: Any,
tool_map: dict[str, Callable],
tool_effects: dict[str, Any] | None = None,
) -> Callable:
"""Create a tool executor that routes Claude tool_use calls through effect-log.

Returns a callable that takes (tool_name, tool_input, tool_use_id) and
returns a tool_result content block dict suitable for passing back to
the Claude API.

Args:
log: An initialized EffectLog instance (tools must already be registered).
tool_map: Dict mapping tool name -> callable. Used for documentation;
actual execution goes through the EffectLog.
tool_effects: Optional dict mapping tool name -> EffectKind.
For documentation only; tools must already be registered.

Returns:
A callable: (tool_name, tool_input, tool_use_id) -> tool_result dict.
"""

def execute(tool_name: str, tool_input: dict, tool_use_id: str) -> dict:
"""Execute a tool through effect-log and return a tool_result block."""
if tool_name not in tool_map:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps({"error": f"Unknown tool: {tool_name}"}),
"is_error": True,
}

args = tool_input if isinstance(tool_input, dict) else {"input": tool_input}
result = log.execute(tool_name, args)
content = json.dumps(result) if not isinstance(result, str) else result

return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": content,
}

return execute


def process_tool_calls(
log: Any,
tool_map: dict[str, Callable],
response: Any,
) -> list[dict]:
"""Process all tool_use blocks from a Claude API response.

Iterates over the response content, executes each tool_use block through
effect-log, and returns the list of tool_result blocks to send back.

Args:
log: An initialized EffectLog instance.
tool_map: Dict mapping tool name -> callable.
response: A Claude API response (Message object with .content).

Returns:
List of tool_result dicts, one per tool_use block in the response.
"""
executor = effect_logged_tool_executor(log, tool_map)
results = []

for block in response.content:
if getattr(block, "type", None) == "tool_use":
result = executor(block.name, block.input, block.id)
results.append(result)

return results
14 changes: 4 additions & 10 deletions bindings/python/python/effect_log/middleware/openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,17 @@ def wrap_function_tool(log: Any, tool: Any, effect_kind: Any = None) -> Any:
_ensure_openai_agents()
from agents import FunctionTool

original_invoke = tool.on_invoke_tool

async def effect_logged_invoke(ctx, args_json: str) -> Any:
tool_name = tool.name
try:
args = json.loads(args_json) if isinstance(args_json, str) else args_json
except (json.JSONDecodeError, TypeError):
args = {"raw_input": str(args_json)}

try:
result = log.execute(
tool_name, args if isinstance(args, dict) else {"input": args}
)
return json.dumps(result) if not isinstance(result, str) else result
except Exception:
# Fall back to original if effect-log fails (e.g., tool not registered)
return await original_invoke(ctx, args_json)
result = log.execute(
tool_name, args if isinstance(args, dict) else {"input": args}
)
return json.dumps(result) if not isinstance(result, str) else result

return FunctionTool(
name=tool.name,
Expand Down
170 changes: 167 additions & 3 deletions bindings/python/tests/test_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ def test_wrap_function_tool(self):
assert wrapped.name == "search"
assert wrapped.description == "Search the web"

@pytest.mark.asyncio
async def test_wrapped_tool_invocation(self):
def test_wrapped_tool_invocation(self):
import asyncio
from effect_log.middleware.openai_agents import wrap_function_tool

tools, counts = make_tools()
Expand All @@ -321,7 +321,9 @@ async def test_wrapped_tool_invocation(self):
)

wrapped = wrap_function_tool(log, original)
result = await wrapped.on_invoke_tool(None, '{"query": "test"}')
result = asyncio.get_event_loop().run_until_complete(
wrapped.on_invoke_tool(None, '{"query": "test"}')
)

parsed = json.loads(result)
assert "results" in parsed
Expand Down Expand Up @@ -731,3 +733,165 @@ def test_recovery_through_pydantic_ai(self):
assert counts2["send_email"] == 0
# Same result
assert result1 == result2


# ── Anthropic Claude API Middleware Tests ─────────────────────────────────────


def _mock_anthropic():
"""Install mock anthropic module."""
anthropic_mod = types.ModuleType("anthropic")
sys.modules["anthropic"] = anthropic_mod
return anthropic_mod


class TestAnthropicMiddleware:
def setup_method(self):
self.anthropic_mod = _mock_anthropic()

def teardown_method(self):
sys.modules.pop("anthropic", None)
sys.modules.pop("effect_log.middleware.anthropic", None)

def test_make_tooldefs(self):
"""make_tooldefs creates ToolDefs from raw functions."""
from effect_log.middleware.anthropic import make_tooldefs

call_log = []

def search_db(query: str = "") -> str:
call_log.append(("search", query))
return f"results for {query}"

def send_email(to: str = "", subject: str = "") -> str:
call_log.append(("email", to))
return f"sent to {to}"

defs = make_tooldefs(
[
{"func": search_db, "effect": EffectKind.ReadOnly},
{"func": send_email, "effect": EffectKind.IrreversibleWrite},
]
)

assert len(defs) == 2
log = make_log(defs)
log.execute("search_db", {"query": "revenue"})
assert call_log == [("search", "revenue")]

log.execute("send_email", {"to": "ceo@co.com", "subject": "Report"})
assert call_log == [("search", "revenue"), ("email", "ceo@co.com")]

def test_executor_normal_execution(self):
"""Executor routes tool calls through effect-log."""
from effect_log.middleware.anthropic import effect_logged_tool_executor

tools, counts = make_tools()
log = make_log(tools)

tool_map = {"search": lambda **kw: None, "send_email": lambda **kw: None}
executor = effect_logged_tool_executor(log, tool_map)

result = executor("search", {"query": "test"}, "call-001")
assert result["type"] == "tool_result"
assert result["tool_use_id"] == "call-001"
assert "is_error" not in result
parsed = json.loads(result["content"])
assert "results" in parsed
assert counts["search"] == 1

def test_executor_unknown_tool(self):
"""Executor returns error for unknown tools."""
from effect_log.middleware.anthropic import effect_logged_tool_executor

tools, counts = make_tools()
log = make_log(tools)

executor = effect_logged_tool_executor(log, {"search": lambda **kw: None})

result = executor("nonexistent", {"query": "test"}, "call-002")
assert result["is_error"] is True
assert "Unknown tool" in result["content"]

def test_executor_irreversible_write(self):
"""Executor handles IrreversibleWrite tools."""
from effect_log.middleware.anthropic import effect_logged_tool_executor

tools, counts = make_tools()
log = make_log(tools)

tool_map = {"send_email": lambda **kw: None}
executor = effect_logged_tool_executor(log, tool_map)

result = executor("send_email", {"to": "ceo@co.com"}, "call-003")
assert "is_error" not in result
parsed = json.loads(result["content"])
assert parsed["sent"] is True
assert counts["send_email"] == 1

def test_process_tool_calls(self):
"""process_tool_calls handles a full response."""
from effect_log.middleware.anthropic import process_tool_calls

tools, counts = make_tools()
log = make_log(tools)

tool_map = {"search": lambda **kw: None, "send_email": lambda **kw: None}

# Mock a Claude response with tool_use blocks
block1 = MagicMock()
block1.type = "tool_use"
block1.name = "search"
block1.input = {"query": "revenue"}
block1.id = "tu_001"

block2 = MagicMock()
block2.type = "tool_use"
block2.name = "send_email"
block2.input = {"to": "ceo@co.com"}
block2.id = "tu_002"

text_block = MagicMock()
text_block.type = "text"

response = MagicMock()
response.content = [block1, text_block, block2]

results = process_tool_calls(log, tool_map, response)
assert len(results) == 2
assert results[0]["tool_use_id"] == "tu_001"
assert results[1]["tool_use_id"] == "tu_002"
assert counts["search"] == 1
assert counts["send_email"] == 1

def test_recovery_through_anthropic(self):
"""Verify sealed results work through the Anthropic middleware."""
import tempfile, os
from effect_log.middleware.anthropic import effect_logged_tool_executor

tmpdir = tempfile.mkdtemp()
db = f"sqlite:///{os.path.join(tmpdir, 'test.db')}"

tools, counts1 = make_tools()
log = make_log(tools, storage=db)

tool_map = {"send_email": lambda **kw: None}
executor = effect_logged_tool_executor(log, tool_map)

# First execution
result1 = executor("send_email", {"to": "ceo@co.com"}, "call-001")
assert counts1["send_email"] == 1

# Recovery
tools2, counts2 = make_tools()
log2 = EffectLog(
execution_id="test-mw-001", tools=tools2, storage=db, recover=True
)

executor2 = effect_logged_tool_executor(log2, tool_map)
result2 = executor2("send_email", {"to": "ceo@co.com"}, "call-001")

# Sealed — not re-executed
assert counts2["send_email"] == 0
# Same content
assert result1["content"] == result2["content"]
Loading
Loading