Coverage Report

Created: 2026-05-21 08:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/seq/src/eth/crawler.rs
Line
Count
Source
1
use super::{ChainPos, Contract, Slot};
2
use alloy_sol_types::SolEvent;
3
use futures::FutureExt as _;
4
use lyquor_api::anyhow::{self, Context as _Context};
5
use lyquor_eth::eth;
6
use lyquor_jsonrpc::client::ClientHandle;
7
use lyquor_jsonrpc::types::{
8
    Block, BlockNumber, EthGetBlockByNumber, EthGetBlockByNumberResp, EthGetLogs, EthGetLogsResp, EthNewHeadUpdate,
9
    EthSubscribe,
10
};
11
use lyquor_primitives::{Address, LyquidID, U256, alloy_primitives, debug_struct_name};
12
use std::collections::HashMap;
13
use std::time::Duration;
14
use tokio::sync::mpsc;
15
use tokio::sync::mpsc::error::TrySendError;
16
use tokio::time::MissedTickBehavior;
17
use tokio_util::sync::CancellationToken;
18
use tokio_util::task::TaskTracker;
19
use tokio_util::time::FutureExt as _;
20
use tracing::Instrument;
21
22
enum WatchedLogKind {
23
    Slot,
24
}
25
26
impl WatchedLogKind {
27
0
    fn from_topic0(topic0: Option<alloy_primitives::B256>) -> Option<Self> {
28
0
        match topic0 {
29
0
            Some(t) if t == eth::Slot::SIGNATURE_HASH => Some(Self::Slot),
30
0
            _ => None,
31
        }
32
0
    }
33
34
0
    fn topic0_filter() -> Vec<alloy_primitives::B256> {
35
0
        vec![eth::Slot::SIGNATURE_HASH]
36
0
    }
37
}
38
39
#[derive(Default)]
40
struct WatchedLogsBatch {
41
    slots: HashMap<Address, Vec<Slot>>,
42
}
43
44
// Wakeups are edge-triggered: one queued wake is enough to force another sync pass.
45
const SLOT_CRAWLER_WAKE_CAPACITY: usize = 1;
46
47
struct SlotCrawlerWorker {
48
    client: ClientHandle,
49
    contracts: HashMap<LyquidID, Contract>,
50
    blocks_per_request: usize,
51
    finality_tag: BlockNumber,
52
}
53
54
enum ContractCommand {
55
    Upsert(Contract),
56
    RemoveLyquid(LyquidID),
57
}
58
59
impl SlotCrawlerWorker {
60
0
    async fn run(mut self, mut wake_rx: mpsc::Receiver<()>, mut contract_rx: mpsc::UnboundedReceiver<ContractCommand>) {
61
        loop {
62
0
            tokio::select! {
63
0
                Some(cmd) = contract_rx.recv() => {
64
0
                    self.handle_contract_command(cmd);
65
0
                }
66
0
                Some(()) = wake_rx.recv() => {}
67
0
                else => break,
68
            }
69
70
0
            while let Ok(cmd) = contract_rx.try_recv() {
71
0
                self.handle_contract_command(cmd);
72
0
            }
73
0
            while let Ok(()) = wake_rx.try_recv() {}
74
75
0
            if let Err(e) = Self::sync(
76
0
                &mut self.contracts,
77
0
                self.finality_tag,
78
0
                self.blocks_per_request,
79
0
                self.client.clone(),
80
            )
81
0
            .await
82
            {
83
0
                tracing::error!("SlotCrawler: Error during sync: {e:?}");
84
0
            }
85
        }
86
0
    }
87
88
0
    fn handle_contract_command(&mut self, cmd: ContractCommand) {
89
0
        match cmd {
90
0
            ContractCommand::Upsert(contract) => {
91
0
                tracing::debug!(
92
                    "SlotCrawler: Added contract {} for lyquid {}.",
93
                    contract.addr,
94
                    contract.id
95
                );
96
0
                self.contracts.insert(contract.id, contract);
97
            }
98
0
            ContractCommand::RemoveLyquid(id) => {
99
0
                if self.contracts.remove(&id).is_some() {
100
0
                    tracing::debug!("SlotCrawler: Removed contract watch for lyquid {id}.");
101
0
                }
102
            }
103
        }
104
0
    }
105
106
0
    fn extract_watched_logs(resp: EthGetLogsResp) -> WatchedLogsBatch {
107
0
        let mut out = WatchedLogsBatch::default();
108
0
        for e in resp.0.into_iter() {
109
0
            match WatchedLogKind::from_topic0(e.topics.first().copied()) {
110
                Some(WatchedLogKind::Slot) => {
111
                    // From https://docs.rs/alloy/latest/alloy/dyn_abi/index.html:
112
                    // We strongly recommend using the static encoder/decoder when possible. The
113
                    // dynamic encoder/decoder is significantly more expensive, especially for complex
114
                    // types. It is also significantly more error prone, as the mapping between
115
                    // solidity types and rust types is not enforced by the compiler.
116
0
                    let rlp_decoded = alloy_primitives::LogData::new_unchecked(e.topics, e.data);
117
0
                    let le = match eth::Slot::decode_log_data(&rlp_decoded).ok() {
118
0
                        Some(le) => le,
119
0
                        None => continue,
120
                    };
121
0
                    let mut slot = Slot::from(le);
122
0
                    let block_number = match e.block_number {
123
0
                        BlockNumber::Quantity(q) => q,
124
0
                        _ => unreachable!(),
125
                    };
126
0
                    let bn: u64 = block_number.to();
127
0
                    let ln: u64 = e.log_index.to();
128
0
                    if ln >> 32 != 0 {
129
0
                        tracing::warn!("Skipped a block that's too big.");
130
0
                        continue;
131
0
                    } // block number uses 44 bits
132
0
                    slot.pos = ChainPos::new(bn, ln as u32);
133
0
                    tracing::debug!("SlotCrawler: extracted slot={slot:?}.");
134
0
                    out.slots.entry(e.address).or_default().push(slot);
135
                }
136
0
                None => {}
137
            }
138
        }
139
0
        out
140
0
    }
141
142
0
    async fn fetch_watched_logs_for_batch(
143
0
        client: &ClientHandle, batch: impl IntoIterator<Item = &Address>, from_block: BlockNumber,
144
0
        to_block: BlockNumber,
145
0
    ) -> anyhow::Result<WatchedLogsBatch> {
146
0
        let resp: EthGetLogsResp = client
147
0
            .request::<EthGetLogs, EthGetLogsResp>(EthGetLogs {
148
0
                from_block,
149
0
                to_block,
150
0
                address: batch.into_iter().copied().collect(),
151
0
                topics: vec![WatchedLogKind::topic0_filter()],
152
0
            })
153
0
            .await
154
0
            .with_context(|| format!("Failed to retrieve {}..{}.", from_block, to_block))?;
155
0
        Ok(Self::extract_watched_logs(resp))
156
0
    }
157
158
0
    async fn update_contracts(
159
0
        contracts: &mut HashMap<LyquidID, Contract>, batch_ids_by_addr: &HashMap<Address, LyquidID>, resync: &mut bool,
160
0
        updates: impl Iterator<Item = (Address, Vec<Slot>)>,
161
0
    ) {
162
0
        for (addr, slots) in updates {
163
0
            let id = match batch_ids_by_addr.get(&addr).copied() {
164
0
                Some(id) => id,
165
                None => {
166
0
                    tracing::debug!("Skipped stale logs for unknown contract address {addr}.");
167
0
                    continue;
168
                }
169
            };
170
0
            let mut remove_self = false;
171
            {
172
0
                let c = match contracts.get_mut(&id) {
173
0
                    Some(c) => c,
174
                    None => {
175
0
                        tracing::debug!("Lyquid {id} was removed.");
176
0
                        continue;
177
                    }
178
                };
179
0
                if c.addr != addr {
180
0
                    tracing::debug!(
181
                        "Skipped stale logs for Lyquid {id} because current address {} does not match {}.",
182
                        c.addr,
183
                        addr
184
                    );
185
0
                    continue;
186
0
                }
187
0
                let mut to_be_filled = Vec::new();
188
0
                for s in slots.into_iter() {
189
0
                    if let crate::NextContractID::Eth(new_addr) = s.switch_contract {
190
0
                        if let Err(e) = c.lyquid.switch_contract(s.pos).await {
191
0
                            tracing::warn!("Sync: LyquidBackend of {id} is unavailable: {e:?}");
192
                            // We need to remove it from the watch list because the LyquidBackend
193
                            // worker is already unreachable.
194
0
                            remove_self = true;
195
                        } else {
196
0
                            tracing::debug!("SlotCrawler: Switching contract for {id}: {addr} => {new_addr}.");
197
0
                            c.addr = new_addr;
198
0
                            *resync = true;
199
                        }
200
                        // Ignore all subsequent slots for this old address.
201
0
                        break;
202
0
                    }
203
0
                    to_be_filled.push(s);
204
                }
205
0
                if !remove_self {
206
0
                    if let Err(e) = c.seq.fill_slots(to_be_filled).await {
207
0
                        tracing::error!("Sync: Failed to fill slots from {id} ({addr}): {e:?}");
208
                        // We need to remove it from the watch list so the lyquid's next block won't be
209
                        // advanced because the slots weren't filled properly.
210
0
                        remove_self = true;
211
0
                    }
212
0
                }
213
            }
214
0
            if remove_self {
215
0
                contracts.remove(&id);
216
0
                continue;
217
0
            }
218
        }
219
0
    }
220
221
    // This function is the core logic of SlotCrawler.
222
    //
223
    // It progressively tries to bring all contracts ("watch list") to be the same, latest block
224
    // known on the chain. While each query to the chain may bundle multiple contracts, the
225
    // philosophy is to still treat each contract's sync progress separately. This is good for
226
    // robust handling of LyquidBackend, because upon any errors during syncing, there is no
227
    // incorrectly advanced progress for any Lyquids.
228
    #[tracing::instrument(level = "trace", skip_all)]
229
0
    async fn sync(
230
0
        contracts: &mut HashMap<LyquidID, Contract>, finality_tag: BlockNumber, blocks_per_request: usize,
231
0
        client: ClientHandle,
232
0
    ) -> anyhow::Result<()> {
233
        // Choose the contracts that should be considered during this sync.
234
        let mut chosen = {
235
0
            let next_blocks = contracts.iter().filter_map(|(id, c)| {
236
0
                if c.seq.flow_control().should_stop_filling() {
237
0
                    None
238
                } else {
239
0
                    Some(c.seq.next_block().map(|bn| (bn, *id, c.addr)))
240
                }
241
0
            });
242
            futures::future::join_all(next_blocks)
243
                .await
244
                .into_iter()
245
0
                .filter_map(|(res, id, addr)| {
246
0
                    res.map_err(|e| tracing::warn!("Sync: Failed to get the next block number for {addr}: {e:?}"))
247
                        // We don't have to remove this contract from the watch list because it
248
                        // is not harmful to try again upon next sync().
249
0
                        .ok()
250
0
                        .map(|bn| (bn, id, addr))
251
0
                })
252
                .collect::<Vec<_>>()
253
        };
254
255
        if chosen.is_empty() {
256
            tracing::debug!("No contract need to fetch.");
257
            return Ok(());
258
        }
259
260
        // Let set a fix goal for this sync: up to the `finalized` block for all chosen contracts.
261
        let finalized: Block = client
262
            .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber {
263
                block: finality_tag,
264
                full_tx: false,
265
            })
266
            .in_current_span()
267
            .await
268
0
            .map_err(|e| e.into())
269
0
            .and_then(|f: EthGetBlockByNumberResp| f.0.ok_or(anyhow::anyhow!("result is null")))
270
            .context("Failed obtain the last finalized block.")?;
271
272
        let finalized = match finalized.number {
273
            BlockNumber::Quantity(b) => b,
274
            _ => {
275
                return Err(anyhow::anyhow!("Invalid finalized block number."));
276
            }
277
        }
278
        .to::<u64>();
279
280
        // Because the next chain position for each chosen contract (lyquid) could be different
281
        // (some may just be added to the watch list of SlotCrawler), we need to bundle the queries
282
        // in a way that is:
283
        //
284
        // - Not always just for a single address (to reduce the number of queries).
285
        // - Not always ask for a lot of addreses whose corresponding blocks are already
286
        // queried (to reduce the redundant data returned by each query).
287
        //
288
        // A clean and simple approach is to sort them according to their respective next block to
289
        // crawl from, and start from the lowest to catch up with the second lowest, then we bundle
290
        // up the contracts in a rolling style so there is *no* redundant data received.
291
        //
292
        // At the end of the rolling, if there is no error or contract switching (which requires a
293
        // resync), all current contracts are brought to the same next block value.
294
295
        chosen.sort_by_key(|k| k.0);
296
        let sentinel_id = LyquidID::from(Address::ZERO);
297
        chosen.push((finalized + 1, sentinel_id, Address::ZERO)); // sentinel pos, to flush all batches
298
299
        let mut next_block_number = None;
300
        let mut batch = HashMap::new();
301
        let mut resync = false;
302
303
        for (block, id, addr) in chosen {
304
            if let Some(mut n) = next_block_number {
305
                if block == n {
306
                    // in the same batch (same next block), let's don't query yet
307
                    batch.insert(addr, id);
308
                    continue;
309
                } else {
310
                    // Now we need to query with the existing items in `batch` because this
311
                    // candidate's next block is larger.
312
                    while n < block {
313
                        // `end` is inclusive
314
                        let mut end: u64 = n + blocks_per_request as u64 - 1;
315
                        end = end.min(block - 1);
316
                        tracing::info!("Fetching [{}..{}] ({} addrs).", n, end, batch.len());
317
                        let from_block = BlockNumber::Quantity(U256::from(n));
318
                        let to_block = BlockNumber::Quantity(U256::from(end));
319
                        let watched =
320
                            Self::fetch_watched_logs_for_batch(&client, batch.keys(), from_block, to_block).await?;
321
                        Self::update_contracts(contracts, &batch, &mut resync, watched.slots.into_iter()).await;
322
                        n = end + 1;
323
                    }
324
                    // Now n == block, we can change the next value and continue bundling more addresses.
325
                }
326
            }
327
            next_block_number = Some(block);
328
            batch.insert(addr, id);
329
        }
330
331
        // Finalizing the sync by setting the next block (telling FCO about the progress) for each
332
        // Lyquid whose contract has successfully synced to n.
333
        if let Some(n) = next_block_number {
334
            assert_eq!(n, finalized + 1);
335
            for (addr, id) in batch {
336
                if let Some(c) = contracts.get_mut(&id) {
337
                    if c.addr != addr {
338
                        tracing::debug!(
339
                            "SlotCrawler: Skipping stale batch entry for {id}; current address is {}, expected {}.",
340
                            c.addr,
341
                            addr
342
                        );
343
                        continue;
344
                    }
345
                    if let Err(e) = c.seq.set_next_block(n).await {
346
                        // Otherwise, we can safetly tell FCO to advance the progress because we
347
                        // have successfully filled all slots up to n.
348
                        tracing::error!("SlotCrawler: Failed to advance fetching progress for {id} ({addr}): {e:?}");
349
                        // We don't have to remove this contract from the watch list because it
350
                        // is not harmful to try again upon next sync().
351
                    }
352
                }
353
            }
354
        }
355
        if resync {
356
            Box::pin(Self::sync(contracts, finality_tag, blocks_per_request, client)).await?;
357
        }
358
        tracing::info!("SlotCrawler: Sync complete.");
359
        Ok(())
360
0
    }
361
}
362
363
// TODO: Drop Clone once the outer backend ownership is explicit enough that only
364
// SlotCrawlerHandle needs to be cloned.
365
#[derive(Clone)]
366
pub(super) struct SlotCrawler {
367
    handle: SlotCrawlerHandle,
368
    task_tracker: TaskTracker,
369
}
370
debug_struct_name!(SlotCrawler);
371
372
#[derive(Clone)]
373
pub(super) struct SlotCrawlerHandle {
374
    wake_tx: mpsc::Sender<()>,
375
    contract_tx: mpsc::UnboundedSender<ContractCommand>,
376
}
377
debug_struct_name!(SlotCrawlerHandle);
378
379
impl SlotCrawler {
380
0
    pub(super) fn new(
381
0
        client: ClientHandle, blocks_per_request: usize, finality_tag: BlockNumber, shutdown: CancellationToken,
382
0
    ) -> Self {
383
0
        let (wake_tx, wake_rx) = mpsc::channel(SLOT_CRAWLER_WAKE_CAPACITY);
384
0
        let (contract_tx, contract_rx) = mpsc::unbounded_channel();
385
0
        let task_tracker = TaskTracker::new();
386
0
        task_tracker.spawn(
387
0
            SlotCrawlerWorker {
388
0
                contracts: HashMap::new(),
389
0
                blocks_per_request,
390
0
                finality_tag,
391
0
                client: client.clone(),
392
0
            }
393
0
            .run(wake_rx, contract_rx)
394
0
            .with_cancellation_token_owned(shutdown),
395
        );
396
0
        task_tracker.close();
397
0
        Self {
398
0
            handle: SlotCrawlerHandle { wake_tx, contract_tx },
399
0
            task_tracker,
400
0
        }
401
0
    }
402
403
0
    pub(super) fn handle(&self) -> SlotCrawlerHandle {
404
0
        self.handle.clone()
405
0
    }
406
407
0
    pub(super) async fn wait_for_shutdown(&self) {
408
0
        self.task_tracker.wait().await;
409
0
    }
410
}
411
412
impl SlotCrawlerHandle {
413
0
    pub(super) fn wake(&self) -> bool {
414
0
        match self.wake_tx.try_send(()) {
415
0
            Ok(()) | Err(TrySendError::Full(_)) => true,
416
0
            Err(TrySendError::Closed(_)) => false,
417
        }
418
0
    }
419
420
    // TODO: consider return a token that automatically removes the contract from the watch list when dropped,
421
    // to avoid the need of explicit removal and the potential risk of forgetting to remove.
422
0
    pub(super) async fn add_contract(&self, contract: Contract) -> anyhow::Result<()> {
423
0
        self.contract_tx
424
0
            .send(ContractCommand::Upsert(contract))
425
0
            .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped"))
426
0
    }
427
428
0
    pub(super) fn remove_lyquid(&self, lyquid: LyquidID) -> anyhow::Result<()> {
429
0
        self.contract_tx
430
0
            .send(ContractCommand::RemoveLyquid(lyquid))
431
0
            .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped"))
432
0
    }
433
}
434
435
#[derive(Clone)]
436
pub(super) struct HeadWatcher {
437
    task_tracker: TaskTracker,
438
}
439
440
impl HeadWatcher {
441
0
    pub(super) fn new(client: ClientHandle, slot_crawler: SlotCrawlerHandle, shutdown: CancellationToken) -> Self {
442
0
        let task_tracker = TaskTracker::new();
443
0
        task_tracker.spawn(Self::head_watcher_loop(client, slot_crawler).with_cancellation_token_owned(shutdown));
444
0
        task_tracker.close();
445
0
        Self { task_tracker }
446
0
    }
447
448
0
    pub(super) async fn wait_for_shutdown(&self) {
449
0
        self.task_tracker.wait().await;
450
0
    }
451
452
0
    async fn subscribe_to_new_heads(
453
0
        client: &ClientHandle,
454
0
    ) -> Option<lyquor_jsonrpc::client::Subscription<EthNewHeadUpdate>> {
455
0
        match client.subscribe(EthSubscribe("newHeads".to_string())).await {
456
0
            Ok(subscription) => {
457
0
                tracing::debug!("Subscribed to new head");
458
0
                Some(subscription)
459
            }
460
0
            Err(e) => {
461
0
                tracing::error!("Failed to subscribe to new head: {e:?}");
462
0
                None
463
            }
464
        }
465
0
    }
466
467
0
    async fn head_watcher_loop(client: ClientHandle, slot_crawler: SlotCrawlerHandle) {
468
0
        let mut subscription = Self::subscribe_to_new_heads(&client).await;
469
470
        // NOTE: Hardhat Network may silently lose eth_subscribe state after a period of
471
        // inactivity (e.g., no block mining). Although the WebSocket connection remains
472
        // open, subscriptions can become stale — resulting in:
473
        // - No new calls being delivered, even when blocks are mined later
474
        // - eth_unsubscribe failing with "Subscription not found"
475
        // This is likely due to internal state cleanup or subscription manager desync in Hardhat's
476
        // in-memory event system. To work around this, we re-subscribe when activity resumes.
477
        //
478
        // This is not an issue for production chains like Ethereum.
479
0
        let mut periodic_poll = tokio::time::interval(Duration::from_secs(10));
480
0
        periodic_poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
481
0
        periodic_poll.tick().await;
482
483
0
        let mut reset = tokio::time::interval(Duration::from_secs(60));
484
0
        reset.set_missed_tick_behavior(MissedTickBehavior::Delay);
485
0
        reset.tick().await;
486
487
        loop {
488
0
            tokio::select! {
489
0
                _ = periodic_poll.tick() => {
490
0
                    if !slot_crawler.wake() {
491
0
                        break;
492
0
                    }
493
                }
494
0
                _ = reset.tick() => {
495
0
                    tracing::debug!("Resetting new head subscription");
496
0
                    subscription = Self::subscribe_to_new_heads(&client).await;
497
                }
498
0
                update = async {
499
0
                    match &mut subscription {
500
0
                        Some(subscription) => subscription.next().await,
501
0
                        None => std::future::pending().await,
502
                    }
503
0
                } => match update {
504
0
                    Some(msg) => {
505
0
                        tracing::debug!("New head: {:?}.", msg);
506
0
                        if !slot_crawler.wake() {
507
0
                            break;
508
0
                        }
509
                    }
510
                    None => {
511
0
                        tracing::warn!("New head subscription ended; waiting for reset to retry");
512
0
                        subscription = None;
513
                    }
514
                }
515
            }
516
        }
517
0
    }
518
}