hyperion/global/
priority_guard.rs

1use tokio::sync::{broadcast, mpsc};
2
3use crate::component::ComponentName;
4
5use super::{InputMessage, InputMessageData, InputSourceHandle, Message};
6
7enum SendingChannel {
8    Mpsc(mpsc::Sender<InputMessage>),
9    Broadcast(broadcast::Sender<InputMessage>),
10}
11
12impl SendingChannel {
13    pub async fn send(&self, message: InputMessage) {
14        match self {
15            SendingChannel::Mpsc(tx) => tx.send(message).await.ok(),
16            SendingChannel::Broadcast(tx) => tx.send(message).ok().map(|_| ()),
17        };
18    }
19}
20
21impl From<mpsc::Sender<InputMessage>> for SendingChannel {
22    fn from(tx: mpsc::Sender<InputMessage>) -> Self {
23        Self::Mpsc(tx)
24    }
25}
26
27impl From<broadcast::Sender<InputMessage>> for SendingChannel {
28    fn from(tx: broadcast::Sender<InputMessage>) -> Self {
29        Self::Broadcast(tx)
30    }
31}
32
33pub struct PriorityGuard {
34    channel: SendingChannel,
35    source_id: usize,
36    priority: Option<i32>,
37    component: ComponentName,
38}
39
40impl PriorityGuard {
41    pub fn new_mpsc(
42        tx: mpsc::Sender<InputMessage>,
43        handle: &InputSourceHandle<InputMessage>,
44    ) -> Self {
45        Self {
46            channel: SendingChannel::from(tx),
47            source_id: handle.id(),
48            priority: handle.priority(),
49            component: handle.name().component(),
50        }
51    }
52
53    pub fn new_broadcast(handle: &InputSourceHandle<InputMessage>) -> Self {
54        Self {
55            channel: SendingChannel::from(handle.channel().clone()),
56            source_id: handle.id(),
57            priority: handle.priority(),
58            component: handle.name().component(),
59        }
60    }
61
62    pub fn set_priority(&mut self, priority: Option<i32>) {
63        self.priority = priority;
64    }
65}
66
67impl Drop for PriorityGuard {
68    fn drop(&mut self) {
69        if let Some(priority) = self.priority {
70            futures::executor::block_on(async {
71                self.channel
72                    .send(InputMessage::new(
73                        self.source_id,
74                        self.component,
75                        InputMessageData::Clear { priority },
76                    ))
77                    .await;
78            })
79        }
80    }
81}