Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ jobs:

- name: Run redis message stream tests
run: pytest tests/unit/agents/test_redis_message_streams.py $GS_FLAG

- name: Run AEGIS unit and integration tests
run: pytest tests/unit/aegis tests/integration/aegis -v --tb=short

- name: Run AEGIS detector F1 benchmarks
run: pytest tests/plugins/pytest_aegis -m aegis -v --tb=short
24 changes: 24 additions & 0 deletions finbot/aegis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ============================================================
# File: finbot/aegis/__init__.py
# Purpose: Public exports for FinBot-AEGIS runtime security layer
# Author: Jean Francois Regis MUKIZA
# GSoC Week: 1
# OWASP Category: ASI01–ASI10 (platform-wide)
# ============================================================
"""FinBot-AEGIS: runtime security layer for OWASP FinBot CTF."""

from finbot.aegis.intent_gate import IntentGate
from finbot.aegis.schemas import PolicyVerdict
from finbot.aegis.sentinel import AuditEvent, SentinelStream
from finbot.aegis.service import AegisEnforcementService
from finbot.aegis.trust_mesh import AttestationResult, TrustMesh

__all__ = [
"AegisEnforcementService",
"AttestationResult",
"AuditEvent",
"IntentGate",
"PolicyVerdict",
"SentinelStream",
"TrustMesh",
]
115 changes: 115 additions & 0 deletions finbot/aegis/intent_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# ============================================================
# File: finbot/aegis/intent_gate.py
# Purpose: Policy-as-code PEP/PDP for pre-execution tool validation
# Author: Jean Francois Regis MUKIZA
# GSoC Week: 3
# OWASP Category: ASI01 Goal Hijack, ASI02 Tool Misuse, ASI05 Unexpected RCE
# ============================================================
"""IntentGate: policy-as-code PEP/PDP for tool hooks."""

import json
import logging
import re
from pathlib import Path

import yaml
from pydantic import ValidationError

from finbot.aegis.schemas import (
PolicyAction,
PolicyDocument,
PolicyVerdict,
ToolInvocationContext,
)
from finbot.config import settings

logger = logging.getLogger(__name__)

_RCE_PATTERNS = (
re.compile(r"\b(curl|wget|nc|bash|sh)\b", re.I),
re.compile(r"/etc/(passwd|shadow)", re.I),
re.compile(r"rm\s+-rf", re.I),
)


class IntentGate:
"""Loads YAML policies and evaluates tool invocations before execution."""

def __init__(self, policy_dir: Path | None = None) -> None:
self._policy_dir = policy_dir or Path(settings.AEGIS_POLICY_DIR)
self._policies: list[PolicyDocument] = []
self.reload()

def reload(self) -> None:
"""Reload all YAML policies from the configured directory."""
self._policies = []
if not self._policy_dir.exists():
logger.warning("AEGIS policy dir missing: %s", self._policy_dir)
return
for path in sorted(self._policy_dir.glob("*.yaml")):
try:
raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
doc = PolicyDocument.model_validate(raw.get("policy", raw))
self._policies.append(doc)
logger.info("Loaded AEGIS policy %s v%s", doc.name, doc.version)
except (ValidationError, yaml.YAMLError) as exc:
logger.error("Invalid policy %s: %s", path, exc)

def evaluate_tool(self, ctx: ToolInvocationContext) -> PolicyVerdict:
"""Return allow/deny/quarantine verdict for a tool invocation."""
for policy in self._policies:
if policy.allowed_tools and ctx.tool_name not in policy.allowed_tools:
if not any(ctx.tool_name.endswith(t) for t in policy.allowed_tools):
return PolicyVerdict(
action=PolicyAction.deny,
reason="tool_not_in_allowlist",
rule_id=policy.name,
asi_tags=["ASI02"],
)

args_blob = json.dumps(ctx.arguments, default=str)
for pat in _RCE_PATTERNS:
if pat.search(args_blob) or (
ctx.tool_description and pat.search(ctx.tool_description)
):
return PolicyVerdict(
action=PolicyAction.deny,
reason="rce_pattern_blocked",
rule_id="builtin_rce",
asi_tags=["ASI05"],
)

for policy in self._policies:
for rule in policy.rules:
if rule.action != PolicyAction.deny:
continue
if rule.condition.startswith("deny_tool:"):
denied = rule.condition.split(":", 1)[1]
if ctx.tool_name == denied or ctx.tool_name.endswith(denied):
return PolicyVerdict(
action=PolicyAction.deny,
reason=rule.reason,
rule_id=rule.id,
asi_tags=["ASI02"],
)
if rule.condition == "cross_namespace_tool":
ns_arg = str(ctx.arguments.get("namespace", ""))
if ns_arg and ns_arg != ctx.namespace:
return PolicyVerdict(
action=PolicyAction.deny,
reason=rule.reason,
rule_id=rule.id,
asi_tags=["ASI03"],
)

for policy in self._policies:
for pattern in policy.denied_patterns:
if re.search(pattern, args_blob, re.I):
return PolicyVerdict(
action=PolicyAction.deny,
reason="denied_pattern_match",
rule_id=policy.name,
asi_tags=["ASI05"],
)

return PolicyVerdict(action=PolicyAction.allow, reason="default_allow")
89 changes: 89 additions & 0 deletions finbot/aegis/sentinel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# ============================================================
# File: finbot/aegis/sentinel.py
# Purpose: Hash-chained HMAC audit trail on Redis via EventBus
# Author: Jean Francois Regis MUKIZA
# GSoC Week: 2
# OWASP Category: ASI06 Memory Poisoning, ASI08 Cascading Failures
# ============================================================
"""SentinelStream: hash-chained forensic audit events on Redis."""

import hashlib
import hmac
import json
import logging
from datetime import UTC, datetime
from typing import Any

from finbot.aegis.schemas import AuditEvent
from finbot.config import settings
from finbot.core.auth.session import SessionContext
from finbot.core.messaging import event_bus

logger = logging.getLogger(__name__)


class SentinelStream:
"""Records tamper-evident audit events with per-namespace hash chains."""

def __init__(self) -> None:
self._chain_key = "aegis:audit:chain_head"
signing_key = settings.SESSION_SIGNING_KEY or settings.SECRET_KEY
self._signing_key = signing_key.encode()

async def record(
self,
*,
event_type: str,
namespace: str,
workflow_id: str,
agent_name: str,
payload: dict[str, Any],
session_context: SessionContext,
) -> AuditEvent:
prev_hash = await self._get_chain_head(namespace)
timestamp = datetime.now(UTC).isoformat()
body = {
"event_type": event_type,
"namespace": namespace,
"workflow_id": workflow_id,
"agent_name": agent_name,
"payload": payload,
"timestamp": timestamp,
"prev_hash": prev_hash,
}
canonical = json.dumps(body, sort_keys=True, separators=(",", ":"))
event_hash = hmac.new(
self._signing_key,
canonical.encode(),
hashlib.sha256,
).hexdigest()
audit = AuditEvent(**body, event_hash=event_hash)
await self._set_chain_head(namespace, event_hash)
await event_bus.emit_agent_event(
agent_name="aegis",
event_type=f"audit.{event_type}",
event_subtype="security",
event_data={**body, "event_hash": event_hash},
session_context=session_context,
workflow_id=workflow_id,
summary=f"AEGIS audit: {event_type}",
)
return audit

async def _get_chain_head(self, namespace: str) -> str | None:
key = f"{self._chain_key}:{namespace}"
try:
val = await event_bus.redis.get(key)
if val is None:
return None
return val.decode() if isinstance(val, bytes) else str(val)
except Exception: # pylint: disable=broad-exception-caught
logger.debug("Could not read AEGIS chain head for %s", namespace, exc_info=True)
return None

async def _set_chain_head(self, namespace: str, digest: str) -> None:
key = f"{self._chain_key}:{namespace}"
try:
await event_bus.redis.set(key, digest, ex=settings.AEGIS_AUDIT_CHAIN_TTL)
except Exception: # pylint: disable=broad-exception-caught
logger.debug("Could not write AEGIS chain head for %s", namespace, exc_info=True)
91 changes: 91 additions & 0 deletions finbot/aegis/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# ============================================================
# File: finbot/aegis/service.py
# Purpose: Orchestrates IntentGate, TrustMesh, and SentinelStream at tool hooks
# Author: Jean Francois Regis MUKIZA
# GSoC Week: 3–4
# OWASP Category: ASI01–ASI02 (enforcement facade)
# ============================================================
"""AegisEnforcementService: orchestrates IntentGate, TrustMesh, SentinelStream."""

import logging
from typing import Any

from finbot.aegis.anomaly import CascadeCircuitBreaker
from finbot.aegis.intent_gate import IntentGate
from finbot.aegis.schemas import (
EnforcementMode,
PolicyAction,
PolicyVerdict,
ToolInvocationContext,
)
from finbot.aegis.sentinel import SentinelStream
from finbot.config import settings
from finbot.core.auth.session import SessionContext

logger = logging.getLogger(__name__)


class AegisEnforcementService:
"""Pre-execution policy enforcement for agent tool invocations."""

def __init__(self, session_context: SessionContext, workflow_id: str) -> None:
self._session = session_context
self._workflow_id = workflow_id
self._intent = IntentGate()
self._sentinel = SentinelStream()
self._circuit = CascadeCircuitBreaker()
self._mode = EnforcementMode(settings.AEGIS_ENFORCEMENT_MODE)

async def before_tool(
self,
*,
agent_name: str,
tool_name: str,
tool_source: str,
arguments: dict[str, Any] | None,
tool_description: str | None = None,
) -> PolicyVerdict:
if await self._circuit.is_tripped(self._session.namespace, self._workflow_id):
verdict = PolicyVerdict(
action=PolicyAction.deny,
reason="cascade_circuit_breaker_tripped",
rule_id="circuit_breaker",
asi_tags=["ASI08"],
)
else:
ctx = ToolInvocationContext(
agent_name=agent_name,
tool_name=tool_name,
tool_source=tool_source,
namespace=self._session.namespace,
user_id=self._session.user_id,
workflow_id=self._workflow_id,
arguments=arguments or {},
tool_description=tool_description,
)
verdict = self._intent.evaluate_tool(ctx)
await self._circuit.record_tool_call(self._session.namespace, self._workflow_id)

await self._sentinel.record(
event_type="policy.before_tool",
namespace=self._session.namespace,
workflow_id=self._workflow_id,
agent_name=agent_name,
payload={"tool": tool_name, "verdict": verdict.model_dump()},
session_context=self._session,
)

if self._mode == EnforcementMode.enforce and verdict.action == PolicyAction.deny:
logger.warning(
"AEGIS denied tool=%s user=%s reason=%s",
tool_name,
self._session.user_id[:8],
verdict.reason,
)
return verdict

def should_block(self, verdict: PolicyVerdict) -> bool:
return (
self._mode == EnforcementMode.enforce
and verdict.action == PolicyAction.deny
)
28 changes: 28 additions & 0 deletions finbot/aegis/simulator/mcp_mocks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# ============================================================
# File: finbot/aegis/simulator/mcp_mocks/__init__.py
# Purpose: Adversarial MCP server mocks for attack simulation
# Author: Jean Francois Regis MUKIZA
# GSoC Week: 4
# OWASP Category: -
# ============================================================
"""Adversarial MCP server mocks for simulating malicious MCP servers.

Provides mock implementations of MCP servers that exhibit various
adversarial behaviors for testing FinBot's defenses.
"""

from finbot.aegis.simulator.mcp_mocks.adversarial import (
AdmServer,
AdviceServer,
DataExfiltrationServer,
FileSystemServer,
ToolPoisoningServer,
)

__all__ = [
"AdmServer",
"AdviceServer",
"DataExfiltrationServer",
"FileSystemServer",
"ToolPoisoningServer",
]
Loading
Loading