i3status_rs/
pipewire.rs

1use std::cell::{Cell, RefCell};
2use std::collections::{HashMap, HashSet};
3use std::io::Cursor;
4use std::mem::MaybeUninit;
5use std::rc::Rc;
6use std::str::FromStr;
7use std::sync::{LazyLock, Mutex};
8use std::thread;
9use std::time::Duration;
10
11pub(crate) use ::pipewire::channel::Sender as PwSender;
12use ::pipewire::{
13    channel::{Receiver as PwReceiver, channel as pw_channel},
14    context::ContextRc,
15    device::Device as DeviceProxy,
16    keys,
17    main_loop::MainLoopRc,
18    metadata::Metadata as MetadataProxy,
19    node::{Node as NodeProxy, NodeState},
20    properties::properties,
21    proxy::{Listener, ProxyListener, ProxyT},
22    spa::{
23        param::{ParamType, audio::AudioInfoRaw},
24        pod::{
25            Object, Pod, Value, ValueArray, builder::Builder as PodBuilder,
26            deserialize::PodDeserializer, serialize::PodSerializer,
27        },
28        sys::{
29            SPA_DIRECTION_INPUT, SPA_DIRECTION_OUTPUT, SPA_PARAM_EnumFormat,
30            SPA_PARAM_ROUTE_device, SPA_PARAM_ROUTE_direction, SPA_PARAM_ROUTE_index,
31            SPA_PARAM_ROUTE_name, SPA_PARAM_ROUTE_props, SPA_PARAM_ROUTE_save,
32            SPA_PROP_channelVolumes, SPA_PROP_mute, SPA_TYPE_OBJECT_Format,
33        },
34        utils::{SpaTypes, dict::DictRef},
35    },
36    stream::{StreamBox, StreamFlags},
37    types::ObjectType,
38};
39use bitflags::bitflags;
40use tokio::sync::mpsc::UnboundedSender;
41
42use crate::{Error, ErrorContext as _, Result};
43
44make_log_macro!(debug, "pipewire");
45
46pub(crate) static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
47
48const NORMAL: f32 = 100.0;
49const DEFAULT_SINK_KEY: &str = "default.audio.sink";
50const DEFAULT_SOURCE_KEY: &str = "default.audio.source";
51
52#[derive(Debug, Clone, Copy)]
53pub(crate) enum Direction {
54    Input,
55    Output,
56}
57
58impl FromStr for Direction {
59    type Err = Error;
60
61    fn from_str(s: &str) -> Result<Self, Self::Err> {
62        if s.ends_with("/Source") {
63            Ok(Self::Input)
64        } else if s.ends_with("/Sink") {
65            Ok(Self::Output)
66        } else {
67            Err(Error::new("Invalid media class to determine direction"))
68        }
69    }
70}
71
72impl TryFrom<u32> for Direction {
73    type Error = Error;
74
75    fn try_from(value: u32) -> Result<Self, Self::Error> {
76        match value {
77            SPA_DIRECTION_INPUT => Ok(Self::Input),
78            SPA_DIRECTION_OUTPUT => Ok(Self::Output),
79            _ => Err(Error::new("Invalid direction value, must be 0 or 1")),
80        }
81    }
82}
83
84#[derive(Debug)]
85pub(crate) struct Node {
86    proxy_id: u32,
87    pub device_id: Option<u32>,
88    pub name: String,
89    pub nick: Option<String>,
90    pub media_class: Option<String>,
91    // direction is derived from media_class
92    pub direction: Option<Direction>,
93    pub media_role: Option<String>,
94    //These come from the proxy
95    pub running: bool,
96    pub muted: Option<bool>,
97    pub volume: Option<Vec<f32>>,
98    pub description: Option<String>,
99    pub form_factor: Option<String>,
100}
101
102impl Node {
103    fn new(global_id: u32, global_props: &DictRef, proxy_id: u32) -> Self {
104        Self {
105            proxy_id,
106            device_id: global_props
107                .get(&keys::DEVICE_ID)
108                .and_then(|s| s.parse().ok()),
109            name: global_props
110                .get(&keys::NODE_NAME)
111                .map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
112            nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
113            media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
114            direction: global_props
115                .get(&keys::MEDIA_CLASS)
116                .and_then(|s| s.parse().ok()),
117            media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
118            description: global_props
119                .get(&keys::NODE_DESCRIPTION)
120                .map(|s| s.to_string()),
121            form_factor: global_props
122                .get(&keys::DEVICE_FORM_FACTOR)
123                .map(|s| s.to_string()),
124            muted: None,
125            volume: None,
126            running: false,
127        }
128    }
129}
130
131#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
132pub(crate) struct Link {
133    pub link_output_node: u32,
134    pub link_input_node: u32,
135}
136
137impl Link {
138    fn new(global_props: &DictRef) -> Option<Self> {
139        if let Some(link_output_node) = global_props
140            .get(&keys::LINK_OUTPUT_NODE)
141            .and_then(|s| s.parse().ok())
142            && let Some(link_input_node) = global_props
143                .get(&keys::LINK_INPUT_NODE)
144                .and_then(|s| s.parse().ok())
145        {
146            Some(Self {
147                link_output_node,
148                link_input_node,
149            })
150        } else {
151            None
152        }
153    }
154}
155
156#[derive(Debug)]
157pub(crate) struct Route {
158    index: i32,
159    device: i32,
160    pub name: String,
161}
162
163#[derive(Debug, Default)]
164pub(crate) struct DirectedRoutes {
165    proxy_id: u32,
166    //These come from the proxy
167    input: Option<Route>,
168    output: Option<Route>,
169}
170
171impl DirectedRoutes {
172    fn new(proxy_id: u32) -> Self {
173        Self {
174            proxy_id,
175            input: None,
176            output: None,
177        }
178    }
179
180    pub fn get_route(&self, direction: Direction) -> Option<&Route> {
181        match direction {
182            Direction::Input => self.input.as_ref(),
183            Direction::Output => self.output.as_ref(),
184        }
185    }
186
187    fn get_mut_route(&mut self, direction: Direction) -> &mut Option<Route> {
188        match direction {
189            Direction::Input => &mut self.input,
190            Direction::Output => &mut self.output,
191        }
192    }
193}
194
195#[derive(Default)]
196pub(crate) struct DefaultMetadata {
197    pub sink: Option<String>,
198    pub source: Option<String>,
199}
200
201#[derive(Default)]
202pub(crate) struct Data {
203    pub nodes: HashMap<u32, Node>,
204    pub links: HashMap<u32, Link>,
205    pub default_metadata: DefaultMetadata,
206    pub directed_routes: HashMap<u32, DirectedRoutes>,
207    ports: HashSet<u32>,
208}
209
210struct Proxies<T: ProxyT + 'static> {
211    proxies_t: HashMap<u32, T>,
212    listeners: HashMap<u32, Vec<Box<dyn Listener>>>,
213}
214
215impl<T: ProxyT + 'static> Proxies<T> {
216    fn new() -> Self {
217        Self {
218            proxies_t: HashMap::new(),
219            listeners: HashMap::new(),
220        }
221    }
222
223    fn add_proxy(
224        &mut self,
225        proxy: T,
226        listener: impl Listener + 'static,
227        proxies: &Rc<RefCell<Self>>,
228    ) -> u32 {
229        let listener_spe = Box::new(listener);
230
231        let proxy_upcast = proxy.upcast_ref();
232        let proxy_id = proxy_upcast.id();
233
234        let proxies_weak = Rc::downgrade(proxies);
235
236        let listener = proxy_upcast
237            .add_listener_local()
238            .removed(move || {
239                if let Some(proxies) = proxies_weak.upgrade() {
240                    proxies.borrow_mut().remove(proxy_id);
241                }
242            })
243            .register();
244
245        self.add_proxy_t(proxy_id, proxy, listener_spe);
246        self.add_proxy_listener(proxy_id, listener);
247
248        proxy_id
249    }
250
251    fn add_proxy_t(&mut self, proxy_id: u32, device_proxy: T, listener: Box<dyn Listener>) {
252        self.proxies_t.insert(proxy_id, device_proxy);
253        self.listeners.entry(proxy_id).or_default().push(listener);
254    }
255
256    fn add_proxy_listener(&mut self, proxy_id: u32, listener: ProxyListener) {
257        self.listeners
258            .entry(proxy_id)
259            .or_default()
260            .push(Box::new(listener));
261    }
262
263    fn remove(&mut self, proxy_id: u32) {
264        self.proxies_t.remove(&proxy_id);
265        self.listeners.remove(&proxy_id);
266    }
267}
268
269bitflags! {
270    #[derive(Debug, Clone, Copy, Default)]
271    pub(crate) struct EventKind: u16 {
272        const DEFAULT_META_DATA_UPDATED = 1 <<  0;
273        const DEVICE_ADDED              = 1 <<  1;
274        const DEVICE_PARAM_UPDATE       = 1 <<  2;
275        const DEVICE_REMOVED            = 1 <<  3;
276        const LINK_ADDED                = 1 <<  4;
277        const LINK_REMOVED              = 1 <<  5;
278        const NODE_ADDED                = 1 <<  6;
279        const NODE_PARAM_UPDATE         = 1 <<  7;
280        const NODE_REMOVED              = 1 <<  8;
281        const NODE_STATE_UPDATE         = 1 <<  9;
282        const PORT_ADDED                = 1 << 10;
283        const PORT_REMOVED              = 1 << 11;
284    }
285}
286
287#[derive(Clone, Debug)]
288pub(crate) enum CommandKind {
289    Mute(u32, bool),
290    SetVolume(u32, Vec<f32>),
291}
292
293impl CommandKind {
294    fn execute(
295        self,
296        client: &Client,
297        node_proxies: Rc<RefCell<Proxies<NodeProxy>>>,
298        device_proxies: Rc<RefCell<Proxies<DeviceProxy>>>,
299    ) {
300        debug!("Executing command: {:?}", self);
301        use CommandKind::*;
302        let id = match self {
303            SetVolume(id, _) | Mute(id, _) => id,
304        };
305        let client_data = client.data.lock().unwrap();
306        if let Some(node) = client_data.nodes.get(&id) {
307            if let Some(node_proxy) = node_proxies.borrow_mut().proxies_t.get(&node.proxy_id) {
308                let mut pod_data = Vec::new();
309                let mut pod_builder = PodBuilder::new(&mut pod_data);
310
311                let mut object_param_props_frame = MaybeUninit::uninit();
312
313                // Safety: Frames must be popped in the reverse order they were pushed.
314                // push_object initializes the frame.
315                // object_param_props_frame is frame 1
316                unsafe {
317                    pod_builder.push_object(
318                        &mut object_param_props_frame,
319                        SpaTypes::ObjectParamProps.as_raw(),
320                        ParamType::Props.as_raw(),
321                    )
322                }
323                .expect("Could not push object");
324
325                match &self {
326                    SetVolume(_, volume) => {
327                        pod_builder
328                            .add_prop(SPA_PROP_channelVolumes, 0)
329                            .expect("Could not add prop");
330
331                        let mut array_frame = MaybeUninit::uninit();
332
333                        // Safety: push_array initializes the frame.
334                        // array_frame is frame 2
335                        unsafe { pod_builder.push_array(&mut array_frame) }
336                            .expect("Could not push object");
337
338                        for vol in volume {
339                            let vol = vol / NORMAL;
340                            pod_builder
341                                .add_float(vol * vol * vol)
342                                .expect("Could not add bool");
343                        }
344
345                        // Safety: array_frame is popped here, which is frame 2
346                        unsafe {
347                            PodBuilder::pop(&mut pod_builder, array_frame.assume_init_mut());
348                        }
349                    }
350                    Mute(_, mute) => {
351                        pod_builder
352                            .add_prop(SPA_PROP_mute, 0)
353                            .expect("Could not add prop");
354                        pod_builder.add_bool(*mute).expect("Could not add bool");
355                    }
356                }
357
358                // Safety: object_param_props_frame is popped here, which is frame 1
359                unsafe {
360                    PodBuilder::pop(&mut pod_builder, object_param_props_frame.assume_init_mut());
361                }
362
363                debug!(
364                    "Setting Node Props param: {:?}",
365                    PodDeserializer::deserialize_from::<Value>(&pod_data)
366                );
367                let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
368                node_proxy.set_param(ParamType::Props, 0, pod);
369            }
370
371            if let Some(device_id) = node.device_id
372                && let Some(direction) = node.direction
373                && let Some(directed_routes) = client_data.directed_routes.get(&device_id)
374                && let Some(route) = directed_routes.get_route(direction)
375                && let Some(device_proxy) = device_proxies
376                    .borrow_mut()
377                    .proxies_t
378                    .get(&directed_routes.proxy_id)
379            {
380                let mut pod_data = Vec::new();
381                let mut pod_builder = PodBuilder::new(&mut pod_data);
382
383                let mut object_param_route_frame = MaybeUninit::uninit();
384
385                // Safety: Frames must be popped in the reverse order they were pushed.
386                // push_object initializes the frame.
387                // object_param_route_frame is frame 1
388                unsafe {
389                    pod_builder.push_object(
390                        &mut object_param_route_frame,
391                        SpaTypes::ObjectParamRoute.as_raw(),
392                        ParamType::Route.as_raw(),
393                    )
394                }
395                .expect("Could not push object");
396
397                pod_builder
398                    .add_prop(SPA_PARAM_ROUTE_index, 0)
399                    .expect("Could not add prop");
400
401                pod_builder.add_int(route.index).expect("Could not add int");
402
403                pod_builder
404                    .add_prop(SPA_PARAM_ROUTE_device, 0)
405                    .expect("Could not add prop");
406
407                pod_builder
408                    .add_int(route.device)
409                    .expect("Could not add int");
410
411                pod_builder
412                    .add_prop(SPA_PARAM_ROUTE_props, 0)
413                    .expect("Could not add prop");
414
415                let mut object_param_props_frame = MaybeUninit::uninit();
416
417                // Safety: object_param_props_frame is frame 2
418                unsafe {
419                    pod_builder.push_object(
420                        &mut object_param_props_frame,
421                        SpaTypes::ObjectParamProps.as_raw(),
422                        ParamType::Route.as_raw(),
423                    )
424                }
425                .expect("Could not push object");
426
427                match &self {
428                    SetVolume(_, volume) => {
429                        pod_builder
430                            .add_prop(SPA_PROP_channelVolumes, 0)
431                            .expect("Could not add prop");
432
433                        let mut array_frame = MaybeUninit::uninit();
434
435                        // Safety: push_array initializes the frame.
436                        // array_frame is frame 3
437                        unsafe { pod_builder.push_array(&mut array_frame) }
438                            .expect("Could not push object");
439
440                        for vol in volume {
441                            let vol = vol / NORMAL;
442                            pod_builder
443                                .add_float(vol * vol * vol)
444                                .expect("Could not add bool");
445                        }
446
447                        // Safety: array_frame is popped here, which is frame 3
448                        unsafe {
449                            PodBuilder::pop(&mut pod_builder, array_frame.assume_init_mut());
450                        }
451                    }
452                    Mute(_, mute) => {
453                        pod_builder
454                            .add_prop(SPA_PROP_mute, 0)
455                            .expect("Could not add prop");
456                        pod_builder.add_bool(*mute).expect("Could not add bool");
457                    }
458                }
459
460                // Safety: object_param_props_frame is popped here, which is frame 2
461                unsafe {
462                    PodBuilder::pop(&mut pod_builder, object_param_props_frame.assume_init_mut());
463                }
464
465                pod_builder
466                    .add_prop(SPA_PARAM_ROUTE_save, 0)
467                    .expect("Could not add prop");
468                pod_builder.add_bool(true).expect("Could not add bool");
469
470                // Safety: object_param_route_frame is popped here, which is frame 1
471                unsafe {
472                    PodBuilder::pop(&mut pod_builder, object_param_route_frame.assume_init_mut());
473                }
474
475                debug!(
476                    "Setting Device Route param: {:?}",
477                    PodDeserializer::deserialize_from::<Value>(&pod_data)
478                );
479                let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
480                device_proxy.set_param(ParamType::Route, 0, pod);
481            }
482        }
483    }
484}
485
486pub(crate) struct Client {
487    event_senders: Mutex<Vec<UnboundedSender<EventKind>>>,
488    command_sender: PwSender<CommandKind>,
489    pub data: Mutex<Data>,
490}
491
492impl Client {
493    fn new() -> Result<Client> {
494        let (tx, rx) = pw_channel();
495
496        thread::Builder::new()
497            .name("i3status_pipewire".to_string())
498            .spawn(|| Client::main_loop_thread(rx))
499            .error("failed to spawn a thread")?;
500
501        Ok(Self {
502            event_senders: Mutex::new(Vec::new()),
503            command_sender: tx,
504            data: Mutex::new(Data::default()),
505        })
506    }
507
508    fn main_loop_thread(command_receiver: PwReceiver<CommandKind>) {
509        let client = CLIENT.as_ref().error("Could not get client").unwrap();
510
511        let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
512
513        let main_loop = MainLoopRc::new(None).expect("Failed to create main loop");
514
515        let context = ContextRc::new(&main_loop, Some(proplist)).expect("Failed to create context");
516        let core = context.connect_rc(None).expect("Failed to connect");
517        let registry = core.get_registry_rc().expect("Failed to get registry");
518        let registry_weak = registry.downgrade();
519
520        let update = Rc::new(RefCell::new(EventKind::empty()));
521        let update_copy = update.clone();
522        let update_copy2 = update.clone();
523
524        // Proxies and their listeners need to stay alive so store them here
525        let node_proxies = Rc::new(RefCell::new(Proxies::<NodeProxy>::new()));
526        let node_proxies_weak = Rc::downgrade(&node_proxies);
527        let device_proxies = Rc::new(RefCell::new(Proxies::<DeviceProxy>::new()));
528        let device_proxies_weak = Rc::downgrade(&device_proxies);
529        let metadata_proxies = Rc::new(RefCell::new(Proxies::<MetadataProxy>::new()));
530
531        let _receiver = command_receiver.attach(main_loop.loop_(), move |command: CommandKind| {
532            if let Some(node_proxies) = node_proxies_weak.upgrade()
533                && let Some(device_proxies) = device_proxies_weak.upgrade()
534            {
535                command.execute(client, node_proxies.clone(), device_proxies.clone());
536            }
537        });
538
539        // Register a callback to the `global` event on the registry, which notifies of any new global objects
540        // appearing on the remote.
541        // The callback will only get called as long as we keep the returned listener alive.
542        let _registry_listener = registry
543            .add_listener_local()
544            .global(move |global| {
545                let Some(registry) = registry_weak.upgrade() else {
546                    return;
547                };
548                let global_id = global.id;
549                let Some(global_props) = global.props else {
550                    return;
551                };
552                match &global.type_ {
553                    ObjectType::Node => {
554                        let node_proxy: NodeProxy =
555                            registry.bind(global).expect("Could not bind node");
556                        node_proxy.subscribe_params(&[ParamType::Props]);
557                        let update_copy2 = update_copy.clone();
558                        let update_copy3 = update_copy.clone();
559                        let node_listener = node_proxy
560                            .add_listener_local()
561                            .info(move |info| {
562                                let running = matches!(info.state(), NodeState::Running);
563                                client
564                                    .data
565                                    .lock()
566                                    .unwrap()
567                                    .nodes
568                                    .entry(global_id)
569                                    .and_modify(|node| {
570                                        if node.running != running {
571                                            node.running = running;
572                                            update_copy2.replace_with(|v| {
573                                                *v | EventKind::NODE_STATE_UPDATE
574                                            });
575                                        }
576                                    });
577                            })
578                            .param(move |_seq, _id, _index, _next, param| {
579                                let Some(param) = param else {
580                                    return;
581                                };
582                                let Ok((_, Value::Object(object))) =
583                                    PodDeserializer::deserialize_from::<Value>(param.as_bytes())
584                                else {
585                                    return;
586                                };
587                                client
588                                    .data
589                                    .lock()
590                                    .unwrap()
591                                    .nodes
592                                    .entry(global_id)
593                                    .and_modify(|node| {
594                                        for property in object.properties {
595                                            if property.key == SPA_PROP_mute {
596                                                let Value::Bool(muted) = property.value else {
597                                                    return;
598                                                };
599                                                let muted = Some(muted);
600                                                if node.muted != muted {
601                                                    node.muted = muted;
602                                                    update_copy3.replace_with(|v| {
603                                                        *v | EventKind::NODE_PARAM_UPDATE
604                                                    });
605                                                }
606                                            } else if property.key == SPA_PROP_channelVolumes {
607                                                let Value::ValueArray(ValueArray::Float(volumes)) =
608                                                    property.value
609                                                else {
610                                                    return;
611                                                };
612
613                                                let volume = Some(
614                                                    volumes
615                                                        .iter()
616                                                        .map(|vol| vol.cbrt() * NORMAL)
617                                                        .collect(),
618                                                );
619                                                if node.volume != volume {
620                                                    node.volume = volume;
621                                                    update_copy3.replace_with(|v| {
622                                                        *v | EventKind::NODE_PARAM_UPDATE
623                                                    });
624                                                }
625                                            }
626                                        }
627                                    });
628                            })
629                            .register();
630
631                        let proxy_id = node_proxies.borrow_mut().add_proxy(
632                            node_proxy,
633                            node_listener,
634                            &node_proxies,
635                        );
636
637                        client
638                            .data
639                            .lock()
640                            .unwrap()
641                            .nodes
642                            .insert(global_id, Node::new(global_id, global_props, proxy_id));
643                        update_copy.replace_with(|v| *v | EventKind::NODE_ADDED);
644                    }
645                    ObjectType::Link => {
646                        let mut client_data = client.data.lock().unwrap();
647                        let Some(link) = Link::new(global_props) else {
648                            return;
649                        };
650                        if let Some(node) = client_data.nodes.get(&link.link_input_node)
651                            && node.name == env!("CARGO_PKG_NAME")
652                        {
653                            return;
654                        }
655
656                        client_data.links.insert(global_id, link);
657                        update_copy.replace_with(|v| *v | EventKind::LINK_ADDED);
658                    }
659                    ObjectType::Port => {
660                        client.data.lock().unwrap().ports.insert(global_id);
661                        update_copy.replace_with(|v| *v | EventKind::PORT_ADDED);
662                    }
663                    ObjectType::Device => {
664                        let device_proxy: DeviceProxy =
665                            registry.bind(global).expect("Could not bind device");
666                        device_proxy.subscribe_params(&[ParamType::Route]);
667                        let update_copy2 = update_copy.clone();
668                        let device_listener = device_proxy
669                            .add_listener_local()
670                            .param(move |_seq, _id, _index, _next, param| {
671                                let Some(param) = param else {
672                                    return;
673                                };
674                                let Ok((_, Value::Object(object))) =
675                                    PodDeserializer::deserialize_from::<Value>(param.as_bytes())
676                                else {
677                                    return;
678                                };
679                                let mut route_index = None;
680                                let mut route_direction = None;
681                                let mut route_device = None;
682                                let mut route_name = None;
683                                for property in &object.properties {
684                                    if property.key == SPA_PARAM_ROUTE_index {
685                                        let Value::Int(route_index_v) = property.value else {
686                                            return;
687                                        };
688                                        route_index = Some(route_index_v);
689                                    } else if property.key == SPA_PARAM_ROUTE_direction {
690                                        let Value::Id(route_direction_v) = property.value else {
691                                            return;
692                                        };
693                                        route_direction = route_direction_v.0.try_into().ok();
694                                    } else if property.key == SPA_PARAM_ROUTE_device {
695                                        let Value::Int(route_device_v) = property.value else {
696                                            return;
697                                        };
698                                        route_device = Some(route_device_v);
699                                    } else if property.key == SPA_PARAM_ROUTE_name {
700                                        let Value::String(route_name_v) = property.value.to_owned()
701                                        else {
702                                            return;
703                                        };
704                                        route_name = Some(route_name_v);
705                                    }
706                                }
707
708                                if let Some(route_index) = route_index
709                                    && let Some(route_direction) = route_direction
710                                    && let Some(route_device) = route_device
711                                    && let Some(route_name) = route_name
712                                {
713                                    client
714                                        .data
715                                        .lock()
716                                        .unwrap()
717                                        .directed_routes
718                                        .entry(global_id)
719                                        .and_modify(|directed_routes| {
720                                            let route =
721                                                directed_routes.get_mut_route(route_direction);
722                                            if let Some(route) = route {
723                                                if route.index != route_index
724                                                    || route.device != route_device
725                                                    || route.name != route_name
726                                                {
727                                                    route.index = route_index;
728                                                    route.device = route_device;
729                                                    route.name = route_name;
730                                                    update_copy2.replace_with(|v| {
731                                                        *v | EventKind::DEVICE_PARAM_UPDATE
732                                                    });
733                                                }
734                                            } else {
735                                                route.replace(Route {
736                                                    index: route_index,
737                                                    device: route_device,
738                                                    name: route_name,
739                                                });
740                                                update_copy2.replace_with(|v| {
741                                                    *v | EventKind::DEVICE_PARAM_UPDATE
742                                                });
743                                            }
744                                        });
745                                }
746                            })
747                            .register();
748
749                        let proxy_id = device_proxies.borrow_mut().add_proxy(
750                            device_proxy,
751                            device_listener,
752                            &device_proxies,
753                        );
754
755                        client
756                            .data
757                            .lock()
758                            .unwrap()
759                            .directed_routes
760                            .insert(global_id, DirectedRoutes::new(proxy_id));
761
762                        update_copy.replace_with(|v| *v | EventKind::DEVICE_ADDED);
763                    }
764                    ObjectType::Metadata => {
765                        // There are many kinds of metadata, but we are only interested in the default metadata
766                        if global_props.get("metadata.name") != Some("default") {
767                            return;
768                        }
769                        let metadata_proxy: MetadataProxy =
770                            registry.bind(global).expect("Could not bind device");
771                        let update_copy2 = update_copy.clone();
772                        let metadata_listener = metadata_proxy
773                            .add_listener_local()
774                            .property(move |_subject, key, type_, value| {
775                                if type_ != Some("Spa:String:JSON")
776                                    || (key != Some(DEFAULT_SINK_KEY)
777                                        && key != Some(DEFAULT_SOURCE_KEY))
778                                {
779                                    return -1;
780                                }
781
782                                let Some(value) = value else {
783                                    return -1;
784                                };
785
786                                let value: serde_json::Value =
787                                    serde_json::from_str(value).unwrap_or_default();
788                                let name = value
789                                    .get("name")
790                                    .and_then(|v| v.as_str())
791                                    .map(|s| s.to_string());
792                                if key == Some(DEFAULT_SINK_KEY) {
793                                    let default_medata_sink =
794                                        &mut client.data.lock().unwrap().default_metadata.sink;
795                                    if *default_medata_sink != name {
796                                        *default_medata_sink = name;
797                                        update_copy2.replace_with(|v| {
798                                            *v | EventKind::DEFAULT_META_DATA_UPDATED
799                                        });
800                                    }
801                                } else {
802                                    let default_medata_source =
803                                        &mut client.data.lock().unwrap().default_metadata.source;
804                                    if *default_medata_source != name {
805                                        *default_medata_source = name;
806                                        update_copy2.replace_with(|v| {
807                                            *v | EventKind::DEFAULT_META_DATA_UPDATED
808                                        });
809                                    }
810                                }
811
812                                0
813                            })
814                            .register();
815
816                        metadata_proxies.borrow_mut().add_proxy(
817                            metadata_proxy,
818                            metadata_listener,
819                            &metadata_proxies,
820                        );
821                    }
822                    _ => (),
823                }
824            })
825            .global_remove(move |uid| {
826                let mut client_data = client.data.lock().unwrap();
827                if client_data.nodes.remove(&uid).is_some() {
828                    update_copy2.replace_with(|v| *v | EventKind::NODE_REMOVED);
829                } else if client_data.links.remove(&uid).is_some() {
830                    update_copy2.replace_with(|v| *v | EventKind::LINK_REMOVED);
831                } else if client_data.ports.remove(&uid) {
832                    update_copy2.replace_with(|v| *v | EventKind::PORT_REMOVED);
833                } else if client_data.directed_routes.remove(&uid).is_some() {
834                    update_copy2.replace_with(|v| *v | EventKind::DEVICE_REMOVED);
835                }
836            })
837            .register();
838
839        /* START WORKAROUND FOR PIPEWIRE LOOPBACK BUG
840
841        This workaround fixes an issue caused by the loopback device.
842        Without this workaround, if nothing has interacted with the sink/source then setting
843        the volume/mute will fail.
844
845        This workaround creates input and output streams which forces pipewire
846        to use the real sink/source instead of the loopback device.
847
848        For context see:
849
850            * https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/4949
851            * https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/4610
852            * https://gitlab.freedesktop.org/pipewire/wireplumber/-/issues/822
853        */
854
855        let output_done = Rc::new(Cell::new(false));
856        let output_done_clone = output_done.clone();
857        let input_done = Rc::new(Cell::new(false));
858        let input_done_clone = input_done.clone();
859
860        let values: Vec<u8> = PodSerializer::serialize(
861            Cursor::new(Vec::new()),
862            &Value::Object(Object {
863                type_: SPA_TYPE_OBJECT_Format,
864                id: SPA_PARAM_EnumFormat,
865                properties: AudioInfoRaw::new().into(),
866            }),
867        )
868        .expect("Failed to serialize pod")
869        .0
870        .into_inner();
871
872        let mut params = [Pod::from_bytes(&values).expect("Failed to create pod")];
873
874        let output_stream = StreamBox::new(
875            &core,
876            "i3status_pipewire_workaround_output_stream",
877            properties! {
878                *keys::MEDIA_TYPE => "Audio",
879                *keys::MEDIA_ROLE => "Music",
880                *keys::MEDIA_CATEGORY => "Playback",
881                *keys::AUDIO_CHANNELS => "2",
882            },
883        )
884        .expect("Could not create output_stream");
885
886        let output_stream_listener = output_stream
887            .add_local_listener()
888            .process(move |_stream, _acc: &mut f64| {
889                output_done_clone.set(true);
890            })
891            .register()
892            .expect("Could not add output_stream listener");
893
894        output_stream
895            .connect(
896                ::pipewire::spa::utils::Direction::Output,
897                None,
898                StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
899                &mut params,
900            )
901            .expect("Could not connect output_stream");
902
903        let input_stream = StreamBox::new(
904            &core,
905            "i3status_pipewire_workaround_input_stream",
906            properties! {
907                *keys::MEDIA_TYPE => "Audio",
908                *keys::MEDIA_ROLE => "Music",
909                *keys::MEDIA_CATEGORY => "Playback",
910                *keys::AUDIO_CHANNELS => "2",
911            },
912        )
913        .expect("Could not create input_stream");
914        let input_stream_listener = input_stream
915            .add_local_listener()
916            .process(move |_stream, _acc: &mut f64| {
917                input_done_clone.set(true);
918            })
919            .register()
920            .expect("Could not add input_stream listener");
921
922        input_stream
923            .connect(
924                ::pipewire::spa::utils::Direction::Input,
925                None,
926                StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
927                &mut params,
928            )
929            .expect("Could not connect input_stream");
930
931        while !output_done.get() {
932            main_loop.loop_().iterate(Duration::from_secs(1));
933            let event = update.take();
934            if !event.is_empty() {
935                client.send_update_event(event);
936            }
937        }
938        output_stream
939            .disconnect()
940            .expect("Unable to disconnect output_stream");
941        output_stream_listener.unregister();
942
943        while !input_done.get() {
944            main_loop.loop_().iterate(Duration::from_secs(1));
945            let event = update.take();
946            if !event.is_empty() {
947                client.send_update_event(event);
948            }
949        }
950        input_stream
951            .disconnect()
952            .expect("Unable to disconnect input_stream");
953        input_stream_listener.unregister();
954
955        // END WORKAROUND FOR PIPEWIRE LOOPBACK BUG
956
957        loop {
958            main_loop.loop_().iterate(Duration::from_hours(24));
959            let event = update.take();
960            if !event.is_empty() {
961                client.send_update_event(event);
962            }
963        }
964    }
965
966    pub fn add_event_listener(&self, tx: UnboundedSender<EventKind>) {
967        self.event_senders.lock().unwrap().push(tx);
968    }
969
970    pub fn add_command_listener(&self) -> PwSender<CommandKind> {
971        self.command_sender.clone()
972    }
973
974    pub fn send_update_event(&self, event: EventKind) {
975        debug!("send_update_event {:?}", event);
976        self.event_senders
977            .lock()
978            .unwrap()
979            .retain(|tx| tx.send(event).is_ok());
980    }
981}