i3status_rs/formatting/
scheduling.rs

1use crate::BoxedStream;
2use futures::stream::StreamExt as _;
3use std::time::{Duration, Instant};
4use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
5
6pub fn manage_widgets_updates() -> (UnboundedSender<(usize, Vec<u64>)>, BoxedStream<Vec<usize>>) {
7    let (intervals_tx, intervals_rx) = unbounded_channel::<(usize, Vec<u64>)>();
8    struct State {
9        time_anchor: Instant,
10        last_update: u64,
11        intervals_rx: UnboundedReceiver<(usize, Vec<u64>)>,
12        intervals: Vec<(usize, Vec<u64>)>,
13    }
14    let stream = futures::stream::unfold(
15        State {
16            time_anchor: Instant::now(),
17            last_update: 0,
18            intervals_rx,
19            intervals: Vec::new(),
20        },
21        |mut state| async move {
22            loop {
23                if state.intervals.is_empty() {
24                    let (id, new_intervals) = state.intervals_rx.recv().await?;
25                    state.intervals.retain(|(i, _)| *i != id);
26                    if !new_intervals.is_empty() {
27                        state.intervals.push((id, new_intervals));
28                    }
29                    continue;
30                }
31
32                let time = state.time_anchor.elapsed().as_millis() as u64;
33
34                let mut blocks = Vec::new();
35                let mut delay = 100000;
36                for (id, intervals) in &state.intervals {
37                    let block_delay = single_block_next_update(intervals, time, state.last_update);
38                    if block_delay < delay {
39                        delay = block_delay;
40                        blocks.clear();
41                    }
42                    if block_delay == delay {
43                        blocks.push(*id);
44                    }
45                }
46
47                if delay == 0 {
48                    state.last_update = time;
49                    return Some((blocks, state));
50                }
51
52                if let Ok(Some((id, new_intervals))) =
53                    tokio::time::timeout(Duration::from_millis(delay), state.intervals_rx.recv())
54                        .await
55                {
56                    state.intervals.retain(|(i, _)| *i != id);
57                    if !new_intervals.is_empty() {
58                        state.intervals.push((id, new_intervals));
59                    }
60                }
61            }
62        },
63    )
64    .boxed();
65    (intervals_tx, stream)
66}
67
68fn single_block_next_update(intervals: &[u64], time: u64, last_update: u64) -> u64 {
69    fn next_update(time: u64, interval: u64) -> u64 {
70        time + interval - time % interval
71    }
72    let mut time_to_next = u64::MAX;
73    for &interval in intervals {
74        if next_update(last_update, interval) <= time {
75            return 0;
76        }
77        time_to_next = time_to_next.min(next_update(time, interval) - time);
78    }
79    time_to_next
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[test]
87    fn single_block() {
88        //     0   100  200  300  400  500  600  700  800  900  1000
89        //     |    |    |    |    |    |    |    |    |    |    |
90        // 200 x         x         x         x         x         x
91        // 300 x              x              x              x
92        // 500 x                        x                        x
93        let intervals = &[200, 300, 500];
94        assert_eq!(single_block_next_update(intervals, 0, 0), 200);
95        assert_eq!(single_block_next_update(intervals, 50, 0), 150);
96        assert_eq!(single_block_next_update(intervals, 210, 50), 0);
97        assert_eq!(single_block_next_update(intervals, 290, 210), 10);
98        assert_eq!(single_block_next_update(intervals, 300, 290), 0);
99        assert_eq!(single_block_next_update(intervals, 300, 300), 100);
100        assert_eq!(single_block_next_update(intervals, 800, 300), 0);
101    }
102}