1use std::collections::{BTreeMap, HashMap};
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use parse_display::Display;
6use tokio::sync::broadcast;
7use tokio::sync::RwLock;
8
9mod event;
10pub use event::*;
11
12mod hook_runner;
13pub use hook_runner::*;
14
15mod input_message;
16pub use input_message::*;
17
18mod input_source;
19pub use input_source::*;
20
21mod paths;
22pub use paths::*;
23
24mod priority_guard;
25pub use priority_guard::*;
26
27use crate::{
28 component::ComponentName, effects::EffectRegistry, instance::InstanceHandle, models::Config,
29};
30
31pub trait Message: Sized {
32 type Data;
33
34 fn new(source_id: usize, component: ComponentName, data: Self::Data) -> Self;
35
36 fn source_id(&self) -> usize;
37
38 fn component(&self) -> ComponentName;
39
40 fn data(&self) -> &Self::Data;
41
42 fn unregister_source(global: &mut GlobalData, input_source: &InputSource<Self>);
43}
44
45#[derive(Clone)]
46pub struct Global(Arc<RwLock<GlobalData>>);
47
48#[derive(Display, Debug)]
49pub enum InputSourceName {
50 #[display("Boblight({peer_addr})")]
51 Boblight { peer_addr: SocketAddr },
52 #[display("FlatBuffers({peer_addr}): {origin}")]
53 FlatBuffers {
54 peer_addr: SocketAddr,
55 origin: String,
56 },
57 #[display("JSON({peer_addr})")]
58 Json { peer_addr: SocketAddr },
59 #[display("Protobuf({peer_addr})")]
60 Protobuf { peer_addr: SocketAddr },
61 #[display("Web({session_id})")]
62 Web { session_id: uuid::Uuid },
63 #[display("PriorityMuxer")]
64 PriorityMuxer,
65 #[display("Effect({name})")]
66 Effect { name: String },
67}
68
69impl InputSourceName {
70 pub fn component(&self) -> ComponentName {
71 match self {
72 InputSourceName::Boblight { .. } => ComponentName::BoblightServer,
73 InputSourceName::FlatBuffers { .. } => ComponentName::FlatbufServer,
74 InputSourceName::Protobuf { .. } => ComponentName::ProtoServer,
75 InputSourceName::Effect { .. } => ComponentName::Effect,
76 _ => ComponentName::All,
77 }
78 }
79}
80
81impl Global {
82 pub async fn register_input_source(
83 &self,
84 name: InputSourceName,
85 priority: Option<i32>,
86 ) -> Result<InputSourceHandle<InputMessage>, InputSourceError> {
87 let priority = if let Some(priority) = priority {
88 if !(0..=255).contains(&priority) {
89 return Err(InputSourceError::InvalidPriority(priority));
90 }
91
92 Some(priority)
93 } else {
94 None
96 };
97
98 Ok(InputSourceHandle::new(
99 self.0.write().await.register_input_source(name, priority),
100 self.clone(),
101 ))
102 }
103
104 pub async fn subscribe_input(&self) -> broadcast::Receiver<InputMessage> {
105 self.0.read().await.input_tx.subscribe()
106 }
107
108 pub async fn register_instance(&self, handle: InstanceHandle) {
109 self.0.write().await.register_instance(handle);
110 }
111
112 pub async fn unregister_instance(&self, id: i32) {
113 self.0.write().await.unregister_instance(id);
114 }
115
116 pub async fn get_instance(&self, id: i32) -> Option<InstanceHandle> {
117 self.0.read().await.instances.get(&id).cloned()
118 }
119
120 pub async fn default_instance(&self) -> Option<(i32, InstanceHandle)> {
121 self.0
122 .read()
123 .await
124 .instances
125 .iter()
126 .next()
127 .map(|(k, v)| (*k, v.clone()))
128 }
129
130 pub async fn read_config<T>(&self, f: impl FnOnce(&Config) -> T) -> T {
131 let data = self.0.read().await;
132 f(&data.config)
133 }
134
135 pub async fn read_effects<T>(&self, f: impl FnOnce(&EffectRegistry) -> T) -> T {
136 let data = self.0.read().await;
137 f(&data.effects)
138 }
139
140 pub async fn write_effects<T>(&self, f: impl FnOnce(&mut EffectRegistry) -> T) -> T {
141 let mut data = self.0.write().await;
142 f(&mut data.effects)
143 }
144
145 pub async fn read_input_sources<T>(
146 &self,
147 f: impl FnOnce(&HashMap<usize, Arc<InputSource<InputMessage>>>) -> T,
148 ) -> T {
149 let data = self.0.read().await;
150 f(&data.input_sources)
151 }
152
153 pub async fn get_event_tx(&self) -> broadcast::Sender<Event> {
154 self.0.read().await.event_tx.clone()
155 }
156
157 pub async fn subscribe_events(&self) -> broadcast::Receiver<Event> {
158 self.0.read().await.event_tx.subscribe()
159 }
160}
161
162pub struct GlobalData {
163 input_tx: broadcast::Sender<InputMessage>,
164 input_sources: HashMap<usize, Arc<InputSource<InputMessage>>>,
165 next_input_source_id: usize,
166 config: Config,
167 instances: BTreeMap<i32, InstanceHandle>,
168 event_tx: broadcast::Sender<Event>,
169 effects: EffectRegistry,
170}
171
172impl GlobalData {
173 pub fn new(config: &Config) -> Self {
174 let (input_tx, _) = broadcast::channel(4);
175 let (event_tx, _) = broadcast::channel(4);
176
177 Self {
178 input_tx,
179 input_sources: Default::default(),
180 next_input_source_id: 1,
181 config: config.clone(),
182 instances: Default::default(),
183 event_tx,
184 effects: Default::default(),
185 }
186 }
187
188 pub fn wrap(self) -> Global {
189 Global(Arc::new(RwLock::new(self)))
190 }
191
192 fn register_input_source(
193 &mut self,
194 name: InputSourceName,
195 priority: Option<i32>,
196 ) -> Arc<InputSource<InputMessage>> {
197 let id = self.next_input_source_id;
198 self.next_input_source_id += 1;
199
200 let input_source = Arc::new(InputSource::new(id, name, priority, self.input_tx.clone()));
201
202 info!(source = %input_source, "registered new input source");
203
204 self.input_sources.insert(id, input_source.clone());
205
206 input_source
207 }
208
209 fn unregister_input_source(&mut self, source: &InputSource<InputMessage>) {
210 if let Some(is) = self.input_sources.remove(&source.id()) {
211 info!(source = %*is, "unregistered input source");
212 }
213 }
214
215 fn register_instance(&mut self, handle: InstanceHandle) {
216 let id = handle.id();
217 self.instances.insert(id, handle);
218 info!(id = %id, "registered instance");
219 }
220
221 fn unregister_instance(&mut self, id: i32) {
222 if self.instances.remove(&id).is_some() {
223 info!(id = %id, "unregistered instance");
224 }
225 }
226}