Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion src/openhuman/cron/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,55 @@ pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
execute_job_with_retry(config, &security, job).await
}

/// Did this failed agent-job attempt hit the backend session-expired state?
///
/// When the OpenHuman backend returns 401 because the user's app JWT has
/// lapsed, [`inference::provider::ops::api_error`] already publishes
/// [`crate::core::event_bus::DomainEvent::SessionExpired`] (via
/// `publish_backend_session_expired`) and the credentials subscriber clears
/// the stored session + flips the scheduler-gate `signed_out` override. The
/// gate then halts downstream LLM work until the user re-auths.
///
/// The cron retry loop pre-dates that gate handshake: it sleeps with
/// exponential backoff and retries the same job N times, every attempt
/// hitting the same global 401, then calls `report_error` with
/// `failure=retries_exhausted`. That generated TAURI-RUST-N (7,038 events /
/// 5 users): a cron-fired `morning_briefing` agent grinding through retries
/// after a single JWT lapse, every retries-exhausted capture pointing at a
/// problem the user can only fix from the UI.
///
/// The right move is the same halt-on-first-occurrence pattern as
/// `agent::harness::tool_loop::BACKEND_USER_STATE_MARKER` (#3334): the
/// condition is global and retries can't recover it, so we stop after the
/// first attempt. Skipping the `report_error` call too is correct because
/// the existing classifier
/// [`crate::core::observability::is_session_expired_message`] already
/// considers this expected user state (`observability.rs` — anchored on
/// `OpenHuman API error (401` + `"error":"Invalid token"`).
///
/// We match on `last_agent_error` first because cron's `run_agent_job`
/// routes the raw anyhow chain there (containing the provider's wire
/// message), while `last_output` only carries the canned user-facing
/// notification (`AGENT_JOB_USER_FAILURE_MESSAGE` / per-variant copy). For
/// the canned-message branch we still fall back to `last_output` so a
/// future code path that surfaces the raw error there isn't a silent miss.
///
/// Restricted to `JobType::Agent`: shell jobs that happen to echo a
/// 401-shaped string don't go through the inference layer's
/// `SessionExpired` publish, so halting them based on stdout would skip
/// retries the operator may want.
fn is_session_expired_failure(
job_type: &JobType,
last_agent_error: Option<&str>,
last_output: &str,
) -> bool {
if !matches!(job_type, JobType::Agent) {
return false;
}
let signal = last_agent_error.unwrap_or(last_output);
crate::core::observability::is_session_expired_message(signal)
}

async fn execute_job_with_retry(
config: &Config,
security: &SecurityPolicy,
Expand All @@ -201,6 +250,7 @@ async fn execute_job_with_retry(
let mut last_agent_error: Option<String> = None;
let retries = config.reliability.scheduler_retries;
let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
let mut session_expired = false;

for attempt in 0..=retries {
let (success, output, agent_error) = match job.job_type {
Expand All @@ -224,14 +274,28 @@ async fn execute_job_with_retry(
return (false, last_output);
}

if is_session_expired_failure(
&job.job_type,
last_agent_error.as_deref(),
last_output.as_str(),
) {
// Halt on the first occurrence — the inference layer already
// published `SessionExpired`, retries cannot recover until the
// user re-auths, and the classifier considers this expected
// user state (TAURI-RUST-N). See `is_session_expired_failure`
// for the full rationale.
session_expired = true;
break;
}

if attempt < retries {
let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
}
}

if matches!(job.job_type, JobType::Agent) {
if matches!(job.job_type, JobType::Agent) && !session_expired {
let report_message = last_agent_error
.as_deref()
.unwrap_or_else(|| last_output.as_str());
Expand Down
81 changes: 81 additions & 0 deletions src/openhuman/cron/scheduler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,87 @@ async fn execute_job_with_retry_exhausts_attempts() {
assert!(output.contains("always_missing_for_retry_test"));
}

// TAURI-RUST-N — backend 401 ("Invalid token") leaks from a cron-fired agent
// job through `last_agent_error` and the existing classifier in
// `core::observability::is_session_expired_message` matches it (the
// `OpenHuman API error (401` + `"error":"Invalid token"` conjunction was added
// for OPENHUMAN-TAURI-4P0). `is_session_expired_failure` MUST consult that
// classifier so the cron retry loop halts on the first occurrence instead of
// retrying N times and reporting `failure=retries_exhausted` to Sentry.
#[test]
fn is_session_expired_failure_matches_openhuman_backend_401_in_agent_error() {
let wire =
r#"OpenHuman API error (401 Unauthorized): {"success":false,"error":"Invalid token"}"#;
assert!(
is_session_expired_failure(&JobType::Agent, Some(wire), AGENT_JOB_USER_FAILURE_MESSAGE),
"raw agent error carrying the 401 wire shape must trip the halt"
);
}

// Defense-in-depth: if a future code path ever surfaces the raw error in
// `last_output` instead of `last_agent_error` (currently `run_agent_job`
// keeps the canned user message in `last_output`), the predicate should
// still classify. Falling back to `last_output` when `last_agent_error` is
// `None` is what guards against that silent-miss case.
#[test]
fn is_session_expired_failure_matches_when_only_output_carries_signal() {
let wire =
r#"OpenHuman API error (401 Unauthorized): {"success":false,"error":"Invalid token"}"#;
assert!(is_session_expired_failure(&JobType::Agent, None, wire));
}

// Negative guard: the canned user-facing message that `run_agent_job`
// routes into `last_output` today carries no session signal. The predicate
// must NOT trip on it — otherwise every generic agent failure (provider
// keys missing, tool error, network blip) would halt after one attempt and
// stop reporting to Sentry, defeating the retry semantics for non-401
// failures.
#[test]
fn is_session_expired_failure_does_not_match_canned_user_message() {
assert!(!is_session_expired_failure(
&JobType::Agent,
Some(AGENT_JOB_USER_FAILURE_MESSAGE),
AGENT_JOB_USER_FAILURE_MESSAGE,
));
}

// Negative guard: ordinary provider-error wire text (e.g. a third-party
// model rejecting a request as 400 / 500 / 429) must not be misclassified
// as session expiry. Those failures are exactly what the retry loop +
// `failure=retries_exhausted` capture exist for.
#[test]
fn is_session_expired_failure_does_not_match_ordinary_provider_error() {
let wire =
r#"OpenHuman API error (500 Internal Server Error): {"error":"Internal server error"}"#;
assert!(!is_session_expired_failure(&JobType::Agent, Some(wire), ""));

let byo_key = r#"OpenAI API error (401 Unauthorized): {"error":{"message":"Invalid API key","type":"invalid_request_error"}}"#;
assert!(
!is_session_expired_failure(&JobType::Agent, Some(byo_key), ""),
"third-party BYO-key 401 is actionable (user misconfigured their key) — must NOT classify as backend session expiry"
);
}

// Scope guard: the halt is restricted to `JobType::Agent` because the
// `SessionExpired` publish + scheduler-gate handshake only fires from the
// inference layer. A shell job that happens to echo the 401-shaped string
// (e.g. an operator's curl wrapper printing the backend response verbatim)
// MUST keep its existing retry semantics — the operator may want those
// retries, and the gate has no reason to be flipped from a shell exit.
#[test]
fn is_session_expired_failure_does_not_halt_shell_jobs() {
let wire =
r#"OpenHuman API error (401 Unauthorized): {"success":false,"error":"Invalid token"}"#;
assert!(
!is_session_expired_failure(&JobType::Shell, None, wire),
"shell jobs must retain retry semantics regardless of stdout content"
);
assert!(
!is_session_expired_failure(&JobType::Shell, Some(wire), wire),
"shell jobs never populate last_agent_error — but even if a future path did, scope stays Agent-only"
);
}

#[tokio::test]
async fn run_agent_job_returns_error_without_provider_key() {
let tmp = TempDir::new().unwrap();
Expand Down
Loading