Coverage Report

Created: 2026-05-21 08:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line
Count
Source
1
use axum::Router;
2
use hyper_util::rt::{TokioExecutor, TokioIo};
3
use hyper_util::server::conn::auto;
4
use hyper_util::service::TowerToHyperService;
5
use lyquor_api::anyhow;
6
use lyquor_image_store::OciDistributionStore;
7
use lyquor_jsonrpc::client::ClientHandle;
8
use std::sync::Arc;
9
use tokio::net::TcpListener;
10
use tokio_util::sync::CancellationToken;
11
use tokio_util::task::TaskTracker;
12
13
pub mod httptls;
14
mod jsonrpc;
15
mod lyquid;
16
mod node;
17
mod oci;
18
19
pub use self::lyquid::LyquidServiceContext;
20
pub use self::node::NodeServiceContext;
21
22
pub struct HttpServer {
23
    local_addr: String,
24
    shutdown: CancellationToken,
25
    task_tracker: TaskTracker,
26
}
27
28
impl std::fmt::Debug for HttpServer {
29
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30
0
        f.debug_struct("HttpServer")
31
0
            .field("local_addr", &self.local_addr)
32
0
            .finish()
33
0
    }
34
}
35
36
impl HttpServer {
37
0
    pub async fn new(
38
0
        bind_addr: &str, client: ClientHandle, fco: lyquor_seq::fco::FCOHandle,
39
0
        process_manager: lyquor_hosting::ProcessManagerHandle, registry: lyquor_hosting::LyquidRegistry,
40
0
        node_service_context: NodeServiceContext, oci_store: Arc<dyn OciDistributionStore>,
41
0
        shutdown: CancellationToken, http_tls: Option<httptls::HttpsTlsRuntime>,
42
0
    ) -> anyhow::Result<Self> {
43
0
        let listener = TcpListener::bind(bind_addr).await?;
44
0
        let local_addr = listener.local_addr()?.to_string();
45
46
0
        let router = build_router(
47
0
            jsonrpc::JsonRpcRouteState::new(
48
0
                jsonrpc::JsonRpcContext::new(client.clone(), process_manager.clone(), registry.clone(), fco),
49
0
                shutdown.child_token(),
50
            ),
51
0
            Arc::new(LyquidServiceContext::new(process_manager.clone(), registry)),
52
0
            Arc::new(node_service_context.clone()),
53
0
            oci::OciRouteState::new(oci_store.clone()),
54
        );
55
56
0
        let cert_state_rx = http_tls.as_ref().map(httptls::HttpsTlsRuntime::cert_state_receiver);
57
58
0
        let task_tracker = TaskTracker::new();
59
0
        task_tracker.spawn(run_http_server(listener, router, cert_state_rx, shutdown.clone()));
60
61
        // Don't change this line as it is parsed by tests.
62
0
        tracing::info!("API listening at {local_addr}");
63
64
0
        Ok(Self {
65
0
            local_addr,
66
0
            shutdown,
67
0
            task_tracker,
68
0
        })
69
0
    }
70
71
0
    pub fn stop(&self) {
72
        // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled
73
        // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`.
74
0
        self.shutdown.cancel();
75
0
        self.task_tracker.close();
76
0
    }
77
78
0
    pub async fn wait(&self) {
79
0
        self.task_tracker.wait().await;
80
0
    }
81
}
82
83
0
fn build_router(
84
0
    jsonrpc_state: jsonrpc::JsonRpcRouteState, lyquid_service_context: Arc<lyquid::LyquidServiceContext>,
85
0
    node_service_context: Arc<node::NodeServiceContext>, oci_state: oci::OciRouteState,
86
0
) -> Router {
87
    // Keep top-level router composition in mod.rs so adding REST APIs later only
88
    // extends this function.
89
0
    jsonrpc::build_jsonrpc_router(jsonrpc_state)
90
0
        .merge(oci::build_oci_router(oci_state))
91
0
        .merge(node::build_node_router(node_service_context))
92
0
        .merge(lyquid::build_lyquid_router(lyquid_service_context))
93
0
}
94
95
0
async fn serve_connection<I>(i: I, app: Router, shutdown: CancellationToken)
96
0
where
97
0
    I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
98
0
{
99
0
    let service = TowerToHyperService::new(app);
100
0
    let builder = auto::Builder::new(TokioExecutor::new());
101
0
    let mut connection = std::pin::pin!(builder.serve_connection_with_upgrades(TokioIo::new(i), service));
102
0
    tokio::select! {
103
        biased;
104
0
        _ = shutdown.cancelled() => {
105
            // Shutdown signal received, stop accepting new requests but allow in-flight requests to complete.
106
0
            tracing::info!("Shutting down HTTP connection.");
107
0
            connection.as_mut().graceful_shutdown();
108
        }
109
0
        res = connection.as_mut() => {
110
0
            if let Err(e) = res {
111
0
                tracing::trace!("HTTP API connection error: {e:?}.");
112
0
            }
113
        }
114
    }
115
0
}
116
117
0
async fn run_http_server(
118
0
    listener: TcpListener, router: Router, cert_state_rx: Option<httptls::HttpsCertReceiver>,
119
0
    shutdown: CancellationToken,
120
0
) {
121
    loop {
122
0
        let (socket, _remote) = tokio::select! {
123
            biased;
124
0
            _ = shutdown.cancelled() => break,
125
0
            res = listener.accept() => match res {
126
0
                Ok(pair) => pair,
127
0
                Err(e) => {
128
0
                    tracing::error!("Failed to accept HTTP API connection: {e:?}.");
129
0
                    continue;
130
                }
131
            },
132
        };
133
134
0
        let tls = cert_state_rx.as_ref().map(|rx| rx.acceptor());
135
0
        let app = router.clone();
136
0
        let connection_shutdown = shutdown.child_token();
137
138
0
        tokio::spawn(async move {
139
0
            if let Some(tls) = tls {
140
0
                match tls.accept(socket).await {
141
0
                    Ok(tls_stream) => serve_connection(tls_stream, app, connection_shutdown).await,
142
0
                    Err(e) => {
143
0
                        tracing::trace!("HTTPS handshake error: {e:?}.");
144
0
                        return;
145
                    }
146
                }
147
            } else {
148
0
                serve_connection(socket, app, connection_shutdown).await;
149
            }
150
0
        });
151
    }
152
0
}