1use std::sync::Arc;
2
3use thiserror::Error;
4use tokio::{
5 select,
6 sync::{broadcast, mpsc, oneshot},
7};
8
9use crate::{
10 api::types::PriorityInfo,
11 global::{Event, Global, InputMessage, InstanceEventKind},
12 models::{Color, InstanceConfig},
13 servers::{self, ServerHandle},
14};
15
16mod black_border_detector;
17use black_border_detector::*;
18
19mod core;
20use self::core::*;
21
22mod device;
23use device::*;
24
25mod muxer;
26pub use muxer::StartEffectError;
27use muxer::*;
28
29mod smoothing;
30use smoothing::*;
31
32#[derive(Debug, Error)]
33pub enum InstanceError {
34 #[error("i/o error: {0}")]
35 Io(#[from] std::io::Error),
36 #[error("device error: {0}")]
37 Device(#[from] DeviceError),
38 #[error("recv error: {0}")]
39 Recv(#[from] broadcast::error::RecvError),
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43enum ActiveState {
44 #[default]
45 Inactive,
46 Active,
47 Deactivating,
48}
49
50pub struct Instance {
51 config: Arc<InstanceConfig>,
52 device: InstanceDevice,
53 handle_rx: mpsc::Receiver<InstanceMessage>,
54 receiver: broadcast::Receiver<InputMessage>,
55 local_receiver: mpsc::Receiver<InputMessage>,
56 event_tx: broadcast::Sender<Event>,
57 muxer: PriorityMuxer,
58 core: Core,
59 _boblight_server: Option<Result<ServerHandle, std::io::Error>>,
60 active_state: ActiveState,
61}
62
63impl Instance {
64 pub async fn new(global: Global, config: InstanceConfig) -> (Self, InstanceHandle) {
65 let device: InstanceDevice =
66 Device::new(&config.instance.friendly_name, config.device.clone())
67 .await
68 .into();
69
70 let led_count = config.leds.leds.len();
71
72 if let Err(error) = &device.inner {
73 error!(
74 instance = %config.instance.id,
75 name = %config.instance.friendly_name,
76 error = %error,
77 "initializing instance failed"
78 );
79 }
80
81 let receiver = global.subscribe_input().await;
82 let (local_tx, local_receiver) = mpsc::channel(4);
83
84 let muxer = PriorityMuxer::new(global.clone(), MuxerConfig { led_count }).await;
85 let core = Core::new(&config).await;
86
87 let (tx, handle_rx) = mpsc::channel(1);
88 let id = config.instance.id;
89 let handle = InstanceHandle { id, tx, local_tx };
90
91 let config = Arc::new(config);
92 let _boblight_server = if config.boblight_server.enable {
93 let server_handle = servers::bind(
94 "Boblight",
95 config.boblight_server.clone(),
96 global.clone(),
97 {
98 let handle = handle.clone();
99
100 move |tcp, global| {
101 servers::boblight::handle_client(tcp, led_count, handle.clone(), global)
102 }
103 },
104 )
105 .await;
106
107 if let Err(error) = &server_handle {
108 error!(
109 instance = %config.instance.id,
110 name = %config.instance.friendly_name,
111 error = %error,
112 "cannot start Boblight server"
113 );
114 }
115
116 Some(server_handle)
117 } else {
118 None
119 };
120
121 let event_tx = global.get_event_tx().await;
122
123 (
124 Self {
125 config,
126 device,
127 handle_rx,
128 receiver,
129 local_receiver,
130 event_tx,
131 muxer,
132 core,
133 _boblight_server,
134 active_state: ActiveState::default(),
135 },
136 handle,
137 )
138 }
139
140 async fn on_input_message(&mut self, message: InputMessage) {
141 if let Some(message) = self.muxer.handle_message(message).await {
142 self.on_muxed_message(message);
144 }
145 }
146
147 fn on_muxed_message(&mut self, message: MuxedMessage) {
148 if self.active_state == ActiveState::Active {
149 if message.priority() == muxer::MAX_PRIORITY
150 && message.color() == Some(Color::new(0, 0, 0))
151 {
152 self.active_state = ActiveState::Deactivating;
153 }
154 } else if (message.priority() != muxer::MAX_PRIORITY
155 || message.color() != Some(Color::new(0, 0, 0)))
156 && std::mem::replace(&mut self.active_state, ActiveState::Active)
157 == ActiveState::Inactive
158 {
159 self.event_tx
160 .send(Event::instance(self.id(), InstanceEventKind::Activate))
161 .unwrap();
162 }
163
164 self.core.handle_message(message);
165 }
166
167 pub fn id(&self) -> i32 {
168 self.config.instance.id
169 }
170
171 async fn handle_instance_message(&mut self, message: InstanceMessage) -> InstanceControl {
172 match message {
175 InstanceMessage::PriorityInfo(tx) => {
176 tx.send(self.muxer.current_priorities().await).ok();
177 }
178 InstanceMessage::Config(tx) => {
179 tx.send(self.config.clone()).ok();
180 }
181 InstanceMessage::Stop(tx) => {
182 tx.send(()).ok();
183 return InstanceControl::Break;
184 }
185 }
186
187 InstanceControl::Continue
188 }
189
190 #[instrument]
191 pub async fn run(mut self) -> Result<(), InstanceError> {
192 loop {
193 select! {
194 update = self.device.update() => {
195 trace!("device update");
196
197 if let Err(error) = update {
198 error!(error = %error, "device update failed, disabling device");
200 self.device.inner = Err(error);
201 }
202 },
203 message = self.receiver.recv() => {
204 trace!(message = ?message, "global msg");
205
206 match message {
207 Ok(message) => {
208 self.on_input_message(message).await;
209 },
210 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
211 break Ok(());
213 },
214 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
215 warn!(skipped = %skipped, "skipped input messages");
216 },
217 }
218 },
219 message = self.local_receiver.recv() => {
220 trace!(message = ?message, "local msg");
221
222 if let Some(message) = message {
223 self.on_input_message(message).await;
224 } else {
225 break Ok(());
226 }
227 },
228 message = self.muxer.update() => {
229 trace!(message = ?message, "muxer msg");
230
231 if let Some(message) = message {
233 self.on_muxed_message(message);
234 }
235 },
236 (led_data, update) = self.core.update() => {
237 trace!("core update");
238
239 self.device.set_led_data(led_data).await?;
241
242 if update == SmoothingUpdate::Settled &&
243 self.active_state == ActiveState::Deactivating {
244 self.active_state = ActiveState::Inactive;
245 self.event_tx
246 .send(Event::instance(self.id(), InstanceEventKind::Deactivate))
247 .unwrap();
248 }
249 },
250 message = self.handle_rx.recv() => {
251 trace!(message = ?message, "handle_rx msg");
252
253 if let Some(message) = message {
254 if InstanceControl::Break == self.handle_instance_message(message).await {
255 break Ok(());
256 }
257 } else {
258 break Ok(());
260 }
261 }
262 }
263 }
264 }
265}
266
267impl std::fmt::Debug for Instance {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("Instance").field("id", &self.id()).finish()
270 }
271}
272
273struct InstanceDevice {
275 inner: Result<Device, DeviceError>,
276}
277
278impl InstanceDevice {
279 async fn update(&mut self) -> Result<(), DeviceError> {
280 if let Ok(device) = &mut self.inner {
281 device.update().await
282 } else {
283 futures::future::pending::<()>().await;
284 Ok(())
285 }
286 }
287
288 async fn set_led_data(&mut self, led_data: &[Color]) -> Result<(), DeviceError> {
289 if let Ok(device) = &mut self.inner {
290 device.set_led_data(led_data).await
291 } else {
292 Ok(())
293 }
294 }
295}
296
297impl From<Result<Device, DeviceError>> for InstanceDevice {
298 fn from(inner: Result<Device, DeviceError>) -> Self {
299 Self { inner }
300 }
301}
302
303#[derive(Debug, Clone, Copy, PartialEq)]
304enum InstanceControl {
305 Continue,
306 Break,
307}
308
309#[derive(Debug)]
310enum InstanceMessage {
311 PriorityInfo(oneshot::Sender<Vec<PriorityInfo>>),
312 Config(oneshot::Sender<Arc<InstanceConfig>>),
313 Stop(oneshot::Sender<()>),
314}
315
316#[derive(Clone)]
317pub struct InstanceHandle {
318 id: i32,
319 tx: mpsc::Sender<InstanceMessage>,
320 local_tx: mpsc::Sender<InputMessage>,
321}
322
323#[derive(Debug, Error)]
324pub enum InstanceHandleError {
325 #[error("the corresponding instance is no longer running")]
326 Dropped,
327}
328
329impl<T> From<tokio::sync::mpsc::error::SendError<T>> for InstanceHandleError {
330 fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
331 Self::Dropped
332 }
333}
334
335impl From<tokio::sync::oneshot::error::RecvError> for InstanceHandleError {
336 fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
337 Self::Dropped
338 }
339}
340
341impl InstanceHandle {
342 pub fn id(&self) -> i32 {
343 self.id
344 }
345
346 pub fn input_channel(&self) -> &mpsc::Sender<InputMessage> {
347 &self.local_tx
348 }
349
350 pub async fn send(&self, input: InputMessage) -> Result<(), InstanceHandleError> {
351 Ok(self.local_tx.send(input).await?)
352 }
353
354 pub async fn current_priorities(&self) -> Result<Vec<PriorityInfo>, InstanceHandleError> {
355 let (tx, rx) = oneshot::channel();
356 self.tx.send(InstanceMessage::PriorityInfo(tx)).await?;
357 Ok(rx.await?)
358 }
359
360 pub async fn config(&self) -> Result<Arc<InstanceConfig>, InstanceHandleError> {
361 let (tx, rx) = oneshot::channel();
362 self.tx.send(InstanceMessage::Config(tx)).await?;
363 Ok(rx.await?)
364 }
365
366 pub async fn stop(&self) -> Result<(), InstanceHandleError> {
367 let (tx, rx) = oneshot::channel();
368 self.tx.send(InstanceMessage::Stop(tx)).await?;
369 Ok(rx.await?)
370 }
371}