i3status_rs/blocks/sound/
pulseaudio.rs

1use std::cmp::{max, min};
2use std::convert::{TryFrom, TryInto};
3use std::io;
4use std::os::fd::{IntoRawFd as _, RawFd};
5use std::sync::{Arc, Mutex, Weak};
6use std::thread;
7
8use libc::c_void;
9use libpulse_binding::callbacks::ListResult;
10use libpulse_binding::context::{
11    Context, FlagSet, State as PulseState, introspect::ServerInfo, introspect::SinkInfo,
12    introspect::SourceInfo, subscribe::Facility, subscribe::InterestMaskSet,
13};
14use libpulse_binding::mainloop::api::MainloopApi;
15use libpulse_binding::mainloop::standard::{IterateResult, Mainloop};
16use libpulse_binding::proplist::{Proplist, properties};
17use libpulse_binding::volume::{ChannelVolumes, Volume};
18use tokio::sync::Notify;
19
20use super::super::prelude::*;
21use super::{DeviceKind, SoundDevice};
22
23static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
24static EVENT_LISTENER: Mutex<Vec<Weak<Notify>>> = Mutex::new(Vec::new());
25static DEVICES: LazyLock<Mutex<HashMap<(DeviceKind, String), VolInfo>>> = LazyLock::new(default);
26
27// Default device names
28pub(super) static DEFAULT_SOURCE: Mutex<Cow<'static, str>> =
29    Mutex::new(Cow::Borrowed("@DEFAULT_SOURCE@"));
30pub(super) static DEFAULT_SINK: Mutex<Cow<'static, str>> =
31    Mutex::new(Cow::Borrowed("@DEFAULT_SINK@"));
32
33impl DeviceKind {
34    pub fn default_name(self) -> Cow<'static, str> {
35        match self {
36            Self::Sink => DEFAULT_SINK.lock().unwrap().clone(),
37            Self::Source => DEFAULT_SOURCE.lock().unwrap().clone(),
38        }
39    }
40}
41
42pub(super) struct Device {
43    name: Option<String>,
44    description: Option<String>,
45    active_port: Option<String>,
46    form_factor: Option<String>,
47    device_kind: DeviceKind,
48    volume: Option<ChannelVolumes>,
49    volume_avg: u32,
50    muted: bool,
51    notify: Arc<Notify>,
52}
53
54struct Connection {
55    mainloop: Mainloop,
56    context: Context,
57}
58
59struct Client {
60    send_req: std::sync::mpsc::Sender<ClientRequest>,
61    ml_waker: MainloopWaker,
62}
63
64#[derive(Debug)]
65struct VolInfo {
66    volume: ChannelVolumes,
67    mute: bool,
68    name: String,
69    description: Option<String>,
70    active_port: Option<String>,
71    form_factor: Option<String>,
72}
73
74impl TryFrom<&SourceInfo<'_>> for VolInfo {
75    type Error = ();
76
77    fn try_from(source_info: &SourceInfo) -> std::result::Result<Self, Self::Error> {
78        match source_info.name.as_ref() {
79            None => Err(()),
80            Some(name) => Ok(VolInfo {
81                volume: source_info.volume,
82                mute: source_info.mute,
83                name: name.to_string(),
84                description: source_info.description.as_ref().map(|d| d.to_string()),
85                active_port: source_info
86                    .active_port
87                    .as_ref()
88                    .and_then(|a| a.name.as_ref().map(|n| n.to_string())),
89                form_factor: source_info.proplist.get_str(properties::DEVICE_FORM_FACTOR),
90            }),
91        }
92    }
93}
94
95impl TryFrom<&SinkInfo<'_>> for VolInfo {
96    type Error = ();
97
98    fn try_from(sink_info: &SinkInfo) -> std::result::Result<Self, Self::Error> {
99        match sink_info.name.as_ref() {
100            None => Err(()),
101            Some(name) => Ok(VolInfo {
102                volume: sink_info.volume,
103                mute: sink_info.mute,
104                name: name.to_string(),
105                description: sink_info.description.as_ref().map(|d| d.to_string()),
106                active_port: sink_info
107                    .active_port
108                    .as_ref()
109                    .and_then(|a| a.name.as_ref().map(|n| n.to_string())),
110                form_factor: sink_info.proplist.get_str(properties::DEVICE_FORM_FACTOR),
111            }),
112        }
113    }
114}
115
116#[derive(Debug)]
117enum ClientRequest {
118    GetDefaultDevice,
119    GetInfoByName(DeviceKind, String),
120    SetVolumeByName(DeviceKind, String, ChannelVolumes),
121    SetMuteByName(DeviceKind, String, bool),
122}
123
124impl Connection {
125    fn new() -> Result<Self> {
126        let mut proplist = Proplist::new().unwrap();
127        proplist
128            .set_str(properties::APPLICATION_NAME, env!("CARGO_PKG_NAME"))
129            .map_err(|_| Error::new("Could not set pulseaudio APPLICATION_NAME property"))?;
130
131        let mainloop = Mainloop::new().error("Failed to create pulseaudio mainloop")?;
132
133        let mut context = Context::new_with_proplist(
134            &mainloop,
135            concat!(env!("CARGO_PKG_NAME"), "_context"),
136            &proplist,
137        )
138        .error("Failed to create new pulseaudio context")?;
139
140        context
141            .connect(None, FlagSet::NOFLAGS, None)
142            .error("Failed to connect to pulseaudio context")?;
143
144        let mut connection = Connection { mainloop, context };
145
146        // Wait for context to be ready
147        loop {
148            connection.iterate(false)?;
149            match connection.context.get_state() {
150                PulseState::Ready => {
151                    break;
152                }
153                PulseState::Failed | PulseState::Terminated => {
154                    return Err(Error::new("pulseaudio context state failed/terminated"));
155                }
156                _ => {}
157            }
158        }
159
160        Ok(connection)
161    }
162
163    fn iterate(&mut self, blocking: bool) -> Result<()> {
164        match self.mainloop.iterate(blocking) {
165            IterateResult::Quit(_) | IterateResult::Err(_) => {
166                Err(Error::new("failed to iterate pulseaudio state"))
167            }
168            IterateResult::Success(_) => Ok(()),
169        }
170    }
171
172    /// Create connection in a new thread.
173    ///
174    /// If connection can't be created, Err is returned.
175    fn spawn(thread_name: &str, f: impl Fn(Self) -> bool + Send + 'static) -> Result<()> {
176        let (tx, rx) = std::sync::mpsc::sync_channel(0);
177        thread::Builder::new()
178            .name(thread_name.to_owned())
179            .spawn(move || match Self::new() {
180                Ok(mut conn) => {
181                    tx.send(Ok(())).unwrap();
182                    while f(conn) {
183                        let mut try_i = 0usize;
184                        loop {
185                            try_i += 1;
186                            let delay =
187                                Duration::from_millis(if try_i <= 10 { 100 } else { 5_000 });
188                            eprintln!("reconnecting to pulseaudio in {delay:?}... (try {try_i})");
189                            thread::sleep(delay);
190                            if let Ok(c) = Self::new() {
191                                eprintln!("reconnected to pulseaudio");
192                                conn = c;
193                                break;
194                            }
195                        }
196                    }
197                }
198                Err(err) => {
199                    tx.send(Err(err)).unwrap();
200                }
201            })
202            .error("failed to spawn a thread")?;
203        rx.recv().error("channel closed")?
204    }
205}
206
207impl Client {
208    fn new() -> Result<Client> {
209        let (send_req, recv_req) = std::sync::mpsc::channel();
210        let ml_waker = MainloopWaker::new().unwrap();
211
212        Connection::spawn("sound_pulseaudio", move |mut connection| {
213            ml_waker.attach(connection.mainloop.get_api());
214
215            let introspector = connection.context.introspect();
216            connection
217                .context
218                .set_subscribe_callback(Some(Box::new(move |facility, _, index| match facility {
219                    Some(Facility::Server) => {
220                        introspector.get_server_info(Client::server_info_callback);
221                    }
222                    Some(Facility::Sink) => {
223                        introspector.get_sink_info_by_index(index, Client::sink_info_callback);
224                    }
225                    Some(Facility::Source) => {
226                        introspector.get_source_info_by_index(index, Client::source_info_callback);
227                    }
228                    _ => (),
229                })));
230
231            connection.context.subscribe(
232                InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE,
233                |_| (),
234            );
235
236            let mut introspector = connection.context.introspect();
237
238            loop {
239                loop {
240                    connection.iterate(true).unwrap();
241                    match connection.context.get_state() {
242                        PulseState::Ready => break,
243                        PulseState::Failed => return true,
244                        _ => (),
245                    }
246                }
247
248                loop {
249                    use std::sync::mpsc::TryRecvError;
250                    let req = match recv_req.try_recv() {
251                        Ok(x) => x,
252                        Err(TryRecvError::Empty) => break,
253                        Err(TryRecvError::Disconnected) => return false,
254                    };
255
256                    use ClientRequest::*;
257                    match req {
258                        GetDefaultDevice => {
259                            introspector.get_server_info(Client::server_info_callback);
260                        }
261                        GetInfoByName(DeviceKind::Sink, name) => {
262                            introspector.get_sink_info_by_name(&name, Client::sink_info_callback);
263                        }
264                        GetInfoByName(DeviceKind::Source, name) => {
265                            introspector
266                                .get_source_info_by_name(&name, Client::source_info_callback);
267                        }
268                        SetVolumeByName(DeviceKind::Sink, name, volumes) => {
269                            introspector.set_sink_volume_by_name(&name, &volumes, None);
270                        }
271                        SetVolumeByName(DeviceKind::Source, name, volumes) => {
272                            introspector.set_source_volume_by_name(&name, &volumes, None);
273                        }
274                        SetMuteByName(DeviceKind::Sink, name, mute) => {
275                            introspector.set_sink_mute_by_name(&name, mute, None);
276                        }
277                        SetMuteByName(DeviceKind::Source, name, mute) => {
278                            introspector.set_source_mute_by_name(&name, mute, None);
279                        }
280                    };
281                }
282            }
283        })?;
284
285        Ok(Client { send_req, ml_waker })
286    }
287
288    fn send(request: ClientRequest) -> Result<()> {
289        match CLIENT.as_ref() {
290            Ok(client) => {
291                client.send_req.send(request).unwrap();
292                client.ml_waker.wake().unwrap();
293                Ok(())
294            }
295            Err(err) => Err(Error::new(format!(
296                "pulseaudio connection failed with error: {err}",
297            ))),
298        }
299    }
300
301    fn server_info_callback(server_info: &ServerInfo) {
302        if let Some(default_sink) = server_info.default_sink_name.as_ref() {
303            *DEFAULT_SINK.lock().unwrap() = default_sink.to_string().into();
304        }
305
306        if let Some(default_source) = server_info.default_source_name.as_ref() {
307            *DEFAULT_SOURCE.lock().unwrap() = default_source.to_string().into();
308        }
309
310        Client::send_update_event();
311    }
312
313    fn get_info_callback<I: TryInto<VolInfo>>(result: ListResult<I>) -> Option<VolInfo> {
314        match result {
315            ListResult::End | ListResult::Error => None,
316            ListResult::Item(info) => info.try_into().ok(),
317        }
318    }
319
320    fn sink_info_callback(result: ListResult<&SinkInfo>) {
321        if let Some(vol_info) = Self::get_info_callback(result) {
322            DEVICES
323                .lock()
324                .unwrap()
325                .insert((DeviceKind::Sink, vol_info.name.to_string()), vol_info);
326
327            Client::send_update_event();
328        }
329    }
330
331    fn source_info_callback(result: ListResult<&SourceInfo>) {
332        if let Some(vol_info) = Self::get_info_callback(result) {
333            DEVICES
334                .lock()
335                .unwrap()
336                .insert((DeviceKind::Source, vol_info.name.to_string()), vol_info);
337
338            Client::send_update_event();
339        }
340    }
341
342    fn send_update_event() {
343        EVENT_LISTENER
344            .lock()
345            .unwrap()
346            .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some());
347    }
348}
349
350impl Device {
351    pub(super) fn new(device_kind: DeviceKind, name: Option<String>) -> Result<Self> {
352        let notify = Arc::new(Notify::new());
353        EVENT_LISTENER.lock().unwrap().push(Arc::downgrade(&notify));
354
355        Client::send(ClientRequest::GetDefaultDevice)?;
356
357        let device = Device {
358            name,
359            description: None,
360            active_port: None,
361            form_factor: None,
362            device_kind,
363            volume: None,
364            volume_avg: 0,
365            muted: false,
366            notify,
367        };
368
369        Client::send(ClientRequest::GetInfoByName(device_kind, device.name()))?;
370
371        Ok(device)
372    }
373
374    fn name(&self) -> String {
375        self.name
376            .clone()
377            .unwrap_or_else(|| self.device_kind.default_name().into())
378    }
379
380    fn volume(&mut self, volume: ChannelVolumes) {
381        self.volume = Some(volume);
382        self.volume_avg = (volume.avg().0 as f32 / Volume::NORMAL.0 as f32 * 100.0).round() as u32;
383    }
384}
385
386#[async_trait::async_trait]
387impl SoundDevice for Device {
388    fn volume(&self) -> u32 {
389        self.volume_avg
390    }
391
392    fn muted(&self) -> bool {
393        self.muted
394    }
395
396    fn output_name(&self) -> String {
397        self.name()
398    }
399
400    fn output_description(&self) -> Option<String> {
401        self.description.clone()
402    }
403
404    fn active_port(&self) -> Option<String> {
405        self.active_port.clone()
406    }
407
408    fn form_factor(&self) -> Option<&str> {
409        self.form_factor.as_deref()
410    }
411
412    async fn get_info(&mut self) -> Result<()> {
413        let devices = DEVICES.lock().unwrap();
414
415        if let Some(info) = devices.get(&(self.device_kind, self.name())) {
416            self.volume(info.volume);
417            self.muted = info.mute;
418            self.description.clone_from(&info.description);
419            self.active_port.clone_from(&info.active_port);
420            self.form_factor.clone_from(&info.form_factor);
421        }
422
423        Ok(())
424    }
425
426    async fn set_volume(&mut self, step: i32, max_vol: Option<u32>) -> Result<()> {
427        let mut volume = self.volume.error("Volume unknown")?;
428
429        // apply step to volumes
430        let step = (step as f32 * Volume::NORMAL.0 as f32 / 100.0).round() as i32;
431        for vol in volume.get_mut().iter_mut() {
432            let uncapped_vol = max(0, vol.0 as i32 + step) as u32;
433            let capped_vol = if let Some(vol_cap) = max_vol {
434                min(
435                    uncapped_vol,
436                    (vol_cap as f32 * Volume::NORMAL.0 as f32 / 100.0).round() as u32,
437                )
438            } else {
439                uncapped_vol
440            };
441            vol.0 = min(capped_vol, Volume::MAX.0);
442        }
443
444        // update volumes
445        self.volume(volume);
446        Client::send(ClientRequest::SetVolumeByName(
447            self.device_kind,
448            self.name(),
449            volume,
450        ))?;
451
452        Ok(())
453    }
454
455    async fn toggle(&mut self) -> Result<()> {
456        self.muted = !self.muted;
457
458        Client::send(ClientRequest::SetMuteByName(
459            self.device_kind,
460            self.name(),
461            self.muted,
462        ))?;
463
464        Ok(())
465    }
466
467    async fn wait_for_update(&mut self) -> Result<()> {
468        self.notify.notified().await;
469        Ok(())
470    }
471}
472
473/// Thread safe [`Mainloop`] waker.
474///
475/// Has the same purpose as [`Mainloop::wake`], but can be shared across threads.
476#[derive(Debug, Clone, Copy)]
477struct MainloopWaker {
478    // Note: these fds are never closed, but this is OK because there is only one instance of this struct.
479    pipe_tx: RawFd,
480    pipe_rx: RawFd,
481}
482
483impl MainloopWaker {
484    /// Create new waker.
485    fn new() -> io::Result<Self> {
486        let (pipe_rx, pipe_tx) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
487        Ok(Self {
488            pipe_tx: pipe_tx.into_raw_fd(),
489            pipe_rx: pipe_rx.into_raw_fd(),
490        })
491    }
492
493    /// Attach this waker to a [`Mainloop`].
494    ///
495    /// A waker should be attached to _one_ mainloop.
496    fn attach(self, ml: &MainloopApi) {
497        extern "C" fn wake_cb(
498            _: *const MainloopApi,
499            _: *mut libpulse_binding::mainloop::events::io::IoEventInternal,
500            fd: RawFd,
501            _: libpulse_binding::mainloop::events::io::FlagSet,
502            _: *mut c_void,
503        ) {
504            nix::unistd::read(fd, &mut [0; 32]).unwrap();
505        }
506
507        (ml.io_new.unwrap())(
508            ml as *const _,
509            self.pipe_rx,
510            libpulse_binding::mainloop::events::io::FlagSet::INPUT,
511            Some(wake_cb),
512            std::ptr::null_mut(),
513        );
514    }
515
516    /// Interrupt blocking [`Mainloop::iterate`].
517    fn wake(self) -> io::Result<()> {
518        let buf = [0u8];
519        let res = unsafe { libc::write(self.pipe_tx, buf.as_ptr().cast(), 1) };
520        if res == -1 {
521            Err(io::Error::last_os_error())
522        } else {
523            Ok(())
524        }
525    }
526}