hyperion/
instance.rs

1use std::sync::Arc;
2
3use thiserror::Error;
4use tokio::{
5    select,
6    sync::{broadcast, mpsc, oneshot},
7};
8
9use crate::{
10    api::types::PriorityInfo,
11    global::{Event, Global, InputMessage, InstanceEventKind},
12    models::{Color, InstanceConfig},
13    servers::{self, ServerHandle},
14};
15
16mod black_border_detector;
17use black_border_detector::*;
18
19mod core;
20use self::core::*;
21
22mod device;
23use device::*;
24
25mod muxer;
26pub use muxer::StartEffectError;
27use muxer::*;
28
29mod smoothing;
30use smoothing::*;
31
32#[derive(Debug, Error)]
33pub enum InstanceError {
34    #[error("i/o error: {0}")]
35    Io(#[from] std::io::Error),
36    #[error("device error: {0}")]
37    Device(#[from] DeviceError),
38    #[error("recv error: {0}")]
39    Recv(#[from] broadcast::error::RecvError),
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43enum ActiveState {
44    #[default]
45    Inactive,
46    Active,
47    Deactivating,
48}
49
50pub struct Instance {
51    config: Arc<InstanceConfig>,
52    device: InstanceDevice,
53    handle_rx: mpsc::Receiver<InstanceMessage>,
54    receiver: broadcast::Receiver<InputMessage>,
55    local_receiver: mpsc::Receiver<InputMessage>,
56    event_tx: broadcast::Sender<Event>,
57    muxer: PriorityMuxer,
58    core: Core,
59    _boblight_server: Option<Result<ServerHandle, std::io::Error>>,
60    active_state: ActiveState,
61}
62
63impl Instance {
64    pub async fn new(global: Global, config: InstanceConfig) -> (Self, InstanceHandle) {
65        let device: InstanceDevice =
66            Device::new(&config.instance.friendly_name, config.device.clone())
67                .await
68                .into();
69
70        let led_count = config.leds.leds.len();
71
72        if let Err(error) = &device.inner {
73            error!(
74                instance = %config.instance.id,
75                name = %config.instance.friendly_name,
76                error = %error,
77                "initializing instance failed"
78            );
79        }
80
81        let receiver = global.subscribe_input().await;
82        let (local_tx, local_receiver) = mpsc::channel(4);
83
84        let muxer = PriorityMuxer::new(global.clone(), MuxerConfig { led_count }).await;
85        let core = Core::new(&config).await;
86
87        let (tx, handle_rx) = mpsc::channel(1);
88        let id = config.instance.id;
89        let handle = InstanceHandle { id, tx, local_tx };
90
91        let config = Arc::new(config);
92        let _boblight_server = if config.boblight_server.enable {
93            let server_handle = servers::bind(
94                "Boblight",
95                config.boblight_server.clone(),
96                global.clone(),
97                {
98                    let handle = handle.clone();
99
100                    move |tcp, global| {
101                        servers::boblight::handle_client(tcp, led_count, handle.clone(), global)
102                    }
103                },
104            )
105            .await;
106
107            if let Err(error) = &server_handle {
108                error!(
109                    instance = %config.instance.id,
110                    name = %config.instance.friendly_name,
111                    error = %error,
112                    "cannot start Boblight server"
113                );
114            }
115
116            Some(server_handle)
117        } else {
118            None
119        };
120
121        let event_tx = global.get_event_tx().await;
122
123        (
124            Self {
125                config,
126                device,
127                handle_rx,
128                receiver,
129                local_receiver,
130                event_tx,
131                muxer,
132                core,
133                _boblight_server,
134                active_state: ActiveState::default(),
135            },
136            handle,
137        )
138    }
139
140    async fn on_input_message(&mut self, message: InputMessage) {
141        if let Some(message) = self.muxer.handle_message(message).await {
142            // The message triggered a muxing update
143            self.on_muxed_message(message);
144        }
145    }
146
147    fn on_muxed_message(&mut self, message: MuxedMessage) {
148        if self.active_state == ActiveState::Active {
149            if message.priority() == muxer::MAX_PRIORITY
150                && message.color() == Some(Color::new(0, 0, 0))
151            {
152                self.active_state = ActiveState::Deactivating;
153            }
154        } else if (message.priority() != muxer::MAX_PRIORITY
155            || message.color() != Some(Color::new(0, 0, 0)))
156            && std::mem::replace(&mut self.active_state, ActiveState::Active)
157                == ActiveState::Inactive
158        {
159            self.event_tx
160                .send(Event::instance(self.id(), InstanceEventKind::Activate))
161                .unwrap();
162        }
163
164        self.core.handle_message(message);
165    }
166
167    pub fn id(&self) -> i32 {
168        self.config.instance.id
169    }
170
171    async fn handle_instance_message(&mut self, message: InstanceMessage) -> InstanceControl {
172        // ok: the instance shouldn't care if the receiver dropped
173
174        match message {
175            InstanceMessage::PriorityInfo(tx) => {
176                tx.send(self.muxer.current_priorities().await).ok();
177            }
178            InstanceMessage::Config(tx) => {
179                tx.send(self.config.clone()).ok();
180            }
181            InstanceMessage::Stop(tx) => {
182                tx.send(()).ok();
183                return InstanceControl::Break;
184            }
185        }
186
187        InstanceControl::Continue
188    }
189
190    #[instrument]
191    pub async fn run(mut self) -> Result<(), InstanceError> {
192        loop {
193            select! {
194                update = self.device.update() => {
195                    trace!("device update");
196
197                    if let Err(error) = update {
198                        // A device update shouldn't error, disable it
199                        error!(error = %error, "device update failed, disabling device");
200                        self.device.inner = Err(error);
201                    }
202                },
203                message = self.receiver.recv() => {
204                    trace!(message = ?message, "global msg");
205
206                    match message {
207                        Ok(message) => {
208                            self.on_input_message(message).await;
209                        },
210                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
211                            // No more input messages
212                            break Ok(());
213                        },
214                        Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
215                            warn!(skipped = %skipped, "skipped input messages");
216                        },
217                    }
218                },
219                message = self.local_receiver.recv() => {
220                    trace!(message = ?message, "local msg");
221
222                    if let Some(message) = message {
223                        self.on_input_message(message).await;
224                    } else {
225                        break Ok(());
226                    }
227                },
228                message = self.muxer.update() => {
229                    trace!(message = ?message, "muxer msg");
230
231                    // Muxer update completed
232                    if let Some(message) = message {
233                        self.on_muxed_message(message);
234                    }
235                },
236                (led_data, update) = self.core.update() => {
237                    trace!("core update");
238
239                    // LED data changed
240                    self.device.set_led_data(led_data).await?;
241
242                    if update == SmoothingUpdate::Settled &&
243                        self.active_state == ActiveState::Deactivating {
244                        self.active_state = ActiveState::Inactive;
245                        self.event_tx
246                            .send(Event::instance(self.id(), InstanceEventKind::Deactivate))
247                            .unwrap();
248                    }
249                },
250                message = self.handle_rx.recv() => {
251                    trace!(message = ?message, "handle_rx msg");
252
253                    if let Some(message) = message {
254                        if InstanceControl::Break == self.handle_instance_message(message).await {
255                            break Ok(());
256                        }
257                    } else {
258                        // If the handle is dropped, it means the instance was unregistered
259                        break Ok(());
260                    }
261                }
262            }
263        }
264    }
265}
266
267impl std::fmt::Debug for Instance {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("Instance").field("id", &self.id()).finish()
270    }
271}
272
273/// A wrapper for a device that may have failed initializing
274struct InstanceDevice {
275    inner: Result<Device, DeviceError>,
276}
277
278impl InstanceDevice {
279    async fn update(&mut self) -> Result<(), DeviceError> {
280        if let Ok(device) = &mut self.inner {
281            device.update().await
282        } else {
283            futures::future::pending::<()>().await;
284            Ok(())
285        }
286    }
287
288    async fn set_led_data(&mut self, led_data: &[Color]) -> Result<(), DeviceError> {
289        if let Ok(device) = &mut self.inner {
290            device.set_led_data(led_data).await
291        } else {
292            Ok(())
293        }
294    }
295}
296
297impl From<Result<Device, DeviceError>> for InstanceDevice {
298    fn from(inner: Result<Device, DeviceError>) -> Self {
299        Self { inner }
300    }
301}
302
303#[derive(Debug, Clone, Copy, PartialEq)]
304enum InstanceControl {
305    Continue,
306    Break,
307}
308
309#[derive(Debug)]
310enum InstanceMessage {
311    PriorityInfo(oneshot::Sender<Vec<PriorityInfo>>),
312    Config(oneshot::Sender<Arc<InstanceConfig>>),
313    Stop(oneshot::Sender<()>),
314}
315
316#[derive(Clone)]
317pub struct InstanceHandle {
318    id: i32,
319    tx: mpsc::Sender<InstanceMessage>,
320    local_tx: mpsc::Sender<InputMessage>,
321}
322
323#[derive(Debug, Error)]
324pub enum InstanceHandleError {
325    #[error("the corresponding instance is no longer running")]
326    Dropped,
327}
328
329impl<T> From<tokio::sync::mpsc::error::SendError<T>> for InstanceHandleError {
330    fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
331        Self::Dropped
332    }
333}
334
335impl From<tokio::sync::oneshot::error::RecvError> for InstanceHandleError {
336    fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
337        Self::Dropped
338    }
339}
340
341impl InstanceHandle {
342    pub fn id(&self) -> i32 {
343        self.id
344    }
345
346    pub fn input_channel(&self) -> &mpsc::Sender<InputMessage> {
347        &self.local_tx
348    }
349
350    pub async fn send(&self, input: InputMessage) -> Result<(), InstanceHandleError> {
351        Ok(self.local_tx.send(input).await?)
352    }
353
354    pub async fn current_priorities(&self) -> Result<Vec<PriorityInfo>, InstanceHandleError> {
355        let (tx, rx) = oneshot::channel();
356        self.tx.send(InstanceMessage::PriorityInfo(tx)).await?;
357        Ok(rx.await?)
358    }
359
360    pub async fn config(&self) -> Result<Arc<InstanceConfig>, InstanceHandleError> {
361        let (tx, rx) = oneshot::channel();
362        self.tx.send(InstanceMessage::Config(tx)).await?;
363        Ok(rx.await?)
364    }
365
366    pub async fn stop(&self) -> Result<(), InstanceHandleError> {
367        let (tx, rx) = oneshot::channel();
368        self.tx.send(InstanceMessage::Stop(tx)).await?;
369        Ok(rx.await?)
370    }
371}