diff --git a/CHANGELOG.md b/CHANGELOG.md index a52c5f3..15f0051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.15.2 - 2026-03-18 + +### New features + +- Added `ProcessorAsync`, an async wrapper around `Processor` for use in `async`/`await` contexts (requires the `async` feature flag). It offloads blocking audio processing to a thread pool via `tokio::task::spawn_blocking`, keeping the async runtime responsive. All methods mirror the synchronous `Processor` API. A `ProcessorAsync::with_config` convenience constructor is also provided. + ## 0.15.1 - 2026-03-17 ## Improvements diff --git a/Cargo.lock b/Cargo.lock index 17f2bc6..7e72919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,10 +30,11 @@ dependencies = [ [[package]] name = "aic-sdk" -version = "0.15.1" +version = "0.15.2" dependencies = [ "aic-sdk-sys", "approx", + "futures", "hound", "serde", "serde_json", @@ -113,7 +114,7 @@ dependencies = [ [[package]] name = "build-time-download" -version = "0.15.1" +version = "0.15.2" dependencies = [ "aic-sdk", ] @@ -312,6 +313,94 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -822,6 +911,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 30fef73..05d859b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" homepage = "https://ai-coustics.com/sdk/" license = "Apache-2.0" repository = "https://github.com/ai-coustics/aic-sdk-rs" -version = "0.15.1" +version = "0.15.2" [workspace.dependencies] aic-sdk-sys = { version = "0.15.1", path = "aic-sdk-sys" } @@ -36,22 +36,32 @@ serde = { workspace = true, optional = true, features = ["derive"] } serde_json = { workspace = true, optional = true } sha2 = { workspace = true, optional = true } thiserror = { workspace = true } +tokio = { version = "1.49", optional = true, features = ["rt", "sync"] } ureq = { workspace = true, optional = true, features = ["rustls"] } [dev-dependencies] approx = "0.5" +futures = "0.3" hound = "3.5" serde_json = { workspace = true } tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "sync", "time"] } [features] +async = ["dep:tokio"] download-lib = ["aic-sdk-sys/download-lib"] download-model = ["dep:serde", "dep:serde_json", "dep:sha2", "dep:ureq"] [[example]] name = "basic_usage" path = "examples/basic_usage.rs" +required-features = ["download-model"] [[example]] name = "benchmark" path = "examples/benchmark.rs" +required-features = ["async", "download-model"] + +[[example]] +name = "parallel_async" +path = "examples/parallel_async.rs" +required-features = ["async", "download-model"] diff --git a/README.md b/README.md index 49e7cd2..71ee313 100644 --- a/README.md +++ b/README.md @@ -211,11 +211,13 @@ See the example files for complete working examples: - [`examples/basic_usage.rs`](examples/basic_usage.rs) - Basic usage example - [`examples/build-time-download`](examples/build-time-download) - Download and embed models at compile-time - [`examples/benchmark.rs`](examples/benchmark.rs) - Run multiple processor instances concurrently until the real-time requirements are not met +- [`examples/parallel_async.rs`](examples/parallel_async.rs) - Demonstrates parallel processing with `ProcessorAsync`: runs N processors concurrently via `tokio::join_all` and prints each processor's individual time alongside a sequential baseline to confirm the speedup Run examples with: ```bash export AIC_SDK_LICENSE="your_license_key_here" cargo run --example basic_usage --features download-lib,download-model +cargo run --example parallel_async --features download-lib,download-model,async ``` ## Documentation diff --git a/examples/basic_usage.rs b/examples/basic_usage.rs index 8c7e6ca..3cc2850 100644 --- a/examples/basic_usage.rs +++ b/examples/basic_usage.rs @@ -1,15 +1,6 @@ -#![cfg_attr(not(feature = "download-model"), allow(dead_code, unused_imports))] - -#[cfg(feature = "download-model")] use aic_sdk::{Model, Processor, ProcessorConfig, ProcessorParameter, VadParameter}; use std::env; -#[cfg(not(feature = "download-model"))] -fn main() -> Result<(), Box> { - Err("Enable the `download-model` feature to run this example.".into()) -} - -#[cfg(feature = "download-model")] fn main() -> Result<(), Box> { // Display library version println!("ai-coustics SDK version: {}", aic_sdk::get_sdk_version()); diff --git a/examples/benchmark.rs b/examples/benchmark.rs index c953425..da94ac3 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -1,17 +1,23 @@ -use aic_sdk::{Model, Processor, ProcessorConfig}; +use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; use std::{ env, io::Write, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; -use tokio::sync::{mpsc, watch}; +use tokio::{ + task::JoinSet, + time::{self, MissedTickBehavior}, +}; // Specify the model to benchmark const MODEL: &str = "quail-vf-2.0-l-16khz"; -// Interval between spawning new processing threads -const THREAD_SPAWN_INTERVAL: Duration = Duration::from_secs(3); +// Interval between spawning new processor sessions +const SESSION_SPAWN_INTERVAL: Duration = Duration::from_secs(1); // Safety margin to account for system variability // e.g. 0.3 means 30% of the period is reserved as a safety margin, @@ -37,8 +43,7 @@ async fn main() -> Result<(), Box> { let config = ProcessorConfig::optimal(&model); - let period = config.num_frames as f64 / config.sample_rate as f64; - let period = Duration::from_secs_f64(period); + let period = Duration::from_secs_f64(config.num_frames as f64 / config.sample_rate as f64); let safety_margin = Duration::from_secs_f64(period.as_secs_f64() * SAFETY_MARGIN); println!("Model: {}", model.id()); @@ -48,108 +53,79 @@ async fn main() -> Result<(), Box> { println!("Safety margin: {} ms\n", safety_margin.as_millis()); println!( - "Starting benchmark: spawning a processing thread every {} seconds until a deadline is missed...\n", - THREAD_SPAWN_INTERVAL.as_secs() + "Starting benchmark: spawning a processor session every {} second(s) until a deadline is missed or a process error occurs...\n", + SESSION_SPAWN_INTERVAL.as_secs() ); - let (stop_tx, stop_rx) = watch::channel(false); - let (report_tx, mut report_rx) = mpsc::unbounded_channel::(); - let mut active_threads = 0usize; - - let mut handles = Vec::new(); - let mut thread_id = 1usize; + let stop = Arc::new(AtomicBool::new(false)); + let mut sessions = JoinSet::new(); + let mut reports = Vec::new(); + let mut spawned_sessions = 0usize; - handles.push(spawn_session( - thread_id, + spawned_sessions += 1; + sessions.spawn(run_session( + spawned_sessions, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, - stop_rx.clone(), - report_tx.clone(), + Arc::clone(&stop), )); print!("*"); std::io::stdout().flush().unwrap(); - active_threads += 1; + let mut spawn_ticks = time::interval(SESSION_SPAWN_INTERVAL); + spawn_ticks.set_missed_tick_behavior(MissedTickBehavior::Skip); + spawn_ticks.tick().await; - let spawn_interval = THREAD_SPAWN_INTERVAL; - let mut next_spawn = tokio::time::Instant::now() + spawn_interval; - - let mut reports = Vec::new(); - let first_session_report = loop { + let first_failed_report = loop { tokio::select! { - // Spawn a new session at regular intervals - _ = tokio::time::sleep_until(next_spawn) => { - thread_id += 1; - handles.push(spawn_session( - thread_id, + _ = spawn_ticks.tick() => { + spawned_sessions += 1; + sessions.spawn(run_session( + spawned_sessions, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, - stop_rx.clone(), - report_tx.clone(), + Arc::clone(&stop), )); - active_threads += 1; print!("*"); - if active_threads.is_multiple_of(50) { - print!("\n"); - } - std::io::stdout().flush().unwrap(); - next_spawn += spawn_interval; } - // Check for deadline misses and break the loop if one occurs - Some(report) = report_rx.recv() => { - // Print line breaks for readability - if active_threads.is_multiple_of(50) { - println!(); - } else { - println!("\n"); + Some(result) = sessions.join_next() => { + let report = result?; + if report.error.is_some() { + stop.store(true, Ordering::Relaxed); + reports.push(report.clone()); + break report; } - - let is_miss = report.error.is_some(); reports.push(report); - if is_miss { - break reports.last().cloned(); - } } } }; - println!("Benchmark complete\n"); + println!("\nBenchmark complete\n"); - let _ = stop_tx.send(true); - drop(report_tx); - for handle in handles { - let _ = handle.await; - } - - while let Some(report) = report_rx.recv().await { - reports.push(report); + while let Some(result) = sessions.join_next().await { + reports.push(result?); } reports.sort_by_key(|report| report.session_id); - let mut number_of_missed_deadlines = 0; + let mut number_of_missed_deadlines = 0usize; + let period_ms = period.as_secs_f64() * 1000.0; println!(" ID | Max Exec Time | RTF | Notes"); println!("----+---------------+---------+------"); for report in &reports { let max_ms = report.max_execution_time.as_secs_f64() * 1000.0; - let period_ms = period.as_secs_f64() * 1000.0; - - let rtf = if period_ms > 0.0 { - max_ms / period_ms - } else { - 0.0 - }; + let rtf = max_ms / period_ms; - let miss_note = match report.error.as_deref() { + let note = match report.error.as_deref() { Some(reason) => { number_of_missed_deadlines += 1; format!("deadline missed: {}", reason) @@ -159,115 +135,92 @@ async fn main() -> Result<(), Box> { println!( "{:>3} | {:>9.3} ms | {:>7.3} | {}", - report.session_id, max_ms, rtf, miss_note + report.session_id, max_ms, rtf, note ); } println!(); - let max_ok = active_threads.saturating_sub(1); - + let max_ok = spawned_sessions.saturating_sub(1); println!( "System can run {} instances of this model/config concurrently while meeting real-time requirements", max_ok ); - if let Some(first_session_report) = &first_session_report { + println!( + "After spawning the {}{} session, session #{} missed its deadline ({})", + spawned_sessions, + number_suffix(spawned_sessions), + first_failed_report.session_id, + first_failed_report.error.as_deref().unwrap_or("unknown") + ); + + if number_of_missed_deadlines > 1 { println!( - "After spawning the {}{} thread, thread #{} missed its deadline ({})", - active_threads, - number_suffix(active_threads), - first_session_report.session_id, - first_session_report.error.as_deref().unwrap_or("unknown") + "Other sessions also missed deadlines after session #{}", + first_failed_report.session_id ); - - if number_of_missed_deadlines > 1 { - println!( - "Other threads also missed deadlines after thread #{}", - first_session_report.session_id - ); - } - } else { - println!("Missed deadline in thread unknown (no report)"); } Ok(()) } -fn spawn_session( +async fn run_session( session_id: usize, model: Arc>, license: String, config: ProcessorConfig, period: Duration, safety_margin: Duration, - stop_rx: watch::Receiver, - report_tx: mpsc::UnboundedSender, -) -> tokio::task::JoinHandle<()> { - tokio::task::spawn_blocking(move || { - let mut processor = - match Processor::new(&model, &license).and_then(|p| p.with_config(&config)) { - Ok(processor) => processor, - Err(err) => { - let reason = format!("processor init failed: {}", err); - let _ = report_tx.send(SessionReport { - session_id, - max_execution_time: Duration::from_secs(0), - error: Some(reason), - }); - return; - } + stop: Arc, +) -> SessionReport { + let processor = match ProcessorAsync::with_config(&model, &license, &config).await { + Ok(processor) => processor, + Err(err) => { + return SessionReport { + session_id, + max_execution_time: Duration::ZERO, + error: Some(format!("processor init failed: {}", err)), }; + } + }; - let mut buffer = vec![0.0f32; config.num_channels as usize * config.num_frames]; - - let mut max_execution_time = Duration::from_secs(0); - let mut error = None; - - let deadline = period - safety_margin; - - loop { - // Check if we should stop (another session missed a deadline) - if *stop_rx.borrow() { - break; - } - - // Process the audio buffer - let process_start = Instant::now(); - if let Err(err) = processor.process_interleaved(&mut buffer) { - error = Some(format!("process error: {}", err)); - break; - } - let execution_time = process_start.elapsed(); + let mut buffer = vec![0.0f32; config.num_channels as usize * config.num_frames]; + let mut max_execution_time = Duration::ZERO; + let deadline = period.saturating_sub(safety_margin); + + while !stop.load(Ordering::Relaxed) { + let process_start = Instant::now(); + if let Err(err) = processor.process_interleaved(&mut buffer).await { + return SessionReport { + session_id, + max_execution_time, + error: Some(format!("process error: {}", err)), + }; + } - // Keep track of the maximum execution time - if execution_time > max_execution_time { - max_execution_time = execution_time; - } + let execution_time = process_start.elapsed(); + max_execution_time = max_execution_time.max(execution_time); - // Check if we missed the deadline - if execution_time > deadline { - let late_by = execution_time - deadline; - let reason = format!("late by {:?}", late_by); - error = Some(reason); - break; - } + if execution_time > deadline { + return SessionReport { + session_id, + max_execution_time, + error: Some(format!("late by {:?}", execution_time - deadline)), + }; + } - // Sleep until the next deadline - let next_deadline = process_start + period; - let sleep_for = next_deadline.saturating_duration_since(Instant::now()); - if sleep_for > Duration::from_secs(0) { - std::thread::sleep(sleep_for); - } + let sleep_for = (process_start + period).saturating_duration_since(Instant::now()); + if !sleep_for.is_zero() { + time::sleep(sleep_for).await; } + } - // Send the session report - let _ = report_tx.send(SessionReport { - session_id, - max_execution_time, - error, - }); - }) + SessionReport { + session_id, + max_execution_time, + error: None, + } } fn number_suffix(n: usize) -> &'static str { diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs new file mode 100644 index 0000000..05604e2 --- /dev/null +++ b/examples/parallel_async.rs @@ -0,0 +1,125 @@ +// Demonstrates that multiple `ProcessorAsync` instances genuinely run in +// parallel when awaited concurrently. +// +// Each processor records its own wall-clock processing time. If they ran +// sequentially, the total elapsed time would be roughly `N × per-processor time`. +// When running in parallel the total time is close to the slowest +// single processor, which is what we verify and print. + +use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; +use std::time::Instant; + +const MODEL: &str = "quail-vf-2.0-l-16khz"; +const NUM_PROCESSORS: usize = 4; +// Number of process calls per processor – enough to make timing visible. +const ITERATIONS: usize = 50; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("ai-coustics SDK version: {}", aic_sdk::get_sdk_version()); + + let license = std::env::var("AIC_SDK_LICENSE").expect("AIC_SDK_LICENSE not set"); + + let model_path = Model::download(MODEL, "target")?; + let model = Model::from_file(&model_path)?; + println!("Model loaded from {}", model_path.display()); + + let config = ProcessorConfig::optimal(&model); + println!( + "Config: {} Hz, {} frames/buffer, {} channel(s)\n", + config.sample_rate, config.num_frames, config.num_channels + ); + + // ------------------------------------------------------------------------- + // Build all processors upfront so initialization is not part of the timed + // section. + // ------------------------------------------------------------------------- + let processors = futures::future::try_join_all( + (0..NUM_PROCESSORS).map(|_| ProcessorAsync::with_config(&model, &license, &config)), + ) + .await?; + + println!( + "Running {} processors × {} iterations each", + NUM_PROCESSORS, ITERATIONS + ); + + // ------------------------------------------------------------------------- + // Sequential baseline – process each processor one after the other. + // ------------------------------------------------------------------------- + let buf_len = config.num_channels as usize * config.num_frames; + + let sequential_start = Instant::now(); + for p in &processors { + let mut audio = vec![0.0f32; buf_len]; + for _ in 0..ITERATIONS { + p.process_interleaved(&mut audio).await?; + } + } + let sequential_elapsed = sequential_start.elapsed(); + + println!( + "Sequential total: {:>8.1} ms", + sequential_elapsed.as_secs_f64() * 1000.0 + ); + + // ------------------------------------------------------------------------- + // Parallel run – drive all processors concurrently with futures::future::try_join_all. + // Each task times itself and returns its own elapsed duration. + // ------------------------------------------------------------------------- + let parallel_start = Instant::now(); + + let tasks: Vec<_> = processors + .iter() + .map(|p| { + let config = config.clone(); + async move { + let mut audio = vec![0.0f32; config.num_channels as usize * config.num_frames]; + let t0 = Instant::now(); + for _ in 0..ITERATIONS { + p.process_interleaved(&mut audio).await?; + } + let elapsed = t0.elapsed(); + Ok::<_, aic_sdk::AicError>(elapsed) + } + }) + .collect(); + + let results = futures::future::try_join_all(tasks).await?; + let parallel_elapsed = parallel_start.elapsed(); + + for (id, elapsed) in results.iter().enumerate() { + println!( + " Processor {:>2} finished in {:>8.1} ms", + id + 1, + elapsed.as_secs_f64() * 1000.0, + ); + } + + let max_individual = results.iter().max().copied().unwrap_or_default(); + + println!( + "\nParallel total: {:>8.1} ms", + parallel_elapsed.as_secs_f64() * 1000.0 + ); + println!( + "Slowest processor: {:>8.1} ms", + max_individual.as_secs_f64() * 1000.0, + ); + + let speedup = sequential_elapsed.as_secs_f64() / parallel_elapsed.as_secs_f64(); + println!( + "\nSpeedup vs sequential: {:.2}x (ideal ≈ {}x)", + speedup, NUM_PROCESSORS + ); + println!( + "{}", + if speedup > 1.5 { + "Parallel execution confirmed." + } else { + "Warning: low speedup – your system may not have enough CPU cores for parallel blocking tasks." + } + ); + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index a2d035d..35d72ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,11 +7,15 @@ mod download; mod error; mod model; mod processor; +#[cfg(feature = "async")] +mod processor_async; mod vad; pub use error::*; pub use model::*; pub use processor::*; +#[cfg(feature = "async")] +pub use processor_async::*; pub use vad::*; /// Returns the version of the ai-coustics SDK library. diff --git a/src/processor.rs b/src/processor.rs index 87455dd..20d46e3 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -101,13 +101,17 @@ pub enum ProcessorParameter { /// /// **Default:** 0.0 Bypass, - /// Controls the intensity of speech enhancement processing. + /// A tunable parameter to optimize for specific STT engines, deployment environments, + /// and user experience requirements. + /// + /// The exact behavior depends on the active model: + /// - **Quail Models:** Controls how aggressively the model suppresses noise. When used + /// with Quail Voice Focus, it also suppresses background and competing speech. + /// - **Rook Models:** Controls the mixback and therefore the intensity of the + /// enhancement. /// /// **Range:** 0.0 to 1.0 - /// - **0.0:** Bypass mode - original signal passes through unchanged - /// - **1.0:** Full enhancement - maximum noise reduction but also more audible artifacts /// - /// **Default:** 1.0 EnhancementLevel, } diff --git a/src/processor_async.rs b/src/processor_async.rs new file mode 100644 index 0000000..e88b1b5 --- /dev/null +++ b/src/processor_async.rs @@ -0,0 +1,141 @@ +use crate::{AicError, Model, Processor, ProcessorConfig, ProcessorContext, VadContext}; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// An async wrapper around [`Processor`] for use in async/await contexts. +/// +/// # Example +/// +/// ```rust,no_run +/// use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; +/// #[tokio::main] +/// async fn main() -> Result<(), aic_sdk::AicError> { +/// let license_key = std::env::var("AIC_SDK_LICENSE").unwrap(); +/// let model = Model::from_file("/path/to/model.aicmodel")?; +/// let config = ProcessorConfig::optimal(&model).with_num_channels(2); +/// +/// let processor = ProcessorAsync::new(&model, &license_key)?; +/// processor.initialize(&config).await?; +/// +/// let mut audio = vec![0.0f32; config.num_channels as usize * config.num_frames]; +/// processor.process_interleaved(&mut audio).await?; +/// Ok(()) +/// } +/// ``` +pub struct ProcessorAsync { + inner: Arc>>, +} + +impl ProcessorAsync { + /// Creates a new async audio enhancement processor instance. + /// + /// See [`Processor::new`] for details. + pub fn new(model: &Model<'static>, license_key: &str) -> Result { + let processor = Processor::new(model, license_key)?; + Ok(Self { + inner: Arc::new(Mutex::new(processor)), + }) + } + + /// Creates a new async processor and initializes it with the given configuration. + /// + /// This is a convenience method combining [`ProcessorAsync::new`] and + /// [`ProcessorAsync::initialize`]. + pub async fn with_config( + model: &Model<'static>, + license_key: &str, + config: &ProcessorConfig, + ) -> Result { + let this = Self::new(model, license_key)?; + this.initialize(config).await?; + Ok(this) + } + + /// Initializes the processor with the given configuration. + /// + /// See [`Processor::initialize`] for details. + /// + /// # Warning + /// This allocates memory internally. Do not call from latency-sensitive paths. + pub async fn initialize(&self, config: &ProcessorConfig) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let config = config.clone(); + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + processor.initialize(&config) + }) + .await + .expect("spawn_blocking task panicked") + } + + /// Processes audio with interleaved channel data. + /// + /// See [`Processor::process_interleaved`] for details on the memory layout. + pub async fn process_interleaved(&self, audio: &mut [f32]) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let mut buf = audio.to_vec(); + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + processor.process_interleaved(&mut buf)?; + Ok(buf) + }) + .await + .expect("spawn_blocking task panicked") + .map(|buf| audio.copy_from_slice(&buf)) + } + + /// Processes audio with separate buffers for each channel (planar layout). + /// + /// See [`Processor::process_planar`] for details on the memory layout. + pub async fn process_planar + AsRef<[f32]>>( + &self, + audio: &mut [V], + ) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let mut buf: Vec> = audio.iter().map(|ch| ch.as_ref().to_vec()).collect(); + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + processor.process_planar(&mut buf)?; + Ok(buf) + }) + .await + .expect("spawn_blocking task panicked") + .map(|buf| { + for (dst, src) in audio.iter_mut().zip(buf.iter()) { + dst.as_mut().copy_from_slice(src); + } + }) + } + + /// Processes audio with sequential channel data. + /// + /// See [`Processor::process_sequential`] for details on the memory layout. + pub async fn process_sequential(&self, audio: &mut [f32]) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let mut buf = audio.to_vec(); + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + processor.process_sequential(&mut buf)?; + Ok(buf) + }) + .await + .expect("spawn_blocking task panicked") + .map(|buf| audio.copy_from_slice(&buf)) + } + + /// Creates a [`ProcessorContext`] for real-time parameter control. + /// + /// See [`Processor::processor_context`] for details. + pub async fn processor_context(&self) -> ProcessorContext { + let processor = self.inner.lock().await; + processor.processor_context() + } + + /// Creates a [`VadContext`] for voice activity detection. + /// + /// See [`Processor::vad_context`] for details. + pub async fn vad_context(&self) -> VadContext { + let processor = self.inner.lock().await; + processor.vad_context() + } +}