i3status_rs/blocks/privacy/
pipewire.rs

1use std::cell::Cell;
2use std::collections::HashMap;
3use std::rc::Rc;
4use std::sync::{Arc, Mutex, Weak};
5use std::thread;
6
7use ::pipewire::{
8    context::Context, keys, main_loop::MainLoop, properties::properties, spa::utils::dict::DictRef,
9    types::ObjectType,
10};
11use itertools::Itertools as _;
12use tokio::sync::Notify;
13
14use super::*;
15
16static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
17
18#[derive(Debug)]
19struct Node {
20    name: String,
21    nick: Option<String>,
22    media_class: Option<String>,
23    media_role: Option<String>,
24    description: Option<String>,
25}
26
27impl Node {
28    fn new(global_id: u32, global_props: &DictRef) -> Self {
29        Self {
30            name: global_props
31                .get(&keys::NODE_NAME)
32                .map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
33            nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
34            media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
35            media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
36            description: global_props
37                .get(&keys::NODE_DESCRIPTION)
38                .map(|s| s.to_string()),
39        }
40    }
41}
42
43#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
44struct Link {
45    link_output_node: u32,
46    link_input_node: u32,
47}
48
49impl Link {
50    fn new(global_props: &DictRef) -> Option<Self> {
51        if let Some(link_output_node) = global_props
52            .get(&keys::LINK_OUTPUT_NODE)
53            .and_then(|s| s.parse().ok())
54            && let Some(link_input_node) = global_props
55                .get(&keys::LINK_INPUT_NODE)
56                .and_then(|s| s.parse().ok())
57        {
58            Some(Self {
59                link_output_node,
60                link_input_node,
61            })
62        } else {
63            None
64        }
65    }
66}
67
68#[derive(Default)]
69struct Data {
70    nodes: HashMap<u32, Node>,
71    links: HashMap<u32, Link>,
72}
73
74#[derive(Default)]
75struct Client {
76    event_listeners: Mutex<Vec<Weak<Notify>>>,
77    data: Mutex<Data>,
78}
79
80impl Client {
81    fn new() -> Result<Client> {
82        thread::Builder::new()
83            .name("privacy_pipewire".to_string())
84            .spawn(Client::main_loop_thread)
85            .error("failed to spawn a thread")?;
86
87        Ok(Client::default())
88    }
89
90    fn main_loop_thread() {
91        let client = CLIENT.as_ref().error("Could not get client").unwrap();
92
93        let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
94
95        let main_loop = MainLoop::new(None).expect("Failed to create main loop");
96
97        let context =
98            Context::with_properties(&main_loop, proplist).expect("Failed to create context");
99        let core = context.connect(None).expect("Failed to connect");
100        let registry = core.get_registry().expect("Failed to get registry");
101
102        let updated = Rc::new(Cell::new(false));
103        let updated_copy = updated.clone();
104        let updated_copy2 = updated.clone();
105
106        // Register a callback to the `global` event on the registry, which notifies of any new global objects
107        // appearing on the remote.
108        // The callback will only get called as long as we keep the returned listener alive.
109        let _registry_listener = registry
110            .add_listener_local()
111            .global(move |global| {
112                let Some(global_props) = global.props else {
113                    return;
114                };
115                match &global.type_ {
116                    ObjectType::Node => {
117                        client
118                            .data
119                            .lock()
120                            .unwrap()
121                            .nodes
122                            .insert(global.id, Node::new(global.id, global_props));
123                        updated_copy.set(true);
124                    }
125                    ObjectType::Link => {
126                        let Some(link) = Link::new(global_props) else {
127                            return;
128                        };
129                        client.data.lock().unwrap().links.insert(global.id, link);
130                        updated_copy.set(true);
131                    }
132                    _ => (),
133                }
134            })
135            .global_remove(move |uid| {
136                let mut data = client.data.lock().unwrap();
137                if data.nodes.remove(&uid).is_some() || data.links.remove(&uid).is_some() {
138                    updated_copy2.set(true);
139                }
140            })
141            .register();
142
143        loop {
144            main_loop.loop_().iterate(Duration::from_secs(60 * 60 * 24));
145            if updated.get() {
146                updated.set(false);
147                client
148                    .event_listeners
149                    .lock()
150                    .unwrap()
151                    .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some());
152            }
153        }
154    }
155
156    fn add_event_listener(&self, notify: &Arc<Notify>) {
157        self.event_listeners
158            .lock()
159            .unwrap()
160            .push(Arc::downgrade(notify));
161    }
162}
163
164#[derive(Deserialize, Debug, SmartDefault)]
165#[serde(rename_all = "lowercase", deny_unknown_fields, default)]
166pub struct Config {
167    exclude_output: Vec<String>,
168    exclude_input: Vec<String>,
169    display: NodeDisplay,
170}
171
172#[derive(Deserialize, Debug, SmartDefault)]
173#[serde(rename_all = "snake_case")]
174enum NodeDisplay {
175    #[default]
176    Name,
177    Description,
178    Nickname,
179}
180
181impl NodeDisplay {
182    fn map_node(&self, node: &Node) -> String {
183        match self {
184            NodeDisplay::Name => node.name.clone(),
185            NodeDisplay::Description => node.description.clone().unwrap_or(node.name.clone()),
186            NodeDisplay::Nickname => node.nick.clone().unwrap_or(node.name.clone()),
187        }
188    }
189}
190
191pub(super) struct Monitor<'a> {
192    config: &'a Config,
193    notify: Arc<Notify>,
194}
195
196impl<'a> Monitor<'a> {
197    pub(super) async fn new(config: &'a Config) -> Result<Self> {
198        let client = CLIENT.as_ref().error("Could not get client")?;
199        let notify = Arc::new(Notify::new());
200        client.add_event_listener(&notify);
201        Ok(Self { config, notify })
202    }
203}
204
205#[async_trait]
206impl PrivacyMonitor for Monitor<'_> {
207    async fn get_info(&mut self) -> Result<PrivacyInfo> {
208        let client = CLIENT.as_ref().error("Could not get client")?;
209        let data = client.data.lock().unwrap();
210        let mut mapping: PrivacyInfo = PrivacyInfo::new();
211
212        for node in data.nodes.values() {
213            debug! {"{:?}", node};
214        }
215
216        // The links must be sorted and then dedup'ed since you can multiple links between any given pair of nodes
217        for Link {
218            link_output_node,
219            link_input_node,
220            ..
221        } in data.links.values().sorted().dedup()
222        {
223            if let Some(output_node) = data.nodes.get(link_output_node)
224                && let Some(input_node) = data.nodes.get(link_input_node)
225                && input_node.media_class != Some("Audio/Sink".into())
226                && !self.config.exclude_output.contains(&output_node.name)
227                && !self.config.exclude_input.contains(&input_node.name)
228            {
229                let type_ = if input_node.media_class == Some("Stream/Input/Video".into()) {
230                    if output_node.media_role == Some("Camera".into()) {
231                        Type::Webcam
232                    } else {
233                        Type::Video
234                    }
235                } else if input_node.media_class == Some("Stream/Input/Audio".into()) {
236                    if output_node.media_class == Some("Audio/Sink".into()) {
237                        Type::AudioSink
238                    } else {
239                        Type::Audio
240                    }
241                } else {
242                    Type::Unknown
243                };
244                *mapping
245                    .entry(type_)
246                    .or_default()
247                    .entry(self.config.display.map_node(output_node))
248                    .or_default()
249                    .entry(self.config.display.map_node(input_node))
250                    .or_default() += 1;
251            }
252        }
253
254        Ok(mapping)
255    }
256
257    async fn wait_for_change(&mut self) -> Result<()> {
258        self.notify.notified().await;
259        Ok(())
260    }
261}