Coverage Report

Created: 2026-04-05 04:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line
Count
Source
1
use actix::prelude::*;
2
use axum::Router;
3
use jsonrpsee::server::serve_with_graceful_shutdown;
4
use lyquor_api::anyhow;
5
use lyquor_image_store::OciDistributionStore;
6
use lyquor_jsonrpc::client::ClientHandle;
7
use std::sync::Arc;
8
use tokio::net::TcpListener;
9
use tokio_util::sync::CancellationToken;
10
11
pub mod httptls;
12
mod jsonrpc;
13
mod lyquid;
14
mod node;
15
mod oci;
16
17
pub use self::lyquid::LyquidServiceContext;
18
pub use self::node::NodeServiceContext;
19
20
pub struct HttpServer {
21
    bind_addr: String,
22
    jsonrpc: jsonrpc::JsonRpcContext,
23
    lyquid_service_context: Arc<lyquid::LyquidServiceContext>,
24
    node_service_context: Arc<node::NodeServiceContext>,
25
    oci: oci::OciRouteState,
26
    http_tls: Option<httptls::HttpTls>,
27
    shutdown: CancellationToken,
28
    task: Option<tokio::task::JoinHandle<()>>,
29
}
30
31
impl std::fmt::Debug for HttpServer {
32
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33
0
        f.debug_struct("HttpServer")
34
0
            .field("bind_addr", &self.bind_addr)
35
0
            .finish()
36
0
    }
37
}
38
39
impl HttpServer {
40
0
    pub async fn new(
41
0
        bind_addr: &str, client: ClientHandle, fco: Addr<lyquor_seq::fco::FCO>, pool: Addr<crate::lyquid::LyquidPool>,
42
0
        node_service_context: NodeServiceContext, oci_store: Arc<dyn OciDistributionStore>,
43
0
        shutdown: CancellationToken, http_tls: Option<httptls::HttpTls>,
44
0
    ) -> Result<Self, anyhow::Error> {
45
0
        let bind_addr = bind_addr.into();
46
0
        Ok(Self {
47
0
            bind_addr,
48
0
            jsonrpc: jsonrpc::JsonRpcContext::new(client, pool.clone(), fco),
49
0
            lyquid_service_context: Arc::new(LyquidServiceContext::new(pool)),
50
0
            node_service_context: Arc::new(node_service_context),
51
0
            oci: oci::OciRouteState::new(oci_store),
52
0
            http_tls,
53
0
            shutdown,
54
0
            task: None,
55
0
        })
56
0
    }
57
58
0
    pub fn start(&mut self) {
59
0
        if self.task.is_some() {
60
0
            tracing::warn!("HttpServer already started.");
61
0
            return;
62
0
        }
63
64
0
        let bind_addr = self.bind_addr.clone();
65
0
        let jsonrpc_state = jsonrpc::JsonRpcRouteState::new(self.jsonrpc.clone(), self.shutdown.child_token());
66
0
        let lyquid_service_context = self.lyquid_service_context.clone();
67
0
        let node_service_context = self.node_service_context.clone();
68
0
        let oci_state = self.oci.clone();
69
0
        let shutdown = self.shutdown.child_token();
70
0
        let http_tls = self.http_tls.clone();
71
72
0
        self.task = Some(tokio::spawn(async move {
73
0
            run_http_server(
74
0
                bind_addr,
75
0
                jsonrpc_state,
76
0
                lyquid_service_context,
77
0
                node_service_context,
78
0
                oci_state,
79
0
                http_tls,
80
0
                shutdown,
81
0
            )
82
0
            .await;
83
0
        }));
84
0
    }
85
86
0
    pub fn stop(&self) {
87
        // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled
88
        // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`.
89
0
        self.shutdown.cancel();
90
0
    }
91
}
92
93
0
fn build_router(
94
0
    jsonrpc_state: jsonrpc::JsonRpcRouteState, lyquid_service_context: Arc<lyquid::LyquidServiceContext>,
95
0
    node_service_context: Arc<node::NodeServiceContext>, oci_state: oci::OciRouteState,
96
0
) -> Router {
97
    // Keep top-level router composition in mod.rs so adding REST APIs later only
98
    // extends this function.
99
0
    node::build_node_router(node_service_context)
100
0
        .merge(lyquid::build_lyquid_router(lyquid_service_context))
101
0
        .merge(oci::build_oci_router(oci_state))
102
0
        .merge(jsonrpc::build_jsonrpc_router(jsonrpc_state))
103
0
}
104
105
0
async fn run_http_server(
106
0
    bind_addr: String, jsonrpc_state: jsonrpc::JsonRpcRouteState,
107
0
    lyquid_service_context: Arc<lyquid::LyquidServiceContext>, node_service_context: Arc<node::NodeServiceContext>,
108
0
    oci_state: oci::OciRouteState, http_tls: Option<httptls::HttpTls>, shutdown: CancellationToken,
109
0
) {
110
0
    let listener = match TcpListener::bind(&bind_addr).await {
111
0
        Ok(listener) => listener,
112
0
        Err(e) => {
113
0
            tracing::error!("Failed to start HTTP server: {e:?}.");
114
0
            return;
115
        }
116
    };
117
118
0
    if let Some(tls) = &http_tls {
119
0
        tls.spawn_renewal_worker(shutdown.child_token());
120
0
    }
121
122
    // Don't change this line as it is parsed by tests.
123
0
    tracing::info!("API listening at {}", bind_addr);
124
125
    loop {
126
0
        let (socket, _remote) = tokio::select! {
127
0
            res = listener.accept() => match res {
128
0
                Ok(pair) => pair,
129
0
                Err(e) => {
130
0
                    tracing::error!("Failed to accept HTTP API connection: {e:?}.");
131
0
                    continue;
132
                }
133
            },
134
0
            _ = shutdown.cancelled() => break,
135
        };
136
137
0
        let app = build_router(
138
0
            jsonrpc_state.clone(),
139
0
            lyquid_service_context.clone(),
140
0
            node_service_context.clone(),
141
0
            oci_state.clone(),
142
        );
143
0
        let connection_shutdown = shutdown.child_token();
144
0
        let tls = {
145
0
            if let Some(ht) = &http_tls {
146
0
                ht.current_acceptor()
147
            } else {
148
0
                None
149
            }
150
        };
151
152
        // TODO: do we need to split TLS and non-TLS ports?
153
0
        tokio::spawn(async move {
154
0
            if let Some(tls) = tls {
155
0
                match tls.accept(socket).await {
156
0
                    Ok(tls_stream) => {
157
0
                        if let Err(e) =
158
0
                            serve_with_graceful_shutdown(tls_stream, app, connection_shutdown.cancelled()).await
159
                        {
160
0
                            tracing::debug!("HTTPS API connection error: {e:?}.");
161
0
                        }
162
                    }
163
0
                    Err(e) => {
164
0
                        tracing::debug!("HTTPS handshake error: {e:?}.");
165
                    }
166
                }
167
0
            } else if let Err(e) = serve_with_graceful_shutdown(socket, app, connection_shutdown.cancelled()).await {
168
0
                tracing::debug!("HTTP API connection error: {e:?}.");
169
0
            }
170
0
        });
171
    }
172
0
}