1use std::net::SocketAddr;
2
3use futures::{Future, SinkExt, StreamExt};
4use warp::{http::StatusCode, path::FullPath, Filter, Rejection};
5
6use crate::{
7 api::json::message,
8 global::{Global, Paths},
9 models::WebConfig,
10};
11
12mod session;
13use session::*;
14
15pub async fn bind(
16 global: Global,
17 config: &WebConfig,
18 paths: &Paths,
19) -> Result<impl Future<Output = ()>, std::io::Error> {
20 let session_store = SessionStore::new(config.max_sessions as _);
21
22 let ws = warp::ws()
23 .and(session_store.request())
24 .and(warp::filters::addr::remote())
25 .and({
26 let global = global.clone();
27 warp::any().map(move || global.clone())
28 })
29 .map(
30 |ws: warp::ws::Ws,
31 session: SessionInstance,
32 _remote: Option<SocketAddr>,
33 global: Global| {
34 (
35 ws.on_upgrade({
36 let session = session.session().clone();
37
38 move |websocket| {
39 let (mut tx, mut rx) = websocket.split();
41
42 async move {
43 while let Some(result) = rx.next().await {
44 if let Some(message) =
45 session.write().await.handle_result(&global, result).await
46 {
47 if let Err(error) = tx.send(message).await {
48 warn!(error = %error, "websocket error");
49 }
50 } else {
51 break;
52 }
53 }
54 }
55 }
56 }),
57 session,
58 )
59 },
60 )
61 .untuple_one()
62 .and_then(reply_session);
63
64 let cgi = warp::path("cgi").and(
65 warp::path("cfg_jsonserver")
66 .and_then({
67 let global = global.clone();
68 move || {
69 let global = global.clone();
70
71 async move {
72 Ok::<_, Rejection>(format!(
73 ":{}",
74 global
75 .read_config(|config| config.global.json_server.port)
76 .await
77 ))
78 }
79 }
80 })
81 .or(warp::path("run")
82 .and(warp::path::full())
83 .map(|full_path: FullPath| {
84 warp::reply::with_status(
86 format!("script failed ({})", full_path.as_str()),
87 StatusCode::INTERNAL_SERVER_ERROR,
88 )
89 })),
90 );
91
92 let json_rpc = warp::path("json-rpc")
93 .and(warp::body::json())
94 .and(warp::filters::header::optional("Authorization"))
95 .and(session_store.request())
96 .and(warp::filters::addr::remote())
97 .and(warp::any().map(move || global.clone()))
98 .and_then(
99 |request: message::HyperionMessage,
100 _authorization: Option<String>,
101 session: SessionInstance,
102 _remote: Option<SocketAddr>,
103 global: Global| {
104 async move {
105 let reply = warp::reply::json(
106 &session
107 .session()
108 .write()
109 .await
110 .handle_request(&global, request)
111 .await,
112 );
113
114 Ok::<_, Rejection>((reply, session))
115 }
116 },
117 )
118 .untuple_one()
119 .and_then(reply_session);
120
121 let files = warp::fs::dir(paths.resolve_path(if config.document_root.is_empty() {
122 WebConfig::SYSTEM_DOCUMENT_ROOT
123 } else {
124 config.document_root.as_str()
125 }));
126
127 let address = SocketAddr::from(([0, 0, 0, 0], config.port));
130 let listener = tokio::net::TcpListener::bind(address).await;
131
132 match listener {
133 Ok(listener) => {
134 info!(address = %address, "Webconfig server listening");
135 Ok(warp::serve(
136 ws.or(cgi)
137 .or(json_rpc)
138 .or(files)
139 .with(warp::filters::log::log("hyperion::web")),
140 )
141 .run_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)))
142 }
143 Err(error) => Err(error),
144 }
145}