-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Fix expired pending approvals lingering after restart #2357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,18 +2,22 @@ | |
| //! | ||
| //! Pending rows survive core restart so a queued approval is not lost | ||
| //! when the user quits before deciding. Each row carries the | ||
| //! `session_id` of the launch that queued it (informational — | ||
| //! `list_pending` returns every undecided row regardless of session | ||
| //! so the UI can audit / dismiss orphans after restart, per the | ||
| //! issue #1339 acceptance criterion). | ||
| //! `session_id` of the launch that queued it (informational only). | ||
| //! `list_pending` returns every undecided row regardless of session so | ||
| //! the UI can audit or dismiss orphans after restart, per the issue | ||
| //! #1339 acceptance criterion. | ||
| //! | ||
| //! Replay safety: a `decide` on an orphan row (process that queued it | ||
| //! is gone) updates the DB but cannot resume the parked future — no | ||
| //! side effect can fire across processes. `purge_session` is a | ||
| //! best-effort cleanup helper kept for an explicit RPC in a follow-up. | ||
| //! is gone) updates the DB but cannot resume the parked future, so no | ||
| //! side effect can fire across processes. | ||
| //! | ||
| //! Durability safety: `expires_at` is enforced in the store. When a | ||
| //! pending row has already expired by the time the store is read again | ||
| //! after a restart, it is lazily transitioned into a terminal state so | ||
| //! stale rows stop showing up as actionable approvals forever. | ||
| //! | ||
| //! Follows the same `with_connection` shape as `notifications/store.rs` | ||
| //! and `cron/store.rs` — synchronous `rusqlite::Connection` opened per | ||
| //! and `cron/store.rs`: synchronous `rusqlite::Connection` opened per | ||
| //! call, schema applied idempotently. | ||
|
|
||
| use anyhow::{Context, Result}; | ||
|
|
@@ -24,7 +28,6 @@ use crate::openhuman::config::Config; | |
|
|
||
| use super::types::{ApprovalAuditEntry, ApprovalDecision, PendingApproval}; | ||
|
|
||
| /// SQL schema applied on every `with_connection` call. | ||
| const SCHEMA: &str = " | ||
| PRAGMA foreign_keys = ON; | ||
|
|
||
|
|
@@ -45,8 +48,6 @@ CREATE INDEX IF NOT EXISTS idx_pending_approvals_session | |
| ON pending_approvals(session_id); | ||
| "; | ||
|
|
||
| /// Open (and migrate) the approval DB, then call `f` with a live | ||
| /// connection. Mirrors `notifications/store.rs::with_connection`. | ||
| fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> { | ||
| let db_path = config.workspace_dir.join("approval").join("approval.db"); | ||
|
|
||
|
|
@@ -77,8 +78,6 @@ fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>) | |
| f(&conn) | ||
| } | ||
|
|
||
| /// Insert a pending row. Caller supplies the `request_id` and | ||
| /// `session_id` so the gate can correlate the parked future. | ||
| pub fn insert_pending(config: &Config, pending: &PendingApproval) -> Result<()> { | ||
| with_connection(config, |conn| { | ||
| let args = serde_json::to_string(&pending.args_redacted) | ||
|
|
@@ -105,18 +104,25 @@ pub fn insert_pending(config: &Config, pending: &PendingApproval) -> Result<()> | |
| }) | ||
| } | ||
|
|
||
| /// List all rows with no `decided_at` (still awaiting user input) | ||
| /// regardless of which launch queued them. Orphan rows (the gate's | ||
| /// in-memory waiter has been dropped — process died between | ||
| /// `intercept` and the user's decision) stay visible so the UI can | ||
| /// audit / dismiss them after restart, satisfying the issue #1339 | ||
| /// acceptance criterion "pending rows survive app restart". | ||
| /// Transition any stale rows into a terminal state so they no longer | ||
| /// appear as actionable pending approvals after restart. | ||
| /// | ||
| /// `decide` on an orphan row updates the DB and returns the row but | ||
| /// the parked tool call is gone — no side effect ever fires, which | ||
| /// matches the security invariant. | ||
| /// We currently reuse `deny` as the persisted terminal value to avoid | ||
| /// widening the externally visible approval decision enum before the | ||
| /// broader durable-audit work lands. This preserves the audit trail | ||
| /// (`decided_at` + `decision`) without leaving expired rows pending | ||
| /// forever. | ||
| pub fn expire_stale(config: &Config) -> Result<usize> { | ||
| with_connection(config, |conn| expire_stale_with_now(conn, Utc::now())) | ||
| } | ||
|
|
||
| /// List all rows that are still awaiting user input, regardless of | ||
| /// which launch queued them. Orphan rows from prior sessions remain | ||
| /// visible until they are explicitly decided or expire. | ||
| pub fn list_pending(config: &Config) -> Result<Vec<PendingApproval>> { | ||
| with_connection(config, |conn| { | ||
| expire_stale_with_now(conn, Utc::now())?; | ||
|
|
||
| let mut stmt = conn | ||
| .prepare( | ||
| "SELECT request_id, tool_name, action_summary, args_redacted, | ||
|
|
@@ -138,14 +144,16 @@ pub fn list_pending(config: &Config) -> Result<Vec<PendingApproval>> { | |
| } | ||
|
|
||
| /// Mark a pending row as decided and return the now-decided row. | ||
| /// Returns `Ok(None)` if no row matched (already decided, expired, | ||
| /// or unknown id). | ||
| /// Returns `Ok(None)` if no row matched (already decided, expired, or | ||
| /// unknown id). | ||
| pub fn decide( | ||
| config: &Config, | ||
| request_id: &str, | ||
| decision: ApprovalDecision, | ||
| ) -> Result<Option<PendingApproval>> { | ||
| with_connection(config, |conn| { | ||
| expire_stale_with_now(conn, Utc::now())?; | ||
|
|
||
| let decision_str = decision.as_str(); | ||
| let now = Utc::now().to_rfc3339(); | ||
| let updated = conn | ||
|
|
@@ -217,6 +225,22 @@ pub fn purge_session(config: &Config, session_id: &str) -> Result<usize> { | |
| }) | ||
| } | ||
|
|
||
| fn expire_stale_with_now(conn: &Connection, now: DateTime<Utc>) -> Result<usize> { | ||
| let now_rfc3339 = now.to_rfc3339(); | ||
| let deny = ApprovalDecision::Deny.as_str(); | ||
| let updated = conn | ||
| .execute( | ||
| "UPDATE pending_approvals | ||
| SET decided_at = ?1, decision = ?2 | ||
| WHERE decided_at IS NULL | ||
| AND expires_at IS NOT NULL | ||
| AND strftime('%s', expires_at) <= strftime('%s', ?3)", | ||
| params![now_rfc3339, deny, now_rfc3339], | ||
| ) | ||
| .context("[approval::store] expire stale rows")?; | ||
| Ok(updated) | ||
| } | ||
|
|
||
| fn row_to_audit_entry(row: &rusqlite::Row<'_>) -> rusqlite::Result<ApprovalAuditEntry> { | ||
| let args_str: String = row.get(3)?; | ||
| let args_redacted: serde_json::Value = serde_json::from_str(&args_str) | ||
|
|
@@ -263,10 +287,10 @@ fn invalid_text_column(column: usize, message: String) -> rusqlite::Error { | |
|
|
||
| fn row_to_pending(row: &rusqlite::Row<'_>) -> rusqlite::Result<PendingApproval> { | ||
| let args_str: String = row.get(3)?; | ||
| let args_redacted: serde_json::Value = serde_json::from_str(&args_str) | ||
| .unwrap_or_else(|_| serde_json::json!({ "_error": "args_redacted not valid JSON" })); | ||
| let args_redacted = serde_json::from_str(&args_str).unwrap_or(serde_json::Value::Null); | ||
| let created_str: String = row.get(5)?; | ||
| let expires_opt: Option<String> = row.get(6)?; | ||
|
|
||
| Ok(PendingApproval { | ||
| request_id: row.get(0)?, | ||
| tool_name: row.get(1)?, | ||
|
|
@@ -302,17 +326,51 @@ mod tests { | |
| } | ||
|
|
||
| fn sample(request_id: &str, session_id: &str) -> PendingApproval { | ||
| sample_with_expiry( | ||
| request_id, | ||
| session_id, | ||
| Some(Utc::now() + Duration::minutes(10)), | ||
| ) | ||
| } | ||
|
|
||
| fn sample_with_expiry( | ||
| request_id: &str, | ||
| session_id: &str, | ||
| expires_at: Option<DateTime<Utc>>, | ||
| ) -> PendingApproval { | ||
| PendingApproval { | ||
| request_id: request_id.to_string(), | ||
| tool_name: "composio".to_string(), | ||
| action_summary: "send slack message (12 chars)".to_string(), | ||
| args_redacted: json!({ "action": "execute", "tool_slug": "SLACK_SEND" }), | ||
| session_id: session_id.to_string(), | ||
| created_at: Utc::now(), | ||
| expires_at: Some(Utc::now() + Duration::minutes(10)), | ||
| expires_at, | ||
| } | ||
| } | ||
|
|
||
| fn fetch_decision_state( | ||
| config: &Config, | ||
| request_id: &str, | ||
| ) -> Option<(Option<String>, Option<String>)> { | ||
| with_connection(config, |conn| { | ||
| let mut stmt = conn | ||
| .prepare("SELECT decided_at, decision FROM pending_approvals WHERE request_id = ?1") | ||
| .context("prepare raw decision lookup")?; | ||
| let mut rows = stmt | ||
| .query(params![request_id]) | ||
| .context("query raw decision lookup")?; | ||
| if let Some(row) = rows.next().context("decision row next")? { | ||
| let decided_at: Option<String> = row.get(0)?; | ||
| let decision: Option<String> = row.get(1)?; | ||
| Ok(Some((decided_at, decision))) | ||
| } else { | ||
| Ok(None) | ||
| } | ||
| }) | ||
| .unwrap() | ||
| } | ||
|
|
||
| #[test] | ||
| fn insert_then_list_returns_pending_row() { | ||
| let (config, _dir) = test_config(); | ||
|
|
@@ -373,7 +431,6 @@ mod tests { | |
| decide(&config, "p2", ApprovalDecision::ApproveOnce).unwrap(); | ||
| let removed = purge_session(&config, "sess-A").unwrap(); | ||
| assert_eq!(removed, 1, "only undecided sess-A row should be purged"); | ||
| // p2 stays because it is decided; sess-B untouched. | ||
| let remaining = list_pending(&config).unwrap(); | ||
| assert_eq!(remaining.len(), 1); | ||
| assert_eq!(remaining[0].request_id, "p3"); | ||
|
|
@@ -383,14 +440,116 @@ mod tests { | |
| fn pending_row_survives_connection_close() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending(&config, &sample("survives", "sess-A")).unwrap(); | ||
| // Each `with_connection` opens a fresh handle — re-reading | ||
| // proves the row persisted to disk (acceptance criterion: | ||
| // pending rows survive app restart). | ||
| let rows = list_pending(&config).unwrap(); | ||
| assert_eq!(rows.len(), 1); | ||
| assert_eq!(rows[0].request_id, "survives"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn list_pending_expires_stale_rows_before_returning() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("expired", "sess-A", Some(Utc::now() - Duration::minutes(5))), | ||
| ) | ||
| .unwrap(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("active", "sess-A", Some(Utc::now() + Duration::minutes(5))), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let rows = list_pending(&config).unwrap(); | ||
| let ids: Vec<_> = rows.into_iter().map(|row| row.request_id).collect(); | ||
| assert_eq!(ids, vec!["active"]); | ||
|
|
||
| let state = fetch_decision_state(&config, "expired").expect("expired row should persist"); | ||
| assert!( | ||
| state.0.is_some(), | ||
| "expired row should have decided_at recorded" | ||
| ); | ||
| assert_eq!(state.1.as_deref(), Some("deny")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn decide_on_expired_row_returns_none_and_keeps_terminal_audit_state() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("late", "sess-A", Some(Utc::now() - Duration::minutes(1))), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let decided = decide(&config, "late", ApprovalDecision::ApproveOnce).unwrap(); | ||
| assert!( | ||
| decided.is_none(), | ||
| "late approvals should no longer be actionable" | ||
| ); | ||
|
|
||
| let state = fetch_decision_state(&config, "late").expect("row should remain for audit"); | ||
| assert!(state.0.is_some()); | ||
| assert_eq!(state.1.as_deref(), Some("deny")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn expire_stale_returns_number_of_rows_transitioned() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("old-1", "sess-A", Some(Utc::now() - Duration::minutes(2))), | ||
| ) | ||
| .unwrap(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("old-2", "sess-B", Some(Utc::now() - Duration::minutes(1))), | ||
| ) | ||
| .unwrap(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("fresh", "sess-B", Some(Utc::now() + Duration::minutes(30))), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let expired = expire_stale(&config).unwrap(); | ||
| assert_eq!(expired, 2); | ||
|
|
||
| let rows = list_pending(&config).unwrap(); | ||
| assert_eq!(rows.len(), 1); | ||
| assert_eq!(rows[0].request_id, "fresh"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn expire_stale_is_idempotent() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending( | ||
| &config, | ||
| &sample_with_expiry("once", "sess-A", Some(Utc::now() - Duration::minutes(3))), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(expire_stale(&config).unwrap(), 1); | ||
| assert_eq!(expire_stale(&config).unwrap(), 0); | ||
|
|
||
| let state = fetch_decision_state(&config, "once").expect("row should remain recorded"); | ||
| assert!(state.0.is_some()); | ||
| assert_eq!(state.1.as_deref(), Some("deny")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn expire_stale_leaves_non_expiring_rows_pending() { | ||
| let (config, _dir) = test_config(); | ||
| insert_pending(&config, &sample_with_expiry("no-ttl", "sess-A", None)).unwrap(); | ||
|
|
||
| assert_eq!(expire_stale(&config).unwrap(), 0); | ||
| let rows = list_pending(&config).unwrap(); | ||
| assert_eq!(rows.len(), 1); | ||
| assert_eq!(rows[0].request_id, "no-ttl"); | ||
|
|
||
| let state = fetch_decision_state(&config, "no-ttl").expect("row should still exist"); | ||
| assert!(state.0.is_none()); | ||
| assert!(state.1.is_none()); | ||
| } | ||
|
Comment on lines
+448
to
+551
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix formatting in assert! macros to pass CI. The pipeline reports 🧰 Tools🪛 GitHub Actions: Type Check / 0_Rust Quality (fmt + clippy).txt[error] 387-387: cargo fmt --all -- --check failed due to formatting differences in assert! with a multi-line condition/message. Rustfmt expects an assert!( ... ) block layout. [error] 401-401: cargo fmt --all -- --check failed due to formatting differences in assert! for decided.is_none(). Rustfmt expects the assert to be formatted across multiple lines. 🪛 GitHub Actions: Type Check / 1_Type Check TypeScript.txt[error] 387-389: Prettier --check failed for openhuman-app format:check. Diff indicates formatting mismatch for multi-line assert!(...) in store.rs. [error] 401-404: Prettier --check failed for openhuman-app format:check. Diff indicates formatting mismatch for multi-line assert!(...) in store.rs. 🪛 GitHub Actions: Type Check / Rust Quality (fmt + clippy)[error] 387-387: cargo fmt --all -- --check failed due to formatting differences in assert!(): assert!(..., "...") should be reformatted to multi-line macro invocation. [error] 401-401: cargo fmt --all -- --check failed due to formatting differences in assert!(): assert! for decided.is_none() should be reformatted to multi-line macro invocation. 🪛 GitHub Actions: Type Check / Type Check TypeScript[error] 387-387: Prettier --check failed: formatting differences detected in store.rs (multi-line assert! call formatting). Run 'prettier --write .' to fix. [error] 401-401: Prettier --check failed: formatting differences detected in store.rs (multi-line assert! for decided.is_none formatting). Run 'prettier --write .' to fix. 🤖 Prompt for AI Agents |
||
|
|
||
| #[test] | ||
| fn list_recent_decisions_returns_durable_audit_rows() { | ||
| let (config, _dir) = test_config(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix formatting to pass CI.
The pipeline reports
cargo fmtfailures in the test helpers. Runcargo fmtto fix the formatting at lines 258 and 285 (chained method calls).🧰 Tools
🪛 GitHub Actions: Type Check / 0_Rust Quality (fmt + clippy).txt
[error] 282-282: cargo fmt --all -- --check failed due to formatting differences in query statement chaining. Rustfmt expects stmt.query(...) to be broken across multiple lines before .context(...).
🪛 GitHub Actions: Type Check / 1_Type Check TypeScript.txt
[error] 282-282: Prettier --check failed for openhuman-app format:check. Diff indicates formatting mismatch near query() call chain in store.rs.
🪛 GitHub Actions: Type Check / Rust Quality (fmt + clippy)
[error] 282-282: cargo fmt --all -- --check failed due to formatting differences in raw SQL query: stmt.query(...) must be reformatted across multiple lines.
🪛 GitHub Actions: Type Check / Type Check TypeScript
[error] 282-282: Prettier --check failed: formatting differences detected in store.rs (stmt.query(...).context(...) chained formatting). Run 'prettier --write .' to fix.
🤖 Prompt for AI Agents