Coverage Report

Created: 2026-02-20 00:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/node/src/http/mod.rs
Line
Count
Source
1
use actix::prelude::*;
2
use axum::Router;
3
use jsonrpsee::server::serve_with_graceful_shutdown;
4
use lyquor_api::anyhow;
5
use lyquor_jsonrpc::client::ClientHandle;
6
use tokio::net::TcpListener;
7
use tokio_util::sync::CancellationToken;
8
9
mod jsonrpc;
10
11
pub use jsonrpc::{NewConsoleLines, Subscriptions};
12
13
pub struct HttpServer {
14
    bind_addr: String,
15
    jsonrpc: jsonrpc::JsonRpcContext,
16
    shutdown: CancellationToken,
17
    task: Option<tokio::task::JoinHandle<()>>,
18
}
19
20
impl std::fmt::Debug for HttpServer {
21
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22
0
        f.debug_struct("HttpServer")
23
0
            .field("bind_addr", &self.bind_addr)
24
0
            .finish()
25
0
    }
26
}
27
28
impl HttpServer {
29
0
    pub async fn new(
30
0
        bind_addr: &str, client: ClientHandle, console_sub: Addr<Subscriptions>, fco: Addr<lyquor_seq::fco::FCO>,
31
0
        pool: Addr<crate::lyquid::LyquidPool>,
32
0
    ) -> Result<Self, anyhow::Error> {
33
0
        let bind_addr = bind_addr.into();
34
0
        Ok(Self {
35
0
            bind_addr,
36
0
            jsonrpc: jsonrpc::JsonRpcContext::new(client, console_sub, pool, fco),
37
0
            shutdown: CancellationToken::new(),
38
0
            task: None,
39
0
        })
40
0
    }
41
42
0
    pub fn start(&mut self) {
43
0
        if self.task.is_some() {
44
0
            tracing::warn!("HttpServer already started.");
45
0
            return;
46
0
        }
47
48
0
        let bind_addr = self.bind_addr.clone();
49
0
        let route_state = jsonrpc::JsonRpcRouteState::new(self.jsonrpc.clone(), self.shutdown.clone());
50
0
        let shutdown = self.shutdown.clone();
51
52
0
        self.task = Some(tokio::spawn(async move {
53
0
            run_http_server(bind_addr, route_state, shutdown).await;
54
0
        }));
55
0
    }
56
57
0
    pub fn stop(&self) {
58
        // Single shutdown signal for all HTTP/JSON-RPC tasks; cleanup is handled
59
        // by the token-driven tasks spawned in `start` and `JsonRpcRouteState::new`.
60
0
        self.shutdown.cancel();
61
0
    }
62
}
63
64
0
fn build_router(route_state: jsonrpc::JsonRpcRouteState) -> Router {
65
    // Keep top-level router composition in mod.rs so adding REST APIs later only
66
    // extends this function.
67
0
    jsonrpc::build_jsonrpc_router(route_state)
68
0
}
69
70
0
async fn run_http_server(bind_addr: String, route_state: jsonrpc::JsonRpcRouteState, shutdown: CancellationToken) {
71
0
    let listener = match TcpListener::bind(&bind_addr).await {
72
0
        Ok(listener) => listener,
73
0
        Err(e) => {
74
0
            tracing::error!("Failed to start HTTP server: {e:?}.");
75
0
            return;
76
        }
77
    };
78
79
    loop {
80
0
        let (socket, _remote) = tokio::select! {
81
0
            res = listener.accept() => match res {
82
0
                Ok(pair) => pair,
83
0
                Err(e) => {
84
0
                    tracing::error!("Failed to accept HTTP API connection: {e:?}.");
85
0
                    continue;
86
                }
87
            },
88
0
            _ = shutdown.cancelled() => break,
89
        };
90
91
0
        let app = build_router(route_state.clone());
92
0
        let connection_shutdown = shutdown.clone();
93
94
0
        tokio::spawn(async move {
95
0
            if let Err(e) = serve_with_graceful_shutdown(socket, app, connection_shutdown.cancelled()).await {
96
0
                tracing::debug!("HTTP API connection error: {e:?}.");
97
0
            }
98
0
        });
99
    }
100
0
}