Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python-interpreter/packages/afm-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"platformdirs>=4.0",
"packaging>=24.0",
"rich>=14.3.2",
"pyjwt[crypto]>=2.12.1",
]

[project.scripts]
Expand Down
6 changes: 3 additions & 3 deletions python-interpreter/packages/afm-core/src/afm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ async def health_check() -> dict[str, str]:
webchat_router = create_webchat_router(
agent,
webchat_interface.signature,
webchat_path, # type: ignore[arg-type]
webchat_path, # type: ignore
)
app.include_router(webchat_router)

if webhook_interface is not None:
webhook_router = create_webhook_router(
agent,
webhook_interface,
webhook_path, # type: ignore[arg-type]
webhook_path, # type: ignore
)
app.include_router(webhook_router)
# Store subscriber in app state for verification endpoint
Expand All @@ -210,7 +210,7 @@ async def health_check() -> dict[str, str]:
platform_chat_router = create_platform_chat_router(
agent,
platform_chat_interface,
platform_chat_path, # type: ignore[arg-type]
platform_chat_path, # type: ignore
)
app.include_router(platform_chat_router)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

from __future__ import annotations

import hmac
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar, Mapping, Self

import jwt
from fastapi import HTTPException
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, ConfigDict
from jwt import PyJWKClient
from pydantic import BaseModel, ConfigDict, model_validator

from ._handler import PlatformHandler

Expand All @@ -34,25 +36,166 @@
# Event types that the agent can act on.
_ACTIONABLE_EVENT_TYPES: frozenset[str] = frozenset({"MESSAGE", "ADDED_TO_SPACE"})

# Google Chat service account that issues bearer tokens.
_CHAT_ISSUER = "chat@system.gserviceaccount.com"

# Accepted issuers for Google-signed OIDC ID tokens.
_GOOGLE_OIDC_ISSUERS: frozenset[str] = frozenset(
{"accounts.google.com", "https://accounts.google.com"}
)

# JWKS endpoints used to verify bearer token signatures.
_GOOGLE_OIDC_JWKS_URL = "https://www.googleapis.com/oauth2/v3/certs"
_GOOGLE_SA_JWKS_URL = (
"https://www.googleapis.com/service_accounts/v1/jwk/chat@system.gserviceaccount.com"
)

# JWKS clients are cached per-URL by PyJWKClient (keys are cached in-process).
_jwks_clients: dict[str, PyJWKClient] = {}


class GChatConfig(BaseModel):
model_config = ConfigDict(extra="forbid")

verification_token: str | None = None
project_number: str | int | None = None
endpoint_url: str | None = None

@model_validator(mode="after")
def _exactly_one_audience(self) -> Self:
has_project = self.project_number is not None and str(self.project_number) != ""
has_url = isinstance(self.endpoint_url, str) and self.endpoint_url != ""
if has_project and has_url:
raise ValueError(
"GChat platform_config accepts only one of "
"'project_number' or 'endpoint_url', not both."
)
return self


@dataclass(frozen=True)
class HttpEndpointUrlConfig:
"""Verify bearer tokens as Google-signed OIDC ID tokens.

Used when the Chat app's Authentication Audience is set to HTTP endpoint URL.
The `aud` claim of the incoming JWT must match `endpoint_url`.
"""

endpoint_url: str


@dataclass(frozen=True)
class ProjectNumberConfig:
"""Verify bearer tokens as self-signed JWTs from the Chat service account.

def verify_gchat_request_token(
payload: object,
verification_token: str,
Used when the Chat app's Authentication Audience is set to Project Number.
The `aud` claim of the incoming JWT must match `project_number`.
"""

project_number: str


HttpConfig = HttpEndpointUrlConfig | ProjectNumberConfig


def get_http_config(config: GChatConfig) -> HttpConfig | None:
if isinstance(config.endpoint_url, str) and config.endpoint_url:
return HttpEndpointUrlConfig(endpoint_url=config.endpoint_url)

project_number = config.project_number
if isinstance(project_number, str) and project_number:
return ProjectNumberConfig(project_number=project_number)
if isinstance(project_number, int):
return ProjectNumberConfig(project_number=str(project_number))

return None


def extract_bearer_token(auth_header: str | None) -> str | None:
if not isinstance(auth_header, str):
return None
scheme, sep, token = auth_header.partition(" ")
if not sep or scheme.lower() != "bearer":
return None
return token.strip() or None


def verify_gchat_bearer_token(
auth_header: str | None,
config: HttpConfig,
) -> bool:
if not isinstance(payload, dict):
"""Return True if the bearer token validates, False if it does not.

Only token-validation failures are reported as False. Operational errors
(e.g., JWKS fetch failures) are allowed to propagate so callers can surface
them as 5xx rather than masking outages as 401s.
"""
token = extract_bearer_token(auth_header)
if token is None:
logger.warning(
"GChat verification failed: missing or malformed Authorization header"
)
return False

token = payload.get("token")
if not isinstance(token, str):
if isinstance(config, HttpEndpointUrlConfig):
return _verify_id_token(token, config.endpoint_url)
return _verify_project_number_jwt(token, config.project_number)


def _get_jwks_client(url: str) -> PyJWKClient:
client = _jwks_clients.get(url)
if client is None:
client = PyJWKClient(url, cache_keys=True)
_jwks_clients[url] = client
return client


def _verify_id_token(token: str, expected_audience: str) -> bool:
try:
signing_key = _get_jwks_client(_GOOGLE_OIDC_JWKS_URL).get_signing_key_from_jwt(
token
)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=expected_audience,
issuer=list(_GOOGLE_OIDC_ISSUERS),
options={"require": ["exp", "iat", "iss", "aud"]},
)
except jwt.InvalidTokenError as exc:
logger.warning("GChat ID token validation failed: %s", exc)
return False

return hmac.compare_digest(token, verification_token)
email = payload.get("email")
if email != _CHAT_ISSUER:
logger.warning("GChat ID token has unexpected email claim: %r", email)
return False

if payload.get("email_verified") is not True:
logger.warning("GChat ID token email_verified claim is not true")
return False

return True


def _verify_project_number_jwt(token: str, expected_audience: str) -> bool:
try:
signing_key = _get_jwks_client(_GOOGLE_SA_JWKS_URL).get_signing_key_from_jwt(
token
)
jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=expected_audience,
issuer=_CHAT_ISSUER,
options={"require": ["exp", "iat", "iss", "aud"]},
)
except jwt.InvalidTokenError as exc:
logger.warning("GChat project-number JWT validation failed: %s", exc)
return False

return True


def get_gchat_session_id(payload: object) -> str:
Expand All @@ -66,14 +209,6 @@ def get_gchat_session_id(payload: object) -> str:
if space_name is None:
return "gchat:unknown-space:default"

message = payload.get("message")
if isinstance(message, dict):
thread = message.get("thread")
if isinstance(thread, dict):
thread_name = _non_empty_string(thread.get("name"))
if thread_name:
return f"gchat:{space_name}:{thread_name}"

user = payload.get("user")
if isinstance(user, dict):
user_name = _non_empty_string(user.get("name"))
Expand Down Expand Up @@ -116,10 +251,11 @@ def validate_runtime_config(
if not verify_signatures:
return
config = self.parse_config(interface.platform_config)
if config.verification_token is None:
if get_http_config(config) is None:
raise ValueError(
"GChat platform chat requires "
"platform_config.verification_token when "
"platform_config.project_number or "
"platform_config.endpoint_url when "
"signature verification is enabled."
)

Expand All @@ -128,27 +264,28 @@ def verify_raw_request(
body: bytes,
headers: Mapping[str, str],
interface: PlatformChatInterface,
) -> None:
# GChat verification is on the parsed payload's `token` field.
return

def verify_parsed_payload(
self,
payload: Any,
interface: PlatformChatInterface,
) -> None:
config = self.parse_config(interface.platform_config)
if config.verification_token is None:
http_config = get_http_config(config)
if http_config is None:
raise HTTPException(
status_code=500,
detail="GChat verification token is not configured",
detail="GChat verification audience is not configured",
)
if not verify_gchat_request_token(payload, config.verification_token):
auth_header = headers.get("authorization") or headers.get("Authorization")
if not verify_gchat_bearer_token(auth_header, http_config):
raise HTTPException(
status_code=401,
detail="Invalid GChat verification token",
detail="Invalid GChat bearer token",
)

def verify_parsed_payload(
self,
payload: Any,
interface: PlatformChatInterface,
) -> None:
return

def should_ignore(self, payload: Any) -> bool:
return should_ignore_gchat_event(payload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interfaces:
mode: notification
prompt: "[${http:payload.type}] Reply to ${http:payload.message.text}"
platform_config:
verification_token: "test-verification-token"
project_number: "test-project-number"
exposure:
http:
path: "/gchat"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interfaces:
text:
type: string
platform_config:
verification_token: "test-verification-token"
endpoint_url: "http://example.com/gchat"
exposure:
http:
path: "/gchat"
Expand Down
6 changes: 2 additions & 4 deletions python-interpreter/packages/afm-core/tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ def test_parse_gchat_platform_chat_agent(
assert isinstance(interface, PlatformChatInterface)
assert interface.platform == "gchat"
assert interface.mode == PlatformChatMode.NOTIFICATION
assert interface.platform_config == {
"verification_token": "test-verification-token"
}
assert interface.platform_config == {"project_number": "test-project-number"}
assert interface.has_explicit_output_schema is False
assert interface.exposure is not None
assert interface.exposure.http is not None
Expand Down Expand Up @@ -285,7 +283,7 @@ def test_platform_chat_request_allows_explicit_output_schema(self) -> None:
text:
type: string
platform_config:
verification_token: "secret"
project_number: "1234567890"
exposure:
http:
path: "/gchat"
Expand Down
Loading