hyperiond/
main.rs

1#[macro_use]
2extern crate tracing;
3
4use std::path::PathBuf;
5
6use hyperion::effects::EffectRegistry;
7use structopt::StructOpt;
8use tokio::runtime::Builder;
9use tokio::signal;
10
11use hyperion::models::backend::ConfigExt;
12
13#[derive(Debug, StructOpt)]
14struct Opts {
15    /// Log verbosity. Overrides logger level in config, but is overridden by HYPERION_LOG
16    #[structopt(short, long, parse(from_occurrences))]
17    verbose: u32,
18    /// Path to the configuration database
19    #[structopt(
20        short,
21        long = "db-path",
22        default_value = "$ROOT/hyperion.db",
23        env = "DATABASE_URL"
24    )]
25    database_path: PathBuf,
26    /// Path to a TOML config file. Overrides the configuration database
27    #[structopt(short, long = "config")]
28    config_path: Option<PathBuf>,
29    /// Dump the loaded configuration
30    #[structopt(long)]
31    dump_config: bool,
32    /// Path to the user root folder. Defaults to .config/hyperion.rs (Linux) or
33    /// %APPDATA%\hyperion.rs (Windows)
34    #[structopt(long)]
35    user_root: Option<PathBuf>,
36    /// Number of threads to use for the async runtime
37    #[structopt(long)]
38    core_threads: Option<usize>,
39}
40
41async fn run(opts: Opts) -> color_eyre::eyre::Result<()> {
42    // Path resolver
43    let paths = hyperion::global::Paths::new(opts.user_root.clone())?;
44
45    // Load configuration
46    let mut backend: Box<dyn hyperion::models::backend::ConfigBackend> =
47        if let Some(config_path) = opts.config_path.as_deref() {
48            Box::new(hyperion::models::backend::FileBackend::new(config_path))
49        } else {
50            // Connect to database
51            let db = hyperion::db::Db::open(&paths.resolve_path(opts.database_path)).await?;
52            Box::new(hyperion::models::backend::DbBackend::new(db))
53        };
54
55    let config = backend.load().await?;
56
57    // Dump configuration if this was asked
58    if opts.dump_config {
59        print!("{}", config.to_string()?);
60        return Ok(());
61    }
62
63    // Create the global state object
64    let global = hyperion::global::GlobalData::new(&config).wrap();
65
66    // Discover effects
67    let mut effects = EffectRegistry::new();
68    let providers = hyperion::effects::Providers::new();
69
70    // TODO: Per-instance effect discovery
71    for path in ["$SYSTEM/effects"] {
72        // Resolve path variables
73        let path = paths.resolve_path(path);
74
75        // Discover effect files
76        let mut discovered = hyperion::effects::EffectDefinition::read_dir(&path).await?;
77        discovered.sort_by(|a, b| a.file.cmp(&b.file));
78
79        // Register them
80        effects.add_definitions(&providers, discovered);
81    }
82
83    info!("discovered {} effects", effects.len());
84
85    global
86        .write_effects(|e| {
87            // Replace effect registry with our discovered one
88            *e = effects;
89        })
90        .await;
91
92    // Spawn the hook runner
93    tokio::spawn(
94        hyperion::global::HookRunner::new(
95            config.global.hooks.clone(),
96            global.subscribe_events().await,
97        )
98        .run(),
99    );
100
101    // Keep a list of all instances
102    let mut instances = Vec::with_capacity(config.instances.len());
103
104    // Initialize and spawn the devices
105    for (&id, inst) in &config.instances {
106        // Create the instance
107        let (inst, handle) = hyperion::instance::Instance::new(global.clone(), inst.clone()).await;
108        // Register the instance globally using its handle
109        global.register_instance(handle.clone()).await;
110        // Keep it around
111        instances.push(handle);
112        // Run the instance futures
113        tokio::spawn({
114            let global = global.clone();
115            let event_tx = global.get_event_tx().await;
116
117            async move {
118                event_tx
119                    .send(hyperion::global::Event::instance(
120                        id,
121                        hyperion::global::InstanceEventKind::Start,
122                    ))
123                    .map(|_| ())
124                    .unwrap_or_else(|err| {
125                        error!(error = %err, "event error");
126                    });
127
128                let result = inst.run().await;
129
130                if let Err(error) = result {
131                    error!(error = %error, "instance error");
132                }
133
134                global.unregister_instance(id).await;
135
136                event_tx
137                    .send(hyperion::global::Event::instance(
138                        id,
139                        hyperion::global::InstanceEventKind::Stop,
140                    ))
141                    .map(|_| ())
142                    .unwrap_or_else(|err| {
143                        error!(error = %err, "event error");
144                    });
145            }
146        });
147    }
148
149    // Start the Flatbuffers servers
150    let _flatbuffers_server = if config.global.flatbuffers_server.enable {
151        Some(
152            hyperion::servers::bind(
153                "Flatbuffers",
154                config.global.flatbuffers_server.clone(),
155                global.clone(),
156                hyperion::servers::flat::handle_client,
157            )
158            .await?,
159        )
160    } else {
161        None
162    };
163
164    // Start the JSON server
165    let _json_server = hyperion::servers::bind(
166        "JSON",
167        config.global.json_server,
168        global.clone(),
169        hyperion::servers::json::handle_client,
170    )
171    .await?;
172
173    // Start the Protobuf server
174    let _proto_server = if config.global.proto_server.enable {
175        Some(
176            hyperion::servers::bind(
177                "Protobuf",
178                config.global.proto_server.clone(),
179                global.clone(),
180                hyperion::servers::proto::handle_client,
181            )
182            .await?,
183        )
184    } else {
185        None
186    };
187
188    // Start the webconfig server
189    let _webconfig_server = tokio::task::spawn(
190        hyperion::web::bind(global.clone(), &config.global.web_config, &paths).await?,
191    );
192
193    // Global event handle
194    let event_tx = global.get_event_tx().await;
195
196    // We have started
197    event_tx.send(hyperion::global::Event::Start)?;
198
199    // Should we continue running?
200    let mut abort = false;
201
202    while !abort {
203        tokio::select! {
204            _ = signal::ctrl_c() => {
205                abort = true;
206            }
207        }
208    }
209
210    // Stop all instances
211    for instance in instances.into_iter() {
212        instance.stop().await.ok();
213    }
214
215    // We have finished running properly
216    event_tx.send(hyperion::global::Event::Stop)?;
217
218    Ok(())
219}
220
221fn install_tracing(opts: &Opts) -> Result<(), tracing_subscriber::util::TryInitError> {
222    use tracing_error::ErrorLayer;
223    use tracing_subscriber::{fmt, prelude::*, EnvFilter};
224
225    let fmt_layer = fmt::layer();
226
227    let filter_layer = EnvFilter::try_from_env("HYPERION_LOG").unwrap_or_else(|_| {
228        EnvFilter::new(match opts.verbose {
229            0 => "hyperion=warn,hyperiond=warn",
230            1 => "hyperion=info,hyperiond=info",
231            2 => "hyperion=debug,hyperiond=debug",
232            _ => "hyperion=trace,hyperiond=trace",
233        })
234    });
235
236    tracing_subscriber::registry()
237        .with(filter_layer)
238        .with(fmt_layer)
239        .with(ErrorLayer::default())
240        .try_init()
241}
242
243#[paw::main]
244fn main(opts: Opts) -> color_eyre::eyre::Result<()> {
245    color_eyre::install()?;
246    install_tracing(&opts)?;
247
248    // Create tokio runtime
249    let thd_count = opts
250        .core_threads
251        .and_then(|n| if n > 0 { Some(n) } else { None })
252        .unwrap_or_else(|| num_cpus::get().clamp(2, 4));
253    let rt = Builder::new_multi_thread()
254        .worker_threads(thd_count)
255        .enable_all()
256        .build()?;
257    rt.block_on(run(opts))
258}