1use std::cmp::{max, min};
2use std::convert::{TryFrom, TryInto};
3use std::io;
4use std::os::fd::{IntoRawFd as _, RawFd};
5use std::sync::{Arc, Mutex, Weak};
6use std::thread;
7
8use libc::c_void;
9use libpulse_binding::callbacks::ListResult;
10use libpulse_binding::context::{
11 Context, FlagSet, State as PulseState, introspect::ServerInfo, introspect::SinkInfo,
12 introspect::SourceInfo, subscribe::Facility, subscribe::InterestMaskSet,
13};
14use libpulse_binding::mainloop::api::MainloopApi;
15use libpulse_binding::mainloop::standard::{IterateResult, Mainloop};
16use libpulse_binding::proplist::{Proplist, properties};
17use libpulse_binding::volume::{ChannelVolumes, Volume};
18use tokio::sync::Notify;
19
20use super::super::prelude::*;
21use super::{DeviceKind, SoundDevice};
22
23static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
24static EVENT_LISTENER: Mutex<Vec<Weak<Notify>>> = Mutex::new(Vec::new());
25static DEVICES: LazyLock<Mutex<HashMap<(DeviceKind, String), VolInfo>>> = LazyLock::new(default);
26
27pub(super) static DEFAULT_SOURCE: Mutex<Cow<'static, str>> =
29 Mutex::new(Cow::Borrowed("@DEFAULT_SOURCE@"));
30pub(super) static DEFAULT_SINK: Mutex<Cow<'static, str>> =
31 Mutex::new(Cow::Borrowed("@DEFAULT_SINK@"));
32
33impl DeviceKind {
34 pub fn default_name(self) -> Cow<'static, str> {
35 match self {
36 Self::Sink => DEFAULT_SINK.lock().unwrap().clone(),
37 Self::Source => DEFAULT_SOURCE.lock().unwrap().clone(),
38 }
39 }
40}
41
42pub(super) struct Device {
43 name: Option<String>,
44 description: Option<String>,
45 active_port: Option<String>,
46 form_factor: Option<String>,
47 device_kind: DeviceKind,
48 volume: Option<ChannelVolumes>,
49 volume_avg: u32,
50 muted: bool,
51 notify: Arc<Notify>,
52}
53
54struct Connection {
55 mainloop: Mainloop,
56 context: Context,
57}
58
59struct Client {
60 send_req: std::sync::mpsc::Sender<ClientRequest>,
61 ml_waker: MainloopWaker,
62}
63
64#[derive(Debug)]
65struct VolInfo {
66 volume: ChannelVolumes,
67 mute: bool,
68 name: String,
69 description: Option<String>,
70 active_port: Option<String>,
71 form_factor: Option<String>,
72}
73
74impl TryFrom<&SourceInfo<'_>> for VolInfo {
75 type Error = ();
76
77 fn try_from(source_info: &SourceInfo) -> std::result::Result<Self, Self::Error> {
78 match source_info.name.as_ref() {
79 None => Err(()),
80 Some(name) => Ok(VolInfo {
81 volume: source_info.volume,
82 mute: source_info.mute,
83 name: name.to_string(),
84 description: source_info.description.as_ref().map(|d| d.to_string()),
85 active_port: source_info
86 .active_port
87 .as_ref()
88 .and_then(|a| a.name.as_ref().map(|n| n.to_string())),
89 form_factor: source_info.proplist.get_str(properties::DEVICE_FORM_FACTOR),
90 }),
91 }
92 }
93}
94
95impl TryFrom<&SinkInfo<'_>> for VolInfo {
96 type Error = ();
97
98 fn try_from(sink_info: &SinkInfo) -> std::result::Result<Self, Self::Error> {
99 match sink_info.name.as_ref() {
100 None => Err(()),
101 Some(name) => Ok(VolInfo {
102 volume: sink_info.volume,
103 mute: sink_info.mute,
104 name: name.to_string(),
105 description: sink_info.description.as_ref().map(|d| d.to_string()),
106 active_port: sink_info
107 .active_port
108 .as_ref()
109 .and_then(|a| a.name.as_ref().map(|n| n.to_string())),
110 form_factor: sink_info.proplist.get_str(properties::DEVICE_FORM_FACTOR),
111 }),
112 }
113 }
114}
115
116#[derive(Debug)]
117enum ClientRequest {
118 GetDefaultDevice,
119 GetInfoByName(DeviceKind, String),
120 SetVolumeByName(DeviceKind, String, ChannelVolumes),
121 SetMuteByName(DeviceKind, String, bool),
122}
123
124impl Connection {
125 fn new() -> Result<Self> {
126 let mut proplist = Proplist::new().unwrap();
127 proplist
128 .set_str(properties::APPLICATION_NAME, env!("CARGO_PKG_NAME"))
129 .map_err(|_| Error::new("Could not set pulseaudio APPLICATION_NAME property"))?;
130
131 let mainloop = Mainloop::new().error("Failed to create pulseaudio mainloop")?;
132
133 let mut context = Context::new_with_proplist(
134 &mainloop,
135 concat!(env!("CARGO_PKG_NAME"), "_context"),
136 &proplist,
137 )
138 .error("Failed to create new pulseaudio context")?;
139
140 context
141 .connect(None, FlagSet::NOFLAGS, None)
142 .error("Failed to connect to pulseaudio context")?;
143
144 let mut connection = Connection { mainloop, context };
145
146 loop {
148 connection.iterate(false)?;
149 match connection.context.get_state() {
150 PulseState::Ready => {
151 break;
152 }
153 PulseState::Failed | PulseState::Terminated => {
154 return Err(Error::new("pulseaudio context state failed/terminated"));
155 }
156 _ => {}
157 }
158 }
159
160 Ok(connection)
161 }
162
163 fn iterate(&mut self, blocking: bool) -> Result<()> {
164 match self.mainloop.iterate(blocking) {
165 IterateResult::Quit(_) | IterateResult::Err(_) => {
166 Err(Error::new("failed to iterate pulseaudio state"))
167 }
168 IterateResult::Success(_) => Ok(()),
169 }
170 }
171
172 fn spawn(thread_name: &str, f: impl Fn(Self) -> bool + Send + 'static) -> Result<()> {
176 let (tx, rx) = std::sync::mpsc::sync_channel(0);
177 thread::Builder::new()
178 .name(thread_name.to_owned())
179 .spawn(move || match Self::new() {
180 Ok(mut conn) => {
181 tx.send(Ok(())).unwrap();
182 while f(conn) {
183 let mut try_i = 0usize;
184 loop {
185 try_i += 1;
186 let delay =
187 Duration::from_millis(if try_i <= 10 { 100 } else { 5_000 });
188 eprintln!("reconnecting to pulseaudio in {delay:?}... (try {try_i})");
189 thread::sleep(delay);
190 if let Ok(c) = Self::new() {
191 eprintln!("reconnected to pulseaudio");
192 conn = c;
193 break;
194 }
195 }
196 }
197 }
198 Err(err) => {
199 tx.send(Err(err)).unwrap();
200 }
201 })
202 .error("failed to spawn a thread")?;
203 rx.recv().error("channel closed")?
204 }
205}
206
207impl Client {
208 fn new() -> Result<Client> {
209 let (send_req, recv_req) = std::sync::mpsc::channel();
210 let ml_waker = MainloopWaker::new().unwrap();
211
212 Connection::spawn("sound_pulseaudio", move |mut connection| {
213 ml_waker.attach(connection.mainloop.get_api());
214
215 let introspector = connection.context.introspect();
216 connection
217 .context
218 .set_subscribe_callback(Some(Box::new(move |facility, _, index| match facility {
219 Some(Facility::Server) => {
220 introspector.get_server_info(Client::server_info_callback);
221 }
222 Some(Facility::Sink) => {
223 introspector.get_sink_info_by_index(index, Client::sink_info_callback);
224 }
225 Some(Facility::Source) => {
226 introspector.get_source_info_by_index(index, Client::source_info_callback);
227 }
228 _ => (),
229 })));
230
231 connection.context.subscribe(
232 InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE,
233 |_| (),
234 );
235
236 let mut introspector = connection.context.introspect();
237
238 loop {
239 loop {
240 connection.iterate(true).unwrap();
241 match connection.context.get_state() {
242 PulseState::Ready => break,
243 PulseState::Failed => return true,
244 _ => (),
245 }
246 }
247
248 loop {
249 use std::sync::mpsc::TryRecvError;
250 let req = match recv_req.try_recv() {
251 Ok(x) => x,
252 Err(TryRecvError::Empty) => break,
253 Err(TryRecvError::Disconnected) => return false,
254 };
255
256 use ClientRequest::*;
257 match req {
258 GetDefaultDevice => {
259 introspector.get_server_info(Client::server_info_callback);
260 }
261 GetInfoByName(DeviceKind::Sink, name) => {
262 introspector.get_sink_info_by_name(&name, Client::sink_info_callback);
263 }
264 GetInfoByName(DeviceKind::Source, name) => {
265 introspector
266 .get_source_info_by_name(&name, Client::source_info_callback);
267 }
268 SetVolumeByName(DeviceKind::Sink, name, volumes) => {
269 introspector.set_sink_volume_by_name(&name, &volumes, None);
270 }
271 SetVolumeByName(DeviceKind::Source, name, volumes) => {
272 introspector.set_source_volume_by_name(&name, &volumes, None);
273 }
274 SetMuteByName(DeviceKind::Sink, name, mute) => {
275 introspector.set_sink_mute_by_name(&name, mute, None);
276 }
277 SetMuteByName(DeviceKind::Source, name, mute) => {
278 introspector.set_source_mute_by_name(&name, mute, None);
279 }
280 };
281 }
282 }
283 })?;
284
285 Ok(Client { send_req, ml_waker })
286 }
287
288 fn send(request: ClientRequest) -> Result<()> {
289 match CLIENT.as_ref() {
290 Ok(client) => {
291 client.send_req.send(request).unwrap();
292 client.ml_waker.wake().unwrap();
293 Ok(())
294 }
295 Err(err) => Err(Error::new(format!(
296 "pulseaudio connection failed with error: {err}",
297 ))),
298 }
299 }
300
301 fn server_info_callback(server_info: &ServerInfo) {
302 if let Some(default_sink) = server_info.default_sink_name.as_ref() {
303 *DEFAULT_SINK.lock().unwrap() = default_sink.to_string().into();
304 }
305
306 if let Some(default_source) = server_info.default_source_name.as_ref() {
307 *DEFAULT_SOURCE.lock().unwrap() = default_source.to_string().into();
308 }
309
310 Client::send_update_event();
311 }
312
313 fn get_info_callback<I: TryInto<VolInfo>>(result: ListResult<I>) -> Option<VolInfo> {
314 match result {
315 ListResult::End | ListResult::Error => None,
316 ListResult::Item(info) => info.try_into().ok(),
317 }
318 }
319
320 fn sink_info_callback(result: ListResult<&SinkInfo>) {
321 if let Some(vol_info) = Self::get_info_callback(result) {
322 DEVICES
323 .lock()
324 .unwrap()
325 .insert((DeviceKind::Sink, vol_info.name.to_string()), vol_info);
326
327 Client::send_update_event();
328 }
329 }
330
331 fn source_info_callback(result: ListResult<&SourceInfo>) {
332 if let Some(vol_info) = Self::get_info_callback(result) {
333 DEVICES
334 .lock()
335 .unwrap()
336 .insert((DeviceKind::Source, vol_info.name.to_string()), vol_info);
337
338 Client::send_update_event();
339 }
340 }
341
342 fn send_update_event() {
343 EVENT_LISTENER
344 .lock()
345 .unwrap()
346 .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some());
347 }
348}
349
350impl Device {
351 pub(super) fn new(device_kind: DeviceKind, name: Option<String>) -> Result<Self> {
352 let notify = Arc::new(Notify::new());
353 EVENT_LISTENER.lock().unwrap().push(Arc::downgrade(¬ify));
354
355 Client::send(ClientRequest::GetDefaultDevice)?;
356
357 let device = Device {
358 name,
359 description: None,
360 active_port: None,
361 form_factor: None,
362 device_kind,
363 volume: None,
364 volume_avg: 0,
365 muted: false,
366 notify,
367 };
368
369 Client::send(ClientRequest::GetInfoByName(device_kind, device.name()))?;
370
371 Ok(device)
372 }
373
374 fn name(&self) -> String {
375 self.name
376 .clone()
377 .unwrap_or_else(|| self.device_kind.default_name().into())
378 }
379
380 fn volume(&mut self, volume: ChannelVolumes) {
381 self.volume = Some(volume);
382 self.volume_avg = (volume.avg().0 as f32 / Volume::NORMAL.0 as f32 * 100.0).round() as u32;
383 }
384}
385
386#[async_trait::async_trait]
387impl SoundDevice for Device {
388 fn volume(&self) -> u32 {
389 self.volume_avg
390 }
391
392 fn muted(&self) -> bool {
393 self.muted
394 }
395
396 fn output_name(&self) -> String {
397 self.name()
398 }
399
400 fn output_description(&self) -> Option<String> {
401 self.description.clone()
402 }
403
404 fn active_port(&self) -> Option<String> {
405 self.active_port.clone()
406 }
407
408 fn form_factor(&self) -> Option<&str> {
409 self.form_factor.as_deref()
410 }
411
412 async fn get_info(&mut self) -> Result<()> {
413 let devices = DEVICES.lock().unwrap();
414
415 if let Some(info) = devices.get(&(self.device_kind, self.name())) {
416 self.volume(info.volume);
417 self.muted = info.mute;
418 self.description.clone_from(&info.description);
419 self.active_port.clone_from(&info.active_port);
420 self.form_factor.clone_from(&info.form_factor);
421 }
422
423 Ok(())
424 }
425
426 async fn set_volume(&mut self, step: i32, max_vol: Option<u32>) -> Result<()> {
427 let mut volume = self.volume.error("Volume unknown")?;
428
429 let step = (step as f32 * Volume::NORMAL.0 as f32 / 100.0).round() as i32;
431 for vol in volume.get_mut().iter_mut() {
432 let uncapped_vol = max(0, vol.0 as i32 + step) as u32;
433 let capped_vol = if let Some(vol_cap) = max_vol {
434 min(
435 uncapped_vol,
436 (vol_cap as f32 * Volume::NORMAL.0 as f32 / 100.0).round() as u32,
437 )
438 } else {
439 uncapped_vol
440 };
441 vol.0 = min(capped_vol, Volume::MAX.0);
442 }
443
444 self.volume(volume);
446 Client::send(ClientRequest::SetVolumeByName(
447 self.device_kind,
448 self.name(),
449 volume,
450 ))?;
451
452 Ok(())
453 }
454
455 async fn toggle(&mut self) -> Result<()> {
456 self.muted = !self.muted;
457
458 Client::send(ClientRequest::SetMuteByName(
459 self.device_kind,
460 self.name(),
461 self.muted,
462 ))?;
463
464 Ok(())
465 }
466
467 async fn wait_for_update(&mut self) -> Result<()> {
468 self.notify.notified().await;
469 Ok(())
470 }
471}
472
473#[derive(Debug, Clone, Copy)]
477struct MainloopWaker {
478 pipe_tx: RawFd,
480 pipe_rx: RawFd,
481}
482
483impl MainloopWaker {
484 fn new() -> io::Result<Self> {
486 let (pipe_rx, pipe_tx) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
487 Ok(Self {
488 pipe_tx: pipe_tx.into_raw_fd(),
489 pipe_rx: pipe_rx.into_raw_fd(),
490 })
491 }
492
493 fn attach(self, ml: &MainloopApi) {
497 extern "C" fn wake_cb(
498 _: *const MainloopApi,
499 _: *mut libpulse_binding::mainloop::events::io::IoEventInternal,
500 fd: RawFd,
501 _: libpulse_binding::mainloop::events::io::FlagSet,
502 _: *mut c_void,
503 ) {
504 nix::unistd::read(fd, &mut [0; 32]).unwrap();
505 }
506
507 (ml.io_new.unwrap())(
508 ml as *const _,
509 self.pipe_rx,
510 libpulse_binding::mainloop::events::io::FlagSet::INPUT,
511 Some(wake_cb),
512 std::ptr::null_mut(),
513 );
514 }
515
516 fn wake(self) -> io::Result<()> {
518 let buf = [0u8];
519 let res = unsafe { libc::write(self.pipe_tx, buf.as_ptr().cast(), 1) };
520 if res == -1 {
521 Err(io::Error::last_os_error())
522 } else {
523 Ok(())
524 }
525 }
526}