1use std::cell::{Cell, RefCell};
2use std::collections::{HashMap, HashSet};
3use std::io::Cursor;
4use std::rc::Rc;
5use std::str::FromStr;
6use std::sync::{LazyLock, Mutex};
7use std::thread::JoinHandle;
8use std::time::Duration;
9use std::{panic, thread};
10
11pub(crate) use ::pipewire::channel::Sender as PwSender;
12use ::pipewire::{
13 channel::{Receiver as PwReceiver, channel as pw_channel},
14 context::ContextRc,
15 device::Device as DeviceProxy,
16 keys,
17 main_loop::MainLoopRc,
18 metadata::Metadata as MetadataProxy,
19 node::{Node as NodeProxy, NodeState},
20 properties::properties,
21 proxy::{Listener, ProxyListener, ProxyT},
22 spa::{
23 param::{ParamType, audio::AudioInfoRaw},
24 pod::{
25 Object, Pod, Property, Value, ValueArray, deserialize::PodDeserializer,
26 serialize::PodSerializer,
27 },
28 sys::{
29 SPA_DIRECTION_INPUT, SPA_DIRECTION_OUTPUT, SPA_PARAM_EnumFormat,
30 SPA_PARAM_ROUTE_device, SPA_PARAM_ROUTE_direction, SPA_PARAM_ROUTE_index,
31 SPA_PARAM_ROUTE_name, SPA_PARAM_ROUTE_props, SPA_PARAM_ROUTE_save,
32 SPA_PROP_channelVolumes, SPA_PROP_mute, SPA_TYPE_OBJECT_Format,
33 },
34 utils::{SpaTypes, dict::DictRef},
35 },
36 stream::{StreamBox, StreamFlags},
37 types::ObjectType,
38};
39use bitflags::bitflags;
40use tokio::sync::mpsc::UnboundedSender;
41
42use crate::{Error, ErrorContext as _, Result};
43
44make_log_macro!(debug, "pipewire");
45
46pub(crate) static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
47pub(crate) const PIPEWIRE_CONNECTION_ERROR_MSG: &str = "Could not connect to pipewire";
48
49const NORMAL: f32 = 100.0;
50const DEFAULT_SINK_KEY: &str = "default.audio.sink";
51const DEFAULT_SOURCE_KEY: &str = "default.audio.source";
52
53#[derive(Debug, Clone, Copy)]
54pub(crate) enum Direction {
55 Input,
56 Output,
57}
58
59impl FromStr for Direction {
60 type Err = Error;
61
62 fn from_str(s: &str) -> Result<Self, Self::Err> {
63 if s.ends_with("/Source") {
64 Ok(Self::Input)
65 } else if s.ends_with("/Sink") {
66 Ok(Self::Output)
67 } else {
68 Err(Error::new("Invalid media class to determine direction"))
69 }
70 }
71}
72
73impl TryFrom<u32> for Direction {
74 type Error = Error;
75
76 fn try_from(value: u32) -> Result<Self, Self::Error> {
77 match value {
78 SPA_DIRECTION_INPUT => Ok(Self::Input),
79 SPA_DIRECTION_OUTPUT => Ok(Self::Output),
80 _ => Err(Error::new("Invalid direction value, must be 0 or 1")),
81 }
82 }
83}
84
85#[derive(Debug)]
86pub(crate) struct Node {
87 proxy_id: u32,
88 pub device_id: Option<u32>,
89 pub name: String,
90 pub nick: Option<String>,
91 pub media_class: Option<String>,
92 pub direction: Option<Direction>,
94 pub media_role: Option<String>,
95 pub running: bool,
97 pub muted: Option<bool>,
98 pub volume: Option<Vec<f32>>,
99 pub description: Option<String>,
100 pub form_factor: Option<String>,
101}
102
103impl Node {
104 fn new(global_id: u32, global_props: &DictRef, proxy_id: u32) -> Self {
105 Self {
106 proxy_id,
107 device_id: global_props
108 .get(&keys::DEVICE_ID)
109 .and_then(|s| s.parse().ok()),
110 name: global_props
111 .get(&keys::NODE_NAME)
112 .map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
113 nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
114 media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
115 direction: global_props
116 .get(&keys::MEDIA_CLASS)
117 .and_then(|s| s.parse().ok()),
118 media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
119 description: global_props
120 .get(&keys::NODE_DESCRIPTION)
121 .map(|s| s.to_string()),
122 form_factor: global_props
123 .get(&keys::DEVICE_FORM_FACTOR)
124 .map(|s| s.to_string()),
125 muted: None,
126 volume: None,
127 running: false,
128 }
129 }
130}
131
132#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
133pub(crate) struct Link {
134 pub link_output_node: u32,
135 pub link_input_node: u32,
136}
137
138impl Link {
139 fn new(global_props: &DictRef) -> Option<Self> {
140 if let Some(link_output_node) = global_props
141 .get(&keys::LINK_OUTPUT_NODE)
142 .and_then(|s| s.parse().ok())
143 && let Some(link_input_node) = global_props
144 .get(&keys::LINK_INPUT_NODE)
145 .and_then(|s| s.parse().ok())
146 {
147 Some(Self {
148 link_output_node,
149 link_input_node,
150 })
151 } else {
152 None
153 }
154 }
155}
156
157#[derive(Debug)]
158pub(crate) struct Route {
159 index: i32,
160 device: i32,
161 pub name: String,
162}
163
164#[derive(Debug, Default)]
165pub(crate) struct DirectedRoutes {
166 proxy_id: u32,
167 input: Option<Route>,
169 output: Option<Route>,
170}
171
172impl DirectedRoutes {
173 fn new(proxy_id: u32) -> Self {
174 Self {
175 proxy_id,
176 input: None,
177 output: None,
178 }
179 }
180
181 pub fn get_route(&self, direction: Direction) -> Option<&Route> {
182 match direction {
183 Direction::Input => self.input.as_ref(),
184 Direction::Output => self.output.as_ref(),
185 }
186 }
187
188 fn get_mut_route(&mut self, direction: Direction) -> &mut Option<Route> {
189 match direction {
190 Direction::Input => &mut self.input,
191 Direction::Output => &mut self.output,
192 }
193 }
194}
195
196#[derive(Default)]
197pub(crate) struct DefaultMetadata {
198 pub sink: Option<String>,
199 pub source: Option<String>,
200}
201
202#[derive(Default)]
203pub(crate) struct Data {
204 pub nodes: HashMap<u32, Node>,
205 pub links: HashMap<u32, Link>,
206 pub default_metadata: DefaultMetadata,
207 pub directed_routes: HashMap<u32, DirectedRoutes>,
208 ports: HashSet<u32>,
209}
210
211struct Proxies<T: ProxyT + 'static> {
212 proxies_t: HashMap<u32, T>,
213 listeners: HashMap<u32, Vec<Box<dyn Listener>>>,
214}
215
216impl<T: ProxyT + 'static> Proxies<T> {
217 fn new() -> Self {
218 Self {
219 proxies_t: HashMap::new(),
220 listeners: HashMap::new(),
221 }
222 }
223
224 fn add_proxy(
225 &mut self,
226 proxy: T,
227 listener: impl Listener + 'static,
228 proxies: &Rc<RefCell<Self>>,
229 ) -> u32 {
230 let listener_spe = Box::new(listener);
231
232 let proxy_upcast = proxy.upcast_ref();
233 let proxy_id = proxy_upcast.id();
234
235 let proxies_weak = Rc::downgrade(proxies);
236
237 let listener = proxy_upcast
238 .add_listener_local()
239 .removed(move || {
240 if let Some(proxies) = proxies_weak.upgrade() {
241 proxies.borrow_mut().remove(proxy_id);
242 }
243 })
244 .register();
245
246 self.add_proxy_t(proxy_id, proxy, listener_spe);
247 self.add_proxy_listener(proxy_id, listener);
248
249 proxy_id
250 }
251
252 fn add_proxy_t(&mut self, proxy_id: u32, device_proxy: T, listener: Box<dyn Listener>) {
253 self.proxies_t.insert(proxy_id, device_proxy);
254 self.listeners.entry(proxy_id).or_default().push(listener);
255 }
256
257 fn add_proxy_listener(&mut self, proxy_id: u32, listener: ProxyListener) {
258 self.listeners
259 .entry(proxy_id)
260 .or_default()
261 .push(Box::new(listener));
262 }
263
264 fn remove(&mut self, proxy_id: u32) {
265 self.proxies_t.remove(&proxy_id);
266 self.listeners.remove(&proxy_id);
267 }
268}
269
270bitflags! {
271 #[derive(Debug, Clone, Copy, Default)]
272 pub(crate) struct EventKind: u16 {
273 const PIPEWIRE_CONNECTION_ERROR = 1 << 0;
274 const DEFAULT_META_DATA_UPDATED = 1 << 1;
275 const DEVICE_ADDED = 1 << 2;
276 const DEVICE_PARAM_UPDATE = 1 << 3;
277 const DEVICE_REMOVED = 1 << 4;
278 const LINK_ADDED = 1 << 5;
279 const LINK_REMOVED = 1 << 6;
280 const NODE_ADDED = 1 << 7;
281 const NODE_PARAM_UPDATE = 1 << 8;
282 const NODE_REMOVED = 1 << 9;
283 const NODE_STATE_UPDATE = 1 << 10;
284 const PORT_ADDED = 1 << 11;
285 const PORT_REMOVED = 1 << 12;
286 }
287}
288
289#[derive(Clone, Debug)]
290pub(crate) enum CommandKind {
291 Mute(u32, bool),
292 SetVolume(u32, Vec<f32>),
293}
294
295impl CommandKind {
296 fn create_property_value(&self, param_type: ParamType) -> Value {
297 match self {
298 CommandKind::SetVolume(_, volume) => Value::Object(Object {
299 type_: SpaTypes::ObjectParamProps.as_raw(),
300 id: param_type.as_raw(),
301 properties: vec![Property::new(
302 SPA_PROP_channelVolumes,
303 Value::ValueArray(ValueArray::Float(
304 volume
305 .iter()
306 .map(|vol| {
307 let vol = vol / NORMAL;
308 vol * vol * vol
309 })
310 .collect(),
311 )),
312 )],
313 }),
314 CommandKind::Mute(_, mute) => Value::Object(Object {
315 type_: SpaTypes::ObjectParamProps.as_raw(),
316 id: param_type.as_raw(),
317 properties: vec![Property::new(SPA_PROP_mute, Value::Bool(*mute))],
318 }),
319 }
320 }
321
322 fn execute(
323 self,
324 client: &Client,
325 node_proxies: Rc<RefCell<Proxies<NodeProxy>>>,
326 device_proxies: Rc<RefCell<Proxies<DeviceProxy>>>,
327 ) {
328 debug!("Executing command: {:?}", self);
329 use CommandKind::*;
330 let id = match self {
331 SetVolume(id, _) | Mute(id, _) => id,
332 };
333 let client_data = client.data.lock().unwrap();
334 if let Some(node) = client_data.nodes.get(&id) {
335 if let Some(node_proxy) = node_proxies.borrow_mut().proxies_t.get(&node.proxy_id) {
336 let node_param = self.create_property_value(ParamType::Props);
337 debug!("Setting Node Props param: {:?}", node_param);
338
339 let pod_data = PodSerializer::serialize(Cursor::new(Vec::new()), &node_param)
340 .expect("Failed to serialize node props pod")
341 .0
342 .into_inner();
343 let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
344 node_proxy.set_param(ParamType::Props, 0, pod);
345 }
346
347 if let Some(device_id) = node.device_id
348 && let Some(direction) = node.direction
349 && let Some(directed_routes) = client_data.directed_routes.get(&device_id)
350 && let Some(route) = directed_routes.get_route(direction)
351 && let Some(device_proxy) = device_proxies
352 .borrow_mut()
353 .proxies_t
354 .get(&directed_routes.proxy_id)
355 {
356 let route_param = Value::Object(Object {
357 type_: SpaTypes::ObjectParamRoute.as_raw(),
358 id: ParamType::Route.as_raw(),
359 properties: vec![
360 Property::new(SPA_PARAM_ROUTE_index, Value::Int(route.index)),
361 Property::new(SPA_PARAM_ROUTE_device, Value::Int(route.device)),
362 Property::new(
363 SPA_PARAM_ROUTE_props,
364 self.create_property_value(ParamType::Route),
365 ),
366 Property::new(SPA_PARAM_ROUTE_save, Value::Bool(true)),
367 ],
368 });
369 debug!("Setting Device Route param: {:?}", route_param);
370
371 let pod_data = PodSerializer::serialize(Cursor::new(Vec::new()), &route_param)
372 .expect("Failed to serialize route pod")
373 .0
374 .into_inner();
375 let pod = Pod::from_bytes(&pod_data).expect("Unable to construct pod");
376 device_proxy.set_param(ParamType::Route, 0, pod);
377 }
378 }
379 }
380}
381
382pub(crate) struct Client {
383 event_senders: Mutex<Vec<UnboundedSender<EventKind>>>,
384 command_sender: PwSender<CommandKind>,
385 handle: JoinHandle<()>,
386 pub data: Mutex<Data>,
387}
388
389impl Client {
390 fn new() -> Result<Client> {
391 let (command_sender, command_receiver) = pw_channel();
392
393 let handle = thread::Builder::new()
394 .name("i3status_pipewire".to_string())
395 .spawn(|| Client::main_loop_thread(command_receiver))
396 .error("failed to spawn a thread")?;
397
398 Ok(Self {
399 event_senders: Mutex::new(Vec::new()),
400 command_sender,
401 handle,
402 data: Mutex::new(Data::default()),
403 })
404 }
405
406 pub fn is_terminated(&self) -> bool {
407 self.handle.is_finished()
408 }
409
410 fn main_loop_thread(command_receiver: PwReceiver<CommandKind>) -> ! {
411 let client = CLIENT.as_ref().expect("Could not get client");
412
413 panic::set_hook(Box::new(|_| {
414 client.send_update_event(EventKind::PIPEWIRE_CONNECTION_ERROR);
415 }));
416
417 let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
418
419 let main_loop = MainLoopRc::new(None).expect("Failed to create main loop");
420
421 let context = ContextRc::new(&main_loop, Some(proplist)).expect("Failed to create context");
422 let core = context.connect_rc(None).expect("Failed to connect");
423 let registry = core.get_registry_rc().expect("Failed to get registry");
424 let registry_weak = registry.downgrade();
425
426 let update = Rc::new(RefCell::new(EventKind::empty()));
427 let update_copy = update.clone();
428 let update_copy2 = update.clone();
429
430 let node_proxies = Rc::new(RefCell::new(Proxies::<NodeProxy>::new()));
432 let node_proxies_weak = Rc::downgrade(&node_proxies);
433 let device_proxies = Rc::new(RefCell::new(Proxies::<DeviceProxy>::new()));
434 let device_proxies_weak = Rc::downgrade(&device_proxies);
435 let metadata_proxies = Rc::new(RefCell::new(Proxies::<MetadataProxy>::new()));
436
437 let _receiver = command_receiver.attach(main_loop.loop_(), move |command: CommandKind| {
438 if let Some(node_proxies) = node_proxies_weak.upgrade()
439 && let Some(device_proxies) = device_proxies_weak.upgrade()
440 {
441 command.execute(client, node_proxies.clone(), device_proxies.clone());
442 }
443 });
444
445 let _registry_listener = registry
449 .add_listener_local()
450 .global(move |global| {
451 let Some(registry) = registry_weak.upgrade() else {
452 return;
453 };
454 let global_id = global.id;
455 let Some(global_props) = global.props else {
456 return;
457 };
458 match &global.type_ {
459 ObjectType::Node => {
460 let node_proxy: NodeProxy =
461 registry.bind(global).expect("Could not bind node");
462 node_proxy.subscribe_params(&[ParamType::Props]);
463 let update_copy2 = update_copy.clone();
464 let update_copy3 = update_copy.clone();
465 let node_listener = node_proxy
466 .add_listener_local()
467 .info(move |info| {
468 let running = matches!(info.state(), NodeState::Running);
469 client
470 .data
471 .lock()
472 .unwrap()
473 .nodes
474 .entry(global_id)
475 .and_modify(|node| {
476 if node.running != running {
477 node.running = running;
478 update_copy2.replace_with(|v| {
479 *v | EventKind::NODE_STATE_UPDATE
480 });
481 }
482 });
483 })
484 .param(move |_seq, _id, _index, _next, param| {
485 let Some(param) = param else {
486 return;
487 };
488 let Ok((_, Value::Object(object))) =
489 PodDeserializer::deserialize_from::<Value>(param.as_bytes())
490 else {
491 return;
492 };
493 client
494 .data
495 .lock()
496 .unwrap()
497 .nodes
498 .entry(global_id)
499 .and_modify(|node| {
500 for property in object.properties {
501 if property.key == SPA_PROP_mute {
502 let Value::Bool(muted) = property.value else {
503 return;
504 };
505 let muted = Some(muted);
506 if node.muted != muted {
507 node.muted = muted;
508 update_copy3.replace_with(|v| {
509 *v | EventKind::NODE_PARAM_UPDATE
510 });
511 }
512 } else if property.key == SPA_PROP_channelVolumes {
513 let Value::ValueArray(ValueArray::Float(volumes)) =
514 property.value
515 else {
516 return;
517 };
518
519 let volume = Some(
520 volumes
521 .iter()
522 .map(|vol| vol.cbrt() * NORMAL)
523 .collect(),
524 );
525 if node.volume != volume {
526 node.volume = volume;
527 update_copy3.replace_with(|v| {
528 *v | EventKind::NODE_PARAM_UPDATE
529 });
530 }
531 }
532 }
533 });
534 })
535 .register();
536
537 let proxy_id = node_proxies.borrow_mut().add_proxy(
538 node_proxy,
539 node_listener,
540 &node_proxies,
541 );
542
543 client
544 .data
545 .lock()
546 .unwrap()
547 .nodes
548 .insert(global_id, Node::new(global_id, global_props, proxy_id));
549 update_copy.replace_with(|v| *v | EventKind::NODE_ADDED);
550 }
551 ObjectType::Link => {
552 let mut client_data = client.data.lock().unwrap();
553 let Some(link) = Link::new(global_props) else {
554 return;
555 };
556 if let Some(node) = client_data.nodes.get(&link.link_input_node)
557 && node.name == env!("CARGO_PKG_NAME")
558 {
559 return;
560 }
561
562 client_data.links.insert(global_id, link);
563 update_copy.replace_with(|v| *v | EventKind::LINK_ADDED);
564 }
565 ObjectType::Port => {
566 client.data.lock().unwrap().ports.insert(global_id);
567 update_copy.replace_with(|v| *v | EventKind::PORT_ADDED);
568 }
569 ObjectType::Device => {
570 let device_proxy: DeviceProxy =
571 registry.bind(global).expect("Could not bind device");
572 device_proxy.subscribe_params(&[ParamType::Route]);
573 let update_copy2 = update_copy.clone();
574 let device_listener = device_proxy
575 .add_listener_local()
576 .param(move |_seq, _id, _index, _next, param| {
577 let Some(param) = param else {
578 return;
579 };
580 let Ok((_, Value::Object(object))) =
581 PodDeserializer::deserialize_from::<Value>(param.as_bytes())
582 else {
583 return;
584 };
585 let mut route_index = None;
586 let mut route_direction = None;
587 let mut route_device = None;
588 let mut route_name = None;
589 for property in &object.properties {
590 if property.key == SPA_PARAM_ROUTE_index {
591 let Value::Int(route_index_v) = property.value else {
592 return;
593 };
594 route_index = Some(route_index_v);
595 } else if property.key == SPA_PARAM_ROUTE_direction {
596 let Value::Id(route_direction_v) = property.value else {
597 return;
598 };
599 route_direction = route_direction_v.0.try_into().ok();
600 } else if property.key == SPA_PARAM_ROUTE_device {
601 let Value::Int(route_device_v) = property.value else {
602 return;
603 };
604 route_device = Some(route_device_v);
605 } else if property.key == SPA_PARAM_ROUTE_name {
606 let Value::String(route_name_v) = property.value.to_owned()
607 else {
608 return;
609 };
610 route_name = Some(route_name_v);
611 }
612 }
613
614 if let Some(route_index) = route_index
615 && let Some(route_direction) = route_direction
616 && let Some(route_device) = route_device
617 && let Some(route_name) = route_name
618 {
619 client
620 .data
621 .lock()
622 .unwrap()
623 .directed_routes
624 .entry(global_id)
625 .and_modify(|directed_routes| {
626 let route =
627 directed_routes.get_mut_route(route_direction);
628 if let Some(route) = route {
629 if route.index != route_index
630 || route.device != route_device
631 || route.name != route_name
632 {
633 route.index = route_index;
634 route.device = route_device;
635 route.name = route_name;
636 update_copy2.replace_with(|v| {
637 *v | EventKind::DEVICE_PARAM_UPDATE
638 });
639 }
640 } else {
641 route.replace(Route {
642 index: route_index,
643 device: route_device,
644 name: route_name,
645 });
646 update_copy2.replace_with(|v| {
647 *v | EventKind::DEVICE_PARAM_UPDATE
648 });
649 }
650 });
651 }
652 })
653 .register();
654
655 let proxy_id = device_proxies.borrow_mut().add_proxy(
656 device_proxy,
657 device_listener,
658 &device_proxies,
659 );
660
661 client
662 .data
663 .lock()
664 .unwrap()
665 .directed_routes
666 .insert(global_id, DirectedRoutes::new(proxy_id));
667
668 update_copy.replace_with(|v| *v | EventKind::DEVICE_ADDED);
669 }
670 ObjectType::Metadata => {
671 if global_props.get("metadata.name") != Some("default") {
673 return;
674 }
675 let metadata_proxy: MetadataProxy =
676 registry.bind(global).expect("Could not bind device");
677 let update_copy2 = update_copy.clone();
678 let metadata_listener = metadata_proxy
679 .add_listener_local()
680 .property(move |_subject, key, type_, value| {
681 if type_ != Some("Spa:String:JSON")
682 || (key != Some(DEFAULT_SINK_KEY)
683 && key != Some(DEFAULT_SOURCE_KEY))
684 {
685 return -1;
686 }
687
688 let Some(value) = value else {
689 return -1;
690 };
691
692 let value: serde_json::Value =
693 serde_json::from_str(value).unwrap_or_default();
694 let name = value
695 .get("name")
696 .and_then(|v| v.as_str())
697 .map(|s| s.to_string());
698 if key == Some(DEFAULT_SINK_KEY) {
699 let default_medata_sink =
700 &mut client.data.lock().unwrap().default_metadata.sink;
701 if *default_medata_sink != name {
702 *default_medata_sink = name;
703 update_copy2.replace_with(|v| {
704 *v | EventKind::DEFAULT_META_DATA_UPDATED
705 });
706 }
707 } else {
708 let default_medata_source =
709 &mut client.data.lock().unwrap().default_metadata.source;
710 if *default_medata_source != name {
711 *default_medata_source = name;
712 update_copy2.replace_with(|v| {
713 *v | EventKind::DEFAULT_META_DATA_UPDATED
714 });
715 }
716 }
717
718 0
719 })
720 .register();
721
722 metadata_proxies.borrow_mut().add_proxy(
723 metadata_proxy,
724 metadata_listener,
725 &metadata_proxies,
726 );
727 }
728 _ => (),
729 }
730 })
731 .global_remove(move |uid| {
732 let mut client_data = client.data.lock().unwrap();
733 if client_data.nodes.remove(&uid).is_some() {
734 update_copy2.replace_with(|v| *v | EventKind::NODE_REMOVED);
735 } else if client_data.links.remove(&uid).is_some() {
736 update_copy2.replace_with(|v| *v | EventKind::LINK_REMOVED);
737 } else if client_data.ports.remove(&uid) {
738 update_copy2.replace_with(|v| *v | EventKind::PORT_REMOVED);
739 } else if client_data.directed_routes.remove(&uid).is_some() {
740 update_copy2.replace_with(|v| *v | EventKind::DEVICE_REMOVED);
741 }
742 })
743 .register();
744
745 let output_done = Rc::new(Cell::new(false));
762 let output_done_clone = output_done.clone();
763 let input_done = Rc::new(Cell::new(false));
764 let input_done_clone = input_done.clone();
765
766 let values: Vec<u8> = PodSerializer::serialize(
767 Cursor::new(Vec::new()),
768 &Value::Object(Object {
769 type_: SPA_TYPE_OBJECT_Format,
770 id: SPA_PARAM_EnumFormat,
771 properties: AudioInfoRaw::new().into(),
772 }),
773 )
774 .expect("Failed to serialize pod")
775 .0
776 .into_inner();
777
778 let mut params = [Pod::from_bytes(&values).expect("Failed to create pod")];
779
780 let output_stream = StreamBox::new(
781 &core,
782 "i3status_pipewire_workaround_output_stream",
783 properties! {
784 *keys::MEDIA_TYPE => "Audio",
785 *keys::MEDIA_ROLE => "Music",
786 *keys::MEDIA_CATEGORY => "Playback",
787 *keys::AUDIO_CHANNELS => "2",
788 },
789 )
790 .expect("Could not create output_stream");
791
792 let output_stream_listener = output_stream
793 .add_local_listener()
794 .process(move |_stream, _acc: &mut f64| {
795 output_done_clone.set(true);
796 })
797 .register()
798 .expect("Could not add output_stream listener");
799
800 output_stream
801 .connect(
802 ::pipewire::spa::utils::Direction::Output,
803 None,
804 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
805 &mut params,
806 )
807 .expect("Could not connect output_stream");
808
809 let input_stream = StreamBox::new(
810 &core,
811 "i3status_pipewire_workaround_input_stream",
812 properties! {
813 *keys::MEDIA_TYPE => "Audio",
814 *keys::MEDIA_ROLE => "Music",
815 *keys::MEDIA_CATEGORY => "Playback",
816 *keys::AUDIO_CHANNELS => "2",
817 },
818 )
819 .expect("Could not create input_stream");
820 let input_stream_listener = input_stream
821 .add_local_listener()
822 .process(move |_stream, _acc: &mut f64| {
823 input_done_clone.set(true);
824 })
825 .register()
826 .expect("Could not add input_stream listener");
827
828 input_stream
829 .connect(
830 ::pipewire::spa::utils::Direction::Input,
831 None,
832 StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
833 &mut params,
834 )
835 .expect("Could not connect input_stream");
836
837 while !output_done.get() {
838 main_loop.loop_().iterate(Duration::from_secs(1));
839 let event = update.take();
840 if !event.is_empty() {
841 client.send_update_event(event);
842 }
843 }
844 output_stream
845 .disconnect()
846 .expect("Unable to disconnect output_stream");
847 output_stream_listener.unregister();
848
849 while !input_done.get() {
850 main_loop.loop_().iterate(Duration::from_secs(1));
851 let event = update.take();
852 if !event.is_empty() {
853 client.send_update_event(event);
854 }
855 }
856 input_stream
857 .disconnect()
858 .expect("Unable to disconnect input_stream");
859 input_stream_listener.unregister();
860
861 loop {
864 main_loop.loop_().iterate(Duration::from_hours(24));
865 let event = update.take();
866 if !event.is_empty() {
867 client.send_update_event(event);
868 }
869 }
870 }
871
872 pub fn add_event_listener(&self, tx: UnboundedSender<EventKind>) {
873 self.event_senders.lock().unwrap().push(tx);
874 }
875
876 pub fn add_command_listener(&self) -> PwSender<CommandKind> {
877 self.command_sender.clone()
878 }
879
880 pub fn send_update_event(&self, event: EventKind) {
881 debug!("send_update_event {:?}", event);
882 self.event_senders
883 .lock()
884 .unwrap()
885 .retain(|tx| tx.send(event).is_ok());
886 }
887}