hyperion/instance/muxer/
effect_runner.rs1use slotmap::SlotMap;
2use thiserror::Error;
3use tokio::sync::mpsc;
4
5use crate::{
6 api::json::message::EffectRequest,
7 effects::{self, EffectDefinitionError, EffectRunHandle, RunEffectError},
8 global::Global,
9 instance::muxer::MuxedMessageData,
10};
11
12use super::MuxedMessage;
13
14#[derive(Debug, Error)]
15pub enum StartEffectError {
16 #[error(transparent)]
17 Definition(#[from] EffectDefinitionError),
18 #[error(transparent)]
19 Run(#[from] RunEffectError),
20 #[error("effect '{name}' not found")]
21 NotFound { name: String },
22}
23
24slotmap::new_key_type! { pub struct RunningEffectKey; }
25
26pub type EffectMessage = effects::EffectMessage<RunningEffectKey>;
27
28#[derive(Debug, Clone)]
29pub enum EffectRunnerUpdate {
30 Message(MuxedMessage),
31 Completed {
32 key: RunningEffectKey,
33 priority: i32,
34 },
35}
36
37#[derive(Debug, Clone, Copy)]
38pub struct EffectRunnerConfig {
39 pub led_count: usize,
40}
41
42pub struct EffectRunner {
43 global: Global,
44 effect_tx: mpsc::Sender<EffectMessage>,
45 effect_rx: mpsc::Receiver<EffectMessage>,
46 running_effects: SlotMap<RunningEffectKey, Option<EffectRunHandle>>,
47 config: EffectRunnerConfig,
48}
49
50impl EffectRunner {
51 pub fn new(global: Global, config: EffectRunnerConfig) -> Self {
52 let (effect_tx, effect_rx) = mpsc::channel(4);
53
54 Self {
55 global,
56 effect_tx,
57 effect_rx,
58 running_effects: Default::default(),
59 config,
60 }
61 }
62
63 pub async fn abort(&mut self, key: RunningEffectKey) {
64 if let Some(Some(handle)) = self.running_effects.get_mut(key) {
65 handle.abort().await;
66 }
67 }
68
69 pub async fn clear_all(&mut self) -> bool {
70 let mut cleared_effects = false;
71
72 for effect in self.running_effects.values_mut() {
73 if let Some(handle) = effect.as_mut() {
74 cleared_effects = true;
75 handle.abort().await;
76 }
77 }
78
79 if cleared_effects {
80 debug!("cleared all running effects");
81 }
82
83 cleared_effects
84 }
85
86 pub async fn clear(&mut self, priority: i32) -> bool {
87 let mut cleared_effects = false;
88
89 for effect in self.running_effects.values_mut() {
90 if let Some(handle) = effect.as_mut() {
91 if handle.priority == priority {
92 cleared_effects = true;
93 handle.abort().await;
94 }
95 }
96 }
97
98 if cleared_effects {
99 debug!(priority, "cleared running effects");
100 }
101
102 cleared_effects
103 }
104
105 pub async fn start(
106 &mut self,
107 priority: i32,
108 duration: Option<chrono::Duration>,
109 effect: &EffectRequest,
110 ) -> Result<RunningEffectKey, StartEffectError> {
111 self.global
113 .clone()
114 .read_effects(|effects| {
115 let result = if let Some(handle) = effects.find_effect(&effect.name) {
117 let key = self.running_effects.insert(None);
118
119 match handle.run(
120 effect.args.clone().into(),
121 self.config.led_count,
122 duration,
123 priority,
124 self.effect_tx.clone(),
125 key,
126 ) {
127 Ok(handle) => {
128 *self.running_effects.get_mut(key).unwrap() = Some(handle);
129 info!(name = %effect.name, "started effect");
130 Ok(key)
131 }
132 Err(err) => {
133 self.running_effects.remove(key);
134 warn!(name = %effect.name, error = %err, "could not start effect");
135 Err(err.into())
136 }
137 }
138 } else {
139 warn!(name = %effect.name, "effect not found");
140 Err(StartEffectError::NotFound {
141 name: effect.name.clone(),
142 })
143 };
144
145 async move {
146 if let Ok(key) = result {
147 for (existing_key, handle) in self.running_effects.iter_mut() {
149 if existing_key == key {
150 continue;
151 }
152 if let Some(handle) = handle {
153 if priority == handle.priority {
154 handle.abort().await;
155 }
156 }
157 }
158 }
159
160 result
161 }
162 })
163 .await
164 .await
165 }
166
167 pub async fn update(&mut self) -> Option<EffectRunnerUpdate> {
168 let msg = self.effect_rx.recv().await?;
169
170 trace!(message = ?msg, "got effect message");
172
173 let key = msg.extra;
174 let running_effect = || {
175 self.running_effects
179 .get(key)
180 .expect("invalid effect handle")
181 .as_ref()
182 .expect("handle shouldn't be null")
183 };
184
185 match msg.kind {
187 effects::EffectMessageKind::SetColor { color } => Some(EffectRunnerUpdate::Message(
188 MuxedMessage::new(MuxedMessageData::SolidColor {
189 priority: running_effect().priority,
190 duration: None,
191 color,
192 }),
193 )),
194
195 effects::EffectMessageKind::SetImage { image } => Some(EffectRunnerUpdate::Message(
196 MuxedMessage::new(MuxedMessageData::Image {
197 priority: running_effect().priority,
198 duration: None,
199 image: image.clone(),
200 }),
201 )),
202
203 effects::EffectMessageKind::SetLedColors { colors } => Some(
204 EffectRunnerUpdate::Message(MuxedMessage::new(MuxedMessageData::LedColors {
205 priority: running_effect().priority,
206 duration: None,
207 led_colors: colors.clone(),
208 })),
209 ),
210
211 effects::EffectMessageKind::Completed { result } => {
212 let priority = if let Some(mut effect) = self.running_effects.remove(key).flatten()
214 {
215 effect.finish().await;
216 effect.priority
217 } else {
218 panic!("unexpected null handle for completed effect");
219 };
220
221 match result {
223 Ok(_) => {
224 info!("effect completed");
225 }
226 Err(err) => {
227 error!(error = %err, "effect completed with errors");
228 }
229 }
230
231 Some(EffectRunnerUpdate::Completed { key, priority })
232 }
233 }
234 }
235}