diff --git a/src/openhuman/approval/store.rs b/src/openhuman/approval/store.rs index da60372a39..0696f8c7bc 100644 --- a/src/openhuman/approval/store.rs +++ b/src/openhuman/approval/store.rs @@ -2,18 +2,22 @@ //! //! Pending rows survive core restart so a queued approval is not lost //! when the user quits before deciding. Each row carries the -//! `session_id` of the launch that queued it (informational — -//! `list_pending` returns every undecided row regardless of session -//! so the UI can audit / dismiss orphans after restart, per the -//! issue #1339 acceptance criterion). +//! `session_id` of the launch that queued it (informational only). +//! `list_pending` returns every undecided row regardless of session so +//! the UI can audit or dismiss orphans after restart, per the issue +//! #1339 acceptance criterion. //! //! Replay safety: a `decide` on an orphan row (process that queued it -//! is gone) updates the DB but cannot resume the parked future — no -//! side effect can fire across processes. `purge_session` is a -//! best-effort cleanup helper kept for an explicit RPC in a follow-up. +//! is gone) updates the DB but cannot resume the parked future, so no +//! side effect can fire across processes. +//! +//! Durability safety: `expires_at` is enforced in the store. When a +//! pending row has already expired by the time the store is read again +//! after a restart, it is lazily transitioned into a terminal state so +//! stale rows stop showing up as actionable approvals forever. //! //! Follows the same `with_connection` shape as `notifications/store.rs` -//! and `cron/store.rs` — synchronous `rusqlite::Connection` opened per +//! and `cron/store.rs`: synchronous `rusqlite::Connection` opened per //! call, schema applied idempotently. use anyhow::{Context, Result}; @@ -24,7 +28,6 @@ use crate::openhuman::config::Config; use super::types::{ApprovalAuditEntry, ApprovalDecision, PendingApproval}; -/// SQL schema applied on every `with_connection` call. const SCHEMA: &str = " PRAGMA foreign_keys = ON; @@ -45,8 +48,6 @@ CREATE INDEX IF NOT EXISTS idx_pending_approvals_session ON pending_approvals(session_id); "; -/// Open (and migrate) the approval DB, then call `f` with a live -/// connection. Mirrors `notifications/store.rs::with_connection`. fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) -> Result { let db_path = config.workspace_dir.join("approval").join("approval.db"); @@ -77,8 +78,6 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) f(&conn) } -/// Insert a pending row. Caller supplies the `request_id` and -/// `session_id` so the gate can correlate the parked future. pub fn insert_pending(config: &Config, pending: &PendingApproval) -> Result<()> { with_connection(config, |conn| { let args = serde_json::to_string(&pending.args_redacted) @@ -105,18 +104,25 @@ pub fn insert_pending(config: &Config, pending: &PendingApproval) -> Result<()> }) } -/// List all rows with no `decided_at` (still awaiting user input) -/// regardless of which launch queued them. Orphan rows (the gate's -/// in-memory waiter has been dropped — process died between -/// `intercept` and the user's decision) stay visible so the UI can -/// audit / dismiss them after restart, satisfying the issue #1339 -/// acceptance criterion "pending rows survive app restart". +/// Transition any stale rows into a terminal state so they no longer +/// appear as actionable pending approvals after restart. /// -/// `decide` on an orphan row updates the DB and returns the row but -/// the parked tool call is gone — no side effect ever fires, which -/// matches the security invariant. +/// We currently reuse `deny` as the persisted terminal value to avoid +/// widening the externally visible approval decision enum before the +/// broader durable-audit work lands. This preserves the audit trail +/// (`decided_at` + `decision`) without leaving expired rows pending +/// forever. +pub fn expire_stale(config: &Config) -> Result { + with_connection(config, |conn| expire_stale_with_now(conn, Utc::now())) +} + +/// List all rows that are still awaiting user input, regardless of +/// which launch queued them. Orphan rows from prior sessions remain +/// visible until they are explicitly decided or expire. pub fn list_pending(config: &Config) -> Result> { with_connection(config, |conn| { + expire_stale_with_now(conn, Utc::now())?; + let mut stmt = conn .prepare( "SELECT request_id, tool_name, action_summary, args_redacted, @@ -138,14 +144,16 @@ pub fn list_pending(config: &Config) -> Result> { } /// Mark a pending row as decided and return the now-decided row. -/// Returns `Ok(None)` if no row matched (already decided, expired, -/// or unknown id). +/// Returns `Ok(None)` if no row matched (already decided, expired, or +/// unknown id). pub fn decide( config: &Config, request_id: &str, decision: ApprovalDecision, ) -> Result> { with_connection(config, |conn| { + expire_stale_with_now(conn, Utc::now())?; + let decision_str = decision.as_str(); let now = Utc::now().to_rfc3339(); let updated = conn @@ -217,6 +225,22 @@ pub fn purge_session(config: &Config, session_id: &str) -> Result { }) } +fn expire_stale_with_now(conn: &Connection, now: DateTime) -> Result { + let now_rfc3339 = now.to_rfc3339(); + let deny = ApprovalDecision::Deny.as_str(); + let updated = conn + .execute( + "UPDATE pending_approvals + SET decided_at = ?1, decision = ?2 + WHERE decided_at IS NULL + AND expires_at IS NOT NULL + AND strftime('%s', expires_at) <= strftime('%s', ?3)", + params![now_rfc3339, deny, now_rfc3339], + ) + .context("[approval::store] expire stale rows")?; + Ok(updated) +} + fn row_to_audit_entry(row: &rusqlite::Row<'_>) -> rusqlite::Result { let args_str: String = row.get(3)?; let args_redacted: serde_json::Value = serde_json::from_str(&args_str) @@ -263,10 +287,10 @@ fn invalid_text_column(column: usize, message: String) -> rusqlite::Error { fn row_to_pending(row: &rusqlite::Row<'_>) -> rusqlite::Result { let args_str: String = row.get(3)?; - let args_redacted: serde_json::Value = serde_json::from_str(&args_str) - .unwrap_or_else(|_| serde_json::json!({ "_error": "args_redacted not valid JSON" })); + let args_redacted = serde_json::from_str(&args_str).unwrap_or(serde_json::Value::Null); let created_str: String = row.get(5)?; let expires_opt: Option = row.get(6)?; + Ok(PendingApproval { request_id: row.get(0)?, tool_name: row.get(1)?, @@ -302,6 +326,18 @@ mod tests { } fn sample(request_id: &str, session_id: &str) -> PendingApproval { + sample_with_expiry( + request_id, + session_id, + Some(Utc::now() + Duration::minutes(10)), + ) + } + + fn sample_with_expiry( + request_id: &str, + session_id: &str, + expires_at: Option>, + ) -> PendingApproval { PendingApproval { request_id: request_id.to_string(), tool_name: "composio".to_string(), @@ -309,10 +345,32 @@ mod tests { args_redacted: json!({ "action": "execute", "tool_slug": "SLACK_SEND" }), session_id: session_id.to_string(), created_at: Utc::now(), - expires_at: Some(Utc::now() + Duration::minutes(10)), + expires_at, } } + fn fetch_decision_state( + config: &Config, + request_id: &str, + ) -> Option<(Option, Option)> { + with_connection(config, |conn| { + let mut stmt = conn + .prepare("SELECT decided_at, decision FROM pending_approvals WHERE request_id = ?1") + .context("prepare raw decision lookup")?; + let mut rows = stmt + .query(params![request_id]) + .context("query raw decision lookup")?; + if let Some(row) = rows.next().context("decision row next")? { + let decided_at: Option = row.get(0)?; + let decision: Option = row.get(1)?; + Ok(Some((decided_at, decision))) + } else { + Ok(None) + } + }) + .unwrap() + } + #[test] fn insert_then_list_returns_pending_row() { let (config, _dir) = test_config(); @@ -373,7 +431,6 @@ mod tests { decide(&config, "p2", ApprovalDecision::ApproveOnce).unwrap(); let removed = purge_session(&config, "sess-A").unwrap(); assert_eq!(removed, 1, "only undecided sess-A row should be purged"); - // p2 stays because it is decided; sess-B untouched. let remaining = list_pending(&config).unwrap(); assert_eq!(remaining.len(), 1); assert_eq!(remaining[0].request_id, "p3"); @@ -383,14 +440,116 @@ mod tests { fn pending_row_survives_connection_close() { let (config, _dir) = test_config(); insert_pending(&config, &sample("survives", "sess-A")).unwrap(); - // Each `with_connection` opens a fresh handle — re-reading - // proves the row persisted to disk (acceptance criterion: - // pending rows survive app restart). let rows = list_pending(&config).unwrap(); assert_eq!(rows.len(), 1); assert_eq!(rows[0].request_id, "survives"); } + #[test] + fn list_pending_expires_stale_rows_before_returning() { + let (config, _dir) = test_config(); + insert_pending( + &config, + &sample_with_expiry("expired", "sess-A", Some(Utc::now() - Duration::minutes(5))), + ) + .unwrap(); + insert_pending( + &config, + &sample_with_expiry("active", "sess-A", Some(Utc::now() + Duration::minutes(5))), + ) + .unwrap(); + + let rows = list_pending(&config).unwrap(); + let ids: Vec<_> = rows.into_iter().map(|row| row.request_id).collect(); + assert_eq!(ids, vec!["active"]); + + let state = fetch_decision_state(&config, "expired").expect("expired row should persist"); + assert!( + state.0.is_some(), + "expired row should have decided_at recorded" + ); + assert_eq!(state.1.as_deref(), Some("deny")); + } + + #[test] + fn decide_on_expired_row_returns_none_and_keeps_terminal_audit_state() { + let (config, _dir) = test_config(); + insert_pending( + &config, + &sample_with_expiry("late", "sess-A", Some(Utc::now() - Duration::minutes(1))), + ) + .unwrap(); + + let decided = decide(&config, "late", ApprovalDecision::ApproveOnce).unwrap(); + assert!( + decided.is_none(), + "late approvals should no longer be actionable" + ); + + let state = fetch_decision_state(&config, "late").expect("row should remain for audit"); + assert!(state.0.is_some()); + assert_eq!(state.1.as_deref(), Some("deny")); + } + + #[test] + fn expire_stale_returns_number_of_rows_transitioned() { + let (config, _dir) = test_config(); + insert_pending( + &config, + &sample_with_expiry("old-1", "sess-A", Some(Utc::now() - Duration::minutes(2))), + ) + .unwrap(); + insert_pending( + &config, + &sample_with_expiry("old-2", "sess-B", Some(Utc::now() - Duration::minutes(1))), + ) + .unwrap(); + insert_pending( + &config, + &sample_with_expiry("fresh", "sess-B", Some(Utc::now() + Duration::minutes(30))), + ) + .unwrap(); + + let expired = expire_stale(&config).unwrap(); + assert_eq!(expired, 2); + + let rows = list_pending(&config).unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].request_id, "fresh"); + } + + #[test] + fn expire_stale_is_idempotent() { + let (config, _dir) = test_config(); + insert_pending( + &config, + &sample_with_expiry("once", "sess-A", Some(Utc::now() - Duration::minutes(3))), + ) + .unwrap(); + + assert_eq!(expire_stale(&config).unwrap(), 1); + assert_eq!(expire_stale(&config).unwrap(), 0); + + let state = fetch_decision_state(&config, "once").expect("row should remain recorded"); + assert!(state.0.is_some()); + assert_eq!(state.1.as_deref(), Some("deny")); + } + + #[test] + fn expire_stale_leaves_non_expiring_rows_pending() { + let (config, _dir) = test_config(); + insert_pending(&config, &sample_with_expiry("no-ttl", "sess-A", None)).unwrap(); + + assert_eq!(expire_stale(&config).unwrap(), 0); + let rows = list_pending(&config).unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].request_id, "no-ttl"); + + let state = fetch_decision_state(&config, "no-ttl").expect("row should still exist"); + assert!(state.0.is_none()); + assert!(state.1.is_none()); + } + #[test] fn list_recent_decisions_returns_durable_audit_rows() { let (config, _dir) = test_config();