i3status_rs/formatting/
scheduling.rs1use crate::BoxedStream;
2use futures::stream::StreamExt as _;
3use std::time::{Duration, Instant};
4use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
5
6pub fn manage_widgets_updates() -> (UnboundedSender<(usize, Vec<u64>)>, BoxedStream<Vec<usize>>) {
7 let (intervals_tx, intervals_rx) = unbounded_channel::<(usize, Vec<u64>)>();
8 struct State {
9 time_anchor: Instant,
10 last_update: u64,
11 intervals_rx: UnboundedReceiver<(usize, Vec<u64>)>,
12 intervals: Vec<(usize, Vec<u64>)>,
13 }
14 let stream = futures::stream::unfold(
15 State {
16 time_anchor: Instant::now(),
17 last_update: 0,
18 intervals_rx,
19 intervals: Vec::new(),
20 },
21 |mut state| async move {
22 loop {
23 if state.intervals.is_empty() {
24 let (id, new_intervals) = state.intervals_rx.recv().await?;
25 state.intervals.retain(|(i, _)| *i != id);
26 if !new_intervals.is_empty() {
27 state.intervals.push((id, new_intervals));
28 }
29 continue;
30 }
31
32 let time = state.time_anchor.elapsed().as_millis() as u64;
33
34 let mut blocks = Vec::new();
35 let mut delay = 100000;
36 for (id, intervals) in &state.intervals {
37 let block_delay = single_block_next_update(intervals, time, state.last_update);
38 if block_delay < delay {
39 delay = block_delay;
40 blocks.clear();
41 }
42 if block_delay == delay {
43 blocks.push(*id);
44 }
45 }
46
47 if delay == 0 {
48 state.last_update = time;
49 return Some((blocks, state));
50 }
51
52 if let Ok(Some((id, new_intervals))) =
53 tokio::time::timeout(Duration::from_millis(delay), state.intervals_rx.recv())
54 .await
55 {
56 state.intervals.retain(|(i, _)| *i != id);
57 if !new_intervals.is_empty() {
58 state.intervals.push((id, new_intervals));
59 }
60 }
61 }
62 },
63 )
64 .boxed();
65 (intervals_tx, stream)
66}
67
68fn single_block_next_update(intervals: &[u64], time: u64, last_update: u64) -> u64 {
69 fn next_update(time: u64, interval: u64) -> u64 {
70 time + interval - time % interval
71 }
72 let mut time_to_next = u64::MAX;
73 for &interval in intervals {
74 if next_update(last_update, interval) <= time {
75 return 0;
76 }
77 time_to_next = time_to_next.min(next_update(time, interval) - time);
78 }
79 time_to_next
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[test]
87 fn single_block() {
88 let intervals = &[200, 300, 500];
94 assert_eq!(single_block_next_update(intervals, 0, 0), 200);
95 assert_eq!(single_block_next_update(intervals, 50, 0), 150);
96 assert_eq!(single_block_next_update(intervals, 210, 50), 0);
97 assert_eq!(single_block_next_update(intervals, 290, 210), 10);
98 assert_eq!(single_block_next_update(intervals, 300, 290), 0);
99 assert_eq!(single_block_next_update(intervals, 300, 300), 100);
100 assert_eq!(single_block_next_update(intervals, 800, 300), 0);
101 }
102}