1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::net::SocketAddr;

use futures::{Future, SinkExt, StreamExt};
use warp::{http::StatusCode, path::FullPath, Filter, Rejection};

use crate::{
    api::json::message,
    global::{Global, Paths},
    models::WebConfig,
};

mod session;
use session::*;

pub async fn bind(
    global: Global,
    config: &WebConfig,
    paths: &Paths,
) -> Result<impl Future<Output = ()>, std::io::Error> {
    let session_store = SessionStore::new(config.max_sessions as _);

    let ws = warp::ws()
        .and(session_store.request())
        .and(warp::filters::addr::remote())
        .and({
            let global = global.clone();
            warp::any().map(move || global.clone())
        })
        .map(
            |ws: warp::ws::Ws,
             session: SessionInstance,
             _remote: Option<SocketAddr>,
             global: Global| {
                (
                    ws.on_upgrade({
                        let session = session.session().clone();

                        move |websocket| {
                            // Just echo all messages back...
                            let (mut tx, mut rx) = websocket.split();

                            async move {
                                while let Some(result) = rx.next().await {
                                    if let Some(message) =
                                        session.write().await.handle_result(&global, result).await
                                    {
                                        if let Err(error) = tx.send(message).await {
                                            warn!(error = %error, "websocket error");
                                        }
                                    } else {
                                        break;
                                    }
                                }
                            }
                        }
                    }),
                    session,
                )
            },
        )
        .untuple_one()
        .and_then(reply_session);

    let cgi = warp::path("cgi").and(
        warp::path("cfg_jsonserver")
            .and_then({
                let global = global.clone();
                move || {
                    let global = global.clone();

                    async move {
                        Ok::<_, Rejection>(format!(
                            ":{}",
                            global
                                .read_config(|config| config.global.json_server.port)
                                .await
                        ))
                    }
                }
            })
            .or(warp::path("run")
                .and(warp::path::full())
                .map(|full_path: FullPath| {
                    // TODO: Implement run?
                    warp::reply::with_status(
                        format!("script failed ({})", full_path.as_str()),
                        StatusCode::INTERNAL_SERVER_ERROR,
                    )
                })),
    );

    let json_rpc = warp::path("json-rpc")
        .and(warp::body::json())
        .and(warp::filters::header::optional("Authorization"))
        .and(session_store.request())
        .and(warp::filters::addr::remote())
        .and(warp::any().map(move || global.clone()))
        .and_then(
            |request: message::HyperionMessage,
             _authorization: Option<String>,
             session: SessionInstance,
             _remote: Option<SocketAddr>,
             global: Global| {
                async move {
                    let reply = warp::reply::json(
                        &session
                            .session()
                            .write()
                            .await
                            .handle_request(&global, request)
                            .await,
                    );

                    Ok::<_, Rejection>((reply, session))
                }
            },
        )
        .untuple_one()
        .and_then(reply_session);

    let files = warp::fs::dir(paths.resolve_path(if config.document_root.is_empty() {
        WebConfig::SYSTEM_DOCUMENT_ROOT
    } else {
        config.document_root.as_str()
    }));

    // TODO: Serve error pages from /errorpages/*

    let address = SocketAddr::from(([0, 0, 0, 0], config.port));
    let listener = tokio::net::TcpListener::bind(address).await;

    match listener {
        Ok(listener) => {
            info!(address = %address, "Webconfig server listening");
            Ok(warp::serve(
                ws.or(cgi)
                    .or(json_rpc)
                    .or(files)
                    .with(warp::filters::log::log("hyperion::web")),
            )
            .run_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)))
        }
        Err(error) => Err(error),
    }
}