/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line | Count | Source |
1 | | use axum::Router; |
2 | | use hyper_util::rt::{TokioExecutor, TokioIo}; |
3 | | use hyper_util::server::conn::auto; |
4 | | use hyper_util::service::TowerToHyperService; |
5 | | use lyquor_api::anyhow; |
6 | | use lyquor_image_store::OciDistributionStore; |
7 | | use lyquor_jsonrpc::client::ClientHandle; |
8 | | use std::sync::Arc; |
9 | | use tokio::net::TcpListener; |
10 | | use tokio_util::sync::CancellationToken; |
11 | | use tokio_util::task::TaskTracker; |
12 | | |
13 | | pub mod httptls; |
14 | | mod jsonrpc; |
15 | | mod lyquid; |
16 | | mod node; |
17 | | mod oci; |
18 | | |
19 | | pub use self::lyquid::LyquidServiceContext; |
20 | | pub use self::node::NodeServiceContext; |
21 | | |
22 | | pub struct HttpServer { |
23 | | local_addr: String, |
24 | | shutdown: CancellationToken, |
25 | | task_tracker: TaskTracker, |
26 | | } |
27 | | |
28 | | impl std::fmt::Debug for HttpServer { |
29 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
30 | 0 | f.debug_struct("HttpServer") |
31 | 0 | .field("local_addr", &self.local_addr) |
32 | 0 | .finish() |
33 | 0 | } |
34 | | } |
35 | | |
36 | | impl HttpServer { |
37 | 0 | pub async fn new( |
38 | 0 | bind_addr: &str, client: ClientHandle, fco: lyquor_seq::fco::FCOHandle, |
39 | 0 | process_manager: lyquor_hosting::ProcessManagerHandle, registry: lyquor_hosting::LyquidRegistry, |
40 | 0 | node_service_context: NodeServiceContext, oci_store: Arc<dyn OciDistributionStore>, |
41 | 0 | shutdown: CancellationToken, http_tls: Option<httptls::HttpsTlsRuntime>, |
42 | 0 | ) -> anyhow::Result<Self> { |
43 | 0 | let listener = TcpListener::bind(bind_addr).await?; |
44 | 0 | let local_addr = listener.local_addr()?.to_string(); |
45 | | |
46 | 0 | let router = build_router( |
47 | 0 | jsonrpc::JsonRpcRouteState::new( |
48 | 0 | jsonrpc::JsonRpcContext::new(client.clone(), process_manager.clone(), registry.clone(), fco), |
49 | 0 | shutdown.child_token(), |
50 | | ), |
51 | 0 | Arc::new(LyquidServiceContext::new(process_manager.clone(), registry)), |
52 | 0 | Arc::new(node_service_context.clone()), |
53 | 0 | oci::OciRouteState::new(oci_store.clone()), |
54 | | ); |
55 | | |
56 | 0 | let cert_state_rx = http_tls.as_ref().map(httptls::HttpsTlsRuntime::cert_state_receiver); |
57 | | |
58 | 0 | let task_tracker = TaskTracker::new(); |
59 | 0 | task_tracker.spawn(run_http_server(listener, router, cert_state_rx, shutdown.clone())); |
60 | | |
61 | | // Don't change this line as it is parsed by tests. |
62 | 0 | tracing::info!("API listening at {local_addr}"); |
63 | | |
64 | 0 | Ok(Self { |
65 | 0 | local_addr, |
66 | 0 | shutdown, |
67 | 0 | task_tracker, |
68 | 0 | }) |
69 | 0 | } |
70 | | |
71 | 0 | pub fn stop(&self) { |
72 | | // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled |
73 | | // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`. |
74 | 0 | self.shutdown.cancel(); |
75 | 0 | self.task_tracker.close(); |
76 | 0 | } |
77 | | |
78 | 0 | pub async fn wait(&self) { |
79 | 0 | self.task_tracker.wait().await; |
80 | 0 | } |
81 | | } |
82 | | |
83 | 0 | fn build_router( |
84 | 0 | jsonrpc_state: jsonrpc::JsonRpcRouteState, lyquid_service_context: Arc<lyquid::LyquidServiceContext>, |
85 | 0 | node_service_context: Arc<node::NodeServiceContext>, oci_state: oci::OciRouteState, |
86 | 0 | ) -> Router { |
87 | | // Keep top-level router composition in mod.rs so adding REST APIs later only |
88 | | // extends this function. |
89 | 0 | jsonrpc::build_jsonrpc_router(jsonrpc_state) |
90 | 0 | .merge(oci::build_oci_router(oci_state)) |
91 | 0 | .merge(node::build_node_router(node_service_context)) |
92 | 0 | .merge(lyquid::build_lyquid_router(lyquid_service_context)) |
93 | 0 | } |
94 | | |
95 | 0 | async fn serve_connection<I>(i: I, app: Router, shutdown: CancellationToken) |
96 | 0 | where |
97 | 0 | I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, |
98 | 0 | { |
99 | 0 | let service = TowerToHyperService::new(app); |
100 | 0 | let builder = auto::Builder::new(TokioExecutor::new()); |
101 | 0 | let mut connection = std::pin::pin!(builder.serve_connection_with_upgrades(TokioIo::new(i), service)); |
102 | 0 | tokio::select! { |
103 | | biased; |
104 | 0 | _ = shutdown.cancelled() => { |
105 | | // Shutdown signal received, stop accepting new requests but allow in-flight requests to complete. |
106 | 0 | tracing::info!("Shutting down HTTP connection."); |
107 | 0 | connection.as_mut().graceful_shutdown(); |
108 | | } |
109 | 0 | res = connection.as_mut() => { |
110 | 0 | if let Err(e) = res { |
111 | 0 | tracing::trace!("HTTP API connection error: {e:?}."); |
112 | 0 | } |
113 | | } |
114 | | } |
115 | 0 | } |
116 | | |
117 | 0 | async fn run_http_server( |
118 | 0 | listener: TcpListener, router: Router, cert_state_rx: Option<httptls::HttpsCertReceiver>, |
119 | 0 | shutdown: CancellationToken, |
120 | 0 | ) { |
121 | | loop { |
122 | 0 | let (socket, _remote) = tokio::select! { |
123 | | biased; |
124 | 0 | _ = shutdown.cancelled() => break, |
125 | 0 | res = listener.accept() => match res { |
126 | 0 | Ok(pair) => pair, |
127 | 0 | Err(e) => { |
128 | 0 | tracing::error!("Failed to accept HTTP API connection: {e:?}."); |
129 | 0 | continue; |
130 | | } |
131 | | }, |
132 | | }; |
133 | | |
134 | 0 | let tls = cert_state_rx.as_ref().map(|rx| rx.acceptor()); |
135 | 0 | let app = router.clone(); |
136 | 0 | let connection_shutdown = shutdown.child_token(); |
137 | | |
138 | 0 | tokio::spawn(async move { |
139 | 0 | if let Some(tls) = tls { |
140 | 0 | match tls.accept(socket).await { |
141 | 0 | Ok(tls_stream) => serve_connection(tls_stream, app, connection_shutdown).await, |
142 | 0 | Err(e) => { |
143 | 0 | tracing::trace!("HTTPS handshake error: {e:?}."); |
144 | 0 | return; |
145 | | } |
146 | | } |
147 | | } else { |
148 | 0 | serve_connection(socket, app, connection_shutdown).await; |
149 | | } |
150 | 0 | }); |
151 | | } |
152 | 0 | } |