/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line | Count | Source |
1 | | use actix::prelude::*; |
2 | | use axum::Router; |
3 | | use jsonrpsee::server::serve_with_graceful_shutdown; |
4 | | use lyquor_api::anyhow; |
5 | | use lyquor_image_store::OciDistributionStore; |
6 | | use lyquor_jsonrpc::client::ClientHandle; |
7 | | use std::sync::Arc; |
8 | | use tokio::net::TcpListener; |
9 | | use tokio_util::sync::CancellationToken; |
10 | | |
11 | | pub mod httptls; |
12 | | mod jsonrpc; |
13 | | mod lyquid; |
14 | | mod node; |
15 | | mod oci; |
16 | | |
17 | | pub use self::lyquid::LyquidServiceContext; |
18 | | pub use self::node::NodeServiceContext; |
19 | | |
20 | | pub struct HttpServer { |
21 | | bind_addr: String, |
22 | | jsonrpc: jsonrpc::JsonRpcContext, |
23 | | lyquid_service_context: Arc<lyquid::LyquidServiceContext>, |
24 | | node_service_context: Arc<node::NodeServiceContext>, |
25 | | oci: oci::OciRouteState, |
26 | | http_tls: Option<httptls::HttpTls>, |
27 | | shutdown: CancellationToken, |
28 | | task: Option<tokio::task::JoinHandle<()>>, |
29 | | } |
30 | | |
31 | | impl std::fmt::Debug for HttpServer { |
32 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
33 | 0 | f.debug_struct("HttpServer") |
34 | 0 | .field("bind_addr", &self.bind_addr) |
35 | 0 | .finish() |
36 | 0 | } |
37 | | } |
38 | | |
39 | | impl HttpServer { |
40 | 0 | pub async fn new( |
41 | 0 | bind_addr: &str, client: ClientHandle, fco: Addr<lyquor_seq::fco::FCO>, pool: Addr<crate::lyquid::LyquidPool>, |
42 | 0 | node_service_context: NodeServiceContext, oci_store: Arc<dyn OciDistributionStore>, |
43 | 0 | shutdown: CancellationToken, http_tls: Option<httptls::HttpTls>, |
44 | 0 | ) -> Result<Self, anyhow::Error> { |
45 | 0 | let bind_addr = bind_addr.into(); |
46 | 0 | Ok(Self { |
47 | 0 | bind_addr, |
48 | 0 | jsonrpc: jsonrpc::JsonRpcContext::new(client, pool.clone(), fco), |
49 | 0 | lyquid_service_context: Arc::new(LyquidServiceContext::new(pool)), |
50 | 0 | node_service_context: Arc::new(node_service_context), |
51 | 0 | oci: oci::OciRouteState::new(oci_store), |
52 | 0 | http_tls, |
53 | 0 | shutdown, |
54 | 0 | task: None, |
55 | 0 | }) |
56 | 0 | } |
57 | | |
58 | 0 | pub fn start(&mut self) { |
59 | 0 | if self.task.is_some() { |
60 | 0 | tracing::warn!("HttpServer already started."); |
61 | 0 | return; |
62 | 0 | } |
63 | | |
64 | 0 | let bind_addr = self.bind_addr.clone(); |
65 | 0 | let jsonrpc_state = jsonrpc::JsonRpcRouteState::new(self.jsonrpc.clone(), self.shutdown.child_token()); |
66 | 0 | let lyquid_service_context = self.lyquid_service_context.clone(); |
67 | 0 | let node_service_context = self.node_service_context.clone(); |
68 | 0 | let oci_state = self.oci.clone(); |
69 | 0 | let shutdown = self.shutdown.child_token(); |
70 | 0 | let http_tls = self.http_tls.clone(); |
71 | | |
72 | 0 | self.task = Some(tokio::spawn(async move { |
73 | 0 | run_http_server( |
74 | 0 | bind_addr, |
75 | 0 | jsonrpc_state, |
76 | 0 | lyquid_service_context, |
77 | 0 | node_service_context, |
78 | 0 | oci_state, |
79 | 0 | http_tls, |
80 | 0 | shutdown, |
81 | 0 | ) |
82 | 0 | .await; |
83 | 0 | })); |
84 | 0 | } |
85 | | |
86 | 0 | pub fn stop(&self) { |
87 | | // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled |
88 | | // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`. |
89 | 0 | self.shutdown.cancel(); |
90 | 0 | } |
91 | | } |
92 | | |
93 | 0 | fn build_router( |
94 | 0 | jsonrpc_state: jsonrpc::JsonRpcRouteState, lyquid_service_context: Arc<lyquid::LyquidServiceContext>, |
95 | 0 | node_service_context: Arc<node::NodeServiceContext>, oci_state: oci::OciRouteState, |
96 | 0 | ) -> Router { |
97 | | // Keep top-level router composition in mod.rs so adding REST APIs later only |
98 | | // extends this function. |
99 | 0 | node::build_node_router(node_service_context) |
100 | 0 | .merge(lyquid::build_lyquid_router(lyquid_service_context)) |
101 | 0 | .merge(oci::build_oci_router(oci_state)) |
102 | 0 | .merge(jsonrpc::build_jsonrpc_router(jsonrpc_state)) |
103 | 0 | } |
104 | | |
105 | 0 | async fn run_http_server( |
106 | 0 | bind_addr: String, jsonrpc_state: jsonrpc::JsonRpcRouteState, |
107 | 0 | lyquid_service_context: Arc<lyquid::LyquidServiceContext>, node_service_context: Arc<node::NodeServiceContext>, |
108 | 0 | oci_state: oci::OciRouteState, http_tls: Option<httptls::HttpTls>, shutdown: CancellationToken, |
109 | 0 | ) { |
110 | 0 | let listener = match TcpListener::bind(&bind_addr).await { |
111 | 0 | Ok(listener) => listener, |
112 | 0 | Err(e) => { |
113 | 0 | tracing::error!("Failed to start HTTP server: {e:?}."); |
114 | 0 | return; |
115 | | } |
116 | | }; |
117 | | |
118 | 0 | if let Some(tls) = &http_tls { |
119 | 0 | tls.spawn_renewal_worker(shutdown.child_token()); |
120 | 0 | } |
121 | | |
122 | | // Don't change this line as it is parsed by tests. |
123 | 0 | tracing::info!("API listening at {}", bind_addr); |
124 | | |
125 | | loop { |
126 | 0 | let (socket, _remote) = tokio::select! { |
127 | 0 | res = listener.accept() => match res { |
128 | 0 | Ok(pair) => pair, |
129 | 0 | Err(e) => { |
130 | 0 | tracing::error!("Failed to accept HTTP API connection: {e:?}."); |
131 | 0 | continue; |
132 | | } |
133 | | }, |
134 | 0 | _ = shutdown.cancelled() => break, |
135 | | }; |
136 | | |
137 | 0 | let app = build_router( |
138 | 0 | jsonrpc_state.clone(), |
139 | 0 | lyquid_service_context.clone(), |
140 | 0 | node_service_context.clone(), |
141 | 0 | oci_state.clone(), |
142 | | ); |
143 | 0 | let connection_shutdown = shutdown.child_token(); |
144 | 0 | let tls = { |
145 | 0 | if let Some(ht) = &http_tls { |
146 | 0 | ht.current_acceptor() |
147 | | } else { |
148 | 0 | None |
149 | | } |
150 | | }; |
151 | | |
152 | | // TODO: do we need to split TLS and non-TLS ports? |
153 | 0 | tokio::spawn(async move { |
154 | 0 | if let Some(tls) = tls { |
155 | 0 | match tls.accept(socket).await { |
156 | 0 | Ok(tls_stream) => { |
157 | 0 | if let Err(e) = |
158 | 0 | serve_with_graceful_shutdown(tls_stream, app, connection_shutdown.cancelled()).await |
159 | | { |
160 | 0 | tracing::debug!("HTTPS API connection error: {e:?}."); |
161 | 0 | } |
162 | | } |
163 | 0 | Err(e) => { |
164 | 0 | tracing::debug!("HTTPS handshake error: {e:?}."); |
165 | | } |
166 | | } |
167 | 0 | } else if let Err(e) = serve_with_graceful_shutdown(socket, app, connection_shutdown.cancelled()).await { |
168 | 0 | tracing::debug!("HTTP API connection error: {e:?}."); |
169 | 0 | } |
170 | 0 | }); |
171 | | } |
172 | 0 | } |