1use std::collections::{BTreeMap, HashMap};
2use std::convert::TryInto;
3use std::pin::Pin;
4use std::time::Instant;
5
6use futures::Future;
7use tokio::select;
8
9use crate::{
10 api::types::PriorityInfo,
11 component::ComponentName,
12 global::{Global, InputMessage, InputMessageData, Message},
13 models::Color,
14};
15
16mod effect_runner;
17pub use effect_runner::StartEffectError;
18use effect_runner::*;
19
20mod muxed_message;
21pub use muxed_message::*;
22
23#[derive(Debug, Clone, Copy)]
24pub struct MuxerConfig {
25 pub led_count: usize,
26}
27
28impl From<MuxerConfig> for EffectRunnerConfig {
29 fn from(MuxerConfig { led_count }: MuxerConfig) -> Self {
30 Self { led_count }
31 }
32}
33
34#[derive(Debug)]
35struct InputEntry {
36 input_id: usize,
37 message: InputMessage,
38 expires: Option<Instant>,
39 effect_key: Option<RunningEffectKey>,
40}
41
42type BoxedTimeoutCallback =
43 Box<dyn Fn() -> Pin<Box<dyn Future<Output = (usize, i32)> + Send + Sync>> + Send + Sync>;
44
45pub struct PriorityMuxer {
46 global: Global,
47 inputs: BTreeMap<i32, InputEntry>,
48 input_id: usize,
49 timeouts: HashMap<usize, BoxedTimeoutCallback>,
50 effect_runner: EffectRunner,
51}
52
53pub const MAX_PRIORITY: i32 = 256;
54const MUXER_ID: usize = 0;
55
56impl PriorityMuxer {
57 pub async fn new(global: Global, config: MuxerConfig) -> Self {
58 let mut this = Self {
59 global: global.clone(),
60 inputs: Default::default(),
61 timeouts: Default::default(),
62 input_id: 0,
63 effect_runner: EffectRunner::new(global, config.into()),
64 };
65
66 this.clear_all().await;
68
69 this
70 }
71
72 fn current_priority(&self) -> i32 {
73 *self.inputs.keys().next().unwrap()
74 }
75
76 fn notify_output_change(&mut self) -> Option<MuxedMessage> {
77 let target = self.inputs.values().next()?;
78 Some(MuxedMessage::new(
79 target.message.data().clone().try_into().ok()?,
80 ))
81 }
82
83 fn insert_input(
84 &mut self,
85 priority: i32,
86 input: InputMessage,
87 effect_key: Option<RunningEffectKey>,
88 ) -> Option<InputEntry> {
89 let expires = input
91 .data()
92 .duration()
93 .map(|duration| Instant::now() + duration.to_std().unwrap());
94
95 let before = self.inputs.insert(
97 priority,
98 InputEntry {
99 input_id: self.input_id,
100 message: input,
101 expires,
102 effect_key,
103 },
104 );
105
106 if let Some(InputEntry { input_id, .. }) = before {
108 self.timeouts.remove(&input_id);
109 }
110
111 if let Some(expires) = expires {
113 let id = self.input_id;
114
115 self.timeouts.insert(
116 self.input_id,
117 Box::new(move || {
118 Box::pin(async move {
119 tokio::time::sleep_until(expires.into()).await;
120 (id, priority)
121 })
122 }),
123 );
124 }
125
126 self.input_id += 1;
128
129 before
130 }
131
132 fn clear_inputs(&mut self) {
133 self.inputs.clear();
134 self.timeouts.clear();
135 }
136
137 fn clear_input(&mut self, priority: i32) -> bool {
138 if let Some(InputEntry { input_id, .. }) = self.inputs.remove(&priority) {
139 self.timeouts.remove(&input_id);
140 true
141 } else {
142 false
143 }
144 }
145
146 async fn clear_all(&mut self) -> Option<MuxedMessage> {
147 self.clear_inputs();
148 debug!("cleared all inputs");
149
150 self.effect_runner.clear_all().await;
152
153 self.insert_input(
154 MAX_PRIORITY,
155 InputMessage::new(
156 MUXER_ID,
157 ComponentName::All,
158 InputMessageData::SolidColor {
159 priority: MAX_PRIORITY,
160 duration: None,
161 color: Color::from_components((0, 0, 0)),
162 },
163 ),
164 None,
165 );
166
167 debug!(priority = %self.current_priority(), "current priority changed");
168 self.notify_output_change()
169 }
170
171 async fn clear(&mut self, priority: i32) -> Option<MuxedMessage> {
172 assert!(priority < MAX_PRIORITY);
173 let mut notify = self.current_priority() == priority;
175
176 notify = self.effect_runner.clear(priority).await || notify;
179
180 notify = self.clear_input(priority) && notify;
181 debug!(priority = %priority, "cleared priority");
182
183 if notify {
184 debug!(priority = %self.current_priority(), "current priority changed");
185 self.notify_output_change()
186 } else {
187 None
188 }
189 }
190
191 async fn handle_input(&mut self, input: InputMessage) -> Option<MuxedMessage> {
192 let priority = input.data().priority().unwrap();
193 let is_new = priority < self.current_priority();
194 let notify = priority <= self.current_priority();
195
196 let before = self.insert_input(priority, input.clone(), None);
197 trace!(
198 priority = %priority,
199 after = ?input,
200 before = ?before,
201 "new command for priority level",
202 );
203
204 if let Some(key) = before.and_then(|entry| entry.effect_key) {
205 self.effect_runner.abort(key).await;
206 }
207
208 if is_new {
209 debug!(priority = %priority, "current priority changed");
210 }
211
212 if notify {
213 self.notify_output_change()
214 } else {
215 None
216 }
217 }
218
219 async fn handle_timeout(&mut self, (id, priority): (usize, i32)) -> Option<MuxedMessage> {
220 let current_priority = self.current_priority();
221
222 if let Some(input) = self.inputs.get(&priority) {
224 if input.input_id == id {
225 if let Some(removed) = self.inputs.remove(&priority) {
226 debug!(input = ?removed, "input timeout");
227 }
228 } else {
229 warn!(id = %id, "unexpected timeout for input");
230 }
231 }
232
233 self.timeouts.remove(&id);
235
236 if current_priority >= priority {
238 debug!(priority = %current_priority, "current priority changed");
239 self.notify_output_change()
240 } else {
241 None
242 }
243 }
244
245 pub async fn handle_message(&mut self, input: InputMessage) -> Option<MuxedMessage> {
246 trace!(input = ?input, "got input");
247
248 match input.data() {
250 InputMessageData::ClearAll => self.clear_all().await,
251 InputMessageData::Clear { priority } => self.clear(*priority).await,
252 InputMessageData::Effect {
253 priority,
254 duration,
255 effect,
256 response,
257 } => {
258 let result = self.effect_runner.start(*priority, *duration, effect).await;
259 let response = response.clone();
260
261 if let Ok(ref key) = result {
262 self.insert_input(*priority, input, Some(*key));
264 }
265
266 if let Some(tx) = (*response.lock().await).take() {
267 tx.send(result.map(|_| ())).ok();
269 } else {
270 warn!("effect request already answered");
273 }
274
275 None
277 }
278 _ => self.handle_input(input).await,
279 }
280 }
281
282 pub async fn current_priorities(&self) -> Vec<PriorityInfo> {
283 self.global
284 .read_input_sources(|sources| {
285 self.inputs
288 .values()
289 .enumerate()
290 .map(|(i, entry)| {
291 PriorityInfo::new(
292 &entry.message,
293 sources
294 .get(&entry.message.source_id())
295 .map(|source| source.name().to_string())
296 .unwrap_or_default(),
297 entry.expires,
298 i == 0,
299 )
300 })
301 .collect()
302 })
303 .await
304 }
305
306 async fn handle_effect_message(
307 &mut self,
308 msg: Option<EffectRunnerUpdate>,
309 ) -> Option<MuxedMessage> {
310 match msg {
311 Some(msg) => {
312 match msg {
313 EffectRunnerUpdate::Message(msg) => {
314 (msg.priority() <= self.current_priority()).then_some(msg)
315 }
316 EffectRunnerUpdate::Completed { key, priority } => {
317 let notify = self.current_priority() == priority;
318
319 let entry = self.inputs.entry(priority);
321 match entry {
322 std::collections::btree_map::Entry::Vacant(_) => {
323 }
325 std::collections::btree_map::Entry::Occupied(entry) => {
326 if entry.get().effect_key == Some(key) {
328 entry.remove();
329 }
330 }
331 }
332
333 if notify {
335 self.notify_output_change()
336 } else {
337 None
338 }
339 }
340 }
341 }
342 None => {
343 None
345 }
346 }
347 }
348
349 pub async fn update(&mut self) -> Option<MuxedMessage> {
350 if !self.timeouts.is_empty() {
352 select! {
353 id = futures::future::select_all(self.timeouts.values().map(|f| f())) => {
354 self.handle_timeout(id.0).await
355 },
356 msg = self.effect_runner.update() => {
357 self.handle_effect_message(msg).await
358 }
359 }
360 } else {
361 let msg = self.effect_runner.update().await;
362 self.handle_effect_message(msg).await
363 }
364 }
365}