/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line | Count | Source |
1 | | use actix::prelude::*; |
2 | | use axum::Router; |
3 | | use jsonrpsee::server::serve_with_graceful_shutdown; |
4 | | use lyquor_api::anyhow; |
5 | | use lyquor_jsonrpc::client::ClientHandle; |
6 | | use tokio::net::TcpListener; |
7 | | use tokio_util::sync::CancellationToken; |
8 | | |
9 | | mod jsonrpc; |
10 | | |
11 | | pub use jsonrpc::{NewConsoleLines, Subscriptions}; |
12 | | |
13 | | pub struct HttpServer { |
14 | | bind_addr: String, |
15 | | jsonrpc: jsonrpc::JsonRpcContext, |
16 | | shutdown: CancellationToken, |
17 | | task: Option<tokio::task::JoinHandle<()>>, |
18 | | } |
19 | | |
20 | | impl std::fmt::Debug for HttpServer { |
21 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
22 | 0 | f.debug_struct("HttpServer") |
23 | 0 | .field("bind_addr", &self.bind_addr) |
24 | 0 | .finish() |
25 | 0 | } |
26 | | } |
27 | | |
28 | | impl HttpServer { |
29 | 0 | pub async fn new( |
30 | 0 | bind_addr: &str, client: ClientHandle, console_sub: Addr<Subscriptions>, fco: Addr<lyquor_seq::fco::FCO>, |
31 | 0 | pool: Addr<crate::lyquid::LyquidPool>, |
32 | 0 | ) -> Result<Self, anyhow::Error> { |
33 | 0 | let bind_addr = bind_addr.into(); |
34 | 0 | Ok(Self { |
35 | 0 | bind_addr, |
36 | 0 | jsonrpc: jsonrpc::JsonRpcContext::new(client, console_sub, pool, fco), |
37 | 0 | shutdown: CancellationToken::new(), |
38 | 0 | task: None, |
39 | 0 | }) |
40 | 0 | } |
41 | | |
42 | 0 | pub fn start(&mut self) { |
43 | 0 | if self.task.is_some() { |
44 | 0 | tracing::warn!("HttpServer already started."); |
45 | 0 | return; |
46 | 0 | } |
47 | | |
48 | 0 | let bind_addr = self.bind_addr.clone(); |
49 | 0 | let route_state = jsonrpc::JsonRpcRouteState::new(self.jsonrpc.clone(), self.shutdown.clone()); |
50 | 0 | let shutdown = self.shutdown.clone(); |
51 | | |
52 | 0 | self.task = Some(tokio::spawn(async move { |
53 | 0 | run_http_server(bind_addr, route_state, shutdown).await; |
54 | 0 | })); |
55 | 0 | } |
56 | | |
57 | 0 | pub fn stop(&self) { |
58 | | // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled |
59 | | // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`. |
60 | 0 | self.shutdown.cancel(); |
61 | 0 | } |
62 | | } |
63 | | |
64 | 0 | fn build_router(route_state: jsonrpc::JsonRpcRouteState) -> Router { |
65 | | // Keep top-level router composition in mod.rs so adding REST APIs later only |
66 | | // extends this function. |
67 | 0 | jsonrpc::build_jsonrpc_router(route_state) |
68 | 0 | } |
69 | | |
70 | 0 | async fn run_http_server(bind_addr: String, route_state: jsonrpc::JsonRpcRouteState, shutdown: CancellationToken) { |
71 | 0 | let listener = match TcpListener::bind(&bind_addr).await { |
72 | 0 | Ok(listener) => listener, |
73 | 0 | Err(e) => { |
74 | 0 | tracing::error!("Failed to start HTTP server: {e:?}."); |
75 | 0 | return; |
76 | | } |
77 | | }; |
78 | | |
79 | | loop { |
80 | 0 | let (socket, _remote) = tokio::select! { |
81 | 0 | res = listener.accept() => match res { |
82 | 0 | Ok(pair) => pair, |
83 | 0 | Err(e) => { |
84 | 0 | tracing::error!("Failed to accept HTTP API connection: {e:?}."); |
85 | 0 | continue; |
86 | | } |
87 | | }, |
88 | 0 | _ = shutdown.cancelled() => break, |
89 | | }; |
90 | | |
91 | 0 | let app = build_router(route_state.clone()); |
92 | 0 | let connection_shutdown = shutdown.clone(); |
93 | | |
94 | 0 | tokio::spawn(async move { |
95 | 0 | if let Err(e) = serve_with_graceful_shutdown(socket, app, connection_shutdown.cancelled()).await { |
96 | 0 | tracing::debug!("HTTP API connection error: {e:?}."); |
97 | 0 | } |
98 | 0 | }); |
99 | | } |
100 | 0 | } |