hyperion/
global.rs

1use std::collections::{BTreeMap, HashMap};
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use parse_display::Display;
6use tokio::sync::broadcast;
7use tokio::sync::RwLock;
8
9mod event;
10pub use event::*;
11
12mod hook_runner;
13pub use hook_runner::*;
14
15mod input_message;
16pub use input_message::*;
17
18mod input_source;
19pub use input_source::*;
20
21mod paths;
22pub use paths::*;
23
24mod priority_guard;
25pub use priority_guard::*;
26
27use crate::{
28    component::ComponentName, effects::EffectRegistry, instance::InstanceHandle, models::Config,
29};
30
31pub trait Message: Sized {
32    type Data;
33
34    fn new(source_id: usize, component: ComponentName, data: Self::Data) -> Self;
35
36    fn source_id(&self) -> usize;
37
38    fn component(&self) -> ComponentName;
39
40    fn data(&self) -> &Self::Data;
41
42    fn unregister_source(global: &mut GlobalData, input_source: &InputSource<Self>);
43}
44
45#[derive(Clone)]
46pub struct Global(Arc<RwLock<GlobalData>>);
47
48#[derive(Display, Debug)]
49pub enum InputSourceName {
50    #[display("Boblight({peer_addr})")]
51    Boblight { peer_addr: SocketAddr },
52    #[display("FlatBuffers({peer_addr}): {origin}")]
53    FlatBuffers {
54        peer_addr: SocketAddr,
55        origin: String,
56    },
57    #[display("JSON({peer_addr})")]
58    Json { peer_addr: SocketAddr },
59    #[display("Protobuf({peer_addr})")]
60    Protobuf { peer_addr: SocketAddr },
61    #[display("Web({session_id})")]
62    Web { session_id: uuid::Uuid },
63    #[display("PriorityMuxer")]
64    PriorityMuxer,
65    #[display("Effect({name})")]
66    Effect { name: String },
67}
68
69impl InputSourceName {
70    pub fn component(&self) -> ComponentName {
71        match self {
72            InputSourceName::Boblight { .. } => ComponentName::BoblightServer,
73            InputSourceName::FlatBuffers { .. } => ComponentName::FlatbufServer,
74            InputSourceName::Protobuf { .. } => ComponentName::ProtoServer,
75            InputSourceName::Effect { .. } => ComponentName::Effect,
76            _ => ComponentName::All,
77        }
78    }
79}
80
81impl Global {
82    pub async fn register_input_source(
83        &self,
84        name: InputSourceName,
85        priority: Option<i32>,
86    ) -> Result<InputSourceHandle<InputMessage>, InputSourceError> {
87        let priority = if let Some(priority) = priority {
88            if !(0..=255).contains(&priority) {
89                return Err(InputSourceError::InvalidPriority(priority));
90            }
91
92            Some(priority)
93        } else {
94            // TODO: Default value?
95            None
96        };
97
98        Ok(InputSourceHandle::new(
99            self.0.write().await.register_input_source(name, priority),
100            self.clone(),
101        ))
102    }
103
104    pub async fn subscribe_input(&self) -> broadcast::Receiver<InputMessage> {
105        self.0.read().await.input_tx.subscribe()
106    }
107
108    pub async fn register_instance(&self, handle: InstanceHandle) {
109        self.0.write().await.register_instance(handle);
110    }
111
112    pub async fn unregister_instance(&self, id: i32) {
113        self.0.write().await.unregister_instance(id);
114    }
115
116    pub async fn get_instance(&self, id: i32) -> Option<InstanceHandle> {
117        self.0.read().await.instances.get(&id).cloned()
118    }
119
120    pub async fn default_instance(&self) -> Option<(i32, InstanceHandle)> {
121        self.0
122            .read()
123            .await
124            .instances
125            .iter()
126            .next()
127            .map(|(k, v)| (*k, v.clone()))
128    }
129
130    pub async fn read_config<T>(&self, f: impl FnOnce(&Config) -> T) -> T {
131        let data = self.0.read().await;
132        f(&data.config)
133    }
134
135    pub async fn read_effects<T>(&self, f: impl FnOnce(&EffectRegistry) -> T) -> T {
136        let data = self.0.read().await;
137        f(&data.effects)
138    }
139
140    pub async fn write_effects<T>(&self, f: impl FnOnce(&mut EffectRegistry) -> T) -> T {
141        let mut data = self.0.write().await;
142        f(&mut data.effects)
143    }
144
145    pub async fn read_input_sources<T>(
146        &self,
147        f: impl FnOnce(&HashMap<usize, Arc<InputSource<InputMessage>>>) -> T,
148    ) -> T {
149        let data = self.0.read().await;
150        f(&data.input_sources)
151    }
152
153    pub async fn get_event_tx(&self) -> broadcast::Sender<Event> {
154        self.0.read().await.event_tx.clone()
155    }
156
157    pub async fn subscribe_events(&self) -> broadcast::Receiver<Event> {
158        self.0.read().await.event_tx.subscribe()
159    }
160}
161
162pub struct GlobalData {
163    input_tx: broadcast::Sender<InputMessage>,
164    input_sources: HashMap<usize, Arc<InputSource<InputMessage>>>,
165    next_input_source_id: usize,
166    config: Config,
167    instances: BTreeMap<i32, InstanceHandle>,
168    event_tx: broadcast::Sender<Event>,
169    effects: EffectRegistry,
170}
171
172impl GlobalData {
173    pub fn new(config: &Config) -> Self {
174        let (input_tx, _) = broadcast::channel(4);
175        let (event_tx, _) = broadcast::channel(4);
176
177        Self {
178            input_tx,
179            input_sources: Default::default(),
180            next_input_source_id: 1,
181            config: config.clone(),
182            instances: Default::default(),
183            event_tx,
184            effects: Default::default(),
185        }
186    }
187
188    pub fn wrap(self) -> Global {
189        Global(Arc::new(RwLock::new(self)))
190    }
191
192    fn register_input_source(
193        &mut self,
194        name: InputSourceName,
195        priority: Option<i32>,
196    ) -> Arc<InputSource<InputMessage>> {
197        let id = self.next_input_source_id;
198        self.next_input_source_id += 1;
199
200        let input_source = Arc::new(InputSource::new(id, name, priority, self.input_tx.clone()));
201
202        info!(source = %input_source, "registered new input source");
203
204        self.input_sources.insert(id, input_source.clone());
205
206        input_source
207    }
208
209    fn unregister_input_source(&mut self, source: &InputSource<InputMessage>) {
210        if let Some(is) = self.input_sources.remove(&source.id()) {
211            info!(source = %*is, "unregistered input source");
212        }
213    }
214
215    fn register_instance(&mut self, handle: InstanceHandle) {
216        let id = handle.id();
217        self.instances.insert(id, handle);
218        info!(id = %id, "registered instance");
219    }
220
221    fn unregister_instance(&mut self, id: i32) {
222        if self.instances.remove(&id).is_some() {
223            info!(id = %id, "unregistered instance");
224        }
225    }
226}