hyperion/servers/
proto.rs1use std::net::SocketAddr;
4
5use futures::prelude::*;
6use thiserror::Error;
7use tokio::net::TcpStream;
8use tokio_util::codec::Framed;
9
10use crate::{
11 api::proto::{self, message, ProtoApiError},
12 global::{Global, InputSourceName, PriorityGuard},
13};
14
15mod codec;
16use codec::*;
17
18#[derive(Debug, Error)]
19pub enum ProtoServerError {
20 #[error("i/o error: {0}")]
21 Io(#[from] futures_io::Error),
22 #[error("decode error: {0}")]
23 Codec(#[from] ProtoCodecError),
24 #[error(transparent)]
25 Api(#[from] ProtoApiError),
26}
27
28fn success_response(peer_addr: SocketAddr) -> message::HyperionReply {
29 let reply = message::HyperionReply {
30 r#type: message::hyperion_reply::Type::Reply.into(),
31 success: Some(true),
32 ..Default::default()
33 };
34
35 trace!("({}) sending success: {:?}", peer_addr, reply);
36 reply
37}
38
39fn error_response(peer_addr: SocketAddr, error: impl std::fmt::Display) -> message::HyperionReply {
40 let reply = message::HyperionReply {
41 r#type: message::hyperion_reply::Type::Reply.into(),
42 success: Some(false),
43 error: Some(error.to_string()),
44 ..Default::default()
45 };
46
47 trace!("({}) sending error: {:?}", peer_addr, reply);
48 reply
49}
50
51pub async fn handle_client(
52 (socket, peer_addr): (TcpStream, SocketAddr),
53 global: Global,
54) -> Result<(), ProtoServerError> {
55 debug!("accepted new connection from {}", peer_addr);
56
57 let (mut writer, mut reader) = Framed::new(socket, ProtoCodec::new()).split();
58
59 let source = global
61 .register_input_source(InputSourceName::Protobuf { peer_addr }, None)
62 .await
63 .unwrap();
64
65 let mut priority_guard = PriorityGuard::new_broadcast(&source);
66
67 while let Some(request) = reader.next().await {
68 let request = match request {
69 Ok(rb) => rb,
70 Err(error) => {
71 error!("({}) error reading frame: {}", peer_addr, error);
72 continue;
73 }
74 };
75
76 trace!("({}) got request: {:?}", peer_addr, request);
77
78 let reply = match proto::handle_request(peer_addr, request, &source, &mut priority_guard) {
79 Ok(()) => success_response(peer_addr),
80 Err(error) => {
81 error!("({}) error processing request: {}", peer_addr, error);
82
83 error_response(peer_addr, error)
84 }
85 };
86
87 writer.send(reply).await?;
88 writer.flush().await?;
89 }
90
91 Ok(())
92}