i3status_rs/blocks/privacy/
pipewire.rs

1use std::cell::Cell;
2use std::collections::HashMap;
3use std::rc::Rc;
4use std::sync::{Arc, Mutex, Weak};
5use std::thread;
6
7use ::pipewire::{
8    context::Context, keys, main_loop::MainLoop, properties::properties, spa::utils::dict::DictRef,
9    types::ObjectType,
10};
11use itertools::Itertools as _;
12use tokio::sync::Notify;
13
14use super::*;
15
16static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);
17
18#[derive(Debug)]
19struct Node {
20    name: String,
21    nick: Option<String>,
22    media_class: Option<String>,
23    media_role: Option<String>,
24    description: Option<String>,
25}
26
27impl Node {
28    fn new(global_id: u32, global_props: &DictRef) -> Self {
29        Self {
30            name: global_props
31                .get(&keys::NODE_NAME)
32                .map_or_else(|| format!("node_{}", global_id), |s| s.to_string()),
33            nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
34            media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
35            media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
36            description: global_props
37                .get(&keys::NODE_DESCRIPTION)
38                .map(|s| s.to_string()),
39        }
40    }
41}
42
43#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
44struct Link {
45    link_output_node: u32,
46    link_input_node: u32,
47}
48
49impl Link {
50    fn new(global_props: &DictRef) -> Option<Self> {
51        if let (Some(link_output_node), Some(link_input_node)) = (
52            global_props
53                .get(&keys::LINK_OUTPUT_NODE)
54                .and_then(|s| s.parse().ok()),
55            global_props
56                .get(&keys::LINK_INPUT_NODE)
57                .and_then(|s| s.parse().ok()),
58        ) {
59            Some(Self {
60                link_output_node,
61                link_input_node,
62            })
63        } else {
64            None
65        }
66    }
67}
68
69#[derive(Default)]
70struct Data {
71    nodes: HashMap<u32, Node>,
72    links: HashMap<u32, Link>,
73}
74
75#[derive(Default)]
76struct Client {
77    event_listeners: Mutex<Vec<Weak<Notify>>>,
78    data: Mutex<Data>,
79}
80
81impl Client {
82    fn new() -> Result<Client> {
83        thread::Builder::new()
84            .name("privacy_pipewire".to_string())
85            .spawn(Client::main_loop_thread)
86            .error("failed to spawn a thread")?;
87
88        Ok(Client::default())
89    }
90
91    fn main_loop_thread() {
92        let client = CLIENT.as_ref().error("Could not get client").unwrap();
93
94        let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};
95
96        let main_loop = MainLoop::new(None).expect("Failed to create main loop");
97
98        let context =
99            Context::with_properties(&main_loop, proplist).expect("Failed to create context");
100        let core = context.connect(None).expect("Failed to connect");
101        let registry = core.get_registry().expect("Failed to get registry");
102
103        let updated = Rc::new(Cell::new(false));
104        let updated_copy = updated.clone();
105        let updated_copy2 = updated.clone();
106
107        // Register a callback to the `global` event on the registry, which notifies of any new global objects
108        // appearing on the remote.
109        // The callback will only get called as long as we keep the returned listener alive.
110        let _registry_listener = registry
111            .add_listener_local()
112            .global(move |global| {
113                let Some(global_props) = global.props else {
114                    return;
115                };
116                match &global.type_ {
117                    ObjectType::Node => {
118                        client
119                            .data
120                            .lock()
121                            .unwrap()
122                            .nodes
123                            .insert(global.id, Node::new(global.id, global_props));
124                        updated_copy.set(true);
125                    }
126                    ObjectType::Link => {
127                        let Some(link) = Link::new(global_props) else {
128                            return;
129                        };
130                        client.data.lock().unwrap().links.insert(global.id, link);
131                        updated_copy.set(true);
132                    }
133                    _ => (),
134                }
135            })
136            .global_remove(move |uid| {
137                let mut data = client.data.lock().unwrap();
138                if data.nodes.remove(&uid).is_some() || data.links.remove(&uid).is_some() {
139                    updated_copy2.set(true);
140                }
141            })
142            .register();
143
144        loop {
145            main_loop.loop_().iterate(Duration::from_secs(60 * 60 * 24));
146            if updated.get() {
147                updated.set(false);
148                client
149                    .event_listeners
150                    .lock()
151                    .unwrap()
152                    .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some());
153            }
154        }
155    }
156
157    fn add_event_listener(&self, notify: &Arc<Notify>) {
158        self.event_listeners
159            .lock()
160            .unwrap()
161            .push(Arc::downgrade(notify));
162    }
163}
164
165#[derive(Deserialize, Debug, SmartDefault)]
166#[serde(rename_all = "lowercase", deny_unknown_fields, default)]
167pub struct Config {
168    exclude_output: Vec<String>,
169    exclude_input: Vec<String>,
170    display: NodeDisplay,
171}
172
173#[derive(Deserialize, Debug, SmartDefault)]
174#[serde(rename_all = "snake_case")]
175enum NodeDisplay {
176    #[default]
177    Name,
178    Description,
179    Nickname,
180}
181
182impl NodeDisplay {
183    fn map_node(&self, node: &Node) -> String {
184        match self {
185            NodeDisplay::Name => node.name.clone(),
186            NodeDisplay::Description => node.description.clone().unwrap_or(node.name.clone()),
187            NodeDisplay::Nickname => node.nick.clone().unwrap_or(node.name.clone()),
188        }
189    }
190}
191
192pub(super) struct Monitor<'a> {
193    config: &'a Config,
194    notify: Arc<Notify>,
195}
196
197impl<'a> Monitor<'a> {
198    pub(super) async fn new(config: &'a Config) -> Result<Self> {
199        let client = CLIENT.as_ref().error("Could not get client")?;
200        let notify = Arc::new(Notify::new());
201        client.add_event_listener(&notify);
202        Ok(Self { config, notify })
203    }
204}
205
206#[async_trait]
207impl PrivacyMonitor for Monitor<'_> {
208    async fn get_info(&mut self) -> Result<PrivacyInfo> {
209        let client = CLIENT.as_ref().error("Could not get client")?;
210        let data = client.data.lock().unwrap();
211        let mut mapping: PrivacyInfo = PrivacyInfo::new();
212
213        for node in data.nodes.values() {
214            debug! {"{:?}", node};
215        }
216
217        // The links must be sorted and then dedup'ed since you can multiple links between any given pair of nodes
218        for Link {
219            link_output_node,
220            link_input_node,
221            ..
222        } in data.links.values().sorted().dedup()
223        {
224            let (Some(output_node), Some(input_node)) = (
225                data.nodes.get(link_output_node),
226                data.nodes.get(link_input_node),
227            ) else {
228                continue;
229            };
230            if input_node.media_class != Some("Audio/Sink".into())
231                && !self.config.exclude_output.contains(&output_node.name)
232                && !self.config.exclude_input.contains(&input_node.name)
233            {
234                let type_ = if input_node.media_class == Some("Stream/Input/Video".into()) {
235                    if output_node.media_role == Some("Camera".into()) {
236                        Type::Webcam
237                    } else {
238                        Type::Video
239                    }
240                } else if input_node.media_class == Some("Stream/Input/Audio".into()) {
241                    if output_node.media_class == Some("Audio/Sink".into()) {
242                        Type::AudioSink
243                    } else {
244                        Type::Audio
245                    }
246                } else {
247                    Type::Unknown
248                };
249                *mapping
250                    .entry(type_)
251                    .or_default()
252                    .entry(self.config.display.map_node(output_node))
253                    .or_default()
254                    .entry(self.config.display.map_node(input_node))
255                    .or_default() += 1;
256            }
257        }
258
259        Ok(mapping)
260    }
261
262    async fn wait_for_change(&mut self) -> Result<()> {
263        self.notify.notified().await;
264        Ok(())
265    }
266}