hyperion/
web.rs

1use std::net::SocketAddr;
2
3use futures::{Future, SinkExt, StreamExt};
4use warp::{http::StatusCode, path::FullPath, Filter, Rejection};
5
6use crate::{
7    api::json::message,
8    global::{Global, Paths},
9    models::WebConfig,
10};
11
12mod session;
13use session::*;
14
15pub async fn bind(
16    global: Global,
17    config: &WebConfig,
18    paths: &Paths,
19) -> Result<impl Future<Output = ()>, std::io::Error> {
20    let session_store = SessionStore::new(config.max_sessions as _);
21
22    let ws = warp::ws()
23        .and(session_store.request())
24        .and(warp::filters::addr::remote())
25        .and({
26            let global = global.clone();
27            warp::any().map(move || global.clone())
28        })
29        .map(
30            |ws: warp::ws::Ws,
31             session: SessionInstance,
32             _remote: Option<SocketAddr>,
33             global: Global| {
34                (
35                    ws.on_upgrade({
36                        let session = session.session().clone();
37
38                        move |websocket| {
39                            // Just echo all messages back...
40                            let (mut tx, mut rx) = websocket.split();
41
42                            async move {
43                                while let Some(result) = rx.next().await {
44                                    if let Some(message) =
45                                        session.write().await.handle_result(&global, result).await
46                                    {
47                                        if let Err(error) = tx.send(message).await {
48                                            warn!(error = %error, "websocket error");
49                                        }
50                                    } else {
51                                        break;
52                                    }
53                                }
54                            }
55                        }
56                    }),
57                    session,
58                )
59            },
60        )
61        .untuple_one()
62        .and_then(reply_session);
63
64    let cgi = warp::path("cgi").and(
65        warp::path("cfg_jsonserver")
66            .and_then({
67                let global = global.clone();
68                move || {
69                    let global = global.clone();
70
71                    async move {
72                        Ok::<_, Rejection>(format!(
73                            ":{}",
74                            global
75                                .read_config(|config| config.global.json_server.port)
76                                .await
77                        ))
78                    }
79                }
80            })
81            .or(warp::path("run")
82                .and(warp::path::full())
83                .map(|full_path: FullPath| {
84                    // TODO: Implement run?
85                    warp::reply::with_status(
86                        format!("script failed ({})", full_path.as_str()),
87                        StatusCode::INTERNAL_SERVER_ERROR,
88                    )
89                })),
90    );
91
92    let json_rpc = warp::path("json-rpc")
93        .and(warp::body::json())
94        .and(warp::filters::header::optional("Authorization"))
95        .and(session_store.request())
96        .and(warp::filters::addr::remote())
97        .and(warp::any().map(move || global.clone()))
98        .and_then(
99            |request: message::HyperionMessage,
100             _authorization: Option<String>,
101             session: SessionInstance,
102             _remote: Option<SocketAddr>,
103             global: Global| {
104                async move {
105                    let reply = warp::reply::json(
106                        &session
107                            .session()
108                            .write()
109                            .await
110                            .handle_request(&global, request)
111                            .await,
112                    );
113
114                    Ok::<_, Rejection>((reply, session))
115                }
116            },
117        )
118        .untuple_one()
119        .and_then(reply_session);
120
121    let files = warp::fs::dir(paths.resolve_path(if config.document_root.is_empty() {
122        WebConfig::SYSTEM_DOCUMENT_ROOT
123    } else {
124        config.document_root.as_str()
125    }));
126
127    // TODO: Serve error pages from /errorpages/*
128
129    let address = SocketAddr::from(([0, 0, 0, 0], config.port));
130    let listener = tokio::net::TcpListener::bind(address).await;
131
132    match listener {
133        Ok(listener) => {
134            info!(address = %address, "Webconfig server listening");
135            Ok(warp::serve(
136                ws.or(cgi)
137                    .or(json_rpc)
138                    .or(files)
139                    .with(warp::filters::log::log("hyperion::web")),
140            )
141            .run_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)))
142        }
143        Err(error) => Err(error),
144    }
145}