Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions src/host/pulseaudio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use futures::executor::block_on;
use futures::FutureExt as _;
use pulseaudio::{protocol, AsPlaybackSource};

use crate::{
Expand Down Expand Up @@ -53,21 +54,40 @@ enum StreamInner {
Record(pulseaudio::RecordStream, Instant, LatencyHandle),
}

pub struct Stream(StreamInner);
pub struct Stream {
inner: StreamInner,
workers: Vec<std::thread::JoinHandle<()>>,
}

impl Drop for Stream {
fn drop(&mut self) {
match &mut self.0 {
StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => {
handle.cancel()
match &mut self.inner {
StreamInner::Playback(stream, _, handle) => {
handle.cancel();
// Help the play_all driver thread terminate by
// queueing a delete, which causes the reactor to drop
// the source's eof_tx. We need to do this because
// poll_read always reports a non-empty buffer.
let _ = stream.clone().delete().now_or_never();
}
StreamInner::Record(_, _, handle) => {
handle.cancel();
}
}
for handle in self.workers.drain(..) {
// Prevent self-join: a worker thread may surface an error
// through the user's error_callback, and that callback may
// drop the Stream — in which case we'd be joining ourselves.
if handle.thread().id() != std::thread::current().id() {
let _ = handle.join();
}
}
}
}

impl StreamTrait for Stream {
fn play(&self) -> Result<(), Error> {
match &self.0 {
match &self.inner {
StreamInner::Playback(stream, _, handle) => {
block_on(stream.uncork()).map_err(Error::from)?;
handle.notify();
Expand All @@ -82,12 +102,12 @@ impl StreamTrait for Stream {
}

fn pause(&self) -> Result<(), Error> {
let res = match &self.0 {
let res = match &self.inner {
StreamInner::Playback(stream, _, _) => block_on(stream.cork()),
StreamInner::Record(stream, _, _) => block_on(stream.cork()),
};
res.map_err(Error::from)?;
match &self.0 {
match &self.inner {
StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => {
handle.notify()
}
Expand All @@ -96,15 +116,15 @@ impl StreamTrait for Stream {
}

fn now(&self) -> StreamInstant {
let start = match &self.0 {
let start = match &self.inner {
StreamInner::Playback(_, start, _) | StreamInner::Record(_, start, _) => *start,
};
let elapsed = start.elapsed();
StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos())
}

fn buffer_size(&self) -> Result<FrameCount, Error> {
let (spec, bytes) = match &self.0 {
let (spec, bytes) = match &self.inner {
StreamInner::Playback(s, _, _) => (
s.sample_spec(),
s.buffer_attr().minimum_request_length as usize,
Expand Down Expand Up @@ -221,16 +241,22 @@ impl Stream {
// when the stream is stopped by the user.
let stream_clone = stream.clone();
let error_callback_clone = error_callback.clone();
let cancel_driver = handle.cancel.clone();

// The barrier prevents the worker and latency threads from firing callbacks before the
// caller has received the Stream handle.
let ready = std::sync::Arc::new(std::sync::Barrier::new(3));

let ready_worker = ready.clone();
std::thread::spawn(move || {
let driver_handle = std::thread::spawn(move || {
ready_worker.wait();
if let Err(e) = block_on(stream_clone.play_all()) {
emit_error(&error_callback_clone, Error::from(e));
// A server playback error is expected when the client
// closes their stream. No need to report it back to
// the client.
if !cancel_driver.load(atomic::Ordering::Relaxed) {
emit_error(&error_callback_clone, Error::from(e));
}
}
});

Expand All @@ -241,7 +267,7 @@ impl Stream {
let poll_clone = last_poll_micros.clone();

let ready_latency = ready.clone();
std::thread::spawn(move || {
let latency_handle = std::thread::spawn(move || {
ready_latency.wait();
loop {
if cancel_thread.load(atomic::Ordering::Relaxed) {
Expand Down Expand Up @@ -279,7 +305,10 @@ impl Stream {
});

ready.wait();
Ok(Self(StreamInner::Playback(stream, start, handle)))
Ok(Self {
inner: StreamInner::Playback(stream, start, handle),
workers: vec![driver_handle, latency_handle],
})
}

pub fn new_record<D, E>(
Expand Down Expand Up @@ -366,7 +395,7 @@ impl Stream {
let stream_clone = stream.clone();
let latency_clone = current_latency_micros.clone();
let poll_clone = last_poll_micros.clone();
std::thread::spawn(move || loop {
let latency_handle = std::thread::spawn(move || loop {
if cancel_thread.load(atomic::Ordering::Relaxed) {
break;
}
Expand Down Expand Up @@ -400,7 +429,10 @@ impl Stream {
*guard = false;
});

Ok(Self(StreamInner::Record(stream, start, handle)))
Ok(Self {
inner: StreamInner::Record(stream, start, handle),
workers: vec![latency_handle],
})
}
}

Expand Down
Loading