1use std::net::SocketAddr;
4
5use futures::prelude::*;
6use thiserror::Error;
7use tokio::net::TcpStream;
8use tokio_util::codec::Framed;
9
10use crate::{
11 api::json::{self, JsonApiError},
12 global::{Global, InputSourceName},
13};
14
15mod codec;
17use codec::*;
18
19#[derive(Debug, Error)]
20pub enum JsonServerError {
21 #[error("i/o error: {0}")]
22 Io(#[from] futures_io::Error),
23 #[error("codec error: {0}")]
24 Codec(#[from] JsonCodecError),
25 #[error(transparent)]
26 Api(#[from] JsonApiError),
27}
28
29#[instrument(skip(socket, global))]
30pub async fn handle_client(
31 (socket, peer_addr): (TcpStream, SocketAddr),
32 global: Global,
33) -> Result<(), JsonServerError> {
34 debug!("accepted new connection");
35
36 let framed = Framed::new(socket, JsonCodec::new());
37 let (mut writer, mut reader) = framed.split();
38
39 let mut client_connection = json::ClientConnection::new(
41 global
42 .register_input_source(InputSourceName::Json { peer_addr }, None)
43 .await
44 .unwrap(),
45 );
46
47 while let Some(request) = reader.next().await {
48 trace!(request = ?request, "processing request");
49
50 let mut tan = None;
51 let reply = match {
52 match request {
53 Ok(rq) => {
54 tan = rq.tan;
55 Ok(client_connection.handle_request(rq, &global).await?)
56 }
57 Err(error) => Err(JsonServerError::from(error)),
58 }
59 } {
60 Ok(response) => response,
61 Err(error) => {
62 error!(error = %error, "error processing request");
63
64 json::message::HyperionResponse::error(&error)
65 }
66 }
67 .with_tan(tan);
68
69 trace!(response = ?reply, "sending response");
70
71 writer.send(reply).await?;
72 writer.flush().await?;
73 }
74
75 Ok(())
76}