use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
use std::pin::Pin;
use std::time::Instant;
use futures::Future;
use tokio::select;
use crate::{
api::types::PriorityInfo,
component::ComponentName,
global::{Global, InputMessage, InputMessageData, Message},
models::Color,
};
mod effect_runner;
pub use effect_runner::StartEffectError;
use effect_runner::*;
mod muxed_message;
pub use muxed_message::*;
#[derive(Debug, Clone, Copy)]
pub struct MuxerConfig {
pub led_count: usize,
}
impl From<MuxerConfig> for EffectRunnerConfig {
fn from(MuxerConfig { led_count }: MuxerConfig) -> Self {
Self { led_count }
}
}
#[derive(Debug)]
struct InputEntry {
input_id: usize,
message: InputMessage,
expires: Option<Instant>,
effect_key: Option<RunningEffectKey>,
}
type BoxedTimeoutCallback =
Box<dyn Fn() -> Pin<Box<dyn Future<Output = (usize, i32)> + Send + Sync>> + Send + Sync>;
pub struct PriorityMuxer {
global: Global,
inputs: BTreeMap<i32, InputEntry>,
input_id: usize,
timeouts: HashMap<usize, BoxedTimeoutCallback>,
effect_runner: EffectRunner,
}
pub const MAX_PRIORITY: i32 = 256;
const MUXER_ID: usize = 0;
impl PriorityMuxer {
pub async fn new(global: Global, config: MuxerConfig) -> Self {
let mut this = Self {
global: global.clone(),
inputs: Default::default(),
timeouts: Default::default(),
input_id: 0,
effect_runner: EffectRunner::new(global, config.into()),
};
this.clear_all().await;
this
}
fn current_priority(&self) -> i32 {
*self.inputs.keys().next().unwrap()
}
fn notify_output_change(&mut self) -> Option<MuxedMessage> {
let target = self.inputs.values().next()?;
Some(MuxedMessage::new(
target.message.data().clone().try_into().ok()?,
))
}
fn insert_input(
&mut self,
priority: i32,
input: InputMessage,
effect_key: Option<RunningEffectKey>,
) -> Option<InputEntry> {
let expires = input
.data()
.duration()
.map(|duration| Instant::now() + duration.to_std().unwrap());
let before = self.inputs.insert(
priority,
InputEntry {
input_id: self.input_id,
message: input,
expires,
effect_key,
},
);
if let Some(InputEntry { input_id, .. }) = before {
self.timeouts.remove(&input_id);
}
if let Some(expires) = expires {
let id = self.input_id;
self.timeouts.insert(
self.input_id,
Box::new(move || {
Box::pin(async move {
tokio::time::sleep_until(expires.into()).await;
(id, priority)
})
}),
);
}
self.input_id += 1;
before
}
fn clear_inputs(&mut self) {
self.inputs.clear();
self.timeouts.clear();
}
fn clear_input(&mut self, priority: i32) -> bool {
if let Some(InputEntry { input_id, .. }) = self.inputs.remove(&priority) {
self.timeouts.remove(&input_id);
true
} else {
false
}
}
async fn clear_all(&mut self) -> Option<MuxedMessage> {
self.clear_inputs();
debug!("cleared all inputs");
self.effect_runner.clear_all().await;
self.insert_input(
MAX_PRIORITY,
InputMessage::new(
MUXER_ID,
ComponentName::All,
InputMessageData::SolidColor {
priority: MAX_PRIORITY,
duration: None,
color: Color::from_components((0, 0, 0)),
},
),
None,
);
debug!(priority = %self.current_priority(), "current priority changed");
self.notify_output_change()
}
async fn clear(&mut self, priority: i32) -> Option<MuxedMessage> {
assert!(priority < MAX_PRIORITY);
let mut notify = self.current_priority() == priority;
notify = self.effect_runner.clear(priority).await || notify;
notify = self.clear_input(priority) && notify;
debug!(priority = %priority, "cleared priority");
if notify {
debug!(priority = %self.current_priority(), "current priority changed");
self.notify_output_change()
} else {
None
}
}
async fn handle_input(&mut self, input: InputMessage) -> Option<MuxedMessage> {
let priority = input.data().priority().unwrap();
let is_new = priority < self.current_priority();
let notify = priority <= self.current_priority();
let before = self.insert_input(priority, input.clone(), None);
trace!(
priority = %priority,
after = ?input,
before = ?before,
"new command for priority level",
);
if let Some(key) = before.and_then(|entry| entry.effect_key) {
self.effect_runner.abort(key).await;
}
if is_new {
debug!(priority = %priority, "current priority changed");
}
if notify {
self.notify_output_change()
} else {
None
}
}
async fn handle_timeout(&mut self, (id, priority): (usize, i32)) -> Option<MuxedMessage> {
let current_priority = self.current_priority();
if let Some(input) = self.inputs.get(&priority) {
if input.input_id == id {
if let Some(removed) = self.inputs.remove(&priority) {
debug!(input = ?removed, "input timeout");
}
} else {
warn!(id = %id, "unexpected timeout for input");
}
}
self.timeouts.remove(&id);
if current_priority >= priority {
debug!(priority = %current_priority, "current priority changed");
self.notify_output_change()
} else {
None
}
}
pub async fn handle_message(&mut self, input: InputMessage) -> Option<MuxedMessage> {
trace!(input = ?input, "got input");
match input.data() {
InputMessageData::ClearAll => self.clear_all().await,
InputMessageData::Clear { priority } => self.clear(*priority).await,
InputMessageData::Effect {
priority,
duration,
effect,
response,
} => {
let result = self.effect_runner.start(*priority, *duration, effect).await;
let response = response.clone();
if let Ok(ref key) = result {
self.insert_input(*priority, input, Some(*key));
}
if let Some(tx) = (*response.lock().await).take() {
tx.send(result.map(|_| ())).ok();
} else {
warn!("effect request already answered");
}
None
}
_ => self.handle_input(input).await,
}
}
pub async fn current_priorities(&self) -> Vec<PriorityInfo> {
self.global
.read_input_sources(|sources| {
self.inputs
.values()
.enumerate()
.map(|(i, entry)| {
PriorityInfo::new(
&entry.message,
sources
.get(&entry.message.source_id())
.map(|source| source.name().to_string())
.unwrap_or_default(),
entry.expires,
i == 0,
)
})
.collect()
})
.await
}
async fn handle_effect_message(
&mut self,
msg: Option<EffectRunnerUpdate>,
) -> Option<MuxedMessage> {
match msg {
Some(msg) => {
match msg {
EffectRunnerUpdate::Message(msg) => {
(msg.priority() <= self.current_priority()).then_some(msg)
}
EffectRunnerUpdate::Completed { key, priority } => {
let notify = self.current_priority() == priority;
let entry = self.inputs.entry(priority);
match entry {
std::collections::btree_map::Entry::Vacant(_) => {
}
std::collections::btree_map::Entry::Occupied(entry) => {
if entry.get().effect_key == Some(key) {
entry.remove();
}
}
}
if notify {
self.notify_output_change()
} else {
None
}
}
}
}
None => {
None
}
}
}
pub async fn update(&mut self) -> Option<MuxedMessage> {
if !self.timeouts.is_empty() {
select! {
id = futures::future::select_all(self.timeouts.values().map(|f| f())) => {
self.handle_timeout(id.0).await
},
msg = self.effect_runner.update() => {
self.handle_effect_message(msg).await
}
}
} else {
let msg = self.effect_runner.update().await;
self.handle_effect_message(msg).await
}
}
}