Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/openhuman/agent/harness/session/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,11 @@ impl Agent {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: see comment in channels/runtime/startup.rs.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)?;

let local_embedding = config.workload_local_model("embeddings");
let memory: Arc<dyn Memory> = Arc::from(memory::create_memory_with_local_ai(
Expand All @@ -782,6 +787,7 @@ impl Agent {
Arc::new(config.clone()),
&security,
runtime,
audit,
memory.clone(),
&config.browser,
&config.http_request,
Expand Down
11 changes: 11 additions & 0 deletions src/openhuman/channels/runtime/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ pub async fn start_channels(config: Config) -> Result<()> {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: audit logger is wired with defaults so emission paths
// are exercised at runtime. A follow-up promotes `SecurityConfig` (and
// therefore the `audit` knob) onto the runtime `Config` schema so users
// can override `enabled`, `log_path`, and `max_size_mb` via TOML. The
// logger is workspace-scoped and shared, so concurrent sessions append to
// one `audit.log` without racing on rotation.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)?;
let model = config
.default_model
.clone()
Expand All @@ -199,6 +209,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
Arc::new(config.clone()),
&security,
runtime,
audit,
Arc::clone(&mem),
&config.browser,
&config.http_request,
Expand Down
7 changes: 7 additions & 0 deletions src/openhuman/runtime_node/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ fn build_runtime_tools(config: &Config) -> Result<Vec<Box<dyn Tool>>, String> {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: see comment in channels/runtime/startup.rs.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)
.map_err(|e| e.to_string())?;
let runtime: Arc<dyn RuntimeAdapter> = Arc::new(NativeRuntime::new());
let local_embedding = config.workload_local_model("embeddings");
trace!("[runtime_node::ops] build_runtime_tools: create_memory_with_local_ai");
Expand All @@ -63,6 +69,7 @@ fn build_runtime_tools(config: &Config) -> Result<Vec<Box<dyn Tool>>, String> {
Arc::new(config.clone()),
&security,
runtime,
audit,
memory,
&config.browser,
&config.http_request,
Expand Down
110 changes: 106 additions & 4 deletions src/openhuman/security/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use uuid::Uuid;

/// Audit event types
Expand Down Expand Up @@ -148,6 +150,63 @@ pub struct AuditLogger {
log_path: PathBuf,
config: AuditConfig,
buffer: Mutex<Vec<AuditEvent>>,
/// Serializes the rotate + append + fsync critical section so concurrent
/// callers sharing one logger cannot interleave partial JSON lines.
write_lock: Mutex<()>,
}

/// Process-global cache of one [`AuditLogger`] per workspace directory.
///
/// Multiple agent sessions in the same process share a workspace and therefore
/// the same `<workspace>/audit.log`. Handing each session its own logger would
/// let concurrent appends race on rotation and interleave partial lines; this
/// registry guarantees a single `Arc<AuditLogger>` per workspace so all writes
/// serialize through one instance's `write_lock`.
static WORKSPACE_AUDIT_LOGGERS: OnceLock<Mutex<HashMap<PathBuf, Arc<AuditLogger>>>> =
OnceLock::new();

/// Return the shared [`AuditLogger`] for `openhuman_dir`, creating it on first
/// use. All callers targeting the same workspace receive the same `Arc`, so log
/// rotation and appends coordinate through one logger. The `config` of the first
/// caller for a given workspace wins; later callers reuse the cached logger.
pub fn get_or_create_workspace_audit_logger(
config: AuditConfig,
openhuman_dir: PathBuf,
) -> Result<Arc<AuditLogger>> {
// Normalize the key: `PathBuf` equality is lexical, so `/ws` vs `/ws/` vs a
// symlinked spelling would otherwise cache distinct loggers for one physical
// workspace and reopen the rotate/append race this registry prevents.
//
// A canonicalize failure falls back to the raw path rather than propagating:
// audit-logger creation must never block agent startup. `NotFound` (the
// workspace dir not created yet) is expected and logged at debug; other
// errors (permission, I/O) are unexpected and logged at warn so real
// filesystem problems stay observable.
let openhuman_dir = match std::fs::canonicalize(&openhuman_dir) {
Ok(path) => path,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
log::debug!(
"[openhuman:audit] workspace path not yet created; keying registry on raw path: {}",
openhuman_dir.display()
);
openhuman_dir
}
Err(err) => {
log::warn!(
"[openhuman:audit] failed to canonicalize workspace path {} ({err}); keying registry on raw path",
openhuman_dir.display()
);
openhuman_dir
}
};
let registry = WORKSPACE_AUDIT_LOGGERS.get_or_init(|| Mutex::new(HashMap::new()));
let mut map = registry.lock();
if let Some(existing) = map.get(&openhuman_dir) {
return Ok(Arc::clone(existing));
}
let logger = Arc::new(AuditLogger::new(config, openhuman_dir.clone())?);
map.insert(openhuman_dir, Arc::clone(&logger));
Ok(logger)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/// Structured command execution details for audit logging.
Expand All @@ -163,6 +222,23 @@ pub struct CommandExecutionLog<'a> {
}

impl AuditLogger {
/// Build a disabled `Arc<AuditLogger>` for tests and contexts that need a
/// handle but should not write to disk. The `enabled = false` flag
/// short-circuits `log()` before any filesystem I/O, so the sentinel
/// `log_path` is never touched.
pub fn disabled() -> Arc<Self> {
Arc::new(Self {
log_path: PathBuf::new(),
config: AuditConfig {
enabled: false,
log_path: String::new(),
max_size_mb: 0,
},
buffer: Mutex::new(Vec::new()),
write_lock: Mutex::new(()),
})
}

/// Create a new audit logger
pub fn new(config: AuditConfig, openhuman_dir: PathBuf) -> Result<Self> {
let log_path = openhuman_dir.join(&config.log_path);
Expand All @@ -175,6 +251,7 @@ impl AuditLogger {
log_path,
config,
buffer: Mutex::new(Vec::new()),
write_lock: Mutex::new(()),
})
}

Expand All @@ -190,11 +267,14 @@ impl AuditLogger {
event.event_id
);

// Check log size and rotate if needed
self.rotate_if_needed()?;

// Serialize and write
// Serialize the event outside the write lock — formatting is pure.
let line = serde_json::to_string(event)?;

// Hold `write_lock` across rotate + append + fsync so concurrent
// callers sharing this logger cannot interleave partial lines or
// race on rotation renames.
let _guard = self.write_lock.lock();
self.rotate_if_needed()?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
Expand Down Expand Up @@ -331,6 +411,28 @@ mod tests {
assert!(parsed.result.is_some());
}

#[test]
fn audit_logger_disabled_helper_is_noop() -> Result<()> {
let logger = AuditLogger::disabled();
let event = AuditEvent::new(AuditEventType::CommandExecution);
logger.log(&event)?;
assert!(!logger.config.enabled);
Ok(())
}

#[test]
fn workspace_audit_logger_is_shared_per_workspace() -> Result<()> {
let tmp = TempDir::new()?;
let cfg = AuditConfig::default();
let first = get_or_create_workspace_audit_logger(cfg.clone(), tmp.path().to_path_buf())?;
let second = get_or_create_workspace_audit_logger(cfg, tmp.path().to_path_buf())?;
assert!(
Arc::ptr_eq(&first, &second),
"same workspace must yield the same shared logger instance"
);
Ok(())
}

#[test]
fn audit_logger_disabled_does_not_create_file() -> Result<()> {
let tmp = TempDir::new()?;
Expand Down
5 changes: 4 additions & 1 deletion src/openhuman/security/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pub mod secrets;
pub mod traits;

#[allow(unused_imports)]
pub use audit::{AuditEvent, AuditEventType, AuditLogger};
pub use audit::{
get_or_create_workspace_audit_logger, AuditEvent, AuditEventType, AuditLogger,
CommandExecutionLog,
};
pub use core::*;
#[allow(unused_imports)]
pub use detect::create_sandbox;
Expand Down
Loading
Loading