1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::sync::Arc;

use parse_display::Display;
use thiserror::Error;
use tokio::sync::broadcast;

use super::{Global, InputSourceName, Message};
use crate::component::ComponentName;

#[derive(Display)]
#[display("`{name}` (id = {id}, priority = {priority:?})")]
pub struct InputSource<T: Message> {
    id: usize,
    name: InputSourceName,
    priority: Option<i32>,
    tx: broadcast::Sender<T>,
}

impl<T: Message> InputSource<T> {
    pub fn new(
        id: usize,
        name: InputSourceName,
        priority: Option<i32>,
        tx: broadcast::Sender<T>,
    ) -> Self {
        Self {
            id,
            name,
            priority,
            tx,
        }
    }

    pub fn id(&self) -> usize {
        self.id
    }

    pub fn name(&self) -> &InputSourceName {
        &self.name
    }

    pub fn priority(&self) -> Option<i32> {
        self.priority
    }

    pub fn send(
        &self,
        component: ComponentName,
        message: T::Data,
    ) -> Result<usize, broadcast::error::SendError<T>> {
        self.tx.send(T::new(self.id, component, message))
    }

    pub fn channel(&self) -> &broadcast::Sender<T> {
        &self.tx
    }
}

pub struct InputSourceHandle<T: Message> {
    input_source: Arc<InputSource<T>>,
    global: Global,
}

impl<T: Message> InputSourceHandle<T> {
    pub fn new(input_source: Arc<InputSource<T>>, global: Global) -> Self {
        Self {
            input_source,
            global,
        }
    }
}

impl<T: Message> std::ops::Deref for InputSourceHandle<T> {
    type Target = InputSource<T>;

    fn deref(&self) -> &Self::Target {
        &self.input_source
    }
}

impl<T: Message> Drop for InputSourceHandle<T> {
    fn drop(&mut self) {
        // TODO: Can this block?
        futures::executor::block_on(async {
            T::unregister_source(&mut *self.global.0.write().await, &*self.input_source);
        });
    }
}

#[derive(Debug, Error)]
pub enum InputSourceError {
    #[error("invalid priority: {0}")]
    InvalidPriority(i32),
}