1#[macro_use]
2extern crate tracing;
3
4use std::path::PathBuf;
5
6use hyperion::effects::EffectRegistry;
7use structopt::StructOpt;
8use tokio::runtime::Builder;
9use tokio::signal;
10
11use hyperion::models::backend::ConfigExt;
12
13#[derive(Debug, StructOpt)]
14struct Opts {
15 #[structopt(short, long, parse(from_occurrences))]
17 verbose: u32,
18 #[structopt(
20 short,
21 long = "db-path",
22 default_value = "$ROOT/hyperion.db",
23 env = "DATABASE_URL"
24 )]
25 database_path: PathBuf,
26 #[structopt(short, long = "config")]
28 config_path: Option<PathBuf>,
29 #[structopt(long)]
31 dump_config: bool,
32 #[structopt(long)]
35 user_root: Option<PathBuf>,
36 #[structopt(long)]
38 core_threads: Option<usize>,
39}
40
41async fn run(opts: Opts) -> color_eyre::eyre::Result<()> {
42 let paths = hyperion::global::Paths::new(opts.user_root.clone())?;
44
45 let mut backend: Box<dyn hyperion::models::backend::ConfigBackend> =
47 if let Some(config_path) = opts.config_path.as_deref() {
48 Box::new(hyperion::models::backend::FileBackend::new(config_path))
49 } else {
50 let db = hyperion::db::Db::open(&paths.resolve_path(opts.database_path)).await?;
52 Box::new(hyperion::models::backend::DbBackend::new(db))
53 };
54
55 let config = backend.load().await?;
56
57 if opts.dump_config {
59 print!("{}", config.to_string()?);
60 return Ok(());
61 }
62
63 let global = hyperion::global::GlobalData::new(&config).wrap();
65
66 let mut effects = EffectRegistry::new();
68 let providers = hyperion::effects::Providers::new();
69
70 for path in ["$SYSTEM/effects"] {
72 let path = paths.resolve_path(path);
74
75 let mut discovered = hyperion::effects::EffectDefinition::read_dir(&path).await?;
77 discovered.sort_by(|a, b| a.file.cmp(&b.file));
78
79 effects.add_definitions(&providers, discovered);
81 }
82
83 info!("discovered {} effects", effects.len());
84
85 global
86 .write_effects(|e| {
87 *e = effects;
89 })
90 .await;
91
92 tokio::spawn(
94 hyperion::global::HookRunner::new(
95 config.global.hooks.clone(),
96 global.subscribe_events().await,
97 )
98 .run(),
99 );
100
101 let mut instances = Vec::with_capacity(config.instances.len());
103
104 for (&id, inst) in &config.instances {
106 let (inst, handle) = hyperion::instance::Instance::new(global.clone(), inst.clone()).await;
108 global.register_instance(handle.clone()).await;
110 instances.push(handle);
112 tokio::spawn({
114 let global = global.clone();
115 let event_tx = global.get_event_tx().await;
116
117 async move {
118 event_tx
119 .send(hyperion::global::Event::instance(
120 id,
121 hyperion::global::InstanceEventKind::Start,
122 ))
123 .map(|_| ())
124 .unwrap_or_else(|err| {
125 error!(error = %err, "event error");
126 });
127
128 let result = inst.run().await;
129
130 if let Err(error) = result {
131 error!(error = %error, "instance error");
132 }
133
134 global.unregister_instance(id).await;
135
136 event_tx
137 .send(hyperion::global::Event::instance(
138 id,
139 hyperion::global::InstanceEventKind::Stop,
140 ))
141 .map(|_| ())
142 .unwrap_or_else(|err| {
143 error!(error = %err, "event error");
144 });
145 }
146 });
147 }
148
149 let _flatbuffers_server = if config.global.flatbuffers_server.enable {
151 Some(
152 hyperion::servers::bind(
153 "Flatbuffers",
154 config.global.flatbuffers_server.clone(),
155 global.clone(),
156 hyperion::servers::flat::handle_client,
157 )
158 .await?,
159 )
160 } else {
161 None
162 };
163
164 let _json_server = hyperion::servers::bind(
166 "JSON",
167 config.global.json_server,
168 global.clone(),
169 hyperion::servers::json::handle_client,
170 )
171 .await?;
172
173 let _proto_server = if config.global.proto_server.enable {
175 Some(
176 hyperion::servers::bind(
177 "Protobuf",
178 config.global.proto_server.clone(),
179 global.clone(),
180 hyperion::servers::proto::handle_client,
181 )
182 .await?,
183 )
184 } else {
185 None
186 };
187
188 let _webconfig_server = tokio::task::spawn(
190 hyperion::web::bind(global.clone(), &config.global.web_config, &paths).await?,
191 );
192
193 let event_tx = global.get_event_tx().await;
195
196 event_tx.send(hyperion::global::Event::Start)?;
198
199 let mut abort = false;
201
202 while !abort {
203 tokio::select! {
204 _ = signal::ctrl_c() => {
205 abort = true;
206 }
207 }
208 }
209
210 for instance in instances.into_iter() {
212 instance.stop().await.ok();
213 }
214
215 event_tx.send(hyperion::global::Event::Stop)?;
217
218 Ok(())
219}
220
221fn install_tracing(opts: &Opts) -> Result<(), tracing_subscriber::util::TryInitError> {
222 use tracing_error::ErrorLayer;
223 use tracing_subscriber::{fmt, prelude::*, EnvFilter};
224
225 let fmt_layer = fmt::layer();
226
227 let filter_layer = EnvFilter::try_from_env("HYPERION_LOG").unwrap_or_else(|_| {
228 EnvFilter::new(match opts.verbose {
229 0 => "hyperion=warn,hyperiond=warn",
230 1 => "hyperion=info,hyperiond=info",
231 2 => "hyperion=debug,hyperiond=debug",
232 _ => "hyperion=trace,hyperiond=trace",
233 })
234 });
235
236 tracing_subscriber::registry()
237 .with(filter_layer)
238 .with(fmt_layer)
239 .with(ErrorLayer::default())
240 .try_init()
241}
242
243#[paw::main]
244fn main(opts: Opts) -> color_eyre::eyre::Result<()> {
245 color_eyre::install()?;
246 install_tracing(&opts)?;
247
248 let thd_count = opts
250 .core_threads
251 .and_then(|n| if n > 0 { Some(n) } else { None })
252 .unwrap_or_else(|| num_cpus::get().clamp(2, 4));
253 let rt = Builder::new_multi_thread()
254 .worker_threads(thd_count)
255 .enable_all()
256 .build()?;
257 rt.block_on(run(opts))
258}