hyperion/
effects.rs

1use std::sync::Arc;
2
3use thiserror::Error;
4use tokio::{
5    sync::mpsc::{channel, Sender},
6    task::JoinHandle,
7};
8
9use crate::{global::InputSourceError, image::RawImage, models::Color};
10
11mod definition;
12pub use definition::*;
13
14mod providers;
15pub use providers::Providers;
16
17mod instance;
18use instance::*;
19
20use self::providers::{Provider, ProviderError};
21
22pub struct EffectRunHandle {
23    ctx: Sender<ControlMessage>,
24    join_handle: Option<JoinHandle<()>>,
25
26    pub priority: i32,
27}
28
29impl EffectRunHandle {
30    pub async fn abort(&mut self) {
31        self.ctx
32            .send(ControlMessage::Abort)
33            .await
34            .expect("failed to send message");
35    }
36
37    pub async fn finish(&mut self) {
38        if let Some(jh) = self.join_handle.take() {
39            jh.await.expect("failed to join task");
40        }
41    }
42}
43
44impl Drop for EffectRunHandle {
45    fn drop(&mut self) {
46        if self.join_handle.is_some() {
47            let ctx = self.ctx.clone();
48            tokio::task::spawn(async move {
49                // This handle has been discarded, try to abort the running script as best effort
50                ctx.send(ControlMessage::Abort).await.ok();
51            });
52        }
53    }
54}
55
56#[derive(Debug, Error)]
57pub enum RunEffectError {
58    #[error(transparent)]
59    InputSource(#[from] InputSourceError),
60    #[error(transparent)]
61    EffectDefinition(#[from] EffectDefinitionError),
62}
63
64#[derive(Debug)]
65pub struct EffectMessage<X> {
66    pub kind: EffectMessageKind,
67    pub extra: X,
68}
69
70#[derive(Debug)]
71pub enum EffectMessageKind {
72    SetColor { color: Color },
73    SetImage { image: Arc<RawImage> },
74    SetLedColors { colors: Arc<Vec<Color>> },
75    Completed { result: Result<(), ProviderError> },
76}
77
78#[derive(Default, Debug, Clone)]
79pub struct EffectRegistry {
80    effects: Vec<EffectHandle>,
81}
82
83impl EffectRegistry {
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    pub fn iter(&self) -> impl Iterator<Item = &EffectDefinition> {
89        self.effects.iter().map(|handle| &handle.definition)
90    }
91
92    pub fn find_effect(&self, name: &str) -> Option<&EffectHandle> {
93        self.effects.iter().find(|e| e.definition.name == name)
94    }
95
96    pub fn is_empty(&self) -> bool {
97        self.effects.is_empty()
98    }
99
100    pub fn len(&self) -> usize {
101        self.effects.len()
102    }
103
104    /// Add definitions to this registry
105    ///
106    /// # Parameters
107    ///
108    /// * `providers`: effect providers
109    /// * `definitions`: effect definitions to register
110    ///
111    /// # Returns
112    ///
113    /// Effect definitions that are not supported by any provider.
114    pub fn add_definitions(
115        &mut self,
116        providers: &Providers,
117        definitions: Vec<EffectDefinition>,
118    ) -> Vec<EffectDefinition> {
119        let mut remaining = vec![];
120
121        for definition in definitions {
122            if let Some(provider) = providers.get(&definition.script) {
123                debug!(provider=?provider, effect=%definition.name, "assigned provider to effect");
124
125                self.effects.push(EffectHandle {
126                    definition,
127                    provider,
128                });
129            } else {
130                debug!(effect=%definition.name, "no provider for effect");
131
132                remaining.push(definition);
133            }
134        }
135
136        remaining
137    }
138}
139
140#[derive(Debug, Clone)]
141pub struct EffectHandle {
142    pub definition: EffectDefinition,
143    provider: Arc<dyn Provider>,
144}
145
146impl EffectHandle {
147    pub fn run<X: std::fmt::Debug + Clone + Send + 'static>(
148        &self,
149        args: serde_json::Value,
150        led_count: usize,
151        duration: Option<chrono::Duration>,
152        priority: i32,
153        tx: Sender<EffectMessage<X>>,
154        extra: X,
155    ) -> Result<EffectRunHandle, RunEffectError> {
156        // Resolve path
157        let full_path = self.definition.script_path()?;
158
159        // Clone provider arc
160        let provider = self.provider.clone();
161
162        // Create control channel
163        let (ctx, crx) = channel(1);
164
165        // Create channel to wrap data
166        let (etx, mut erx) = channel(1);
167
168        // Create instance methods
169        let methods = Arc::new(InstanceMethods::new(
170            etx,
171            crx,
172            led_count,
173            duration.and_then(|d| d.to_std().ok()),
174        ));
175
176        // Run effect
177        let join_handle = tokio::task::spawn(async move {
178            let methods = methods.clone();
179
180            // Create the blocking task
181            let mut run_effect =
182                tokio::task::spawn_blocking(move || provider.run(&full_path, args, methods));
183
184            // Join the blocking task while forwarding the effect messages
185            let result = loop {
186                tokio::select! {
187                    kind = erx.recv() => {
188                        if let Some(kind) = kind {
189                            // Add the extra marker to the message and forward it to the instance
190                            let msg = EffectMessage { kind, extra: extra.clone() };
191
192                            if let Err(err) = tx.send(msg).await {
193                                // This would happen if the effect is running and the instance has
194                                // already shutdown.
195                                error!(err=%err, "failed to forward effect message");
196                                return;
197                            }
198                        }
199                    }
200                    result = &mut run_effect => {
201                        // Unwrap blocking result
202                        break result.expect("failed to await blocking task");
203                    }
204                }
205            };
206
207            // Send the completion, ignoring failures in case we're shutting down
208            tx.send(EffectMessage {
209                kind: EffectMessageKind::Completed { result },
210                extra,
211            })
212            .await
213            .ok();
214        });
215
216        Ok(EffectRunHandle {
217            ctx,
218            join_handle: join_handle.into(),
219            priority,
220        })
221    }
222}