hyperion/servers/
json.rs

1//! JSON protocol server implementation
2
3use std::net::SocketAddr;
4
5use futures::prelude::*;
6use thiserror::Error;
7use tokio::net::TcpStream;
8use tokio_util::codec::Framed;
9
10use crate::{
11    api::json::{self, JsonApiError},
12    global::{Global, InputSourceName},
13};
14
15/// JSON protocol codec definition
16mod codec;
17use codec::*;
18
19#[derive(Debug, Error)]
20pub enum JsonServerError {
21    #[error("i/o error: {0}")]
22    Io(#[from] futures_io::Error),
23    #[error("codec error: {0}")]
24    Codec(#[from] JsonCodecError),
25    #[error(transparent)]
26    Api(#[from] JsonApiError),
27}
28
29#[instrument(skip(socket, global))]
30pub async fn handle_client(
31    (socket, peer_addr): (TcpStream, SocketAddr),
32    global: Global,
33) -> Result<(), JsonServerError> {
34    debug!("accepted new connection");
35
36    let framed = Framed::new(socket, JsonCodec::new());
37    let (mut writer, mut reader) = framed.split();
38
39    // unwrap: cannot fail because the priority is None
40    let mut client_connection = json::ClientConnection::new(
41        global
42            .register_input_source(InputSourceName::Json { peer_addr }, None)
43            .await
44            .unwrap(),
45    );
46
47    while let Some(request) = reader.next().await {
48        trace!(request = ?request, "processing request");
49
50        let mut tan = None;
51        let reply = match {
52            match request {
53                Ok(rq) => {
54                    tan = rq.tan;
55                    Ok(client_connection.handle_request(rq, &global).await?)
56                }
57                Err(error) => Err(JsonServerError::from(error)),
58            }
59        } {
60            Ok(response) => response,
61            Err(error) => {
62                error!(error = %error, "error processing request");
63
64                json::message::HyperionResponse::error(&error)
65            }
66        }
67        .with_tan(tan);
68
69        trace!(response = ?reply, "sending response");
70
71        writer.send(reply).await?;
72        writer.flush().await?;
73    }
74
75    Ok(())
76}