1use std::sync::Arc;
2
3use thiserror::Error;
4use tokio::{
5 sync::mpsc::{channel, Sender},
6 task::JoinHandle,
7};
8
9use crate::{global::InputSourceError, image::RawImage, models::Color};
10
11mod definition;
12pub use definition::*;
13
14mod providers;
15pub use providers::Providers;
16
17mod instance;
18use instance::*;
19
20use self::providers::{Provider, ProviderError};
21
22pub struct EffectRunHandle {
23 ctx: Sender<ControlMessage>,
24 join_handle: Option<JoinHandle<()>>,
25
26 pub priority: i32,
27}
28
29impl EffectRunHandle {
30 pub async fn abort(&mut self) {
31 self.ctx
32 .send(ControlMessage::Abort)
33 .await
34 .expect("failed to send message");
35 }
36
37 pub async fn finish(&mut self) {
38 if let Some(jh) = self.join_handle.take() {
39 jh.await.expect("failed to join task");
40 }
41 }
42}
43
44impl Drop for EffectRunHandle {
45 fn drop(&mut self) {
46 if self.join_handle.is_some() {
47 let ctx = self.ctx.clone();
48 tokio::task::spawn(async move {
49 ctx.send(ControlMessage::Abort).await.ok();
51 });
52 }
53 }
54}
55
56#[derive(Debug, Error)]
57pub enum RunEffectError {
58 #[error(transparent)]
59 InputSource(#[from] InputSourceError),
60 #[error(transparent)]
61 EffectDefinition(#[from] EffectDefinitionError),
62}
63
64#[derive(Debug)]
65pub struct EffectMessage<X> {
66 pub kind: EffectMessageKind,
67 pub extra: X,
68}
69
70#[derive(Debug)]
71pub enum EffectMessageKind {
72 SetColor { color: Color },
73 SetImage { image: Arc<RawImage> },
74 SetLedColors { colors: Arc<Vec<Color>> },
75 Completed { result: Result<(), ProviderError> },
76}
77
78#[derive(Default, Debug, Clone)]
79pub struct EffectRegistry {
80 effects: Vec<EffectHandle>,
81}
82
83impl EffectRegistry {
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub fn iter(&self) -> impl Iterator<Item = &EffectDefinition> {
89 self.effects.iter().map(|handle| &handle.definition)
90 }
91
92 pub fn find_effect(&self, name: &str) -> Option<&EffectHandle> {
93 self.effects.iter().find(|e| e.definition.name == name)
94 }
95
96 pub fn is_empty(&self) -> bool {
97 self.effects.is_empty()
98 }
99
100 pub fn len(&self) -> usize {
101 self.effects.len()
102 }
103
104 pub fn add_definitions(
115 &mut self,
116 providers: &Providers,
117 definitions: Vec<EffectDefinition>,
118 ) -> Vec<EffectDefinition> {
119 let mut remaining = vec![];
120
121 for definition in definitions {
122 if let Some(provider) = providers.get(&definition.script) {
123 debug!(provider=?provider, effect=%definition.name, "assigned provider to effect");
124
125 self.effects.push(EffectHandle {
126 definition,
127 provider,
128 });
129 } else {
130 debug!(effect=%definition.name, "no provider for effect");
131
132 remaining.push(definition);
133 }
134 }
135
136 remaining
137 }
138}
139
140#[derive(Debug, Clone)]
141pub struct EffectHandle {
142 pub definition: EffectDefinition,
143 provider: Arc<dyn Provider>,
144}
145
146impl EffectHandle {
147 pub fn run<X: std::fmt::Debug + Clone + Send + 'static>(
148 &self,
149 args: serde_json::Value,
150 led_count: usize,
151 duration: Option<chrono::Duration>,
152 priority: i32,
153 tx: Sender<EffectMessage<X>>,
154 extra: X,
155 ) -> Result<EffectRunHandle, RunEffectError> {
156 let full_path = self.definition.script_path()?;
158
159 let provider = self.provider.clone();
161
162 let (ctx, crx) = channel(1);
164
165 let (etx, mut erx) = channel(1);
167
168 let methods = Arc::new(InstanceMethods::new(
170 etx,
171 crx,
172 led_count,
173 duration.and_then(|d| d.to_std().ok()),
174 ));
175
176 let join_handle = tokio::task::spawn(async move {
178 let methods = methods.clone();
179
180 let mut run_effect =
182 tokio::task::spawn_blocking(move || provider.run(&full_path, args, methods));
183
184 let result = loop {
186 tokio::select! {
187 kind = erx.recv() => {
188 if let Some(kind) = kind {
189 let msg = EffectMessage { kind, extra: extra.clone() };
191
192 if let Err(err) = tx.send(msg).await {
193 error!(err=%err, "failed to forward effect message");
196 return;
197 }
198 }
199 }
200 result = &mut run_effect => {
201 break result.expect("failed to await blocking task");
203 }
204 }
205 };
206
207 tx.send(EffectMessage {
209 kind: EffectMessageKind::Completed { result },
210 extra,
211 })
212 .await
213 .ok();
214 });
215
216 Ok(EffectRunHandle {
217 ctx,
218 join_handle: join_handle.into(),
219 priority,
220 })
221 }
222}