/home/runner/work/lyquor/lyquor/net/src/hub.rs
Line | Count | Source |
1 | | use crate::inbound::{RPCReceiver, RequestHandler, RequestReceiver}; |
2 | | use crate::listener::Listener; |
3 | | use crate::outbound::{ConnectionError, RPCSender, RequestSender}; |
4 | | use crate::peermanager::PeerManager; |
5 | | use lyquor_primitives::NodeID; |
6 | | use lyquor_tls::TlsConfig; |
7 | | use std::sync::Arc; |
8 | | use thiserror::Error; |
9 | | use tokio::sync::mpsc; |
10 | | use tokio_util::sync::CancellationToken; |
11 | | |
12 | | #[derive(Debug, Error)] |
13 | | pub enum HubError { |
14 | | #[error("failed to bind listener on {addr}: {source}")] |
15 | | BindAddr { |
16 | | addr: String, |
17 | | #[source] |
18 | | source: Box<dyn std::error::Error + Send + Sync>, |
19 | | }, |
20 | | #[error("peer not exist")] |
21 | | PeerNotExist, |
22 | | #[error("mismatched node id from peer: got {got:?}, expected {expected:?}")] |
23 | | PeerIdMismatch { got: NodeID, expected: NodeID }, |
24 | | #[error("invalid peer TLS certificate")] |
25 | | PeerTlsCertificateError, |
26 | | #[error("address {addr} is already mapped to {existing:?}, cannot map to {requested:?}")] |
27 | | AddressConflict { |
28 | | addr: String, |
29 | | existing: NodeID, |
30 | | requested: NodeID, |
31 | | }, |
32 | | #[error("self connection is not allowed")] |
33 | | SelfConnection, |
34 | | #[error("connection error: {0}")] |
35 | | ConnectionError(#[source] ConnectionError), |
36 | | #[error("TLS error: {0}")] |
37 | | TlsError(#[from] lyquor_tls::TlsError), |
38 | | #[error("Inbound is not ready")] |
39 | | InboundError, |
40 | | #[error("Outbound is not ready")] |
41 | | OutboundError, |
42 | | #[error("unknown error")] |
43 | | Unknown, |
44 | | } |
45 | | |
46 | | type Result<T> = std::result::Result<T, HubError>; |
47 | | |
48 | | pub struct Hub { |
49 | | id: NodeID, |
50 | | peer_manager: Arc<PeerManager>, |
51 | | control: Control, |
52 | | listener: crate::listener::Listener, |
53 | | shutdown: CancellationToken, |
54 | | tls_config: TlsConfig, |
55 | | } |
56 | | |
57 | | impl std::fmt::Debug for Hub { |
58 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
59 | 0 | f.debug_struct("Hub").field("id", &self.id).finish_non_exhaustive() |
60 | 0 | } |
61 | | } |
62 | | |
63 | | impl Hub { |
64 | | #[tracing::instrument(level = "trace", skip(tls_config))] |
65 | 46 | pub async fn new(tls_config: TlsConfig, listen_addr: String, shutdown: CancellationToken) -> anyhow::Result<Self> { |
66 | | let id = tls_config.node_id(); |
67 | | let server_config = tls_config.server_config().map_err(HubError::TlsError)?; |
68 | | let client_config = tls_config.client_config().map_err(HubError::TlsError)?; |
69 | | let control = Control::new(); |
70 | | let peer_manager = Arc::new(PeerManager::new( |
71 | | id.clone(), |
72 | | client_config, |
73 | | shutdown.child_token(), |
74 | | control.rpc_sender(), |
75 | | )); |
76 | | let listener = Listener::new(server_config, listen_addr, shutdown.child_token(), peer_manager.clone()).await?; |
77 | | Ok(Hub { |
78 | | id, |
79 | | peer_manager, |
80 | | control, |
81 | | listener, |
82 | | shutdown, |
83 | | tls_config, |
84 | | }) |
85 | 46 | } |
86 | | |
87 | 37 | pub fn get_id(&self) -> NodeID { |
88 | 37 | self.id |
89 | 37 | } |
90 | | |
91 | 38 | pub fn signing_key(&self) -> Arc<lyquor_tls::NodeSigningKey> { |
92 | 38 | self.tls_config.signing_key() |
93 | 38 | } |
94 | | |
95 | 0 | pub async fn wait_for_shutdown(&self) { |
96 | 0 | self.listener.wait().await; |
97 | 0 | let _ = self.peer_manager.remove_all_peers().await; |
98 | 0 | } |
99 | | |
100 | | #[tracing::instrument(level = "trace", ret, err)] |
101 | 74 | pub async fn add_peer(&self, id: &NodeID, addr: &str) -> Result<()> { |
102 | | tracing::info!("add peer for {:?}: {:?} -> {:?}", self.id, id, addr); |
103 | | self.peer_manager.add_peer(id, addr).await |
104 | 74 | } |
105 | | |
106 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
107 | 0 | pub async fn wait_inbound_ready(&self, id: &NodeID, timeout: std::time::Duration) -> Result<RequestReceiver> { |
108 | | let deadline = tokio::time::Instant::now() + timeout; |
109 | | while tokio::time::Instant::now() < deadline { |
110 | | if let Ok(receiver) = self.peer_manager.inbound(id) { |
111 | | return Ok(receiver); |
112 | | } |
113 | | if self.shutdown.is_cancelled() { |
114 | | return Err(HubError::Unknown); |
115 | | } |
116 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
117 | | } |
118 | | Err(HubError::InboundError) |
119 | 0 | } |
120 | | |
121 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
122 | 0 | pub async fn wait_outbound_ready(&self, id: &NodeID, timeout: std::time::Duration) -> Result<RequestSender> { |
123 | | let deadline = tokio::time::Instant::now() + timeout; |
124 | | while tokio::time::Instant::now() < deadline { |
125 | | if let Ok(sender) = self.peer_manager.outbound(id) { |
126 | | return Ok(sender); |
127 | | } |
128 | | if self.shutdown.is_cancelled() { |
129 | | return Err(HubError::Unknown); |
130 | | } |
131 | | tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
132 | | } |
133 | | Err(HubError::OutboundError) |
134 | 0 | } |
135 | | |
136 | | #[tracing::instrument(level = "trace", ret, err(level = "debug"))] |
137 | 0 | pub async fn remove_peer(&self, id: &NodeID) -> Result<()> { |
138 | | self.peer_manager.remove_peer(id).await |
139 | 0 | } |
140 | | |
141 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
142 | 111 | pub fn outbound(&self, id: &NodeID) -> Result<RequestSender> { |
143 | | self.peer_manager.outbound(id) |
144 | 111 | } |
145 | | |
146 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
147 | 62 | pub fn inbound(&self, id: &NodeID) -> Result<RequestReceiver> { |
148 | | self.peer_manager.inbound(id) |
149 | 62 | } |
150 | | |
151 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
152 | 4 | pub fn rpc_outbound(&self, id: &NodeID) -> Result<RPCSender> { |
153 | | self.peer_manager.rpc_outbound(id) |
154 | 4 | } |
155 | | |
156 | | #[tracing::instrument(level = "trace", err(level = "debug"))] |
157 | 2 | pub fn rpc_inbound(&self) -> Result<RPCReceiver> { |
158 | | Ok(self.control.rpc_inbound()) |
159 | 2 | } |
160 | | |
161 | 0 | pub async fn get_peers(&self) -> Vec<NodeID> { |
162 | 0 | self.peer_manager.get_peers().await |
163 | 0 | } |
164 | | } |
165 | | |
166 | | #[derive(Clone)] |
167 | | pub struct Control { |
168 | | rpc_sender: mpsc::Sender<RequestHandler>, |
169 | | rpc_receiver: RPCReceiver, |
170 | | } |
171 | | |
172 | | impl Control { |
173 | 48 | pub fn new() -> Self { |
174 | 48 | let (tx, rx) = mpsc::channel(100); |
175 | 48 | Control { |
176 | 48 | rpc_sender: tx, |
177 | 48 | rpc_receiver: RPCReceiver::new(rx), |
178 | 48 | } |
179 | 48 | } |
180 | | |
181 | 2 | pub fn rpc_inbound(&self) -> RPCReceiver { |
182 | 2 | self.rpc_receiver.clone() |
183 | 2 | } |
184 | | |
185 | 48 | pub fn rpc_sender(&self) -> mpsc::Sender<RequestHandler> { |
186 | 48 | self.rpc_sender.clone() |
187 | 48 | } |
188 | | } |