1use std::cell::{Cell, RefCell};
2use std::collections::{HashMap, HashSet};
3use std::io::Cursor;
4use std::mem::MaybeUninit;
5use std::rc::Rc;
6use std::str::FromStr;
7use std::sync::{LazyLock, Mutex};
8use std::thread;
9use std::time::Duration;
10
11pub(crate) use ::pipewire::channel::Sender as PwSender;
12use ::pipewire::{
13 channel::{Receiver as PwReceiver, channel as pw_channel},
14 context::ContextRc,
15 device::Device as DeviceProxy,
16 keys,
17 main_loop::MainLoopRc,
18 metadata::Metadata as MetadataProxy,
19 node::{Node as NodeProxy, NodeState},
20 properties::properties,
21 proxy::{Listener, ProxyListener, ProxyT},
22 spa::{
23 param::{ParamType, audio::AudioInfoRaw},
24 pod::{
25 Object, Pod, Value, ValueArray, builder::Builder as PodBuilder,
26 deserialize::PodDeserializer, serialize::PodSerializer,
27 },
28 sys::{
29 SPA_DIRECTION_INPUT, SPA_DIRECTION_OUTPUT, SPA_PARAM_EnumFormat,
30 SPA_PARAM_ROUTE_device, SPA_PARAM_ROUTE_direction, SPA_PARAM_ROUTE_index,
31 SPA_PARAM_ROUTE_name, SPA_PARAM_ROUTE_props, SPA_PARAM_ROUTE_save,
32 SPA_PROP_channelVolumes, SPA_PROP_mute, SPA_TYPE_OBJECT_Format,
33 },
34 utils::{SpaTypes, dict::DictRef},
35 },
36 stream::{StreamBox, StreamFlags},
37 types::ObjectType,
38};
39use bitflags::bitflags;
40use tokio::sync::mpsc::UnboundedSender;
41
42use crate::{Error, ErrorContext as _, Result};
43
44make_log_macro!(debug, "pipewire");
45
46pub(crate) static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
47
48const NORMAL: f32 = 100.0;
49const DEFAULT_SINK_KEY: &str = "default.audio.sink";
50const DEFAULT_SOURCE_KEY: &str = "default.audio.source";
51
52#[derive(Debug, Clone, Copy)]
53pub(crate) enum Direction {
54 Input,
55 Output,
56}
57
58impl FromStr for Direction {
59 type Err = Error;
60
61 fn from_str(s: &str) -> Result<Self, Self::Err> {
62 if s.ends_with("/Source") {
63 Ok(Self::Input)
64 } else if s.ends_with("/Sink") {
65 Ok(Self::Output)
66 } else {
67 Err(Error::new("Invalid media class to determine direction"))
68 }
69 }
70}
71
72impl TryFrom<u32> for Direction {
73 type Error = Error;
74
75 fn try_from(value: u32) -> Result<Self, Self::Error> {
76 match value {
77 SPA_DIRECTION_INPUT => Ok(Self::Input),
78 SPA_DIRECTION_OUTPUT => Ok(Self::Output),
79 _ => Err(Error::new("Invalid direction value, must be 0 or 1")),
80 }
81 }
82}
83
84#[derive(Debug)]
85pub(crate) struct Node {
86 proxy_id: u32,
87 pub device_id: Option<u32>,
88 pub name: String,
89 pub nick: Option<String>,
90 pub media_class: Option<String>,
91 pub direction: Option<Direction>,
93 pub media_role: Option<String>,
94 pub running: bool,
96 pub muted: Option<bool>,
97 pub volume: Option<Vec<f32>>,
98 pub description: Option<String>,
99 pub form_factor: Option<String>,
100}
101
102impl Node {
103 fn new(global_id: u32, global_props: &DictRef, proxy_id: u32) -> Self {
104 Self {
105 proxy_id,
106 device_id: global_props
107 .get(&keys::DEVICE_ID)
108 .and_then(|s| s.parse().ok()),
109 name: global_props
110 .get(&keys::NODE_NAME)
111 .map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
112 nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
113 media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
114 direction: global_props
115 .get(&keys::MEDIA_CLASS)
116 .and_then(|s| s.parse().ok()),
117 media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
118 description: global_props
119 .get(&keys::NODE_DESCRIPTION)
120 .map(|s| s.to_string()),
121 form_factor: global_props
122 .get(&keys::DEVICE_FORM_FACTOR)
123 .map(|s| s.to_string()),
124 muted: None,
125 volume: None,
126 running: false,
127 }
128 }
129}
130
131#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
132pub(crate) struct Link {
133 pub link_output_node: u32,
134 pub link_input_node: u32,
135}
136
137impl Link {
138 fn new(global_props: &DictRef) -> Option<Self> {
139 if let Some(link_output_node) = global_props
140 .get(&keys::LINK_OUTPUT_NODE)
141 .and_then(|s| s.parse().ok())
142 && let Some(link_input_node) = global_props
143 .get(&keys::LINK_INPUT_NODE)
144 .and_then(|s| s.parse().ok())
145 {
146 Some(Self {
147 link_output_node,
148 link_input_node,
149 })
150 } else {
151 None
152 }
153 }
154}
155
156#[derive(Debug)]
157pub(crate) struct Route {
158 index: i32,
159 device: i32,
160 pub name: String,
161}
162
163#[derive(Debug, Default)]
164pub(crate) struct DirectedRoutes {
165 proxy_id: u32,
166 input: Option<Route>,
168 output: Option<Route>,
169}
170
171impl DirectedRoutes {
172 fn new(proxy_id: u32) -> Self {
173 Self {
174 proxy_id,
175 input: None,
176 output: None,
177 }
178 }
179
180 pub fn get_route(&self, direction: Direction) -> Option<&Route> {
181 match direction {
182 Direction::Input => self.input.as_ref(),
183 Direction::Output => self.output.as_ref(),
184 }
185 }
186
187 fn get_mut_route(&mut self, direction: Direction) -> &mut Option<Route> {
188 match direction {
189 Direction::Input => &mut self.input,
190 Direction::Output => &mut self.output,
191 }
192 }
193}
194
195#[derive(Default)]
196pub(crate) struct DefaultMetadata {
197 pub sink: Option<String>,
198 pub source: Option<String>,
199}
200
201#[derive(Default)]
202pub(crate) struct Data {
203 pub nodes: HashMap<u32, Node>,
204 pub links: HashMap<u32, Link>,
205 pub default_metadata: DefaultMetadata,
206 pub directed_routes: HashMap<u32, DirectedRoutes>,
207 ports: HashSet<u32>,
208}
209
210struct Proxies<T: ProxyT + 'static> {
211 proxies_t: HashMap<u32, T>,
212 listeners: HashMap<u32, Vec<Box<dyn Listener>>>,
213}
214
215impl<T: ProxyT + 'static> Proxies<T> {
216 fn new() -> Self {
217 Self {
218 proxies_t: HashMap::new(),
219 listeners: HashMap::new(),
220 }
221 }
222
223 fn add_proxy(
224 &mut self,
225 proxy: T,
226 listener: impl Listener + 'static,
227 proxies: &Rc<RefCell<Self>>,
228 ) -> u32 {
229 let listener_spe = Box::new(listener);
230
231 let proxy_upcast = proxy.upcast_ref();
232 let proxy_id = proxy_upcast.id();
233
234 let proxies_weak = Rc::downgrade(proxies);
235
236 let listener = proxy_upcast
237 .add_listener_local()
238 .removed(move || {
239 if let Some(proxies) = proxies_weak.upgrade() {
240 proxies.borrow_mut().remove(proxy_id);
241 }
242 })
243 .register();
244
245 self.add_proxy_t(proxy_id, proxy, listener_spe);
246 self.add_proxy_listener(proxy_id, listener);
247
248 proxy_id
249 }
250
251 fn add_proxy_t(&mut self, proxy_id: u32, device_proxy: T, listener: Box<dyn Listener>) {
252 self.proxies_t.insert(proxy_id, device_proxy);
253 self.listeners.entry(proxy_id).or_default().push(listener);
254 }
255
256 fn add_proxy_listener(&mut self, proxy_id: u32, listener: ProxyListener) {
257 self.listeners
258 .entry(proxy_id)
259 .or_default()
260 .push(Box::new(listener));
261 }
262
263 fn remove(&mut self, proxy_id: u32) {
264 self.proxies_t.remove(&proxy_id);
265 self.listeners.remove(&proxy_id);
266 }
267}
268
269bitflags! {
270 #[derive(Debug, Clone, Copy, Default)]
271 pub(crate) struct EventKind: u16 {
272 const DEFAULT_META_DATA_UPDATED = 1 << 0;
273 const DEVICE_ADDED = 1 << 1;
274 const DEVICE_PARAM_UPDATE = 1 << 2;
275 const DEVICE_REMOVED = 1 << 3;
276 const LINK_ADDED = 1 << 4;
277 const LINK_REMOVED = 1 << 5;
278 const NODE_ADDED = 1 << 6;
279 const NODE_PARAM_UPDATE = 1 << 7;
280 const NODE_REMOVED = 1 << 8;
281 const NODE_STATE_UPDATE = 1 << 9;
282 const PORT_ADDED = 1 << 10;
283 const PORT_REMOVED = 1 << 11;
284 }
285}
286
287#[derive(Clone, Debug)]
288pub(crate) enum CommandKind {
289 Mute(u32, bool),
290 SetVolume(u32, Vec<f32>),
291}
292
293impl CommandKind {
294 fn execute(
295 self,
296 client: &Client,
297 node_proxies: Rc<RefCell<Proxies<NodeProxy>>>,
298 device_proxies: Rc<RefCell<Proxies<DeviceProxy>>>,
299 ) {
300 debug!("Executing command: {:?}", self);
301 use CommandKind::*;
302 let id = match self {
303 SetVolume(id, _) | Mute(id, _) => id,
304 };
305 let client_data = client.data.lock().unwrap();
306 if let Some(node) = client_data.nodes.get(&id) {
307 if let Some(node_proxy) = node_proxies.borrow_mut().proxies_t.get(&node.proxy_id) {
308 let mut pod_data = Vec::new();
309 let mut pod_builder = PodBuilder::new(&mut pod_data);
310
311 let mut object_param_props_frame = MaybeUninit::uninit();
312
313 unsafe {
317 pod_builder.push_object(
318 &mut object_param_props_frame,
319 SpaTypes::ObjectParamProps.as_raw(),
320 ParamType::Props.as_raw(),
321 )
322 }
323 .expect("Could not push object");
324
325 match &self {
326 SetVolume(_, volume) => {
327 pod_builder
328 .add_prop(SPA_PROP_channelVolumes, 0)
329 .expect("Could not add prop");
330
331 let mut array_frame = MaybeUninit::uninit();
332
333 unsafe { pod_builder.push_array(&mut array_frame) }
336 .expect("Could not push object");
337
338 for vol in volume {
339 let vol = vol / NORMAL;
340 pod_builder
341 .add_float(vol * vol * vol)
342 .expect("Could not add bool");
343 }
344
345 unsafe {
347 PodBuilder::pop(&mut pod_builder, array_frame.assume_init_mut());
348 }
349 }
350 Mute(_, mute) => {
351 pod_builder
352 .add_prop(SPA_PROP_mute, 0)
353 .expect("Could not add prop");
354 pod_builder.add_bool(*mute).expect("Could not add bool");
355 }
356 }
357
358 unsafe {
360 PodBuilder::pop(&mut pod_builder, object_param_props_frame.assume_init_mut());
361 }
362
363 debug!(
364 "Setting Node Props param: {:?}",
365 PodDeserializer::deserialize_from::<Value>(&pod_data)
366 );
367 let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
368 node_proxy.set_param(ParamType::Props, 0, pod);
369 }
370
371 if let Some(device_id) = node.device_id
372 && let Some(direction) = node.direction
373 && let Some(directed_routes) = client_data.directed_routes.get(&device_id)
374 && let Some(route) = directed_routes.get_route(direction)
375 && let Some(device_proxy) = device_proxies
376 .borrow_mut()
377 .proxies_t
378 .get(&directed_routes.proxy_id)
379 {
380 let mut pod_data = Vec::new();
381 let mut pod_builder = PodBuilder::new(&mut pod_data);
382
383 let mut object_param_route_frame = MaybeUninit::uninit();
384
385 unsafe {
389 pod_builder.push_object(
390 &mut object_param_route_frame,
391 SpaTypes::ObjectParamRoute.as_raw(),
392 ParamType::Route.as_raw(),
393 )
394 }
395 .expect("Could not push object");
396
397 pod_builder
398 .add_prop(SPA_PARAM_ROUTE_index, 0)
399 .expect("Could not add prop");
400
401 pod_builder.add_int(route.index).expect("Could not add int");
402
403 pod_builder
404 .add_prop(SPA_PARAM_ROUTE_device, 0)
405 .expect("Could not add prop");
406
407 pod_builder
408 .add_int(route.device)
409 .expect("Could not add int");
410
411 pod_builder
412 .add_prop(SPA_PARAM_ROUTE_props, 0)
413 .expect("Could not add prop");
414
415 let mut object_param_props_frame = MaybeUninit::uninit();
416
417 unsafe {
419 pod_builder.push_object(
420 &mut object_param_props_frame,
421 SpaTypes::ObjectParamProps.as_raw(),
422 ParamType::Route.as_raw(),
423 )
424 }
425 .expect("Could not push object");
426
427 match &self {
428 SetVolume(_, volume) => {
429 pod_builder
430 .add_prop(SPA_PROP_channelVolumes, 0)
431 .expect("Could not add prop");
432
433 let mut array_frame = MaybeUninit::uninit();
434
435 unsafe { pod_builder.push_array(&mut array_frame) }
438 .expect("Could not push object");
439
440 for vol in volume {
441 let vol = vol / NORMAL;
442 pod_builder
443 .add_float(vol * vol * vol)
444 .expect("Could not add bool");
445 }
446
447 unsafe {
449 PodBuilder::pop(&mut pod_builder, array_frame.assume_init_mut());
450 }
451 }
452 Mute(_, mute) => {
453 pod_builder
454 .add_prop(SPA_PROP_mute, 0)
455 .expect("Could not add prop");
456 pod_builder.add_bool(*mute).expect("Could not add bool");
457 }
458 }
459
460 unsafe {
462 PodBuilder::pop(&mut pod_builder, object_param_props_frame.assume_init_mut());
463 }
464
465 pod_builder
466 .add_prop(SPA_PARAM_ROUTE_save, 0)
467 .expect("Could not add prop");
468 pod_builder.add_bool(true).expect("Could not add bool");
469
470 unsafe {
472 PodBuilder::pop(&mut pod_builder, object_param_route_frame.assume_init_mut());
473 }
474
475 debug!(
476 "Setting Device Route param: {:?}",
477 PodDeserializer::deserialize_from::<Value>(&pod_data)
478 );
479 let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
480 device_proxy.set_param(ParamType::Route, 0, pod);
481 }
482 }
483 }
484}
485
486pub(crate) struct Client {
487 event_senders: Mutex<Vec<UnboundedSender<EventKind>>>,
488 command_sender: PwSender<CommandKind>,
489 pub data: Mutex<Data>,
490}
491
492impl Client {
493 fn new() -> Result<Client> {
494 let (tx, rx) = pw_channel();
495
496 thread::Builder::new()
497 .name("i3status_pipewire".to_string())
498 .spawn(|| Client::main_loop_thread(rx))
499 .error("failed to spawn a thread")?;
500
501 Ok(Self {
502 event_senders: Mutex::new(Vec::new()),
503 command_sender: tx,
504 data: Mutex::new(Data::default()),
505 })
506 }
507
508 fn main_loop_thread(command_receiver: PwReceiver<CommandKind>) {
509 let client = CLIENT.as_ref().error("Could not get client").unwrap();
510
511 let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
512
513 let main_loop = MainLoopRc::new(None).expect("Failed to create main loop");
514
515 let context = ContextRc::new(&main_loop, Some(proplist)).expect("Failed to create context");
516 let core = context.connect_rc(None).expect("Failed to connect");
517 let registry = core.get_registry_rc().expect("Failed to get registry");
518 let registry_weak = registry.downgrade();
519
520 let update = Rc::new(RefCell::new(EventKind::empty()));
521 let update_copy = update.clone();
522 let update_copy2 = update.clone();
523
524 let node_proxies = Rc::new(RefCell::new(Proxies::<NodeProxy>::new()));
526 let node_proxies_weak = Rc::downgrade(&node_proxies);
527 let device_proxies = Rc::new(RefCell::new(Proxies::<DeviceProxy>::new()));
528 let device_proxies_weak = Rc::downgrade(&device_proxies);
529 let metadata_proxies = Rc::new(RefCell::new(Proxies::<MetadataProxy>::new()));
530
531 let _receiver = command_receiver.attach(main_loop.loop_(), move |command: CommandKind| {
532 if let Some(node_proxies) = node_proxies_weak.upgrade()
533 && let Some(device_proxies) = device_proxies_weak.upgrade()
534 {
535 command.execute(client, node_proxies.clone(), device_proxies.clone());
536 }
537 });
538
539 let _registry_listener = registry
543 .add_listener_local()
544 .global(move |global| {
545 let Some(registry) = registry_weak.upgrade() else {
546 return;
547 };
548 let global_id = global.id;
549 let Some(global_props) = global.props else {
550 return;
551 };
552 match &global.type_ {
553 ObjectType::Node => {
554 let node_proxy: NodeProxy =
555 registry.bind(global).expect("Could not bind node");
556 node_proxy.subscribe_params(&[ParamType::Props]);
557 let update_copy2 = update_copy.clone();
558 let update_copy3 = update_copy.clone();
559 let node_listener = node_proxy
560 .add_listener_local()
561 .info(move |info| {
562 let running = matches!(info.state(), NodeState::Running);
563 client
564 .data
565 .lock()
566 .unwrap()
567 .nodes
568 .entry(global_id)
569 .and_modify(|node| {
570 if node.running != running {
571 node.running = running;
572 update_copy2.replace_with(|v| {
573 *v | EventKind::NODE_STATE_UPDATE
574 });
575 }
576 });
577 })
578 .param(move |_seq, _id, _index, _next, param| {
579 let Some(param) = param else {
580 return;
581 };
582 let Ok((_, Value::Object(object))) =
583 PodDeserializer::deserialize_from::<Value>(param.as_bytes())
584 else {
585 return;
586 };
587 client
588 .data
589 .lock()
590 .unwrap()
591 .nodes
592 .entry(global_id)
593 .and_modify(|node| {
594 for property in object.properties {
595 if property.key == SPA_PROP_mute {
596 let Value::Bool(muted) = property.value else {
597 return;
598 };
599 let muted = Some(muted);
600 if node.muted != muted {
601 node.muted = muted;
602 update_copy3.replace_with(|v| {
603 *v | EventKind::NODE_PARAM_UPDATE
604 });
605 }
606 } else if property.key == SPA_PROP_channelVolumes {
607 let Value::ValueArray(ValueArray::Float(volumes)) =
608 property.value
609 else {
610 return;
611 };
612
613 let volume = Some(
614 volumes
615 .iter()
616 .map(|vol| vol.cbrt() * NORMAL)
617 .collect(),
618 );
619 if node.volume != volume {
620 node.volume = volume;
621 update_copy3.replace_with(|v| {
622 *v | EventKind::NODE_PARAM_UPDATE
623 });
624 }
625 }
626 }
627 });
628 })
629 .register();
630
631 let proxy_id = node_proxies.borrow_mut().add_proxy(
632 node_proxy,
633 node_listener,
634 &node_proxies,
635 );
636
637 client
638 .data
639 .lock()
640 .unwrap()
641 .nodes
642 .insert(global_id, Node::new(global_id, global_props, proxy_id));
643 update_copy.replace_with(|v| *v | EventKind::NODE_ADDED);
644 }
645 ObjectType::Link => {
646 let mut client_data = client.data.lock().unwrap();
647 let Some(link) = Link::new(global_props) else {
648 return;
649 };
650 if let Some(node) = client_data.nodes.get(&link.link_input_node)
651 && node.name == env!("CARGO_PKG_NAME")
652 {
653 return;
654 }
655
656 client_data.links.insert(global_id, link);
657 update_copy.replace_with(|v| *v | EventKind::LINK_ADDED);
658 }
659 ObjectType::Port => {
660 client.data.lock().unwrap().ports.insert(global_id);
661 update_copy.replace_with(|v| *v | EventKind::PORT_ADDED);
662 }
663 ObjectType::Device => {
664 let device_proxy: DeviceProxy =
665 registry.bind(global).expect("Could not bind device");
666 device_proxy.subscribe_params(&[ParamType::Route]);
667 let update_copy2 = update_copy.clone();
668 let device_listener = device_proxy
669 .add_listener_local()
670 .param(move |_seq, _id, _index, _next, param| {
671 let Some(param) = param else {
672 return;
673 };
674 let Ok((_, Value::Object(object))) =
675 PodDeserializer::deserialize_from::<Value>(param.as_bytes())
676 else {
677 return;
678 };
679 let mut route_index = None;
680 let mut route_direction = None;
681 let mut route_device = None;
682 let mut route_name = None;
683 for property in &object.properties {
684 if property.key == SPA_PARAM_ROUTE_index {
685 let Value::Int(route_index_v) = property.value else {
686 return;
687 };
688 route_index = Some(route_index_v);
689 } else if property.key == SPA_PARAM_ROUTE_direction {
690 let Value::Id(route_direction_v) = property.value else {
691 return;
692 };
693 route_direction = route_direction_v.0.try_into().ok();
694 } else if property.key == SPA_PARAM_ROUTE_device {
695 let Value::Int(route_device_v) = property.value else {
696 return;
697 };
698 route_device = Some(route_device_v);
699 } else if property.key == SPA_PARAM_ROUTE_name {
700 let Value::String(route_name_v) = property.value.to_owned()
701 else {
702 return;
703 };
704 route_name = Some(route_name_v);
705 }
706 }
707
708 if let Some(route_index) = route_index
709 && let Some(route_direction) = route_direction
710 && let Some(route_device) = route_device
711 && let Some(route_name) = route_name
712 {
713 client
714 .data
715 .lock()
716 .unwrap()
717 .directed_routes
718 .entry(global_id)
719 .and_modify(|directed_routes| {
720 let route =
721 directed_routes.get_mut_route(route_direction);
722 if let Some(route) = route {
723 if route.index != route_index
724 || route.device != route_device
725 || route.name != route_name
726 {
727 route.index = route_index;
728 route.device = route_device;
729 route.name = route_name;
730 update_copy2.replace_with(|v| {
731 *v | EventKind::DEVICE_PARAM_UPDATE
732 });
733 }
734 } else {
735 route.replace(Route {
736 index: route_index,
737 device: route_device,
738 name: route_name,
739 });
740 update_copy2.replace_with(|v| {
741 *v | EventKind::DEVICE_PARAM_UPDATE
742 });
743 }
744 });
745 }
746 })
747 .register();
748
749 let proxy_id = device_proxies.borrow_mut().add_proxy(
750 device_proxy,
751 device_listener,
752 &device_proxies,
753 );
754
755 client
756 .data
757 .lock()
758 .unwrap()
759 .directed_routes
760 .insert(global_id, DirectedRoutes::new(proxy_id));
761
762 update_copy.replace_with(|v| *v | EventKind::DEVICE_ADDED);
763 }
764 ObjectType::Metadata => {
765 if global_props.get("metadata.name") != Some("default") {
767 return;
768 }
769 let metadata_proxy: MetadataProxy =
770 registry.bind(global).expect("Could not bind device");
771 let update_copy2 = update_copy.clone();
772 let metadata_listener = metadata_proxy
773 .add_listener_local()
774 .property(move |_subject, key, type_, value| {
775 if type_ != Some("Spa:String:JSON")
776 || (key != Some(DEFAULT_SINK_KEY)
777 && key != Some(DEFAULT_SOURCE_KEY))
778 {
779 return -1;
780 }
781
782 let Some(value) = value else {
783 return -1;
784 };
785
786 let value: serde_json::Value =
787 serde_json::from_str(value).unwrap_or_default();
788 let name = value
789 .get("name")
790 .and_then(|v| v.as_str())
791 .map(|s| s.to_string());
792 if key == Some(DEFAULT_SINK_KEY) {
793 let default_medata_sink =
794 &mut client.data.lock().unwrap().default_metadata.sink;
795 if *default_medata_sink != name {
796 *default_medata_sink = name;
797 update_copy2.replace_with(|v| {
798 *v | EventKind::DEFAULT_META_DATA_UPDATED
799 });
800 }
801 } else {
802 let default_medata_source =
803 &mut client.data.lock().unwrap().default_metadata.source;
804 if *default_medata_source != name {
805 *default_medata_source = name;
806 update_copy2.replace_with(|v| {
807 *v | EventKind::DEFAULT_META_DATA_UPDATED
808 });
809 }
810 }
811
812 0
813 })
814 .register();
815
816 metadata_proxies.borrow_mut().add_proxy(
817 metadata_proxy,
818 metadata_listener,
819 &metadata_proxies,
820 );
821 }
822 _ => (),
823 }
824 })
825 .global_remove(move |uid| {
826 let mut client_data = client.data.lock().unwrap();
827 if client_data.nodes.remove(&uid).is_some() {
828 update_copy2.replace_with(|v| *v | EventKind::NODE_REMOVED);
829 } else if client_data.links.remove(&uid).is_some() {
830 update_copy2.replace_with(|v| *v | EventKind::LINK_REMOVED);
831 } else if client_data.ports.remove(&uid) {
832 update_copy2.replace_with(|v| *v | EventKind::PORT_REMOVED);
833 } else if client_data.directed_routes.remove(&uid).is_some() {
834 update_copy2.replace_with(|v| *v | EventKind::DEVICE_REMOVED);
835 }
836 })
837 .register();
838
839 let output_done = Rc::new(Cell::new(false));
856 let output_done_clone = output_done.clone();
857 let input_done = Rc::new(Cell::new(false));
858 let input_done_clone = input_done.clone();
859
860 let values: Vec<u8> = PodSerializer::serialize(
861 Cursor::new(Vec::new()),
862 &Value::Object(Object {
863 type_: SPA_TYPE_OBJECT_Format,
864 id: SPA_PARAM_EnumFormat,
865 properties: AudioInfoRaw::new().into(),
866 }),
867 )
868 .expect("Failed to serialize pod")
869 .0
870 .into_inner();
871
872 let mut params = [Pod::from_bytes(&values).expect("Failed to create pod")];
873
874 let output_stream = StreamBox::new(
875 &core,
876 "i3status_pipewire_workaround_output_stream",
877 properties! {
878 *keys::MEDIA_TYPE => "Audio",
879 *keys::MEDIA_ROLE => "Music",
880 *keys::MEDIA_CATEGORY => "Playback",
881 *keys::AUDIO_CHANNELS => "2",
882 },
883 )
884 .expect("Could not create output_stream");
885
886 let output_stream_listener = output_stream
887 .add_local_listener()
888 .process(move |_stream, _acc: &mut f64| {
889 output_done_clone.set(true);
890 })
891 .register()
892 .expect("Could not add output_stream listener");
893
894 output_stream
895 .connect(
896 ::pipewire::spa::utils::Direction::Output,
897 None,
898 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
899 &mut params,
900 )
901 .expect("Could not connect output_stream");
902
903 let input_stream = StreamBox::new(
904 &core,
905 "i3status_pipewire_workaround_input_stream",
906 properties! {
907 *keys::MEDIA_TYPE => "Audio",
908 *keys::MEDIA_ROLE => "Music",
909 *keys::MEDIA_CATEGORY => "Playback",
910 *keys::AUDIO_CHANNELS => "2",
911 },
912 )
913 .expect("Could not create input_stream");
914 let input_stream_listener = input_stream
915 .add_local_listener()
916 .process(move |_stream, _acc: &mut f64| {
917 input_done_clone.set(true);
918 })
919 .register()
920 .expect("Could not add input_stream listener");
921
922 input_stream
923 .connect(
924 ::pipewire::spa::utils::Direction::Input,
925 None,
926 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
927 &mut params,
928 )
929 .expect("Could not connect input_stream");
930
931 while !output_done.get() {
932 main_loop.loop_().iterate(Duration::from_secs(1));
933 let event = update.take();
934 if !event.is_empty() {
935 client.send_update_event(event);
936 }
937 }
938 output_stream
939 .disconnect()
940 .expect("Unable to disconnect output_stream");
941 output_stream_listener.unregister();
942
943 while !input_done.get() {
944 main_loop.loop_().iterate(Duration::from_secs(1));
945 let event = update.take();
946 if !event.is_empty() {
947 client.send_update_event(event);
948 }
949 }
950 input_stream
951 .disconnect()
952 .expect("Unable to disconnect input_stream");
953 input_stream_listener.unregister();
954
955 loop {
958 main_loop.loop_().iterate(Duration::from_hours(24));
959 let event = update.take();
960 if !event.is_empty() {
961 client.send_update_event(event);
962 }
963 }
964 }
965
966 pub fn add_event_listener(&self, tx: UnboundedSender<EventKind>) {
967 self.event_senders.lock().unwrap().push(tx);
968 }
969
970 pub fn add_command_listener(&self) -> PwSender<CommandKind> {
971 self.command_sender.clone()
972 }
973
974 pub fn send_update_event(&self, event: EventKind) {
975 debug!("send_update_event {:?}", event);
976 self.event_senders
977 .lock()
978 .unwrap()
979 .retain(|tx| tx.send(event).is_ok());
980 }
981}