i3status_rs/
pipewire.rs

1use std::cell::{Cell, RefCell};
2use std::collections::{HashMap, HashSet};
3use std::io::Cursor;
4use std::rc::Rc;
5use std::str::FromStr;
6use std::sync::{LazyLock, Mutex};
7use std::thread::JoinHandle;
8use std::time::Duration;
9use std::{panic, thread};
10
11pub(crate) use ::pipewire::channel::Sender as PwSender;
12use ::pipewire::{
13    channel::{Receiver as PwReceiver, channel as pw_channel},
14    context::ContextRc,
15    device::Device as DeviceProxy,
16    keys,
17    main_loop::MainLoopRc,
18    metadata::Metadata as MetadataProxy,
19    node::{Node as NodeProxy, NodeState},
20    properties::properties,
21    proxy::{Listener, ProxyListener, ProxyT},
22    spa::{
23        param::{ParamType, audio::AudioInfoRaw},
24        pod::{
25            Object, Pod, Property, Value, ValueArray, deserialize::PodDeserializer,
26            serialize::PodSerializer,
27        },
28        sys::{
29            SPA_DIRECTION_INPUT, SPA_DIRECTION_OUTPUT, SPA_PARAM_EnumFormat,
30            SPA_PARAM_ROUTE_device, SPA_PARAM_ROUTE_direction, SPA_PARAM_ROUTE_index,
31            SPA_PARAM_ROUTE_name, SPA_PARAM_ROUTE_props, SPA_PARAM_ROUTE_save,
32            SPA_PROP_channelVolumes, SPA_PROP_mute, SPA_TYPE_OBJECT_Format,
33        },
34        utils::{SpaTypes, dict::DictRef},
35    },
36    stream::{StreamBox, StreamFlags},
37    types::ObjectType,
38};
39use bitflags::bitflags;
40use tokio::sync::mpsc::UnboundedSender;
41
42use crate::{Error, ErrorContext as _, Result};
43
44make_log_macro!(debug, "pipewire");
45
46pub(crate) static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
47pub(crate) const PIPEWIRE_CONNECTION_ERROR_MSG: &str = "Could not connect to pipewire";
48
49const NORMAL: f32 = 100.0;
50const DEFAULT_SINK_KEY: &str = "default.audio.sink";
51const DEFAULT_SOURCE_KEY: &str = "default.audio.source";
52
53#[derive(Debug, Clone, Copy)]
54pub(crate) enum Direction {
55    Input,
56    Output,
57}
58
59impl FromStr for Direction {
60    type Err = Error;
61
62    fn from_str(s: &str) -> Result<Self, Self::Err> {
63        if s.ends_with("/Source") {
64            Ok(Self::Input)
65        } else if s.ends_with("/Sink") {
66            Ok(Self::Output)
67        } else {
68            Err(Error::new("Invalid media class to determine direction"))
69        }
70    }
71}
72
73impl TryFrom<u32> for Direction {
74    type Error = Error;
75
76    fn try_from(value: u32) -> Result<Self, Self::Error> {
77        match value {
78            SPA_DIRECTION_INPUT => Ok(Self::Input),
79            SPA_DIRECTION_OUTPUT => Ok(Self::Output),
80            _ => Err(Error::new("Invalid direction value, must be 0 or 1")),
81        }
82    }
83}
84
85#[derive(Debug)]
86pub(crate) struct Node {
87    proxy_id: u32,
88    pub device_id: Option<u32>,
89    pub name: String,
90    pub nick: Option<String>,
91    pub media_class: Option<String>,
92    // direction is derived from media_class
93    pub direction: Option<Direction>,
94    pub media_role: Option<String>,
95    //These come from the proxy
96    pub running: bool,
97    pub muted: Option<bool>,
98    pub volume: Option<Vec<f32>>,
99    pub description: Option<String>,
100    pub form_factor: Option<String>,
101}
102
103impl Node {
104    fn new(global_id: u32, global_props: &DictRef, proxy_id: u32) -> Self {
105        Self {
106            proxy_id,
107            device_id: global_props
108                .get(&keys::DEVICE_ID)
109                .and_then(|s| s.parse().ok()),
110            name: global_props
111                .get(&keys::NODE_NAME)
112                .map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
113            nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
114            media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
115            direction: global_props
116                .get(&keys::MEDIA_CLASS)
117                .and_then(|s| s.parse().ok()),
118            media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
119            description: global_props
120                .get(&keys::NODE_DESCRIPTION)
121                .map(|s| s.to_string()),
122            form_factor: global_props
123                .get(&keys::DEVICE_FORM_FACTOR)
124                .map(|s| s.to_string()),
125            muted: None,
126            volume: None,
127            running: false,
128        }
129    }
130}
131
132#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
133pub(crate) struct Link {
134    pub link_output_node: u32,
135    pub link_input_node: u32,
136}
137
138impl Link {
139    fn new(global_props: &DictRef) -> Option<Self> {
140        if let Some(link_output_node) = global_props
141            .get(&keys::LINK_OUTPUT_NODE)
142            .and_then(|s| s.parse().ok())
143            && let Some(link_input_node) = global_props
144                .get(&keys::LINK_INPUT_NODE)
145                .and_then(|s| s.parse().ok())
146        {
147            Some(Self {
148                link_output_node,
149                link_input_node,
150            })
151        } else {
152            None
153        }
154    }
155}
156
157#[derive(Debug)]
158pub(crate) struct Route {
159    index: i32,
160    device: i32,
161    pub name: String,
162}
163
164#[derive(Debug, Default)]
165pub(crate) struct DirectedRoutes {
166    proxy_id: u32,
167    //These come from the proxy
168    input: Option<Route>,
169    output: Option<Route>,
170}
171
172impl DirectedRoutes {
173    fn new(proxy_id: u32) -> Self {
174        Self {
175            proxy_id,
176            input: None,
177            output: None,
178        }
179    }
180
181    pub fn get_route(&self, direction: Direction) -> Option<&Route> {
182        match direction {
183            Direction::Input => self.input.as_ref(),
184            Direction::Output => self.output.as_ref(),
185        }
186    }
187
188    fn get_mut_route(&mut self, direction: Direction) -> &mut Option<Route> {
189        match direction {
190            Direction::Input => &mut self.input,
191            Direction::Output => &mut self.output,
192        }
193    }
194}
195
196#[derive(Default)]
197pub(crate) struct DefaultMetadata {
198    pub sink: Option<String>,
199    pub source: Option<String>,
200}
201
202#[derive(Default)]
203pub(crate) struct Data {
204    pub nodes: HashMap<u32, Node>,
205    pub links: HashMap<u32, Link>,
206    pub default_metadata: DefaultMetadata,
207    pub directed_routes: HashMap<u32, DirectedRoutes>,
208    ports: HashSet<u32>,
209}
210
211struct Proxies<T: ProxyT + 'static> {
212    proxies_t: HashMap<u32, T>,
213    listeners: HashMap<u32, Vec<Box<dyn Listener>>>,
214}
215
216impl<T: ProxyT + 'static> Proxies<T> {
217    fn new() -> Self {
218        Self {
219            proxies_t: HashMap::new(),
220            listeners: HashMap::new(),
221        }
222    }
223
224    fn add_proxy(
225        &mut self,
226        proxy: T,
227        listener: impl Listener + 'static,
228        proxies: &Rc<RefCell<Self>>,
229    ) -> u32 {
230        let listener_spe = Box::new(listener);
231
232        let proxy_upcast = proxy.upcast_ref();
233        let proxy_id = proxy_upcast.id();
234
235        let proxies_weak = Rc::downgrade(proxies);
236
237        let listener = proxy_upcast
238            .add_listener_local()
239            .removed(move || {
240                if let Some(proxies) = proxies_weak.upgrade() {
241                    proxies.borrow_mut().remove(proxy_id);
242                }
243            })
244            .register();
245
246        self.add_proxy_t(proxy_id, proxy, listener_spe);
247        self.add_proxy_listener(proxy_id, listener);
248
249        proxy_id
250    }
251
252    fn add_proxy_t(&mut self, proxy_id: u32, device_proxy: T, listener: Box<dyn Listener>) {
253        self.proxies_t.insert(proxy_id, device_proxy);
254        self.listeners.entry(proxy_id).or_default().push(listener);
255    }
256
257    fn add_proxy_listener(&mut self, proxy_id: u32, listener: ProxyListener) {
258        self.listeners
259            .entry(proxy_id)
260            .or_default()
261            .push(Box::new(listener));
262    }
263
264    fn remove(&mut self, proxy_id: u32) {
265        self.proxies_t.remove(&proxy_id);
266        self.listeners.remove(&proxy_id);
267    }
268}
269
270bitflags! {
271    #[derive(Debug, Clone, Copy, Default)]
272    pub(crate) struct EventKind: u16 {
273        const PIPEWIRE_CONNECTION_ERROR = 1 <<  0;
274        const DEFAULT_META_DATA_UPDATED = 1 <<  1;
275        const DEVICE_ADDED              = 1 <<  2;
276        const DEVICE_PARAM_UPDATE       = 1 <<  3;
277        const DEVICE_REMOVED            = 1 <<  4;
278        const LINK_ADDED                = 1 <<  5;
279        const LINK_REMOVED              = 1 <<  6;
280        const NODE_ADDED                = 1 <<  7;
281        const NODE_PARAM_UPDATE         = 1 <<  8;
282        const NODE_REMOVED              = 1 <<  9;
283        const NODE_STATE_UPDATE         = 1 << 10;
284        const PORT_ADDED                = 1 << 11;
285        const PORT_REMOVED              = 1 << 12;
286    }
287}
288
289#[derive(Clone, Debug)]
290pub(crate) enum CommandKind {
291    Mute(u32, bool),
292    SetVolume(u32, Vec<f32>),
293}
294
295impl CommandKind {
296    fn create_property_value(&self, param_type: ParamType) -> Value {
297        match self {
298            CommandKind::SetVolume(_, volume) => Value::Object(Object {
299                type_: SpaTypes::ObjectParamProps.as_raw(),
300                id: param_type.as_raw(),
301                properties: vec![Property::new(
302                    SPA_PROP_channelVolumes,
303                    Value::ValueArray(ValueArray::Float(
304                        volume
305                            .iter()
306                            .map(|vol| {
307                                let vol = vol / NORMAL;
308                                vol * vol * vol
309                            })
310                            .collect(),
311                    )),
312                )],
313            }),
314            CommandKind::Mute(_, mute) => Value::Object(Object {
315                type_: SpaTypes::ObjectParamProps.as_raw(),
316                id: param_type.as_raw(),
317                properties: vec![Property::new(SPA_PROP_mute, Value::Bool(*mute))],
318            }),
319        }
320    }
321
322    fn execute(
323        self,
324        client: &Client,
325        node_proxies: Rc<RefCell<Proxies<NodeProxy>>>,
326        device_proxies: Rc<RefCell<Proxies<DeviceProxy>>>,
327    ) {
328        debug!("Executing command: {:?}", self);
329        use CommandKind::*;
330        let id = match self {
331            SetVolume(id, _) | Mute(id, _) => id,
332        };
333        let client_data = client.data.lock().unwrap();
334        if let Some(node) = client_data.nodes.get(&id) {
335            if let Some(node_proxy) = node_proxies.borrow_mut().proxies_t.get(&node.proxy_id) {
336                let node_param = self.create_property_value(ParamType::Props);
337                debug!("Setting Node Props param: {:?}", node_param);
338
339                let pod_data = PodSerializer::serialize(Cursor::new(Vec::new()), &node_param)
340                    .expect("Failed to serialize node props pod")
341                    .0
342                    .into_inner();
343                let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
344                node_proxy.set_param(ParamType::Props, 0, pod);
345            }
346
347            if let Some(device_id) = node.device_id
348                && let Some(direction) = node.direction
349                && let Some(directed_routes) = client_data.directed_routes.get(&device_id)
350                && let Some(route) = directed_routes.get_route(direction)
351                && let Some(device_proxy) = device_proxies
352                    .borrow_mut()
353                    .proxies_t
354                    .get(&directed_routes.proxy_id)
355            {
356                let route_param = Value::Object(Object {
357                    type_: SpaTypes::ObjectParamRoute.as_raw(),
358                    id: ParamType::Route.as_raw(),
359                    properties: vec![
360                        Property::new(SPA_PARAM_ROUTE_index, Value::Int(route.index)),
361                        Property::new(SPA_PARAM_ROUTE_device, Value::Int(route.device)),
362                        Property::new(
363                            SPA_PARAM_ROUTE_props,
364                            self.create_property_value(ParamType::Route),
365                        ),
366                        Property::new(SPA_PARAM_ROUTE_save, Value::Bool(true)),
367                    ],
368                });
369                debug!("Setting Device Route param: {:?}", route_param);
370
371                let pod_data = PodSerializer::serialize(Cursor::new(Vec::new()), &route_param)
372                    .expect("Failed to serialize route pod")
373                    .0
374                    .into_inner();
375                let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
376                device_proxy.set_param(ParamType::Route, 0, pod);
377            }
378        }
379    }
380}
381
382pub(crate) struct Client {
383    event_senders: Mutex<Vec<UnboundedSender<EventKind>>>,
384    command_sender: PwSender<CommandKind>,
385    handle: JoinHandle<()>,
386    pub data: Mutex<Data>,
387}
388
389impl Client {
390    fn new() -> Result<Client> {
391        let (command_sender, command_receiver) = pw_channel();
392
393        let handle = thread::Builder::new()
394            .name("i3status_pipewire".to_string())
395            .spawn(|| Client::main_loop_thread(command_receiver))
396            .error("failed to spawn a thread")?;
397
398        Ok(Self {
399            event_senders: Mutex::new(Vec::new()),
400            command_sender,
401            handle,
402            data: Mutex::new(Data::default()),
403        })
404    }
405
406    pub fn is_terminated(&self) -> bool {
407        self.handle.is_finished()
408    }
409
410    fn main_loop_thread(command_receiver: PwReceiver<CommandKind>) -> ! {
411        let client = CLIENT.as_ref().expect("Could not get client");
412
413        panic::set_hook(Box::new(|_| {
414            client.send_update_event(EventKind::PIPEWIRE_CONNECTION_ERROR);
415        }));
416
417        let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
418
419        let main_loop = MainLoopRc::new(None).expect("Failed to create main loop");
420
421        let context = ContextRc::new(&main_loop, Some(proplist)).expect("Failed to create context");
422        let core = context.connect_rc(None).expect("Failed to connect");
423        let registry = core.get_registry_rc().expect("Failed to get registry");
424        let registry_weak = registry.downgrade();
425
426        let update = Rc::new(RefCell::new(EventKind::empty()));
427        let update_copy = update.clone();
428        let update_copy2 = update.clone();
429
430        // Proxies and their listeners need to stay alive so store them here
431        let node_proxies = Rc::new(RefCell::new(Proxies::<NodeProxy>::new()));
432        let node_proxies_weak = Rc::downgrade(&node_proxies);
433        let device_proxies = Rc::new(RefCell::new(Proxies::<DeviceProxy>::new()));
434        let device_proxies_weak = Rc::downgrade(&device_proxies);
435        let metadata_proxies = Rc::new(RefCell::new(Proxies::<MetadataProxy>::new()));
436
437        let _receiver = command_receiver.attach(main_loop.loop_(), move |command: CommandKind| {
438            if let Some(node_proxies) = node_proxies_weak.upgrade()
439                && let Some(device_proxies) = device_proxies_weak.upgrade()
440            {
441                command.execute(client, node_proxies.clone(), device_proxies.clone());
442            }
443        });
444
445        // Register a callback to the `global` event on the registry, which notifies of any new global objects
446        // appearing on the remote.
447        // The callback will only get called as long as we keep the returned listener alive.
448        let _registry_listener = registry
449            .add_listener_local()
450            .global(move |global| {
451                let Some(registry) = registry_weak.upgrade() else {
452                    return;
453                };
454                let global_id = global.id;
455                let Some(global_props) = global.props else {
456                    return;
457                };
458                match &global.type_ {
459                    ObjectType::Node => {
460                        let node_proxy: NodeProxy =
461                            registry.bind(global).expect("Could not bind node");
462                        node_proxy.subscribe_params(&[ParamType::Props]);
463                        let update_copy2 = update_copy.clone();
464                        let update_copy3 = update_copy.clone();
465                        let node_listener = node_proxy
466                            .add_listener_local()
467                            .info(move |info| {
468                                let running = matches!(info.state(), NodeState::Running);
469                                client
470                                    .data
471                                    .lock()
472                                    .unwrap()
473                                    .nodes
474                                    .entry(global_id)
475                                    .and_modify(|node| {
476                                        if node.running != running {
477                                            node.running = running;
478                                            update_copy2.replace_with(|v| {
479                                                *v | EventKind::NODE_STATE_UPDATE
480                                            });
481                                        }
482                                    });
483                            })
484                            .param(move |_seq, _id, _index, _next, param| {
485                                let Some(param) = param else {
486                                    return;
487                                };
488                                let Ok((_, Value::Object(object))) =
489                                    PodDeserializer::deserialize_from::<Value>(param.as_bytes())
490                                else {
491                                    return;
492                                };
493                                client
494                                    .data
495                                    .lock()
496                                    .unwrap()
497                                    .nodes
498                                    .entry(global_id)
499                                    .and_modify(|node| {
500                                        for property in object.properties {
501                                            if property.key == SPA_PROP_mute {
502                                                let Value::Bool(muted) = property.value else {
503                                                    return;
504                                                };
505                                                let muted = Some(muted);
506                                                if node.muted != muted {
507                                                    node.muted = muted;
508                                                    update_copy3.replace_with(|v| {
509                                                        *v | EventKind::NODE_PARAM_UPDATE
510                                                    });
511                                                }
512                                            } else if property.key == SPA_PROP_channelVolumes {
513                                                let Value::ValueArray(ValueArray::Float(volumes)) =
514                                                    property.value
515                                                else {
516                                                    return;
517                                                };
518
519                                                let volume = Some(
520                                                    volumes
521                                                        .iter()
522                                                        .map(|vol| vol.cbrt() * NORMAL)
523                                                        .collect(),
524                                                );
525                                                if node.volume != volume {
526                                                    node.volume = volume;
527                                                    update_copy3.replace_with(|v| {
528                                                        *v | EventKind::NODE_PARAM_UPDATE
529                                                    });
530                                                }
531                                            }
532                                        }
533                                    });
534                            })
535                            .register();
536
537                        let proxy_id = node_proxies.borrow_mut().add_proxy(
538                            node_proxy,
539                            node_listener,
540                            &node_proxies,
541                        );
542
543                        client
544                            .data
545                            .lock()
546                            .unwrap()
547                            .nodes
548                            .insert(global_id, Node::new(global_id, global_props, proxy_id));
549                        update_copy.replace_with(|v| *v | EventKind::NODE_ADDED);
550                    }
551                    ObjectType::Link => {
552                        let mut client_data = client.data.lock().unwrap();
553                        let Some(link) = Link::new(global_props) else {
554                            return;
555                        };
556                        if let Some(node) = client_data.nodes.get(&link.link_input_node)
557                            && node.name == env!("CARGO_PKG_NAME")
558                        {
559                            return;
560                        }
561
562                        client_data.links.insert(global_id, link);
563                        update_copy.replace_with(|v| *v | EventKind::LINK_ADDED);
564                    }
565                    ObjectType::Port => {
566                        client.data.lock().unwrap().ports.insert(global_id);
567                        update_copy.replace_with(|v| *v | EventKind::PORT_ADDED);
568                    }
569                    ObjectType::Device => {
570                        let device_proxy: DeviceProxy =
571                            registry.bind(global).expect("Could not bind device");
572                        device_proxy.subscribe_params(&[ParamType::Route]);
573                        let update_copy2 = update_copy.clone();
574                        let device_listener = device_proxy
575                            .add_listener_local()
576                            .param(move |_seq, _id, _index, _next, param| {
577                                let Some(param) = param else {
578                                    return;
579                                };
580                                let Ok((_, Value::Object(object))) =
581                                    PodDeserializer::deserialize_from::<Value>(param.as_bytes())
582                                else {
583                                    return;
584                                };
585                                let mut route_index = None;
586                                let mut route_direction = None;
587                                let mut route_device = None;
588                                let mut route_name = None;
589                                for property in &object.properties {
590                                    if property.key == SPA_PARAM_ROUTE_index {
591                                        let Value::Int(route_index_v) = property.value else {
592                                            return;
593                                        };
594                                        route_index = Some(route_index_v);
595                                    } else if property.key == SPA_PARAM_ROUTE_direction {
596                                        let Value::Id(route_direction_v) = property.value else {
597                                            return;
598                                        };
599                                        route_direction = route_direction_v.0.try_into().ok();
600                                    } else if property.key == SPA_PARAM_ROUTE_device {
601                                        let Value::Int(route_device_v) = property.value else {
602                                            return;
603                                        };
604                                        route_device = Some(route_device_v);
605                                    } else if property.key == SPA_PARAM_ROUTE_name {
606                                        let Value::String(route_name_v) = property.value.to_owned()
607                                        else {
608                                            return;
609                                        };
610                                        route_name = Some(route_name_v);
611                                    }
612                                }
613
614                                if let Some(route_index) = route_index
615                                    && let Some(route_direction) = route_direction
616                                    && let Some(route_device) = route_device
617                                    && let Some(route_name) = route_name
618                                {
619                                    client
620                                        .data
621                                        .lock()
622                                        .unwrap()
623                                        .directed_routes
624                                        .entry(global_id)
625                                        .and_modify(|directed_routes| {
626                                            let route =
627                                                directed_routes.get_mut_route(route_direction);
628                                            if let Some(route) = route {
629                                                if route.index != route_index
630                                                    || route.device != route_device
631                                                    || route.name != route_name
632                                                {
633                                                    route.index = route_index;
634                                                    route.device = route_device;
635                                                    route.name = route_name;
636                                                    update_copy2.replace_with(|v| {
637                                                        *v | EventKind::DEVICE_PARAM_UPDATE
638                                                    });
639                                                }
640                                            } else {
641                                                route.replace(Route {
642                                                    index: route_index,
643                                                    device: route_device,
644                                                    name: route_name,
645                                                });
646                                                update_copy2.replace_with(|v| {
647                                                    *v | EventKind::DEVICE_PARAM_UPDATE
648                                                });
649                                            }
650                                        });
651                                }
652                            })
653                            .register();
654
655                        let proxy_id = device_proxies.borrow_mut().add_proxy(
656                            device_proxy,
657                            device_listener,
658                            &device_proxies,
659                        );
660
661                        client
662                            .data
663                            .lock()
664                            .unwrap()
665                            .directed_routes
666                            .insert(global_id, DirectedRoutes::new(proxy_id));
667
668                        update_copy.replace_with(|v| *v | EventKind::DEVICE_ADDED);
669                    }
670                    ObjectType::Metadata => {
671                        // There are many kinds of metadata, but we are only interested in the default metadata
672                        if global_props.get("metadata.name") != Some("default") {
673                            return;
674                        }
675                        let metadata_proxy: MetadataProxy =
676                            registry.bind(global).expect("Could not bind device");
677                        let update_copy2 = update_copy.clone();
678                        let metadata_listener = metadata_proxy
679                            .add_listener_local()
680                            .property(move |_subject, key, type_, value| {
681                                if type_ != Some("Spa:String:JSON")
682                                    || (key != Some(DEFAULT_SINK_KEY)
683                                        && key != Some(DEFAULT_SOURCE_KEY))
684                                {
685                                    return -1;
686                                }
687
688                                let Some(value) = value else {
689                                    return -1;
690                                };
691
692                                let value: serde_json::Value =
693                                    serde_json::from_str(value).unwrap_or_default();
694                                let name = value
695                                    .get("name")
696                                    .and_then(|v| v.as_str())
697                                    .map(|s| s.to_string());
698                                if key == Some(DEFAULT_SINK_KEY) {
699                                    let default_medata_sink =
700                                        &mut client.data.lock().unwrap().default_metadata.sink;
701                                    if *default_medata_sink != name {
702                                        *default_medata_sink = name;
703                                        update_copy2.replace_with(|v| {
704                                            *v | EventKind::DEFAULT_META_DATA_UPDATED
705                                        });
706                                    }
707                                } else {
708                                    let default_medata_source =
709                                        &mut client.data.lock().unwrap().default_metadata.source;
710                                    if *default_medata_source != name {
711                                        *default_medata_source = name;
712                                        update_copy2.replace_with(|v| {
713                                            *v | EventKind::DEFAULT_META_DATA_UPDATED
714                                        });
715                                    }
716                                }
717
718                                0
719                            })
720                            .register();
721
722                        metadata_proxies.borrow_mut().add_proxy(
723                            metadata_proxy,
724                            metadata_listener,
725                            &metadata_proxies,
726                        );
727                    }
728                    _ => (),
729                }
730            })
731            .global_remove(move |uid| {
732                let mut client_data = client.data.lock().unwrap();
733                if client_data.nodes.remove(&uid).is_some() {
734                    update_copy2.replace_with(|v| *v | EventKind::NODE_REMOVED);
735                } else if client_data.links.remove(&uid).is_some() {
736                    update_copy2.replace_with(|v| *v | EventKind::LINK_REMOVED);
737                } else if client_data.ports.remove(&uid) {
738                    update_copy2.replace_with(|v| *v | EventKind::PORT_REMOVED);
739                } else if client_data.directed_routes.remove(&uid).is_some() {
740                    update_copy2.replace_with(|v| *v | EventKind::DEVICE_REMOVED);
741                }
742            })
743            .register();
744
745        /* START WORKAROUND FOR PIPEWIRE LOOPBACK BUG
746
747        This workaround fixes an issue caused by the loopback device.
748        Without this workaround, if nothing has interacted with the sink/source then setting
749        the volume/mute will fail.
750
751        This workaround creates input and output streams which forces pipewire
752        to use the real sink/source instead of the loopback device.
753
754        For context see:
755
756            * https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/4949
757            * https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/4610
758            * https://gitlab.freedesktop.org/pipewire/wireplumber/-/issues/822
759        */
760
761        let output_done = Rc::new(Cell::new(false));
762        let output_done_clone = output_done.clone();
763        let input_done = Rc::new(Cell::new(false));
764        let input_done_clone = input_done.clone();
765
766        let values: Vec<u8> = PodSerializer::serialize(
767            Cursor::new(Vec::new()),
768            &Value::Object(Object {
769                type_: SPA_TYPE_OBJECT_Format,
770                id: SPA_PARAM_EnumFormat,
771                properties: AudioInfoRaw::new().into(),
772            }),
773        )
774        .expect("Failed to serialize pod")
775        .0
776        .into_inner();
777
778        let mut params = [Pod::from_bytes(&values).expect("Failed to create pod")];
779
780        let output_stream = StreamBox::new(
781            &core,
782            "i3status_pipewire_workaround_output_stream",
783            properties! {
784                *keys::MEDIA_TYPE => "Audio",
785                *keys::MEDIA_ROLE => "Music",
786                *keys::MEDIA_CATEGORY => "Playback",
787                *keys::AUDIO_CHANNELS => "2",
788            },
789        )
790        .expect("Could not create output_stream");
791
792        let output_stream_listener = output_stream
793            .add_local_listener()
794            .process(move |_stream, _acc: &mut f64| {
795                output_done_clone.set(true);
796            })
797            .register()
798            .expect("Could not add output_stream listener");
799
800        output_stream
801            .connect(
802                ::pipewire::spa::utils::Direction::Output,
803                None,
804                StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
805                &mut params,
806            )
807            .expect("Could not connect output_stream");
808
809        let input_stream = StreamBox::new(
810            &core,
811            "i3status_pipewire_workaround_input_stream",
812            properties! {
813                *keys::MEDIA_TYPE => "Audio",
814                *keys::MEDIA_ROLE => "Music",
815                *keys::MEDIA_CATEGORY => "Playback",
816                *keys::AUDIO_CHANNELS => "2",
817            },
818        )
819        .expect("Could not create input_stream");
820        let input_stream_listener = input_stream
821            .add_local_listener()
822            .process(move |_stream, _acc: &mut f64| {
823                input_done_clone.set(true);
824            })
825            .register()
826            .expect("Could not add input_stream listener");
827
828        input_stream
829            .connect(
830                ::pipewire::spa::utils::Direction::Input,
831                None,
832                StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
833                &mut params,
834            )
835            .expect("Could not connect input_stream");
836
837        while !output_done.get() {
838            main_loop.loop_().iterate(Duration::from_secs(1));
839            let event = update.take();
840            if !event.is_empty() {
841                client.send_update_event(event);
842            }
843        }
844        output_stream
845            .disconnect()
846            .expect("Unable to disconnect output_stream");
847        output_stream_listener.unregister();
848
849        while !input_done.get() {
850            main_loop.loop_().iterate(Duration::from_secs(1));
851            let event = update.take();
852            if !event.is_empty() {
853                client.send_update_event(event);
854            }
855        }
856        input_stream
857            .disconnect()
858            .expect("Unable to disconnect input_stream");
859        input_stream_listener.unregister();
860
861        // END WORKAROUND FOR PIPEWIRE LOOPBACK BUG
862
863        loop {
864            main_loop.loop_().iterate(Duration::from_hours(24));
865            let event = update.take();
866            if !event.is_empty() {
867                client.send_update_event(event);
868            }
869        }
870    }
871
872    pub fn add_event_listener(&self, tx: UnboundedSender<EventKind>) {
873        self.event_senders.lock().unwrap().push(tx);
874    }
875
876    pub fn add_command_listener(&self) -> PwSender<CommandKind> {
877        self.command_sender.clone()
878    }
879
880    pub fn send_update_event(&self, event: EventKind) {
881        debug!("send_update_event {:?}", event);
882        self.event_senders
883            .lock()
884            .unwrap()
885            .retain(|tx| tx.send(event).is_ok());
886    }
887}