/home/runner/work/lyquor/lyquor/seq/src/eth/crawler.rs
Line | Count | Source |
1 | | use super::{ChainPos, Contract, Slot, SwitchContract}; |
2 | | use crate::OracleInfoFetcher; |
3 | | use alloy_sol_types::{SolCall, SolEvent, SolValue}; |
4 | | use async_trait::async_trait; |
5 | | use lyquor_api::actor::FetchOracleInfo; |
6 | | use lyquor_api::anyhow::{self, Context as _Context}; |
7 | | use lyquor_eth::eth; |
8 | | use lyquor_jsonrpc::client::ClientHandle; |
9 | | use lyquor_jsonrpc::types::{ |
10 | | Block, BlockNumber, EthCall, EthCallResp, EthCallTx, EthGetBlockByNumber, EthGetBlockByNumberResp, EthGetLogs, |
11 | | EthGetLogsResp, EthNewHeadUpdate, EthSubscribe, |
12 | | }; |
13 | | use lyquor_primitives::{Address, U256, alloy_primitives, debug_struct_name, oracle::OracleServiceTarget}; |
14 | | use std::collections::{HashMap, hash_map}; |
15 | | use std::time::Duration; |
16 | | use tokio::sync::mpsc; |
17 | | use tokio::sync::mpsc::error::TrySendError; |
18 | | use tokio::time::MissedTickBehavior; |
19 | | use tokio_util::sync::CancellationToken; |
20 | | use tokio_util::task::TaskTracker; |
21 | | use tokio_util::time::FutureExt; |
22 | | use tracing::Instrument; |
23 | | |
24 | | enum WatchedLogKind { |
25 | | Slot, |
26 | | } |
27 | | |
28 | | impl WatchedLogKind { |
29 | 0 | fn from_topic0(topic0: Option<alloy_primitives::B256>) -> Option<Self> { |
30 | 0 | match topic0 { |
31 | 0 | Some(t) if t == eth::Slot::SIGNATURE_HASH => Some(Self::Slot), |
32 | 0 | _ => None, |
33 | | } |
34 | 0 | } |
35 | | |
36 | 0 | fn topic0_filter() -> Vec<alloy_primitives::B256> { |
37 | 0 | vec![eth::Slot::SIGNATURE_HASH] |
38 | 0 | } |
39 | | } |
40 | | |
41 | | #[derive(Default)] |
42 | | struct WatchedLogsBatch { |
43 | | slots: HashMap<Address, Vec<Slot>>, |
44 | | } |
45 | | |
46 | | // Wakeups are edge-triggered: one queued wake is enough to force another sync pass. |
47 | | const SLOT_CRAWLER_WAKE_CAPACITY: usize = 1; |
48 | | |
49 | | pub(super) struct SlotCrawlerRuntime { |
50 | | client: ClientHandle, |
51 | | contracts: HashMap<Address, Contract>, |
52 | | blocks_per_request: usize, |
53 | | finality_tag: BlockNumber, |
54 | | } |
55 | | |
56 | | impl SlotCrawlerRuntime { |
57 | 0 | async fn run( |
58 | 0 | mut self, mut wake_rx: mpsc::Receiver<()>, mut contract_rx: mpsc::UnboundedReceiver<(Address, Contract)>, |
59 | 0 | ) { |
60 | | loop { |
61 | 0 | tokio::select! { |
62 | 0 | Some((addr, contract)) = contract_rx.recv() => { |
63 | 0 | self.handle_contract(addr, contract); |
64 | 0 | } |
65 | 0 | Some(()) = wake_rx.recv() => {} |
66 | 0 | else => break, |
67 | | } |
68 | | |
69 | 0 | while let Ok((addr, contract)) = contract_rx.try_recv() { |
70 | 0 | self.handle_contract(addr, contract); |
71 | 0 | } |
72 | 0 | while let Ok(()) = wake_rx.try_recv() {} |
73 | | |
74 | 0 | if let Err(e) = Self::sync( |
75 | 0 | &mut self.contracts, |
76 | 0 | self.finality_tag, |
77 | 0 | self.blocks_per_request, |
78 | 0 | self.client.clone(), |
79 | | ) |
80 | 0 | .await |
81 | | { |
82 | 0 | tracing::error!("SlotCrawler: Error during sync: {e:?}"); |
83 | 0 | } |
84 | | } |
85 | 0 | } |
86 | | |
87 | 0 | fn handle_contract(&mut self, addr: Address, contract: Contract) { |
88 | 0 | tracing::debug!("SlotCrawler: Added contract {addr}."); |
89 | 0 | self.contracts.insert(addr, contract); |
90 | 0 | } |
91 | | |
92 | 0 | fn extract_watched_logs(resp: EthGetLogsResp) -> WatchedLogsBatch { |
93 | 0 | let mut out = WatchedLogsBatch::default(); |
94 | 0 | for e in resp.0.into_iter() { |
95 | 0 | match WatchedLogKind::from_topic0(e.topics.first().copied()) { |
96 | | Some(WatchedLogKind::Slot) => { |
97 | | // From https://docs.rs/alloy/latest/alloy/dyn_abi/index.html: |
98 | | // We strongly recommend using the static encoder/decoder when possible. The |
99 | | // dynamic encoder/decoder is significantly more expensive, especially for complex |
100 | | // types. It is also significantly more error prone, as the mapping between |
101 | | // solidity types and rust types is not enforced by the compiler. |
102 | 0 | let rlp_decoded = alloy_primitives::LogData::new_unchecked(e.topics, e.data); |
103 | 0 | let le = match eth::Slot::decode_log_data(&rlp_decoded).ok() { |
104 | 0 | Some(le) => le, |
105 | 0 | None => continue, |
106 | | }; |
107 | 0 | let mut slot = Slot::from(le); |
108 | 0 | let block_number = match e.block_number { |
109 | 0 | BlockNumber::Quantity(q) => q, |
110 | 0 | _ => unreachable!(), |
111 | | }; |
112 | 0 | let bn: u64 = block_number.to(); |
113 | 0 | let ln: u64 = e.log_index.to(); |
114 | 0 | if ln >> 32 != 0 { |
115 | 0 | tracing::warn!("Skipped a block that's too big."); |
116 | 0 | continue; |
117 | 0 | } // block number uses 44 bits |
118 | 0 | slot.pos = ChainPos::new(bn, ln as u32); |
119 | 0 | tracing::debug!("SlotCrawler: extracted slot={slot:?}."); |
120 | 0 | out.slots.entry(e.address).or_default().push(slot); |
121 | | } |
122 | 0 | None => {} |
123 | | } |
124 | | } |
125 | 0 | out |
126 | 0 | } |
127 | | |
128 | 0 | async fn fetch_watched_logs_for_batch( |
129 | 0 | client: &ClientHandle, batch: &[Address], from_block: BlockNumber, to_block: BlockNumber, |
130 | 0 | ) -> anyhow::Result<WatchedLogsBatch> { |
131 | 0 | let resp: EthGetLogsResp = client |
132 | 0 | .request::<EthGetLogs, EthGetLogsResp>(EthGetLogs { |
133 | 0 | from_block, |
134 | 0 | to_block, |
135 | 0 | address: batch.to_vec(), |
136 | 0 | topics: vec![WatchedLogKind::topic0_filter()], |
137 | 0 | }) |
138 | 0 | .await |
139 | 0 | .with_context(|| format!("Failed to retrieve {}..{}.", from_block, to_block))?; |
140 | 0 | Ok(Self::extract_watched_logs(resp)) |
141 | 0 | } |
142 | | |
143 | 0 | async fn update_contracts( |
144 | 0 | contracts: &mut HashMap<Address, Contract>, resync: &mut bool, |
145 | 0 | updates: impl Iterator<Item = (Address, Vec<Slot>)>, |
146 | 0 | ) { |
147 | 0 | 'outer: for (addr, slots) in updates { |
148 | 0 | let c = match contracts.get_mut(&addr) { |
149 | 0 | Some(c) => c, |
150 | | None => { |
151 | 0 | tracing::debug!("Contract {addr} was removed."); |
152 | 0 | continue; |
153 | | } |
154 | | }; |
155 | 0 | let mut to_be_filled = Vec::new(); |
156 | 0 | for s in slots.into_iter() { |
157 | 0 | if let crate::NextContractID::Eth(new_addr) = s.switch_contract { |
158 | 0 | if c.lyquid.send(SwitchContract { pos: s.pos }).await.is_err() { |
159 | 0 | tracing::warn!("Sync: LyquidBackend of {addr} already stopped."); |
160 | | // We need to remove it from the watch list because the LyquidBackend actor |
161 | | // is already unreachable. |
162 | 0 | contracts.remove(&addr); |
163 | 0 | continue 'outer; |
164 | 0 | } |
165 | 0 | assert!(c.new_addr.is_none()); |
166 | 0 | c.new_addr = Some(new_addr); |
167 | 0 | *resync = true; |
168 | | // Ignore all subsequent slots for this old address. |
169 | 0 | break; |
170 | 0 | } |
171 | 0 | to_be_filled.push(s); |
172 | | } |
173 | 0 | if let Err(e) = c.seq.fill_slots(to_be_filled).await { |
174 | 0 | tracing::error!("Sync: Failed to fill slots from {addr}: {e:?}"); |
175 | | // We need to remove it from the watch list so the lyquid's next block won't be |
176 | | // advanced because the slots weren't filled properly. |
177 | 0 | contracts.remove(&addr); |
178 | 0 | } |
179 | | } |
180 | 0 | } |
181 | | |
182 | | // This function is the core logic of SlotCrawler. |
183 | | // |
184 | | // It progressively tries to bring all contracts ("watch list") to be the same, latest block |
185 | | // known on the chain. While each query to the chain may bundle multiple contracts, the |
186 | | // philosophy is to still treat each contract's sync progress separately. This is good for |
187 | | // robust handling of LyquidBackend, because upon any errors during syncing, there is no |
188 | | // incorrectly advanced progress for any Lyquids. |
189 | | #[tracing::instrument(level = "trace", skip_all)] |
190 | 0 | async fn sync( |
191 | 0 | contracts: &mut HashMap<Address, Contract>, finality_tag: BlockNumber, blocks_per_request: usize, |
192 | 0 | client: ClientHandle, |
193 | 0 | ) -> anyhow::Result<()> { |
194 | | // Choose the contracts that should be considered during this sync. |
195 | | let mut chosen = Vec::new(); |
196 | 0 | for (addr, c) in contracts.iter().filter_map(|(addr, c)| { |
197 | 0 | if c.seq.flow_control().should_stop_filling() { |
198 | 0 | None |
199 | | } else { |
200 | 0 | Some((*addr, c)) |
201 | | } |
202 | 0 | }) { |
203 | | match c.seq.next_block().await { |
204 | | Ok(bn) => { |
205 | | chosen.push((addr, bn)); |
206 | | } |
207 | | Err(e) => { |
208 | | tracing::warn!("Sync: Failed to get the next block number for {addr}: {e:?}"); |
209 | | // We don't have to remove this contract from the watch list because it |
210 | | // is not harmful to try again upon next sync(). |
211 | | } |
212 | | } |
213 | | } |
214 | | |
215 | | if chosen.is_empty() { |
216 | | tracing::debug!("No contract need to fetch."); |
217 | | return Ok(()); |
218 | | } |
219 | | |
220 | | // Let set a fix goal for this sync: up to the `finalized` block for all chosen contracts. |
221 | | let finalized: Block = client |
222 | | .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber { |
223 | | block: finality_tag, |
224 | | full_tx: false, |
225 | | }) |
226 | | .in_current_span() |
227 | | .await |
228 | 0 | .map_err(|e| e.into()) |
229 | 0 | .and_then(|f: EthGetBlockByNumberResp| f.0.ok_or(anyhow::anyhow!("result is null"))) |
230 | | .context("Failed obtain the last finalized block.")?; |
231 | | |
232 | | let finalized = match finalized.number { |
233 | | BlockNumber::Quantity(b) => b, |
234 | | _ => { |
235 | | return Err(anyhow::anyhow!("Invalid finalized block number.")); |
236 | | } |
237 | | } |
238 | | .to::<u64>(); |
239 | | |
240 | | // Because the next chain position for each chosen contract (lyquid) could be different |
241 | | // (some may just be added to the watch list of SlotCrawler), we need to bundle the queries |
242 | | // in a way that is: |
243 | | // |
244 | | // - Not always just for a single address (to reduce the number of queries). |
245 | | // - Not always ask for a lot of addreses whose corresponding blocks are already |
246 | | // queried (to reduce the redundant data returned by each query). |
247 | | // |
248 | | // A clean and simple approach is to sort them according to their respective next block to |
249 | | // crawl from, and start from the lowest to catch up with the second lowest, then we bundle |
250 | | // up the contracts in a rolling style so there is *no* redundant data received. |
251 | | // |
252 | | // At the end of the rolling, if there is no error or contract switching (which requires a |
253 | | // resync), all current contracts are brought to the same next block value. |
254 | | |
255 | | chosen.sort_by_key(|k| k.1); |
256 | | chosen.push((Address::ZERO, finalized + 1)); // sentinel pos, to flush all batches |
257 | | |
258 | | let mut next_block_number = None; |
259 | | let mut batch = Vec::new(); |
260 | | let mut resync = false; |
261 | | |
262 | | for (addr, block) in chosen { |
263 | | if let Some(mut n) = next_block_number { |
264 | | if block == n { |
265 | | // in the same batch (same next block), let's don't query yet |
266 | | batch.push(addr); |
267 | | continue; |
268 | | } else { |
269 | | // Now we need to query with the existing items in `batch` because this |
270 | | // candidate's next block is larger. |
271 | | while n < block { |
272 | | // `end` is inclusive |
273 | | let mut end: u64 = n + blocks_per_request as u64 - 1; |
274 | | end = end.min(block - 1); |
275 | | tracing::info!("Fetching [{}..{}] ({} addrs).", n, end, batch.len()); |
276 | | let from_block = BlockNumber::Quantity(U256::from(n)); |
277 | | let to_block = BlockNumber::Quantity(U256::from(end)); |
278 | | let watched = Self::fetch_watched_logs_for_batch(&client, &batch, from_block, to_block).await?; |
279 | | Self::update_contracts(contracts, &mut resync, watched.slots.into_iter()).await; |
280 | | n = end + 1; |
281 | | } |
282 | | // Now n == block, we can change the next value and continue bundling more addresses. |
283 | | } |
284 | | } |
285 | | next_block_number = Some(block); |
286 | | batch.push(addr); |
287 | | } |
288 | | |
289 | | // Finalizing the sync by setting the next block (telling FCO about the progress) for each |
290 | | // Lyquid whose contract has successfully synced to n. |
291 | | if let Some(n) = next_block_number { |
292 | | assert_eq!(n, finalized + 1); |
293 | | for addr in batch { |
294 | | if let hash_map::Entry::Occupied(mut e) = contracts.entry(addr) { |
295 | | let c = e.get_mut(); |
296 | | if let Some(new_addr) = c.new_addr.take() { |
297 | | // This lyquid is switching the contract. So we need to update the address |
298 | | // info in the watch list. And we should *not* advance the next block |
299 | | // because this lyquid needs to check for the slots using the new adddress |
300 | | // in the previous block. |
301 | | let c = e.remove(); |
302 | | contracts.insert(new_addr, c); |
303 | | tracing::debug!("SlotCrawler: Switching contract {addr} => {new_addr}."); |
304 | | } else { |
305 | | // Otherwise, we can safetly tell FCO to advance the progress because we |
306 | | // have successfully filled all slots up to n. |
307 | | if let Err(e) = c.seq.set_next_block(n).await { |
308 | | tracing::error!("SlotCrawler: Failed to advance fetching progress for {addr}: {e:?}"); |
309 | | // We don't have to remove this contract from the watch list because it |
310 | | // is not harmful to try again upon next sync(). |
311 | | } |
312 | | } |
313 | | } |
314 | | } |
315 | | } |
316 | | if resync { |
317 | | Box::pin(Self::sync(contracts, finality_tag, blocks_per_request, client)).await?; |
318 | | } |
319 | | tracing::info!("SlotCrawler: Sync complete."); |
320 | | Ok(()) |
321 | 0 | } |
322 | | } |
323 | | |
324 | | #[derive(Clone)] |
325 | | pub(super) struct SlotCrawler { |
326 | | client: ClientHandle, |
327 | | wake_tx: mpsc::Sender<()>, |
328 | | contract_tx: mpsc::UnboundedSender<(Address, Contract)>, |
329 | | task_tracker: TaskTracker, |
330 | | finality_tag: BlockNumber, |
331 | | } |
332 | | debug_struct_name!(SlotCrawler); |
333 | | |
334 | | impl SlotCrawler { |
335 | 0 | pub(super) fn new( |
336 | 0 | client: ClientHandle, blocks_per_request: usize, finality_tag: BlockNumber, shutdown: CancellationToken, |
337 | 0 | ) -> Self { |
338 | 0 | let (wake_tx, wake_rx) = mpsc::channel(SLOT_CRAWLER_WAKE_CAPACITY); |
339 | 0 | let (contract_tx, contract_rx) = mpsc::unbounded_channel(); |
340 | 0 | let task_tracker = TaskTracker::new(); |
341 | 0 | task_tracker.spawn( |
342 | 0 | SlotCrawlerRuntime { |
343 | 0 | contracts: HashMap::new(), |
344 | 0 | blocks_per_request, |
345 | 0 | finality_tag, |
346 | 0 | client: client.clone(), |
347 | 0 | } |
348 | 0 | .run(wake_rx, contract_rx) |
349 | 0 | .with_cancellation_token_owned(shutdown), |
350 | | ); |
351 | 0 | task_tracker.close(); |
352 | 0 | Self { |
353 | 0 | wake_tx, |
354 | 0 | contract_tx, |
355 | 0 | task_tracker, |
356 | 0 | client, |
357 | 0 | finality_tag, |
358 | 0 | } |
359 | 0 | } |
360 | | |
361 | 0 | pub(super) async fn wait_for_shutdown(&self) { |
362 | 0 | self.task_tracker.wait().await; |
363 | 0 | } |
364 | | |
365 | 0 | pub(super) fn wake(&self) -> bool { |
366 | 0 | match self.wake_tx.try_send(()) { |
367 | 0 | Ok(()) | Err(TrySendError::Full(_)) => true, |
368 | 0 | Err(TrySendError::Closed(_)) => false, |
369 | | } |
370 | 0 | } |
371 | | |
372 | 0 | pub(super) async fn add_contract(&self, addr: Address, contract: Contract) -> anyhow::Result<()> { |
373 | 0 | self.contract_tx |
374 | 0 | .send((addr, contract)) |
375 | 0 | .map_err(|_| anyhow::anyhow!("SlotCrawler control loop already stopped")) |
376 | 0 | } |
377 | | } |
378 | | |
379 | | #[async_trait] |
380 | | impl OracleInfoFetcher for SlotCrawler { |
381 | 0 | async fn fetch_oracle_info(&self, msg: FetchOracleInfo) -> Option<lyquor_primitives::oracle::OracleEpochInfo> { |
382 | | let OracleServiceTarget::EVM { eth_contract, .. } = msg.target.target else { |
383 | | return None; |
384 | | }; |
385 | | let topic = alloy_primitives::keccak256(msg.topic.as_bytes()); |
386 | | fetch_oracle_info(&self.client, self.finality_tag, eth_contract, topic, msg.full_config).await |
387 | 0 | } |
388 | | } |
389 | | |
390 | 0 | async fn finalized_block_number(client: &ClientHandle, finality_tag: BlockNumber) -> Option<u64> { |
391 | 0 | let finalized: Block = client |
392 | 0 | .request::<EthGetBlockByNumber, EthGetBlockByNumberResp>(EthGetBlockByNumber { |
393 | 0 | block: finality_tag, |
394 | 0 | full_tx: false, |
395 | 0 | }) |
396 | 0 | .await |
397 | 0 | .ok()? |
398 | 0 | .0?; |
399 | 0 | match finalized.number { |
400 | 0 | BlockNumber::Quantity(bn) => Some(bn.to()), |
401 | 0 | _ => None, |
402 | | } |
403 | 0 | } |
404 | | |
405 | 0 | async fn fetch_oracle_info( |
406 | 0 | client: &ClientHandle, finality_tag: BlockNumber, eth_contract: Address, topic: alloy_primitives::B256, |
407 | 0 | full_config: bool, |
408 | 0 | ) -> Option<lyquor_primitives::oracle::OracleEpochInfo> { |
409 | 0 | let block_number = BlockNumber::Quantity(U256::from(finalized_block_number(client, finality_tag).await?)); |
410 | 0 | let get_epoch_input = eth::ISequenceBackend::getEpochCall { topic }.abi_encode(); |
411 | 0 | let epoch_ret: EthCallResp = client |
412 | 0 | .request(EthCall { |
413 | 0 | tx: EthCallTx { |
414 | 0 | from: None, |
415 | 0 | to: Some(eth_contract), |
416 | 0 | gas: None, |
417 | 0 | gas_price: None, |
418 | 0 | value: None, |
419 | 0 | data: Some(get_epoch_input.into()), |
420 | 0 | }, |
421 | 0 | block_number: block_number.clone(), |
422 | 0 | }) |
423 | 0 | .await |
424 | 0 | .ok()?; |
425 | 0 | let get_hash_input = eth::ISequenceBackend::getConfigHashCall { topic }.abi_encode(); |
426 | 0 | let hash_ret: EthCallResp = client |
427 | 0 | .request(EthCall { |
428 | 0 | tx: EthCallTx { |
429 | 0 | from: None, |
430 | 0 | to: Some(eth_contract), |
431 | 0 | gas: None, |
432 | 0 | gas_price: None, |
433 | 0 | value: None, |
434 | 0 | data: Some(get_hash_input.into()), |
435 | 0 | }, |
436 | 0 | block_number: block_number.clone(), |
437 | 0 | }) |
438 | 0 | .await |
439 | 0 | .ok()?; |
440 | 0 | let get_count_input = eth::ISequenceBackend::getChangeCountCall { topic }.abi_encode(); |
441 | 0 | let count_ret: EthCallResp = client |
442 | 0 | .request(EthCall { |
443 | 0 | tx: EthCallTx { |
444 | 0 | from: None, |
445 | 0 | to: Some(eth_contract), |
446 | 0 | gas: None, |
447 | 0 | gas_price: None, |
448 | 0 | value: None, |
449 | 0 | data: Some(get_count_input.into()), |
450 | 0 | }, |
451 | 0 | block_number: block_number.clone(), |
452 | 0 | }) |
453 | 0 | .await |
454 | 0 | .ok()?; |
455 | 0 | let epoch = eth::ISequenceBackend::getEpochCall::abi_decode_returns(epoch_ret.0.as_ref()).ok()?; |
456 | 0 | let config_hash = eth::ISequenceBackend::getConfigHashCall::abi_decode_returns(hash_ret.0.as_ref()).ok()?; |
457 | 0 | let change_count = eth::ISequenceBackend::getChangeCountCall::abi_decode_returns(count_ret.0.as_ref()).ok()?; |
458 | 0 | let config_hash: lyquor_primitives::HashBytes = config_hash.0.into(); |
459 | 0 | let config = if full_config && config_hash != [0; 32].into() { |
460 | 0 | let get_config_input = eth::ISequenceBackend::getConfigCall { topic }.abi_encode(); |
461 | 0 | let config_ret: EthCallResp = client |
462 | 0 | .request(EthCall { |
463 | 0 | tx: EthCallTx { |
464 | 0 | from: None, |
465 | 0 | to: Some(eth_contract), |
466 | 0 | gas: None, |
467 | 0 | gas_price: None, |
468 | 0 | value: None, |
469 | 0 | data: Some(get_config_input.into()), |
470 | 0 | }, |
471 | 0 | block_number, |
472 | 0 | }) |
473 | 0 | .await |
474 | 0 | .ok()?; |
475 | 0 | let config = eth::ISequenceBackend::getConfigCall::abi_decode_returns(config_ret.0.as_ref()).ok()?; |
476 | 0 | let actual_hash: lyquor_primitives::Hash = alloy_primitives::keccak256(&config.abi_encode()).0.into(); |
477 | 0 | if actual_hash != config_hash.into_inner() { |
478 | 0 | return None; |
479 | 0 | } |
480 | | Some(lyquor_primitives::oracle::OracleConfig { |
481 | 0 | committee: config |
482 | 0 | .committee |
483 | 0 | .into_iter() |
484 | 0 | .map(|signer| lyquor_primitives::oracle::OracleSigner { |
485 | 0 | id: signer.id, |
486 | 0 | key: signer.nodeID.as_slice().to_vec().into(), |
487 | 0 | }) |
488 | 0 | .collect(), |
489 | 0 | threshold: config.threshold, |
490 | | }) |
491 | | } else { |
492 | 0 | None |
493 | | }; |
494 | 0 | Some(lyquor_primitives::oracle::OracleEpochInfo { |
495 | 0 | epoch, |
496 | 0 | config_hash, |
497 | 0 | change_count, |
498 | 0 | config, |
499 | 0 | }) |
500 | 0 | } |
501 | | |
502 | | #[derive(Clone)] |
503 | | pub(super) struct HeadWatcher { |
504 | | task_tracker: TaskTracker, |
505 | | } |
506 | | |
507 | | impl HeadWatcher { |
508 | 0 | pub(super) fn new(client: ClientHandle, slot_crawler: SlotCrawler, shutdown: CancellationToken) -> Self { |
509 | 0 | let task_tracker = TaskTracker::new(); |
510 | 0 | task_tracker.spawn(Self::head_watcher_loop(client, slot_crawler).with_cancellation_token_owned(shutdown)); |
511 | 0 | task_tracker.close(); |
512 | 0 | Self { task_tracker } |
513 | 0 | } |
514 | | |
515 | 0 | pub(super) async fn wait_for_shutdown(&self) { |
516 | 0 | self.task_tracker.wait().await; |
517 | 0 | } |
518 | | |
519 | 0 | async fn subscribe_to_new_heads( |
520 | 0 | client: &ClientHandle, |
521 | 0 | ) -> Option<lyquor_jsonrpc::client::Subscription<EthNewHeadUpdate>> { |
522 | 0 | match client.subscribe(EthSubscribe("newHeads".to_string())).await { |
523 | 0 | Ok(subscription) => { |
524 | 0 | tracing::debug!("Subscribed to new head"); |
525 | 0 | Some(subscription) |
526 | | } |
527 | 0 | Err(e) => { |
528 | 0 | tracing::error!("Failed to subscribe to new head: {e:?}"); |
529 | 0 | None |
530 | | } |
531 | | } |
532 | 0 | } |
533 | | |
534 | 0 | async fn head_watcher_loop(client: ClientHandle, slot_crawler: SlotCrawler) { |
535 | 0 | let mut subscription = Self::subscribe_to_new_heads(&client).await; |
536 | | |
537 | | // NOTE: Hardhat Network may silently lose eth_subscribe state after a period of |
538 | | // inactivity (e.g., no block mining). Although the WebSocket connection remains |
539 | | // open, subscriptions can become stale — resulting in: |
540 | | // - No new calls being delivered, even when blocks are mined later |
541 | | // - eth_unsubscribe failing with "Subscription not found" |
542 | | // This is likely due to internal state cleanup or subscription manager desync in Hardhat's |
543 | | // in-memory event system. To work around this, we re-subscribe when activity resumes. |
544 | | // |
545 | | // This is not an issue for production chains like Ethereum. |
546 | 0 | let mut periodic_poll = tokio::time::interval(Duration::from_secs(10)); |
547 | 0 | periodic_poll.set_missed_tick_behavior(MissedTickBehavior::Delay); |
548 | 0 | periodic_poll.tick().await; |
549 | | |
550 | 0 | let mut reset = tokio::time::interval(Duration::from_secs(60)); |
551 | 0 | reset.set_missed_tick_behavior(MissedTickBehavior::Delay); |
552 | 0 | reset.tick().await; |
553 | | |
554 | | loop { |
555 | 0 | tokio::select! { |
556 | 0 | _ = periodic_poll.tick() => { |
557 | 0 | if !slot_crawler.wake() { |
558 | 0 | break; |
559 | 0 | } |
560 | | } |
561 | 0 | _ = reset.tick() => { |
562 | 0 | tracing::debug!("Resetting new head subscription"); |
563 | 0 | subscription = Self::subscribe_to_new_heads(&client).await; |
564 | | } |
565 | 0 | update = async { |
566 | 0 | match &mut subscription { |
567 | 0 | Some(subscription) => subscription.next().await, |
568 | 0 | None => std::future::pending().await, |
569 | | } |
570 | 0 | } => match update { |
571 | 0 | Some(msg) => { |
572 | 0 | tracing::debug!("New head: {:?}.", msg); |
573 | 0 | if !slot_crawler.wake() { |
574 | 0 | break; |
575 | 0 | } |
576 | | } |
577 | | None => { |
578 | 0 | tracing::warn!("New head subscription ended; waiting for reset to retry"); |
579 | 0 | subscription = None; |
580 | | } |
581 | | } |
582 | | } |
583 | | } |
584 | 0 | } |
585 | | } |