hyperion/instance/
muxer.rs

1use std::collections::{BTreeMap, HashMap};
2use std::convert::TryInto;
3use std::pin::Pin;
4use std::time::Instant;
5
6use futures::Future;
7use tokio::select;
8
9use crate::{
10    api::types::PriorityInfo,
11    component::ComponentName,
12    global::{Global, InputMessage, InputMessageData, Message},
13    models::Color,
14};
15
16mod effect_runner;
17pub use effect_runner::StartEffectError;
18use effect_runner::*;
19
20mod muxed_message;
21pub use muxed_message::*;
22
23#[derive(Debug, Clone, Copy)]
24pub struct MuxerConfig {
25    pub led_count: usize,
26}
27
28impl From<MuxerConfig> for EffectRunnerConfig {
29    fn from(MuxerConfig { led_count }: MuxerConfig) -> Self {
30        Self { led_count }
31    }
32}
33
34#[derive(Debug)]
35struct InputEntry {
36    input_id: usize,
37    message: InputMessage,
38    expires: Option<Instant>,
39    effect_key: Option<RunningEffectKey>,
40}
41
42type BoxedTimeoutCallback =
43    Box<dyn Fn() -> Pin<Box<dyn Future<Output = (usize, i32)> + Send + Sync>> + Send + Sync>;
44
45pub struct PriorityMuxer {
46    global: Global,
47    inputs: BTreeMap<i32, InputEntry>,
48    input_id: usize,
49    timeouts: HashMap<usize, BoxedTimeoutCallback>,
50    effect_runner: EffectRunner,
51}
52
53pub const MAX_PRIORITY: i32 = 256;
54const MUXER_ID: usize = 0;
55
56impl PriorityMuxer {
57    pub async fn new(global: Global, config: MuxerConfig) -> Self {
58        let mut this = Self {
59            global: global.clone(),
60            inputs: Default::default(),
61            timeouts: Default::default(),
62            input_id: 0,
63            effect_runner: EffectRunner::new(global, config.into()),
64        };
65
66        // Start by clearing all outputs
67        this.clear_all().await;
68
69        this
70    }
71
72    fn current_priority(&self) -> i32 {
73        *self.inputs.keys().next().unwrap()
74    }
75
76    fn notify_output_change(&mut self) -> Option<MuxedMessage> {
77        let target = self.inputs.values().next()?;
78        Some(MuxedMessage::new(
79            target.message.data().clone().try_into().ok()?,
80        ))
81    }
82
83    fn insert_input(
84        &mut self,
85        priority: i32,
86        input: InputMessage,
87        effect_key: Option<RunningEffectKey>,
88    ) -> Option<InputEntry> {
89        // Get the duration of this input
90        let expires = input
91            .data()
92            .duration()
93            .map(|duration| Instant::now() + duration.to_std().unwrap());
94
95        // Insert the input, replacing the old one
96        let before = self.inputs.insert(
97            priority,
98            InputEntry {
99                input_id: self.input_id,
100                message: input,
101                expires,
102                effect_key,
103            },
104        );
105
106        // Drop the future for the previous input
107        if let Some(InputEntry { input_id, .. }) = before {
108            self.timeouts.remove(&input_id);
109        }
110
111        // Add the future for the current input
112        if let Some(expires) = expires {
113            let id = self.input_id;
114
115            self.timeouts.insert(
116                self.input_id,
117                Box::new(move || {
118                    Box::pin(async move {
119                        tokio::time::sleep_until(expires.into()).await;
120                        (id, priority)
121                    })
122                }),
123            );
124        }
125
126        // Increment id
127        self.input_id += 1;
128
129        before
130    }
131
132    fn clear_inputs(&mut self) {
133        self.inputs.clear();
134        self.timeouts.clear();
135    }
136
137    fn clear_input(&mut self, priority: i32) -> bool {
138        if let Some(InputEntry { input_id, .. }) = self.inputs.remove(&priority) {
139            self.timeouts.remove(&input_id);
140            true
141        } else {
142            false
143        }
144    }
145
146    async fn clear_all(&mut self) -> Option<MuxedMessage> {
147        self.clear_inputs();
148        debug!("cleared all inputs");
149
150        // Clear all running effects
151        self.effect_runner.clear_all().await;
152
153        self.insert_input(
154            MAX_PRIORITY,
155            InputMessage::new(
156                MUXER_ID,
157                ComponentName::All,
158                InputMessageData::SolidColor {
159                    priority: MAX_PRIORITY,
160                    duration: None,
161                    color: Color::from_components((0, 0, 0)),
162                },
163            ),
164            None,
165        );
166
167        debug!(priority = %self.current_priority(), "current priority changed");
168        self.notify_output_change()
169    }
170
171    async fn clear(&mut self, priority: i32) -> Option<MuxedMessage> {
172        assert!(priority < MAX_PRIORITY);
173        // We should notify if we're clearing the current priority
174        let mut notify = self.current_priority() == priority;
175
176        // Clear running effect on that priority, this notifies if an effect is running in the
177        // clearing priority
178        notify = self.effect_runner.clear(priority).await || notify;
179
180        notify = self.clear_input(priority) && notify;
181        debug!(priority = %priority, "cleared priority");
182
183        if notify {
184            debug!(priority = %self.current_priority(), "current priority changed");
185            self.notify_output_change()
186        } else {
187            None
188        }
189    }
190
191    async fn handle_input(&mut self, input: InputMessage) -> Option<MuxedMessage> {
192        let priority = input.data().priority().unwrap();
193        let is_new = priority < self.current_priority();
194        let notify = priority <= self.current_priority();
195
196        let before = self.insert_input(priority, input.clone(), None);
197        trace!(
198            priority = %priority,
199            after = ?input,
200            before = ?before,
201            "new command for priority level",
202        );
203
204        if let Some(key) = before.and_then(|entry| entry.effect_key) {
205            self.effect_runner.abort(key).await;
206        }
207
208        if is_new {
209            debug!(priority = %priority, "current priority changed");
210        }
211
212        if notify {
213            self.notify_output_change()
214        } else {
215            None
216        }
217    }
218
219    async fn handle_timeout(&mut self, (id, priority): (usize, i32)) -> Option<MuxedMessage> {
220        let current_priority = self.current_priority();
221
222        // Check if the input for the target priority is still the one mentioned in the future
223        if let Some(input) = self.inputs.get(&priority) {
224            if input.input_id == id {
225                if let Some(removed) = self.inputs.remove(&priority) {
226                    debug!(input = ?removed, "input timeout");
227                }
228            } else {
229                warn!(id = %id, "unexpected timeout for input");
230            }
231        }
232
233        // Remove the future
234        self.timeouts.remove(&id);
235
236        // If the timeout priority is <=, then it was the current input
237        if current_priority >= priority {
238            debug!(priority = %current_priority, "current priority changed");
239            self.notify_output_change()
240        } else {
241            None
242        }
243    }
244
245    pub async fn handle_message(&mut self, input: InputMessage) -> Option<MuxedMessage> {
246        trace!(input = ?input, "got input");
247
248        // Check if this will change the output
249        match input.data() {
250            InputMessageData::ClearAll => self.clear_all().await,
251            InputMessageData::Clear { priority } => self.clear(*priority).await,
252            InputMessageData::Effect {
253                priority,
254                duration,
255                effect,
256                response,
257            } => {
258                let result = self.effect_runner.start(*priority, *duration, effect).await;
259                let response = response.clone();
260
261                if let Ok(ref key) = result {
262                    // Register this input to keep track of it
263                    self.insert_input(*priority, input, Some(*key));
264                }
265
266                if let Some(tx) = (*response.lock().await).take() {
267                    // We ignore send errors, this means the caller doesn't care for the response
268                    tx.send(result.map(|_| ())).ok();
269                } else {
270                    // TODO: Remove this when effect requests are properly forwarded to only one
271                    // instance
272                    warn!("effect request already answered");
273                }
274
275                // No MuxedMessage results from this, the effect will publish updates later
276                None
277            }
278            _ => self.handle_input(input).await,
279        }
280    }
281
282    pub async fn current_priorities(&self) -> Vec<PriorityInfo> {
283        self.global
284            .read_input_sources(|sources| {
285                // Inputs are sorted by priority, so i == 0 denotes the
286                // current (active) entry
287                self.inputs
288                    .values()
289                    .enumerate()
290                    .map(|(i, entry)| {
291                        PriorityInfo::new(
292                            &entry.message,
293                            sources
294                                .get(&entry.message.source_id())
295                                .map(|source| source.name().to_string())
296                                .unwrap_or_default(),
297                            entry.expires,
298                            i == 0,
299                        )
300                    })
301                    .collect()
302            })
303            .await
304    }
305
306    async fn handle_effect_message(
307        &mut self,
308        msg: Option<EffectRunnerUpdate>,
309    ) -> Option<MuxedMessage> {
310        match msg {
311            Some(msg) => {
312                match msg {
313                    EffectRunnerUpdate::Message(msg) => {
314                        (msg.priority() <= self.current_priority()).then_some(msg)
315                    }
316                    EffectRunnerUpdate::Completed { key, priority } => {
317                        let notify = self.current_priority() == priority;
318
319                        // Remove corresponding input entry
320                        let entry = self.inputs.entry(priority);
321                        match entry {
322                            std::collections::btree_map::Entry::Vacant(_) => {
323                                // Effect was already removed by a clear call or similar
324                            }
325                            std::collections::btree_map::Entry::Occupied(entry) => {
326                                // Remove the input entry if it's the one that triggered the effect
327                                if entry.get().effect_key == Some(key) {
328                                    entry.remove();
329                                }
330                            }
331                        }
332
333                        // Notify of the priority change, if any
334                        if notify {
335                            self.notify_output_change()
336                        } else {
337                            None
338                        }
339                    }
340                }
341            }
342            None => {
343                // No message
344                None
345            }
346        }
347    }
348
349    pub async fn update(&mut self) -> Option<MuxedMessage> {
350        // Check for input timeouts
351        if !self.timeouts.is_empty() {
352            select! {
353                id = futures::future::select_all(self.timeouts.values().map(|f| f())) => {
354                    self.handle_timeout(id.0).await
355                },
356                msg = self.effect_runner.update() => {
357                    self.handle_effect_message(msg).await
358                }
359            }
360        } else {
361            let msg = self.effect_runner.update().await;
362            self.handle_effect_message(msg).await
363        }
364    }
365}