hyperion/servers/
proto.rs

1//! protobuf protocol server implementation
2
3use std::net::SocketAddr;
4
5use futures::prelude::*;
6use thiserror::Error;
7use tokio::net::TcpStream;
8use tokio_util::codec::Framed;
9
10use crate::{
11    api::proto::{self, message, ProtoApiError},
12    global::{Global, InputSourceName, PriorityGuard},
13};
14
15mod codec;
16use codec::*;
17
18#[derive(Debug, Error)]
19pub enum ProtoServerError {
20    #[error("i/o error: {0}")]
21    Io(#[from] futures_io::Error),
22    #[error("decode error: {0}")]
23    Codec(#[from] ProtoCodecError),
24    #[error(transparent)]
25    Api(#[from] ProtoApiError),
26}
27
28fn success_response(peer_addr: SocketAddr) -> message::HyperionReply {
29    let reply = message::HyperionReply {
30        r#type: message::hyperion_reply::Type::Reply.into(),
31        success: Some(true),
32        ..Default::default()
33    };
34
35    trace!("({}) sending success: {:?}", peer_addr, reply);
36    reply
37}
38
39fn error_response(peer_addr: SocketAddr, error: impl std::fmt::Display) -> message::HyperionReply {
40    let reply = message::HyperionReply {
41        r#type: message::hyperion_reply::Type::Reply.into(),
42        success: Some(false),
43        error: Some(error.to_string()),
44        ..Default::default()
45    };
46
47    trace!("({}) sending error: {:?}", peer_addr, reply);
48    reply
49}
50
51pub async fn handle_client(
52    (socket, peer_addr): (TcpStream, SocketAddr),
53    global: Global,
54) -> Result<(), ProtoServerError> {
55    debug!("accepted new connection from {}", peer_addr);
56
57    let (mut writer, mut reader) = Framed::new(socket, ProtoCodec::new()).split();
58
59    // unwrap: cannot fail because the priority is None
60    let source = global
61        .register_input_source(InputSourceName::Protobuf { peer_addr }, None)
62        .await
63        .unwrap();
64
65    let mut priority_guard = PriorityGuard::new_broadcast(&source);
66
67    while let Some(request) = reader.next().await {
68        let request = match request {
69            Ok(rb) => rb,
70            Err(error) => {
71                error!("({}) error reading frame: {}", peer_addr, error);
72                continue;
73            }
74        };
75
76        trace!("({}) got request: {:?}", peer_addr, request);
77
78        let reply = match proto::handle_request(peer_addr, request, &source, &mut priority_guard) {
79            Ok(()) => success_response(peer_addr),
80            Err(error) => {
81                error!("({}) error processing request: {}", peer_addr, error);
82
83                error_response(peer_addr, error)
84            }
85        };
86
87        writer.send(reply).await?;
88        writer.flush().await?;
89    }
90
91    Ok(())
92}