Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/openhuman/agent/harness/session/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,11 @@ impl Agent {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: see comment in channels/runtime/startup.rs.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)?;

let local_embedding = config.workload_local_model("embeddings");
let memory: Arc<dyn Memory> = Arc::from(memory::create_memory_with_local_ai(
Expand All @@ -782,6 +787,7 @@ impl Agent {
Arc::new(config.clone()),
&security,
runtime,
audit,
memory.clone(),
&config.browser,
&config.http_request,
Expand Down
11 changes: 11 additions & 0 deletions src/openhuman/channels/runtime/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ pub async fn start_channels(config: Config) -> Result<()> {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: audit logger is wired with defaults so emission paths
// are exercised at runtime. A follow-up promotes `SecurityConfig` (and
// therefore the `audit` knob) onto the runtime `Config` schema so users
// can override `enabled`, `log_path`, and `max_size_mb` via TOML. The
// logger is workspace-scoped and shared, so concurrent sessions append to
// one `audit.log` without racing on rotation.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)?;
let model = config
.default_model
.clone()
Expand All @@ -199,6 +209,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
Arc::new(config.clone()),
&security,
runtime,
audit,
Arc::clone(&mem),
&config.browser,
&config.http_request,
Expand Down
7 changes: 7 additions & 0 deletions src/openhuman/runtime_node/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ fn build_runtime_tools(config: &Config) -> Result<Vec<Box<dyn Tool>>, String> {
&config.autonomy,
&config.workspace_dir,
));
// Phase 1 of #1401: see comment in channels/runtime/startup.rs.
let audit = crate::openhuman::security::get_or_create_workspace_audit_logger(
crate::openhuman::config::AuditConfig::default(),
config.workspace_dir.clone(),
)
.map_err(|e| e.to_string())?;
let runtime: Arc<dyn RuntimeAdapter> = Arc::new(NativeRuntime::new());
let local_embedding = config.workload_local_model("embeddings");
trace!("[runtime_node::ops] build_runtime_tools: create_memory_with_local_ai");
Expand All @@ -63,6 +69,7 @@ fn build_runtime_tools(config: &Config) -> Result<Vec<Box<dyn Tool>>, String> {
Arc::new(config.clone()),
&security,
runtime,
audit,
memory,
&config.browser,
&config.http_request,
Expand Down
84 changes: 80 additions & 4 deletions src/openhuman/security/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use uuid::Uuid;

/// Audit event types
Expand Down Expand Up @@ -148,6 +150,37 @@ pub struct AuditLogger {
log_path: PathBuf,
config: AuditConfig,
buffer: Mutex<Vec<AuditEvent>>,
/// Serializes the rotate + append + fsync critical section so concurrent
/// callers sharing one logger cannot interleave partial JSON lines.
write_lock: Mutex<()>,
}

/// Process-global cache of one [`AuditLogger`] per workspace directory.
///
/// Multiple agent sessions in the same process share a workspace and therefore
/// the same `<workspace>/audit.log`. Handing each session its own logger would
/// let concurrent appends race on rotation and interleave partial lines; this
/// registry guarantees a single `Arc<AuditLogger>` per workspace so all writes
/// serialize through one instance's `write_lock`.
static WORKSPACE_AUDIT_LOGGERS: OnceLock<Mutex<HashMap<PathBuf, Arc<AuditLogger>>>> =
OnceLock::new();

/// Return the shared [`AuditLogger`] for `openhuman_dir`, creating it on first
/// use. All callers targeting the same workspace receive the same `Arc`, so log
/// rotation and appends coordinate through one logger. The `config` of the first
/// caller for a given workspace wins; later callers reuse the cached logger.
pub fn get_or_create_workspace_audit_logger(
config: AuditConfig,
openhuman_dir: PathBuf,
) -> Result<Arc<AuditLogger>> {
let registry = WORKSPACE_AUDIT_LOGGERS.get_or_init(|| Mutex::new(HashMap::new()));
let mut map = registry.lock();
if let Some(existing) = map.get(&openhuman_dir) {
return Ok(Arc::clone(existing));
}
let logger = Arc::new(AuditLogger::new(config, openhuman_dir.clone())?);
map.insert(openhuman_dir, Arc::clone(&logger));
Ok(logger)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/// Structured command execution details for audit logging.
Expand All @@ -163,6 +196,23 @@ pub struct CommandExecutionLog<'a> {
}

impl AuditLogger {
/// Build a disabled `Arc<AuditLogger>` for tests and contexts that need a
/// handle but should not write to disk. The `enabled = false` flag
/// short-circuits `log()` before any filesystem I/O, so the sentinel
/// `log_path` is never touched.
pub fn disabled() -> Arc<Self> {
Arc::new(Self {
log_path: PathBuf::new(),
config: AuditConfig {
enabled: false,
log_path: String::new(),
max_size_mb: 0,
},
buffer: Mutex::new(Vec::new()),
write_lock: Mutex::new(()),
})
}

/// Create a new audit logger
pub fn new(config: AuditConfig, openhuman_dir: PathBuf) -> Result<Self> {
let log_path = openhuman_dir.join(&config.log_path);
Expand All @@ -175,6 +225,7 @@ impl AuditLogger {
log_path,
config,
buffer: Mutex::new(Vec::new()),
write_lock: Mutex::new(()),
})
}

Expand All @@ -190,11 +241,14 @@ impl AuditLogger {
event.event_id
);

// Check log size and rotate if needed
self.rotate_if_needed()?;

// Serialize and write
// Serialize the event outside the write lock — formatting is pure.
let line = serde_json::to_string(event)?;

// Hold `write_lock` across rotate + append + fsync so concurrent
// callers sharing this logger cannot interleave partial lines or
// race on rotation renames.
let _guard = self.write_lock.lock();
self.rotate_if_needed()?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
Expand Down Expand Up @@ -331,6 +385,28 @@ mod tests {
assert!(parsed.result.is_some());
}

#[test]
fn audit_logger_disabled_helper_is_noop() -> Result<()> {
let logger = AuditLogger::disabled();
let event = AuditEvent::new(AuditEventType::CommandExecution);
logger.log(&event)?;
assert!(!logger.config.enabled);
Ok(())
}

#[test]
fn workspace_audit_logger_is_shared_per_workspace() -> Result<()> {
let tmp = TempDir::new()?;
let cfg = AuditConfig::default();
let first = get_or_create_workspace_audit_logger(cfg.clone(), tmp.path().to_path_buf())?;
let second = get_or_create_workspace_audit_logger(cfg, tmp.path().to_path_buf())?;
assert!(
Arc::ptr_eq(&first, &second),
"same workspace must yield the same shared logger instance"
);
Ok(())
}

#[test]
fn audit_logger_disabled_does_not_create_file() -> Result<()> {
let tmp = TempDir::new()?;
Expand Down
5 changes: 4 additions & 1 deletion src/openhuman/security/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pub mod secrets;
pub mod traits;

#[allow(unused_imports)]
pub use audit::{AuditEvent, AuditEventType, AuditLogger};
pub use audit::{
get_or_create_workspace_audit_logger, AuditEvent, AuditEventType, AuditLogger,
CommandExecutionLog,
};
pub use core::*;
#[allow(unused_imports)]
pub use detect::create_sandbox;
Expand Down
Loading
Loading