Coverage Report

Created: 2026-04-06 19:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/seq/src/eth/crawler.rs
Line
Count
Source
1
use super::{ChainPos, Contract, Slot, SwitchContract};
2
use crate::OracleInfoFetcher;
3
use alloy_sol_types::{SolCall, SolEvent, SolValue};
4
use async_trait::async_trait;
5
use lyquor_api::actor::FetchOracleInfo;
6
use lyquor_api::anyhow::{self, Context as _Context};
7
use lyquor_eth::eth;
8
use lyquor_jsonrpc::client::ClientHandle;
9
use lyquor_jsonrpc::types::{
10
    Block, BlockNumber, EthCall, EthCallResp, EthCallTx, EthGetBlockByNumber, EthGetBlockByNumberResp, EthGetLogs,
11
    EthGetLogsResp, EthNewHeadUpdate, EthSubscribe,
12
};
13
use lyquor_primitives::{Address, U256, alloy_primitives, debug_struct_name, oracle::OracleServiceTarget};
14
use std::collections::{HashMap, hash_map};
15
use std::time::Duration;
16
use tokio::sync::mpsc;
17
use tokio::sync::mpsc::error::TrySendError;
18
use tokio::time::MissedTickBehavior;
19
use tokio_util::sync::CancellationToken;
20
use tokio_util::task::TaskTracker;
21
use tokio_util::time::FutureExt;
22
use tracing::Instrument;
23
24
enum WatchedLogKind {
25
    Slot,
26
}
27
28
impl WatchedLogKind {
29
0
    fn from_topic0(topic0: Option<alloy_primitives::B256>) -> Option<Self> {
30
0
        match topic0 {
31
0
            Some(t) if t == eth::Slot::SIGNATURE_HASH => Some(Self::Slot),
32
0
            _ => None,
33
        }
34
0
    }
35
36
0
    fn topic0_filter() -> Vec<alloy_primitives::B256> {
37
0
        vec![eth::Slot::SIGNATURE_HASH]
38
0
    }
39
}
40
41
#[derive(Default)]
42
struct WatchedLogsBatch {
43
    slots: HashMap<Address, Vec<Slot>>,
44
}
45
46
// Wakeups are edge-triggered: one queued wake is enough to force another sync pass.
47
const SLOT_CRAWLER_WAKE_CAPACITY: usize = 1;
48
49
pub(super) struct SlotCrawlerRuntime {
50
    client: ClientHandle,
51
    contracts: HashMap<Address, Contract>,
52
    blocks_per_request: usize,
53
    finality_tag: BlockNumber,
54
}
55
56
impl SlotCrawlerRuntime {
57
0
    async fn run(
58
0
        mut self, mut wake_rx: mpsc::Receiver<()>, mut contract_rx: mpsc::UnboundedReceiver<(Address, Contract)>,
59
0
    ) {
60
        loop {
61
0
            tokio::select! {
62
0
                Some((addr, contract)) = contract_rx.recv() => {
63
0
                    self.handle_contract(addr, contract);
64
0
                }
65
0
                Some(()) = wake_rx.recv() => {}
66
0
                else => break,
67
            }
68
69
0
            while let Ok((addr, contract)) = contract_rx.try_recv() {
70
0
                self.handle_contract(addr, contract);
71
0
            }
72
0
            while let Ok(()) = wake_rx.try_recv() {}
73
74
0
            if let Err(e) = Self::sync(
75
0
                &mut self.contracts,
76
0
                self.finality_tag,
77
0
                self.blocks_per_request,
78
0
                self.client.clone(),
79
            )
80
0
            .await
81
            {
82
0
                tracing::error!("SlotCrawler: Error during sync: {e:?}");
83
0
            }
84
        }
85
0
    }
86
87
0
    fn handle_contract(&mut self, addr: Address, contract: Contract) {
88
0
        tracing::debug!("SlotCrawler: Added contract {addr}.");
89
0
        self.contracts.insert(addr, contract);
90
0
    }
91
92
0
    fn extract_watched_logs(resp: EthGetLogsResp) -> WatchedLogsBatch {
93
0
        let mut out = WatchedLogsBatch::default();
94
0
        for e in resp.0.into_iter() {
95
0
            match WatchedLogKind::from_topic0(e.topics.first().copied()) {
96
                Some(WatchedLogKind::Slot) => {
97
                    // From https://docs.rs/alloy/latest/alloy/dyn_abi/index.html:
98
                    // We strongly recommend using the static encoder/decoder when possible. The
99
                    // dynamic encoder/decoder is significantly more expensive, especially for complex
100
                    // types. It is also significantly more error prone, as the mapping between
101
                    // solidity types and rust types is not enforced by the compiler.
102
0
                    let rlp_decoded = alloy_primitives::LogData::new_unchecked(e.topics, e.data);
103
0
                    let le = match eth::Slot::decode_log_data(&rlp_decoded).ok() {
104
0
                        Some(le) => le,
105
0
                        None => continue,
106
                    };
107
0
                    let mut slot = Slot::from(le);
108
0
                    let block_number = match e.block_number {
109
0
                        BlockNumber::Quantity(q) => q,
110
0
                        _ => unreachable!(),
111
                    };
112
0
                    let bn: u64 = block_number.to();
113
0
                    let ln: u64 = e.log_index.to();
114
0
                    if ln >> 32 != 0 {
115
0
                        tracing::warn!("Skipped a block that's too big.");
116
0
                        continue;
117
0
                    } // block number uses 44 bits
118
0
                    slot.pos = ChainPos::new(bn, ln as u32);
119
0
                    tracing::debug!("SlotCrawler: extracted slot={slot:?}.");
120
0
                    out.slots.entry(e.address).or_default().push(slot);
121
                }
122
0
                None => {}
123
            }
124
        }
125
0
        out
126
0
    }
127
128
0
    async fn fetch_watched_logs_for_batch(
129
0
        client: &ClientHandle, batch: &[Address], from_block: BlockNumber, to_block: BlockNumber,
130
0
    ) -> anyhow::Result<WatchedLogsBatch> {
131
0
        let resp: EthGetLogsResp = client
132
0
            .request::<EthGetLogs, EthGetLogsResp>(EthGetLogs {
133
0
                from_block,
134
0
                to_block,
135
0
                address: batch.to_vec(),
136
0
                topics: vec![WatchedLogKind::topic0_filter()],
137
0
            })
138
0
            .await
139
0
            .with_context(|| format!("Failed to retrieve {}..{}.", from_block, to_block))?;
140
0
        Ok(Self::extract_watched_logs(resp))
141
0
    }
142
143
0
    async fn update_contracts(
144
0
        contracts: &mut HashMap<Address, Contract>, resync: &mut bool,
145
0
        updates: impl Iterator<Item = (Address, Vec<Slot>)>,
146
0
    ) {
147
0
        'outer: for (addr, slots) in updates {
148
0
            let c = match contracts.get_mut(&addr) {
149
0
                Some(c) => c,
150
                None => {
151
0
                    tracing::debug!("Contract {addr} was removed.");
152
0
                    continue;
153
                }
154
            };
155
0
            let mut to_be_filled = Vec::new();
156
0
            for s in slots.into_iter() {
157
0
                if let crate::NextContractID::Eth(new_addr) = s.switch_contract {
158
0
                    if c.lyquid.send(SwitchContract { pos: s.pos }).await.is_err() {
159
0
                        tracing::warn!("Sync: LyquidBackend of {addr} already stopped.");
160
                        // We need to remove it from the watch list because the LyquidBackend actor
161
                        // is already unreachable.
162
0
                        contracts.remove(&addr);
163
0
                        continue 'outer;
164
0
                    }
165
0
                    assert!(c.new_addr.is_none());
166
0
                    c.new_addr = Some(new_addr);
167
0
                    *resync = true;
168
                    // Ignore all subsequent slots for this old address.
169
0
                    break;
170
0
                }
171
0
                to_be_filled.push(s);
172
            }
173
0
            if let Err(e) = c.seq.fill_slots(to_be_filled).await {
174
0
                tracing::error!("Sync: Failed to fill slots from {addr}: {e:?}");
175
                // We need to remove it from the watch list so the lyquid's next block won't be
176
                // advanced because the slots weren't filled properly.
177
0
                contracts.remove(&addr);
178
0
            }
179
        }
180
0
    }
181
182
    // This function is the core logic of SlotCrawler.
183
    //
184
    // It progressively tries to bring all contracts ("watch list") to be the same, latest block
185
    // known on the chain. While each query to the chain may bundle multiple contracts, the
186
    // philosophy is to still treat each contract's sync progress separately. This is good for
187
    // robust handling of LyquidBackend, because upon any errors during syncing, there is no
188
    // incorrectly advanced progress for any Lyquids.
189
    #[tracing::instrument(level = "trace", skip_all)]
190
0
    async fn sync(
191
0
        contracts: &mut HashMap<Address, Contract>, finality_tag: BlockNumber, blocks_per_request: usize,
192
0
        client: ClientHandle,
193
0
    ) -> anyhow::Result<()> {
194
        // Choose the contracts that should be considered during this sync.
195
        let mut chosen = Vec::new();
196
0
        for (addr, c) in contracts.iter().filter_map(|(addr, c)| {
197
0
            if c.seq.flow_control().should_stop_filling() {
198
0
                None
199
            } else {
200
0
                Some((*addr, c))
201
            }
202
0
        }) {
203
            match c.seq.next_block().await {
204
                Ok(bn) => {
205
                    chosen.push((addr, bn));
206
                }
207
                Err(e) => {
208
                    tracing::warn!("Sync: Failed to get the next block number for {addr}: {e:?}");
209
                    // We don't have to remove this contract from the watch list because it
210
                    // is not harmful to try again upon next sync().
211
                }
212
            }
213
        }
214
215
        if chosen.is_empty() {
216
            tracing::debug!("No contract need to fetch.");
217
            return Ok(());
218
        }
219
220
        // Let set a fix goal for this sync: up to the `finalized` block for all chosen contracts.
221
        let finalized: Block = client
222
            .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber {
223
                block: finality_tag,
224
                full_tx: false,
225
            })
226
            .in_current_span()
227
            .await
228
0
            .map_err(|e| e.into())
229
0
            .and_then(|f: EthGetBlockByNumberResp| f.0.ok_or(anyhow::anyhow!("result is null")))
230
            .context("Failed obtain the last finalized block.")?;
231
232
        let finalized = match finalized.number {
233
            BlockNumber::Quantity(b) => b,
234
            _ => {
235
                return Err(anyhow::anyhow!("Invalid finalized block number."));
236
            }
237
        }
238
        .to::<u64>();
239
240
        // Because the next chain position for each chosen contract (lyquid) could be different
241
        // (some may just be added to the watch list of SlotCrawler), we need to bundle the queries
242
        // in a way that is:
243
        //
244
        // - Not always just for a single address (to reduce the number of queries).
245
        // - Not always ask for a lot of addreses whose corresponding blocks are already
246
        // queried (to reduce the redundant data returned by each query).
247
        //
248
        // A clean and simple approach is to sort them according to their respective next block to
249
        // crawl from, and start from the lowest to catch up with the second lowest, then we bundle
250
        // up the contracts in a rolling style so there is *no* redundant data received.
251
        //
252
        // At the end of the rolling, if there is no error or contract switching (which requires a
253
        // resync), all current contracts are brought to the same next block value.
254
255
        chosen.sort_by_key(|k| k.1);
256
        chosen.push((Address::ZERO, finalized + 1)); // sentinel pos, to flush all batches
257
258
        let mut next_block_number = None;
259
        let mut batch = Vec::new();
260
        let mut resync = false;
261
262
        for (addr, block) in chosen {
263
            if let Some(mut n) = next_block_number {
264
                if block == n {
265
                    // in the same batch (same next block), let's don't query yet
266
                    batch.push(addr);
267
                    continue;
268
                } else {
269
                    // Now we need to query with the existing items in `batch` because this
270
                    // candidate's next block is larger.
271
                    while n < block {
272
                        // `end` is inclusive
273
                        let mut end: u64 = n + blocks_per_request as u64 - 1;
274
                        end = end.min(block - 1);
275
                        tracing::info!("Fetching [{}..{}] ({} addrs).", n, end, batch.len());
276
                        let from_block = BlockNumber::Quantity(U256::from(n));
277
                        let to_block = BlockNumber::Quantity(U256::from(end));
278
                        let watched = Self::fetch_watched_logs_for_batch(&client, &batch, from_block, to_block).await?;
279
                        Self::update_contracts(contracts, &mut resync, watched.slots.into_iter()).await;
280
                        n = end + 1;
281
                    }
282
                    // Now n == block, we can change the next value and continue bundling more addresses.
283
                }
284
            }
285
            next_block_number = Some(block);
286
            batch.push(addr);
287
        }
288
289
        // Finalizing the sync by setting the next block (telling FCO about the progress) for each
290
        // Lyquid whose contract has successfully synced to n.
291
        if let Some(n) = next_block_number {
292
            assert_eq!(n, finalized + 1);
293
            for addr in batch {
294
                if let hash_map::Entry::Occupied(mut e) = contracts.entry(addr) {
295
                    let c = e.get_mut();
296
                    if let Some(new_addr) = c.new_addr.take() {
297
                        // This lyquid is switching the contract. So we need to update the address
298
                        // info in the watch list. And we should *not* advance the next block
299
                        // because this lyquid needs to check for the slots using the new adddress
300
                        // in the previous block.
301
                        let c = e.remove();
302
                        contracts.insert(new_addr, c);
303
                        tracing::debug!("SlotCrawler: Switching contract {addr} => {new_addr}.");
304
                    } else {
305
                        // Otherwise, we can safetly tell FCO to advance the progress because we
306
                        // have successfully filled all slots up to n.
307
                        if let Err(e) = c.seq.set_next_block(n).await {
308
                            tracing::error!("SlotCrawler: Failed to advance fetching progress for {addr}: {e:?}");
309
                            // We don't have to remove this contract from the watch list because it
310
                            // is not harmful to try again upon next sync().
311
                        }
312
                    }
313
                }
314
            }
315
        }
316
        if resync {
317
            Box::pin(Self::sync(contracts, finality_tag, blocks_per_request, client)).await?;
318
        }
319
        tracing::info!("SlotCrawler: Sync complete.");
320
        Ok(())
321
0
    }
322
}
323
324
#[derive(Clone)]
325
pub(super) struct SlotCrawler {
326
    client: ClientHandle,
327
    wake_tx: mpsc::Sender<()>,
328
    contract_tx: mpsc::UnboundedSender<(Address, Contract)>,
329
    task_tracker: TaskTracker,
330
    finality_tag: BlockNumber,
331
}
332
debug_struct_name!(SlotCrawler);
333
334
impl SlotCrawler {
335
0
    pub(super) fn new(
336
0
        client: ClientHandle, blocks_per_request: usize, finality_tag: BlockNumber, shutdown: CancellationToken,
337
0
    ) -> Self {
338
0
        let (wake_tx, wake_rx) = mpsc::channel(SLOT_CRAWLER_WAKE_CAPACITY);
339
0
        let (contract_tx, contract_rx) = mpsc::unbounded_channel();
340
0
        let task_tracker = TaskTracker::new();
341
0
        task_tracker.spawn(
342
0
            SlotCrawlerRuntime {
343
0
                contracts: HashMap::new(),
344
0
                blocks_per_request,
345
0
                finality_tag,
346
0
                client: client.clone(),
347
0
            }
348
0
            .run(wake_rx, contract_rx)
349
0
            .with_cancellation_token_owned(shutdown),
350
        );
351
0
        task_tracker.close();
352
0
        Self {
353
0
            wake_tx,
354
0
            contract_tx,
355
0
            task_tracker,
356
0
            client,
357
0
            finality_tag,
358
0
        }
359
0
    }
360
361
0
    pub(super) async fn wait_for_shutdown(&self) {
362
0
        self.task_tracker.wait().await;
363
0
    }
364
365
0
    pub(super) fn wake(&self) -> bool {
366
0
        match self.wake_tx.try_send(()) {
367
0
            Ok(()) | Err(TrySendError::Full(_)) => true,
368
0
            Err(TrySendError::Closed(_)) => false,
369
        }
370
0
    }
371
372
0
    pub(super) async fn add_contract(&self, addr: Address, contract: Contract) -> anyhow::Result<()> {
373
0
        self.contract_tx
374
0
            .send((addr, contract))
375
0
            .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped"))
376
0
    }
377
}
378
379
#[async_trait]
380
impl OracleInfoFetcher for SlotCrawler {
381
0
    async fn fetch_oracle_info(&self, msg: FetchOracleInfo) -> Option<lyquor_primitives::oracle::OracleEpochInfo> {
382
        let OracleServiceTarget::EVM { eth_contract, .. } = msg.target.target else {
383
            return None;
384
        };
385
        let topic = alloy_primitives::keccak256(msg.topic.as_bytes());
386
        fetch_oracle_info(&self.client, self.finality_tag, eth_contract, topic, msg.full_config).await
387
0
    }
388
}
389
390
0
async fn finalized_block_number(client: &ClientHandle, finality_tag: BlockNumber) -> Option<u64> {
391
0
    let finalized: Block = client
392
0
        .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber {
393
0
            block: finality_tag,
394
0
            full_tx: false,
395
0
        })
396
0
        .await
397
0
        .ok()?
398
0
        .0?;
399
0
    match finalized.number {
400
0
        BlockNumber::Quantity(bn) => Some(bn.to()),
401
0
        _ => None,
402
    }
403
0
}
404
405
0
async fn fetch_oracle_info(
406
0
    client: &ClientHandle, finality_tag: BlockNumber, eth_contract: Address, topic: alloy_primitives::B256,
407
0
    full_config: bool,
408
0
) -> Option<lyquor_primitives::oracle::OracleEpochInfo> {
409
0
    let block_number = BlockNumber::Quantity(U256::from(finalized_block_number(client, finality_tag).await?));
410
0
    let get_epoch_input = eth::ISequenceBackend::getEpochCall { topic }.abi_encode();
411
0
    let epoch_ret: EthCallResp = client
412
0
        .request(EthCall {
413
0
            tx: EthCallTx {
414
0
                from: None,
415
0
                to: Some(eth_contract),
416
0
                gas: None,
417
0
                gas_price: None,
418
0
                value: None,
419
0
                data: Some(get_epoch_input.into()),
420
0
            },
421
0
            block_number: block_number.clone(),
422
0
        })
423
0
        .await
424
0
        .ok()?;
425
0
    let get_hash_input = eth::ISequenceBackend::getConfigHashCall { topic }.abi_encode();
426
0
    let hash_ret: EthCallResp = client
427
0
        .request(EthCall {
428
0
            tx: EthCallTx {
429
0
                from: None,
430
0
                to: Some(eth_contract),
431
0
                gas: None,
432
0
                gas_price: None,
433
0
                value: None,
434
0
                data: Some(get_hash_input.into()),
435
0
            },
436
0
            block_number: block_number.clone(),
437
0
        })
438
0
        .await
439
0
        .ok()?;
440
0
    let get_count_input = eth::ISequenceBackend::getChangeCountCall { topic }.abi_encode();
441
0
    let count_ret: EthCallResp = client
442
0
        .request(EthCall {
443
0
            tx: EthCallTx {
444
0
                from: None,
445
0
                to: Some(eth_contract),
446
0
                gas: None,
447
0
                gas_price: None,
448
0
                value: None,
449
0
                data: Some(get_count_input.into()),
450
0
            },
451
0
            block_number: block_number.clone(),
452
0
        })
453
0
        .await
454
0
        .ok()?;
455
0
    let epoch = eth::ISequenceBackend::getEpochCall::abi_decode_returns(epoch_ret.0.as_ref()).ok()?;
456
0
    let config_hash = eth::ISequenceBackend::getConfigHashCall::abi_decode_returns(hash_ret.0.as_ref()).ok()?;
457
0
    let change_count = eth::ISequenceBackend::getChangeCountCall::abi_decode_returns(count_ret.0.as_ref()).ok()?;
458
0
    let config_hash: lyquor_primitives::HashBytes = config_hash.0.into();
459
0
    let config = if full_config && config_hash != [0; 32].into() {
460
0
        let get_config_input = eth::ISequenceBackend::getConfigCall { topic }.abi_encode();
461
0
        let config_ret: EthCallResp = client
462
0
            .request(EthCall {
463
0
                tx: EthCallTx {
464
0
                    from: None,
465
0
                    to: Some(eth_contract),
466
0
                    gas: None,
467
0
                    gas_price: None,
468
0
                    value: None,
469
0
                    data: Some(get_config_input.into()),
470
0
                },
471
0
                block_number,
472
0
            })
473
0
            .await
474
0
            .ok()?;
475
0
        let config = eth::ISequenceBackend::getConfigCall::abi_decode_returns(config_ret.0.as_ref()).ok()?;
476
0
        let actual_hash: lyquor_primitives::Hash = alloy_primitives::keccak256(&config.abi_encode()).0.into();
477
0
        if actual_hash != config_hash.into_inner() {
478
0
            return None;
479
0
        }
480
        Some(lyquor_primitives::oracle::OracleConfig {
481
0
            committee: config
482
0
                .committee
483
0
                .into_iter()
484
0
                .map(|signer| lyquor_primitives::oracle::OracleSigner {
485
0
                    id: signer.id,
486
0
                    key: signer.nodeID.as_slice().to_vec().into(),
487
0
                })
488
0
                .collect(),
489
0
            threshold: config.threshold,
490
        })
491
    } else {
492
0
        None
493
    };
494
0
    Some(lyquor_primitives::oracle::OracleEpochInfo {
495
0
        epoch,
496
0
        config_hash,
497
0
        change_count,
498
0
        config,
499
0
    })
500
0
}
501
502
#[derive(Clone)]
503
pub(super) struct HeadWatcher {
504
    task_tracker: TaskTracker,
505
}
506
507
impl HeadWatcher {
508
0
    pub(super) fn new(client: ClientHandle, slot_crawler: SlotCrawler, shutdown: CancellationToken) -> Self {
509
0
        let task_tracker = TaskTracker::new();
510
0
        task_tracker.spawn(Self::head_watcher_loop(client, slot_crawler).with_cancellation_token_owned(shutdown));
511
0
        task_tracker.close();
512
0
        Self { task_tracker }
513
0
    }
514
515
0
    pub(super) async fn wait_for_shutdown(&self) {
516
0
        self.task_tracker.wait().await;
517
0
    }
518
519
0
    async fn subscribe_to_new_heads(
520
0
        client: &ClientHandle,
521
0
    ) -> Option<lyquor_jsonrpc::client::Subscription<EthNewHeadUpdate>> {
522
0
        match client.subscribe(EthSubscribe("newHeads".to_string())).await {
523
0
            Ok(subscription) => {
524
0
                tracing::debug!("Subscribed to new head");
525
0
                Some(subscription)
526
            }
527
0
            Err(e) => {
528
0
                tracing::error!("Failed to subscribe to new head: {e:?}");
529
0
                None
530
            }
531
        }
532
0
    }
533
534
0
    async fn head_watcher_loop(client: ClientHandle, slot_crawler: SlotCrawler) {
535
0
        let mut subscription = Self::subscribe_to_new_heads(&client).await;
536
537
        // NOTE: Hardhat Network may silently lose eth_subscribe state after a period of
538
        // inactivity (e.g., no block mining). Although the WebSocket connection remains
539
        // open, subscriptions can become stale — resulting in:
540
        // - No new calls being delivered, even when blocks are mined later
541
        // - eth_unsubscribe failing with "Subscription not found"
542
        // This is likely due to internal state cleanup or subscription manager desync in Hardhat's
543
        // in-memory event system. To work around this, we re-subscribe when activity resumes.
544
        //
545
        // This is not an issue for production chains like Ethereum.
546
0
        let mut periodic_poll = tokio::time::interval(Duration::from_secs(10));
547
0
        periodic_poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
548
0
        periodic_poll.tick().await;
549
550
0
        let mut reset = tokio::time::interval(Duration::from_secs(60));
551
0
        reset.set_missed_tick_behavior(MissedTickBehavior::Delay);
552
0
        reset.tick().await;
553
554
        loop {
555
0
            tokio::select! {
556
0
                _ = periodic_poll.tick() => {
557
0
                    if !slot_crawler.wake() {
558
0
                        break;
559
0
                    }
560
                }
561
0
                _ = reset.tick() => {
562
0
                    tracing::debug!("Resetting new head subscription");
563
0
                    subscription = Self::subscribe_to_new_heads(&client).await;
564
                }
565
0
                update = async {
566
0
                    match &mut subscription {
567
0
                        Some(subscription) => subscription.next().await,
568
0
                        None => std::future::pending().await,
569
                    }
570
0
                } => match update {
571
0
                    Some(msg) => {
572
0
                        tracing::debug!("New head: {:?}.", msg);
573
0
                        if !slot_crawler.wake() {
574
0
                            break;
575
0
                        }
576
                    }
577
                    None => {
578
0
                        tracing::warn!("New head subscription ended; waiting for reset to retry");
579
0
                        subscription = None;
580
                    }
581
                }
582
            }
583
        }
584
0
    }
585
}