Coverage Report

Created: 2026-05-06 07:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/net/src/hub.rs
Line
Count
Source
1
use crate::inbound::{RPCReceiver, RequestHandler, RequestReceiver};
2
use crate::listener::Listener;
3
use crate::outbound::{ConnectionError, RPCSender, RequestSender};
4
use crate::peermanager::PeerManager;
5
use lyquor_primitives::NodeID;
6
use lyquor_tls::TlsConfig;
7
use std::sync::Arc;
8
use thiserror::Error;
9
use tokio::sync::mpsc;
10
use tokio_util::sync::CancellationToken;
11
12
#[derive(Debug, Error)]
13
pub enum HubError {
14
    #[error("failed to bind listener on {addr}: {source}")]
15
    BindAddr {
16
        addr: String,
17
        #[source]
18
        source: Box<dyn std::error::Error + Send + Sync>,
19
    },
20
    #[error("peer not exist")]
21
    PeerNotExist,
22
    #[error("mismatched node id from peer: got {got:?}, expected {expected:?}")]
23
    PeerIdMismatch { got: NodeID, expected: NodeID },
24
    #[error("invalid peer TLS certificate")]
25
    PeerTlsCertificateError,
26
    #[error("address {addr} is already mapped to {existing:?}, cannot map to {requested:?}")]
27
    AddressConflict {
28
        addr: String,
29
        existing: NodeID,
30
        requested: NodeID,
31
    },
32
    #[error("self connection is not allowed")]
33
    SelfConnection,
34
    #[error("connection error: {0}")]
35
    ConnectionError(#[source] ConnectionError),
36
    #[error("TLS error: {0}")]
37
    TlsError(#[from] lyquor_tls::TlsError),
38
    #[error("Inbound is not ready")]
39
    InboundError,
40
    #[error("Outbound is not ready")]
41
    OutboundError,
42
    #[error("unknown error")]
43
    Unknown,
44
}
45
46
type Result<T> = std::result::Result<T, HubError>;
47
48
pub struct Hub {
49
    id: NodeID,
50
    peer_manager: Arc<PeerManager>,
51
    control: Control,
52
    listener: crate::listener::Listener,
53
    shutdown: CancellationToken,
54
    tls_config: TlsConfig,
55
}
56
57
impl std::fmt::Debug for Hub {
58
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59
0
        f.debug_struct("Hub").field("id", &self.id).finish_non_exhaustive()
60
0
    }
61
}
62
63
impl Hub {
64
    #[tracing::instrument(level = "trace", skip(tls_config))]
65
46
    pub async fn new(tls_config: TlsConfig, listen_addr: String, shutdown: CancellationToken) -> anyhow::Result<Self> {
66
        let id = tls_config.node_id();
67
        let server_config = tls_config.server_config().map_err(HubError::TlsError)?;
68
        let client_config = tls_config.client_config().map_err(HubError::TlsError)?;
69
        let control = Control::new();
70
        let peer_manager = Arc::new(PeerManager::new(
71
            id.clone(),
72
            client_config,
73
            shutdown.child_token(),
74
            control.rpc_sender(),
75
        ));
76
        let listener = Listener::new(server_config, listen_addr, shutdown.child_token(), peer_manager.clone()).await?;
77
        Ok(Hub {
78
            id,
79
            peer_manager,
80
            control,
81
            listener,
82
            shutdown,
83
            tls_config,
84
        })
85
46
    }
86
87
37
    pub fn get_id(&self) -> NodeID {
88
37
        self.id
89
37
    }
90
91
38
    pub fn signing_key(&self) -> Arc<lyquor_tls::NodeSigningKey> {
92
38
        self.tls_config.signing_key()
93
38
    }
94
95
0
    pub async fn wait_for_shutdown(&self) {
96
0
        self.listener.wait().await;
97
0
        let _ = self.peer_manager.remove_all_peers().await;
98
0
    }
99
100
    #[tracing::instrument(level = "trace", ret, err)]
101
74
    pub async fn add_peer(&self, id: &NodeID, addr: &str) -> Result<()> {
102
        tracing::info!("add peer for {:?}: {:?} -> {:?}", self.id, id, addr);
103
        self.peer_manager.add_peer(id, addr).await
104
74
    }
105
106
    #[tracing::instrument(level = "trace", err(level = "debug"))]
107
0
    pub async fn wait_inbound_ready(&self, id: &NodeID, timeout: std::time::Duration) -> Result<RequestReceiver> {
108
        let deadline = tokio::time::Instant::now() + timeout;
109
        while tokio::time::Instant::now() < deadline {
110
            if let Ok(receiver) = self.peer_manager.inbound(id) {
111
                return Ok(receiver);
112
            }
113
            if self.shutdown.is_cancelled() {
114
                return Err(HubError::Unknown);
115
            }
116
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
117
        }
118
        Err(HubError::InboundError)
119
0
    }
120
121
    #[tracing::instrument(level = "trace", err(level = "debug"))]
122
0
    pub async fn wait_outbound_ready(&self, id: &NodeID, timeout: std::time::Duration) -> Result<RequestSender> {
123
        let deadline = tokio::time::Instant::now() + timeout;
124
        while tokio::time::Instant::now() < deadline {
125
            if let Ok(sender) = self.peer_manager.outbound(id) {
126
                return Ok(sender);
127
            }
128
            if self.shutdown.is_cancelled() {
129
                return Err(HubError::Unknown);
130
            }
131
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
132
        }
133
        Err(HubError::OutboundError)
134
0
    }
135
136
    #[tracing::instrument(level = "trace", ret, err(level = "debug"))]
137
0
    pub async fn remove_peer(&self, id: &NodeID) -> Result<()> {
138
        self.peer_manager.remove_peer(id).await
139
0
    }
140
141
    #[tracing::instrument(level = "trace", err(level = "debug"))]
142
111
    pub fn outbound(&self, id: &NodeID) -> Result<RequestSender> {
143
        self.peer_manager.outbound(id)
144
111
    }
145
146
    #[tracing::instrument(level = "trace", err(level = "debug"))]
147
62
    pub fn inbound(&self, id: &NodeID) -> Result<RequestReceiver> {
148
        self.peer_manager.inbound(id)
149
62
    }
150
151
    #[tracing::instrument(level = "trace", err(level = "debug"))]
152
4
    pub fn rpc_outbound(&self, id: &NodeID) -> Result<RPCSender> {
153
        self.peer_manager.rpc_outbound(id)
154
4
    }
155
156
    #[tracing::instrument(level = "trace", err(level = "debug"))]
157
2
    pub fn rpc_inbound(&self) -> Result<RPCReceiver> {
158
        Ok(self.control.rpc_inbound())
159
2
    }
160
161
0
    pub async fn get_peers(&self) -> Vec<NodeID> {
162
0
        self.peer_manager.get_peers().await
163
0
    }
164
}
165
166
#[derive(Clone)]
167
pub struct Control {
168
    rpc_sender: mpsc::Sender<RequestHandler>,
169
    rpc_receiver: RPCReceiver,
170
}
171
172
impl Control {
173
48
    pub fn new() -> Self {
174
48
        let (tx, rx) = mpsc::channel(100);
175
48
        Control {
176
48
            rpc_sender: tx,
177
48
            rpc_receiver: RPCReceiver::new(rx),
178
48
        }
179
48
    }
180
181
2
    pub fn rpc_inbound(&self) -> RPCReceiver {
182
2
        self.rpc_receiver.clone()
183
2
    }
184
185
48
    pub fn rpc_sender(&self) -> mpsc::Sender<RequestHandler> {
186
48
        self.rpc_sender.clone()
187
48
    }
188
}