hyperion/instance/muxer/
effect_runner.rs

1use slotmap::SlotMap;
2use thiserror::Error;
3use tokio::sync::mpsc;
4
5use crate::{
6    api::json::message::EffectRequest,
7    effects::{self, EffectDefinitionError, EffectRunHandle, RunEffectError},
8    global::Global,
9    instance::muxer::MuxedMessageData,
10};
11
12use super::MuxedMessage;
13
14#[derive(Debug, Error)]
15pub enum StartEffectError {
16    #[error(transparent)]
17    Definition(#[from] EffectDefinitionError),
18    #[error(transparent)]
19    Run(#[from] RunEffectError),
20    #[error("effect '{name}' not found")]
21    NotFound { name: String },
22}
23
24slotmap::new_key_type! { pub struct RunningEffectKey; }
25
26pub type EffectMessage = effects::EffectMessage<RunningEffectKey>;
27
28#[derive(Debug, Clone)]
29pub enum EffectRunnerUpdate {
30    Message(MuxedMessage),
31    Completed {
32        key: RunningEffectKey,
33        priority: i32,
34    },
35}
36
37#[derive(Debug, Clone, Copy)]
38pub struct EffectRunnerConfig {
39    pub led_count: usize,
40}
41
42pub struct EffectRunner {
43    global: Global,
44    effect_tx: mpsc::Sender<EffectMessage>,
45    effect_rx: mpsc::Receiver<EffectMessage>,
46    running_effects: SlotMap<RunningEffectKey, Option<EffectRunHandle>>,
47    config: EffectRunnerConfig,
48}
49
50impl EffectRunner {
51    pub fn new(global: Global, config: EffectRunnerConfig) -> Self {
52        let (effect_tx, effect_rx) = mpsc::channel(4);
53
54        Self {
55            global,
56            effect_tx,
57            effect_rx,
58            running_effects: Default::default(),
59            config,
60        }
61    }
62
63    pub async fn abort(&mut self, key: RunningEffectKey) {
64        if let Some(Some(handle)) = self.running_effects.get_mut(key) {
65            handle.abort().await;
66        }
67    }
68
69    pub async fn clear_all(&mut self) -> bool {
70        let mut cleared_effects = false;
71
72        for effect in self.running_effects.values_mut() {
73            if let Some(handle) = effect.as_mut() {
74                cleared_effects = true;
75                handle.abort().await;
76            }
77        }
78
79        if cleared_effects {
80            debug!("cleared all running effects");
81        }
82
83        cleared_effects
84    }
85
86    pub async fn clear(&mut self, priority: i32) -> bool {
87        let mut cleared_effects = false;
88
89        for effect in self.running_effects.values_mut() {
90            if let Some(handle) = effect.as_mut() {
91                if handle.priority == priority {
92                    cleared_effects = true;
93                    handle.abort().await;
94                }
95            }
96        }
97
98        if cleared_effects {
99            debug!(priority, "cleared running effects");
100        }
101
102        cleared_effects
103    }
104
105    pub async fn start(
106        &mut self,
107        priority: i32,
108        duration: Option<chrono::Duration>,
109        effect: &EffectRequest,
110    ) -> Result<RunningEffectKey, StartEffectError> {
111        // TODO: Read per-instance effects
112        self.global
113            .clone()
114            .read_effects(|effects| {
115                // Find the effect definition
116                let result = if let Some(handle) = effects.find_effect(&effect.name) {
117                    let key = self.running_effects.insert(None);
118
119                    match handle.run(
120                        effect.args.clone().into(),
121                        self.config.led_count,
122                        duration,
123                        priority,
124                        self.effect_tx.clone(),
125                        key,
126                    ) {
127                        Ok(handle) => {
128                            *self.running_effects.get_mut(key).unwrap() = Some(handle);
129                            info!(name = %effect.name, "started effect");
130                            Ok(key)
131                        }
132                        Err(err) => {
133                            self.running_effects.remove(key);
134                            warn!(name = %effect.name, error = %err, "could not start effect");
135                            Err(err.into())
136                        }
137                    }
138                } else {
139                    warn!(name = %effect.name, "effect not found");
140                    Err(StartEffectError::NotFound {
141                        name: effect.name.clone(),
142                    })
143                };
144
145                async move {
146                    if let Ok(key) = result {
147                        // Clear existing effects with the same priority as the newly-started one
148                        for (existing_key, handle) in self.running_effects.iter_mut() {
149                            if existing_key == key {
150                                continue;
151                            }
152                            if let Some(handle) = handle {
153                                if priority == handle.priority {
154                                    handle.abort().await;
155                                }
156                            }
157                        }
158                    }
159
160                    result
161                }
162            })
163            .await
164            .await
165    }
166
167    pub async fn update(&mut self) -> Option<EffectRunnerUpdate> {
168        let msg = self.effect_rx.recv().await?;
169
170        // Log received message
171        trace!(message = ?msg, "got effect message");
172
173        let key = msg.extra;
174        let running_effect = || {
175            // expect: we only clear slots when an effect completes, so this one can't be None
176            // expect: Self::update can only run when start has completed, thus the handle slot
177            // can't be None either
178            self.running_effects
179                .get(key)
180                .expect("invalid effect handle")
181                .as_ref()
182                .expect("handle shouldn't be null")
183        };
184
185        // Turn this into a MuxedMessage
186        match msg.kind {
187            effects::EffectMessageKind::SetColor { color } => Some(EffectRunnerUpdate::Message(
188                MuxedMessage::new(MuxedMessageData::SolidColor {
189                    priority: running_effect().priority,
190                    duration: None,
191                    color,
192                }),
193            )),
194
195            effects::EffectMessageKind::SetImage { image } => Some(EffectRunnerUpdate::Message(
196                MuxedMessage::new(MuxedMessageData::Image {
197                    priority: running_effect().priority,
198                    duration: None,
199                    image: image.clone(),
200                }),
201            )),
202
203            effects::EffectMessageKind::SetLedColors { colors } => Some(
204                EffectRunnerUpdate::Message(MuxedMessage::new(MuxedMessageData::LedColors {
205                    priority: running_effect().priority,
206                    duration: None,
207                    led_colors: colors.clone(),
208                })),
209            ),
210
211            effects::EffectMessageKind::Completed { result } => {
212                // The effect has completed, remove it from the running_effects list
213                let priority = if let Some(mut effect) = self.running_effects.remove(key).flatten()
214                {
215                    effect.finish().await;
216                    effect.priority
217                } else {
218                    panic!("unexpected null handle for completed effect");
219                };
220
221                // Log result
222                match result {
223                    Ok(_) => {
224                        info!("effect completed");
225                    }
226                    Err(err) => {
227                        error!(error = %err, "effect completed with errors");
228                    }
229                }
230
231                Some(EffectRunnerUpdate::Completed { key, priority })
232            }
233        }
234    }
235}