/home/runner/work/lyquor/lyquor/seq/src/eth/crawler.rs
Line | Count | Source |
1 | | use super::{ChainPos, Contract, Slot}; |
2 | | use alloy_sol_types::SolEvent; |
3 | | use futures::FutureExt as _; |
4 | | use lyquor_api::anyhow::{self, Context as _Context}; |
5 | | use lyquor_eth::eth; |
6 | | use lyquor_jsonrpc::client::ClientHandle; |
7 | | use lyquor_jsonrpc::types::{ |
8 | | Block, BlockNumber, EthGetBlockByNumber, EthGetBlockByNumberResp, EthGetLogs, EthGetLogsResp, EthNewHeadUpdate, |
9 | | EthSubscribe, |
10 | | }; |
11 | | use lyquor_primitives::{Address, LyquidID, U256, alloy_primitives, debug_struct_name}; |
12 | | use std::collections::HashMap; |
13 | | use std::time::Duration; |
14 | | use tokio::sync::mpsc; |
15 | | use tokio::sync::mpsc::error::TrySendError; |
16 | | use tokio::time::MissedTickBehavior; |
17 | | use tokio_util::sync::CancellationToken; |
18 | | use tokio_util::task::TaskTracker; |
19 | | use tokio_util::time::FutureExt as _; |
20 | | use tracing::Instrument; |
21 | | |
22 | | enum WatchedLogKind { |
23 | | Slot, |
24 | | } |
25 | | |
26 | | impl WatchedLogKind { |
27 | 0 | fn from_topic0(topic0: Option<alloy_primitives::B256>) -> Option<Self> { |
28 | 0 | match topic0 { |
29 | 0 | Some(t) if t == eth::Slot::SIGNATURE_HASH => Some(Self::Slot), |
30 | 0 | _ => None, |
31 | | } |
32 | 0 | } |
33 | | |
34 | 0 | fn topic0_filter() -> Vec<alloy_primitives::B256> { |
35 | 0 | vec![eth::Slot::SIGNATURE_HASH] |
36 | 0 | } |
37 | | } |
38 | | |
39 | | #[derive(Default)] |
40 | | struct WatchedLogsBatch { |
41 | | slots: HashMap<Address, Vec<Slot>>, |
42 | | } |
43 | | |
44 | | // Wakeups are edge-triggered: one queued wake is enough to force another sync pass. |
45 | | const SLOT_CRAWLER_WAKE_CAPACITY: usize = 1; |
46 | | |
47 | | struct SlotCrawlerWorker { |
48 | | client: ClientHandle, |
49 | | contracts: HashMap<LyquidID, Contract>, |
50 | | blocks_per_request: usize, |
51 | | finality_tag: BlockNumber, |
52 | | } |
53 | | |
54 | | enum ContractCommand { |
55 | | Upsert(Contract), |
56 | | RemoveLyquid(LyquidID), |
57 | | } |
58 | | |
59 | | impl SlotCrawlerWorker { |
60 | 0 | async fn run(mut self, mut wake_rx: mpsc::Receiver<()>, mut contract_rx: mpsc::UnboundedReceiver<ContractCommand>) { |
61 | | loop { |
62 | 0 | tokio::select! { |
63 | 0 | Some(cmd) = contract_rx.recv() => { |
64 | 0 | self.handle_contract_command(cmd); |
65 | 0 | } |
66 | 0 | Some(()) = wake_rx.recv() => {} |
67 | 0 | else => break, |
68 | | } |
69 | | |
70 | 0 | while let Ok(cmd) = contract_rx.try_recv() { |
71 | 0 | self.handle_contract_command(cmd); |
72 | 0 | } |
73 | 0 | while let Ok(()) = wake_rx.try_recv() {} |
74 | | |
75 | 0 | if let Err(e) = Self::sync( |
76 | 0 | &mut self.contracts, |
77 | 0 | self.finality_tag, |
78 | 0 | self.blocks_per_request, |
79 | 0 | self.client.clone(), |
80 | | ) |
81 | 0 | .await |
82 | | { |
83 | 0 | tracing::error!("SlotCrawler: Error during sync: {e:?}"); |
84 | 0 | } |
85 | | } |
86 | 0 | } |
87 | | |
88 | 0 | fn handle_contract_command(&mut self, cmd: ContractCommand) { |
89 | 0 | match cmd { |
90 | 0 | ContractCommand::Upsert(contract) => { |
91 | 0 | tracing::debug!( |
92 | | "SlotCrawler: Added contract {} for lyquid {}.", |
93 | | contract.addr, |
94 | | contract.id |
95 | | ); |
96 | 0 | self.contracts.insert(contract.id, contract); |
97 | | } |
98 | 0 | ContractCommand::RemoveLyquid(id) => { |
99 | 0 | if self.contracts.remove(&id).is_some() { |
100 | 0 | tracing::debug!("SlotCrawler: Removed contract watch for lyquid {id}."); |
101 | 0 | } |
102 | | } |
103 | | } |
104 | 0 | } |
105 | | |
106 | 0 | fn extract_watched_logs(resp: EthGetLogsResp) -> WatchedLogsBatch { |
107 | 0 | let mut out = WatchedLogsBatch::default(); |
108 | 0 | for e in resp.0.into_iter() { |
109 | 0 | match WatchedLogKind::from_topic0(e.topics.first().copied()) { |
110 | | Some(WatchedLogKind::Slot) => { |
111 | | // From https://docs.rs/alloy/latest/alloy/dyn_abi/index.html: |
112 | | // We strongly recommend using the static encoder/decoder when possible. The |
113 | | // dynamic encoder/decoder is significantly more expensive, especially for complex |
114 | | // types. It is also significantly more error prone, as the mapping between |
115 | | // solidity types and rust types is not enforced by the compiler. |
116 | 0 | let rlp_decoded = alloy_primitives::LogData::new_unchecked(e.topics, e.data); |
117 | 0 | let le = match eth::Slot::decode_log_data(&rlp_decoded).ok() { |
118 | 0 | Some(le) => le, |
119 | 0 | None => continue, |
120 | | }; |
121 | 0 | let mut slot = Slot::from(le); |
122 | 0 | let block_number = match e.block_number { |
123 | 0 | BlockNumber::Quantity(q) => q, |
124 | 0 | _ => unreachable!(), |
125 | | }; |
126 | 0 | let bn: u64 = block_number.to(); |
127 | 0 | let ln: u64 = e.log_index.to(); |
128 | 0 | if ln >> 32 != 0 { |
129 | 0 | tracing::warn!("Skipped a block that's too big."); |
130 | 0 | continue; |
131 | 0 | } // block number uses 44 bits |
132 | 0 | slot.pos = ChainPos::new(bn, ln as u32); |
133 | 0 | tracing::debug!("SlotCrawler: extracted slot={slot:?}."); |
134 | 0 | out.slots.entry(e.address).or_default().push(slot); |
135 | | } |
136 | 0 | None => {} |
137 | | } |
138 | | } |
139 | 0 | out |
140 | 0 | } |
141 | | |
142 | 0 | async fn fetch_watched_logs_for_batch( |
143 | 0 | client: &ClientHandle, batch: impl IntoIterator<Item = &Address>, from_block: BlockNumber, |
144 | 0 | to_block: BlockNumber, |
145 | 0 | ) -> anyhow::Result<WatchedLogsBatch> { |
146 | 0 | let resp: EthGetLogsResp = client |
147 | 0 | .request::<EthGetLogs, EthGetLogsResp>(EthGetLogs { |
148 | 0 | from_block, |
149 | 0 | to_block, |
150 | 0 | address: batch.into_iter().copied().collect(), |
151 | 0 | topics: vec![WatchedLogKind::topic0_filter()], |
152 | 0 | }) |
153 | 0 | .await |
154 | 0 | .with_context(|| format!("Failed to retrieve {}..{}.", from_block, to_block))?; |
155 | 0 | Ok(Self::extract_watched_logs(resp)) |
156 | 0 | } |
157 | | |
158 | 0 | async fn update_contracts( |
159 | 0 | contracts: &mut HashMap<LyquidID, Contract>, batch_ids_by_addr: &HashMap<Address, LyquidID>, resync: &mut bool, |
160 | 0 | updates: impl Iterator<Item = (Address, Vec<Slot>)>, |
161 | 0 | ) { |
162 | 0 | for (addr, slots) in updates { |
163 | 0 | let id = match batch_ids_by_addr.get(&addr).copied() { |
164 | 0 | Some(id) => id, |
165 | | None => { |
166 | 0 | tracing::debug!("Skipped stale logs for unknown contract address {addr}."); |
167 | 0 | continue; |
168 | | } |
169 | | }; |
170 | 0 | let mut remove_self = false; |
171 | | { |
172 | 0 | let c = match contracts.get_mut(&id) { |
173 | 0 | Some(c) => c, |
174 | | None => { |
175 | 0 | tracing::debug!("Lyquid {id} was removed."); |
176 | 0 | continue; |
177 | | } |
178 | | }; |
179 | 0 | if c.addr != addr { |
180 | 0 | tracing::debug!( |
181 | | "Skipped stale logs for Lyquid {id} because current address {} does not match {}.", |
182 | | c.addr, |
183 | | addr |
184 | | ); |
185 | 0 | continue; |
186 | 0 | } |
187 | 0 | let mut to_be_filled = Vec::new(); |
188 | 0 | for s in slots.into_iter() { |
189 | 0 | if let crate::NextContractID::Eth(new_addr) = s.switch_contract { |
190 | 0 | if let Err(e) = c.lyquid.switch_contract(s.pos).await { |
191 | 0 | tracing::warn!("Sync: LyquidBackend of {id} is unavailable: {e:?}"); |
192 | | // We need to remove it from the watch list because the LyquidBackend |
193 | | // worker is already unreachable. |
194 | 0 | remove_self = true; |
195 | | } else { |
196 | 0 | tracing::debug!("SlotCrawler: Switching contract for {id}: {addr} => {new_addr}."); |
197 | 0 | c.addr = new_addr; |
198 | 0 | *resync = true; |
199 | | } |
200 | | // Ignore all subsequent slots for this old address. |
201 | 0 | break; |
202 | 0 | } |
203 | 0 | to_be_filled.push(s); |
204 | | } |
205 | 0 | if !remove_self { |
206 | 0 | if let Err(e) = c.seq.fill_slots(to_be_filled).await { |
207 | 0 | tracing::error!("Sync: Failed to fill slots from {id} ({addr}): {e:?}"); |
208 | | // We need to remove it from the watch list so the lyquid's next block won't be |
209 | | // advanced because the slots weren't filled properly. |
210 | 0 | remove_self = true; |
211 | 0 | } |
212 | 0 | } |
213 | | } |
214 | 0 | if remove_self { |
215 | 0 | contracts.remove(&id); |
216 | 0 | continue; |
217 | 0 | } |
218 | | } |
219 | 0 | } |
220 | | |
221 | | // This function is the core logic of SlotCrawler. |
222 | | // |
223 | | // It progressively tries to bring all contracts ("watch list") to be the same, latest block |
224 | | // known on the chain. While each query to the chain may bundle multiple contracts, the |
225 | | // philosophy is to still treat each contract's sync progress separately. This is good for |
226 | | // robust handling of LyquidBackend, because upon any errors during syncing, there is no |
227 | | // incorrectly advanced progress for any Lyquids. |
228 | | #[tracing::instrument(level = "trace", skip_all)] |
229 | 0 | async fn sync( |
230 | 0 | contracts: &mut HashMap<LyquidID, Contract>, finality_tag: BlockNumber, blocks_per_request: usize, |
231 | 0 | client: ClientHandle, |
232 | 0 | ) -> anyhow::Result<()> { |
233 | | // Choose the contracts that should be considered during this sync. |
234 | | let mut chosen = { |
235 | 0 | let next_blocks = contracts.iter().filter_map(|(id, c)| { |
236 | 0 | if c.seq.flow_control().should_stop_filling() { |
237 | 0 | None |
238 | | } else { |
239 | 0 | Some(c.seq.next_block().map(|bn| (bn, *id, c.addr))) |
240 | | } |
241 | 0 | }); |
242 | | futures::future::join_all(next_blocks) |
243 | | .await |
244 | | .into_iter() |
245 | 0 | .filter_map(|(res, id, addr)| { |
246 | 0 | res.map_err(|e| tracing::warn!("Sync: Failed to get the next block number for {addr}: {e:?}")) |
247 | | // We don't have to remove this contract from the watch list because it |
248 | | // is not harmful to try again upon next sync(). |
249 | 0 | .ok() |
250 | 0 | .map(|bn| (bn, id, addr)) |
251 | 0 | }) |
252 | | .collect::<Vec<_>>() |
253 | | }; |
254 | | |
255 | | if chosen.is_empty() { |
256 | | tracing::debug!("No contract need to fetch."); |
257 | | return Ok(()); |
258 | | } |
259 | | |
260 | | // Let set a fix goal for this sync: up to the `finalized` block for all chosen contracts. |
261 | | let finalized: Block = client |
262 | | .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber { |
263 | | block: finality_tag, |
264 | | full_tx: false, |
265 | | }) |
266 | | .in_current_span() |
267 | | .await |
268 | 0 | .map_err(|e| e.into()) |
269 | 0 | .and_then(|f: EthGetBlockByNumberResp| f.0.ok_or(anyhow::anyhow!("result is null"))) |
270 | | .context("Failed obtain the last finalized block.")?; |
271 | | |
272 | | let finalized = match finalized.number { |
273 | | BlockNumber::Quantity(b) => b, |
274 | | _ => { |
275 | | return Err(anyhow::anyhow!("Invalid finalized block number.")); |
276 | | } |
277 | | } |
278 | | .to::<u64>(); |
279 | | |
280 | | // Because the next chain position for each chosen contract (lyquid) could be different |
281 | | // (some may just be added to the watch list of SlotCrawler), we need to bundle the queries |
282 | | // in a way that is: |
283 | | // |
284 | | // - Not always just for a single address (to reduce the number of queries). |
285 | | // - Not always ask for a lot of addreses whose corresponding blocks are already |
286 | | // queried (to reduce the redundant data returned by each query). |
287 | | // |
288 | | // A clean and simple approach is to sort them according to their respective next block to |
289 | | // crawl from, and start from the lowest to catch up with the second lowest, then we bundle |
290 | | // up the contracts in a rolling style so there is *no* redundant data received. |
291 | | // |
292 | | // At the end of the rolling, if there is no error or contract switching (which requires a |
293 | | // resync), all current contracts are brought to the same next block value. |
294 | | |
295 | | chosen.sort_by_key(|k| k.0); |
296 | | let sentinel_id = LyquidID::from(Address::ZERO); |
297 | | chosen.push((finalized + 1, sentinel_id, Address::ZERO)); // sentinel pos, to flush all batches |
298 | | |
299 | | let mut next_block_number = None; |
300 | | let mut batch = HashMap::new(); |
301 | | let mut resync = false; |
302 | | |
303 | | for (block, id, addr) in chosen { |
304 | | if let Some(mut n) = next_block_number { |
305 | | if block == n { |
306 | | // in the same batch (same next block), let's don't query yet |
307 | | batch.insert(addr, id); |
308 | | continue; |
309 | | } else { |
310 | | // Now we need to query with the existing items in `batch` because this |
311 | | // candidate's next block is larger. |
312 | | while n < block { |
313 | | // `end` is inclusive |
314 | | let mut end: u64 = n + blocks_per_request as u64 - 1; |
315 | | end = end.min(block - 1); |
316 | | tracing::info!("Fetching [{}..{}] ({} addrs).", n, end, batch.len()); |
317 | | let from_block = BlockNumber::Quantity(U256::from(n)); |
318 | | let to_block = BlockNumber::Quantity(U256::from(end)); |
319 | | let watched = |
320 | | Self::fetch_watched_logs_for_batch(&client, batch.keys(), from_block, to_block).await?; |
321 | | Self::update_contracts(contracts, &batch, &mut resync, watched.slots.into_iter()).await; |
322 | | n = end + 1; |
323 | | } |
324 | | // Now n == block, we can change the next value and continue bundling more addresses. |
325 | | } |
326 | | } |
327 | | next_block_number = Some(block); |
328 | | batch.insert(addr, id); |
329 | | } |
330 | | |
331 | | // Finalizing the sync by setting the next block (telling FCO about the progress) for each |
332 | | // Lyquid whose contract has successfully synced to n. |
333 | | if let Some(n) = next_block_number { |
334 | | assert_eq!(n, finalized + 1); |
335 | | for (addr, id) in batch { |
336 | | if let Some(c) = contracts.get_mut(&id) { |
337 | | if c.addr != addr { |
338 | | tracing::debug!( |
339 | | "SlotCrawler: Skipping stale batch entry for {id}; current address is {}, expected {}.", |
340 | | c.addr, |
341 | | addr |
342 | | ); |
343 | | continue; |
344 | | } |
345 | | if let Err(e) = c.seq.set_next_block(n).await { |
346 | | // Otherwise, we can safetly tell FCO to advance the progress because we |
347 | | // have successfully filled all slots up to n. |
348 | | tracing::error!("SlotCrawler: Failed to advance fetching progress for {id} ({addr}): {e:?}"); |
349 | | // We don't have to remove this contract from the watch list because it |
350 | | // is not harmful to try again upon next sync(). |
351 | | } |
352 | | } |
353 | | } |
354 | | } |
355 | | if resync { |
356 | | Box::pin(Self::sync(contracts, finality_tag, blocks_per_request, client)).await?; |
357 | | } |
358 | | tracing::info!("SlotCrawler: Sync complete."); |
359 | | Ok(()) |
360 | 0 | } |
361 | | } |
362 | | |
363 | | // TODO: Drop Clone once the outer backend ownership is explicit enough that only |
364 | | // SlotCrawlerHandle needs to be cloned. |
365 | | #[derive(Clone)] |
366 | | pub(super) struct SlotCrawler { |
367 | | handle: SlotCrawlerHandle, |
368 | | task_tracker: TaskTracker, |
369 | | } |
370 | | debug_struct_name!(SlotCrawler); |
371 | | |
372 | | #[derive(Clone)] |
373 | | pub(super) struct SlotCrawlerHandle { |
374 | | wake_tx: mpsc::Sender<()>, |
375 | | contract_tx: mpsc::UnboundedSender<ContractCommand>, |
376 | | } |
377 | | debug_struct_name!(SlotCrawlerHandle); |
378 | | |
379 | | impl SlotCrawler { |
380 | 0 | pub(super) fn new( |
381 | 0 | client: ClientHandle, blocks_per_request: usize, finality_tag: BlockNumber, shutdown: CancellationToken, |
382 | 0 | ) -> Self { |
383 | 0 | let (wake_tx, wake_rx) = mpsc::channel(SLOT_CRAWLER_WAKE_CAPACITY); |
384 | 0 | let (contract_tx, contract_rx) = mpsc::unbounded_channel(); |
385 | 0 | let task_tracker = TaskTracker::new(); |
386 | 0 | task_tracker.spawn( |
387 | 0 | SlotCrawlerWorker { |
388 | 0 | contracts: HashMap::new(), |
389 | 0 | blocks_per_request, |
390 | 0 | finality_tag, |
391 | 0 | client: client.clone(), |
392 | 0 | } |
393 | 0 | .run(wake_rx, contract_rx) |
394 | 0 | .with_cancellation_token_owned(shutdown), |
395 | | ); |
396 | 0 | task_tracker.close(); |
397 | 0 | Self { |
398 | 0 | handle: SlotCrawlerHandle { wake_tx, contract_tx }, |
399 | 0 | task_tracker, |
400 | 0 | } |
401 | 0 | } |
402 | | |
403 | 0 | pub(super) fn handle(&self) -> SlotCrawlerHandle { |
404 | 0 | self.handle.clone() |
405 | 0 | } |
406 | | |
407 | 0 | pub(super) async fn wait_for_shutdown(&self) { |
408 | 0 | self.task_tracker.wait().await; |
409 | 0 | } |
410 | | } |
411 | | |
412 | | impl SlotCrawlerHandle { |
413 | 0 | pub(super) fn wake(&self) -> bool { |
414 | 0 | match self.wake_tx.try_send(()) { |
415 | 0 | Ok(()) | Err(TrySendError::Full(_)) => true, |
416 | 0 | Err(TrySendError::Closed(_)) => false, |
417 | | } |
418 | 0 | } |
419 | | |
420 | | // TODO: consider return a token that automatically removes the contract from the watch list when dropped, |
421 | | // to avoid the need of explicit removal and the potential risk of forgetting to remove. |
422 | 0 | pub(super) async fn add_contract(&self, contract: Contract) -> anyhow::Result<()> { |
423 | 0 | self.contract_tx |
424 | 0 | .send(ContractCommand::Upsert(contract)) |
425 | 0 | .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped")) |
426 | 0 | } |
427 | | |
428 | 0 | pub(super) fn remove_lyquid(&self, lyquid: LyquidID) -> anyhow::Result<()> { |
429 | 0 | self.contract_tx |
430 | 0 | .send(ContractCommand::RemoveLyquid(lyquid)) |
431 | 0 | .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped")) |
432 | 0 | } |
433 | | } |
434 | | |
435 | | #[derive(Clone)] |
436 | | pub(super) struct HeadWatcher { |
437 | | task_tracker: TaskTracker, |
438 | | } |
439 | | |
440 | | impl HeadWatcher { |
441 | 0 | pub(super) fn new(client: ClientHandle, slot_crawler: SlotCrawlerHandle, shutdown: CancellationToken) -> Self { |
442 | 0 | let task_tracker = TaskTracker::new(); |
443 | 0 | task_tracker.spawn(Self::head_watcher_loop(client, slot_crawler).with_cancellation_token_owned(shutdown)); |
444 | 0 | task_tracker.close(); |
445 | 0 | Self { task_tracker } |
446 | 0 | } |
447 | | |
448 | 0 | pub(super) async fn wait_for_shutdown(&self) { |
449 | 0 | self.task_tracker.wait().await; |
450 | 0 | } |
451 | | |
452 | 0 | async fn subscribe_to_new_heads( |
453 | 0 | client: &ClientHandle, |
454 | 0 | ) -> Option<lyquor_jsonrpc::client::Subscription<EthNewHeadUpdate>> { |
455 | 0 | match client.subscribe(EthSubscribe("newHeads".to_string())).await { |
456 | 0 | Ok(subscription) => { |
457 | 0 | tracing::debug!("Subscribed to new head"); |
458 | 0 | Some(subscription) |
459 | | } |
460 | 0 | Err(e) => { |
461 | 0 | tracing::error!("Failed to subscribe to new head: {e:?}"); |
462 | 0 | None |
463 | | } |
464 | | } |
465 | 0 | } |
466 | | |
467 | 0 | async fn head_watcher_loop(client: ClientHandle, slot_crawler: SlotCrawlerHandle) { |
468 | 0 | let mut subscription = Self::subscribe_to_new_heads(&client).await; |
469 | | |
470 | | // NOTE: Hardhat Network may silently lose eth_subscribe state after a period of |
471 | | // inactivity (e.g., no block mining). Although the WebSocket connection remains |
472 | | // open, subscriptions can become stale — resulting in: |
473 | | // - No new calls being delivered, even when blocks are mined later |
474 | | // - eth_unsubscribe failing with "Subscription not found" |
475 | | // This is likely due to internal state cleanup or subscription manager desync in Hardhat's |
476 | | // in-memory event system. To work around this, we re-subscribe when activity resumes. |
477 | | // |
478 | | // This is not an issue for production chains like Ethereum. |
479 | 0 | let mut periodic_poll = tokio::time::interval(Duration::from_secs(10)); |
480 | 0 | periodic_poll.set_missed_tick_behavior(MissedTickBehavior::Delay); |
481 | 0 | periodic_poll.tick().await; |
482 | | |
483 | 0 | let mut reset = tokio::time::interval(Duration::from_secs(60)); |
484 | 0 | reset.set_missed_tick_behavior(MissedTickBehavior::Delay); |
485 | 0 | reset.tick().await; |
486 | | |
487 | | loop { |
488 | 0 | tokio::select! { |
489 | 0 | _ = periodic_poll.tick() => { |
490 | 0 | if !slot_crawler.wake() { |
491 | 0 | break; |
492 | 0 | } |
493 | | } |
494 | 0 | _ = reset.tick() => { |
495 | 0 | tracing::debug!("Resetting new head subscription"); |
496 | 0 | subscription = Self::subscribe_to_new_heads(&client).await; |
497 | | } |
498 | 0 | update = async { |
499 | 0 | match &mut subscription { |
500 | 0 | Some(subscription) => subscription.next().await, |
501 | 0 | None => std::future::pending().await, |
502 | | } |
503 | 0 | } => match update { |
504 | 0 | Some(msg) => { |
505 | 0 | tracing::debug!("New head: {:?}.", msg); |
506 | 0 | if !slot_crawler.wake() { |
507 | 0 | break; |
508 | 0 | } |
509 | | } |
510 | | None => { |
511 | 0 | tracing::warn!("New head subscription ended; waiting for reset to retry"); |
512 | 0 | subscription = None; |
513 | | } |
514 | | } |
515 | | } |
516 | | } |
517 | 0 | } |
518 | | } |