Coverage Report

Created: 2026-04-01 22:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/seq/src/fco/advance.rs
Line
Count
Source
1
use lyquor_primitives::ChainPos;
2
3
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4
enum AdvancePhase {
5
    Idle,
6
    WaitingInboundSync { advance_until: ChainPos },
7
    ApplyReady { advance_until: ChainPos },
8
    RetryScheduled { advance_until: ChainPos, token: u64 },
9
}
10
11
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12
pub(super) enum AdvanceAction {
13
    Noop,
14
    SyncInbound { advance_until: ChainPos },
15
    ApplyReady { advance_until: ChainPos },
16
    ScheduleRetry { advance_until: ChainPos },
17
}
18
19
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20
#[allow(clippy::enum_variant_names)]
21
pub(super) enum AdvanceEvent {
22
    SyncReady,
23
    SyncPending,
24
    SyncFailed,
25
}
26
27
#[derive(Debug)]
28
pub(super) struct AdvanceEngine {
29
    phase: AdvancePhase,
30
    may_advance: bool,
31
    next_retry_token: u64,
32
    last_advance: ChainPos,
33
    inbound_backfill_frontier: ChainPos,
34
}
35
36
impl AdvanceEngine {
37
271
    pub(super) fn new(initial_pos: ChainPos) -> Self {
38
271
        Self {
39
271
            phase: AdvancePhase::Idle,
40
271
            may_advance: false,
41
271
            next_retry_token: 0,
42
271
            last_advance: initial_pos,
43
271
            inbound_backfill_frontier: initial_pos,
44
271
        }
45
271
    }
46
47
12.4k
    pub(super) fn request_advance(&mut self) {
48
12.4k
        self.may_advance = true;
49
12.4k
        if matches!(self.phase, AdvancePhase::RetryScheduled { .. }) {
50
0
            self.phase = AdvancePhase::Idle;
51
12.4k
        }
52
12.4k
    }
53
54
12.4k
    fn current_advance_until(&mut self, next_to_sync: Option<ChainPos>) -> Option<ChainPos> {
55
12.4k
        if !self.may_advance {
56
1
            return None;
57
12.4k
        }
58
12.4k
        let 
next_to_sync12.4k
= next_to_sync
?8
;
59
12.4k
        let advance_until = std::cmp::max(next_to_sync, self.last_advance);
60
12.4k
        self.last_advance = advance_until;
61
12.4k
        Some(advance_until)
62
12.4k
    }
63
64
12.4k
    pub(super) fn next_step(&mut self, next_to_sync: Option<ChainPos>, selective_hosting: bool) -> AdvanceAction {
65
12.4k
        if !
matches!1
(self.phase, AdvancePhase::Idle) {
66
1
            return AdvanceAction::Noop;
67
12.4k
        }
68
12.4k
        let Some(
advance_until12.4k
) = self.current_advance_until(next_to_sync) else {
69
9
            return AdvanceAction::Noop;
70
        };
71
72
12.4k
        if selective_hosting && 
self.inbound_backfill_frontier < advance_until151
{
73
76
            self.phase = AdvancePhase::WaitingInboundSync { advance_until };
74
76
            return AdvanceAction::SyncInbound { advance_until };
75
12.3k
        }
76
77
12.3k
        self.inbound_backfill_frontier = advance_until;
78
12.3k
        self.phase = AdvancePhase::ApplyReady { advance_until };
79
12.3k
        AdvanceAction::ApplyReady { advance_until }
80
12.4k
    }
81
82
75
    pub(super) fn on_inbound_sync_event(&mut self, advance_until: ChainPos, event: AdvanceEvent) -> AdvanceAction {
83
74
        match self.phase {
84
            AdvancePhase::WaitingInboundSync {
85
73
                advance_until: expected,
86
74
            } if expected == advance_unti
l73
=>
{}73
87
2
            _ => return AdvanceAction::Noop,
88
        }
89
73
        self.phase = AdvancePhase::Idle;
90
73
        match event {
91
            AdvanceEvent::SyncReady => {
92
35
                self.phase = AdvancePhase::ApplyReady { advance_until };
93
35
                AdvanceAction::ApplyReady { advance_until }
94
            }
95
38
            AdvanceEvent::SyncPending | AdvanceEvent::SyncFailed => AdvanceAction::ScheduleRetry { advance_until },
96
        }
97
75
    }
98
99
36
    pub(super) fn mark_inbound_backfill_frontier(&mut self, advance_until: ChainPos) {
100
36
        self.inbound_backfill_frontier = advance_until;
101
36
    }
102
103
12.3k
    pub(super) fn complete_apply_ready(&mut self) {
104
12.3k
        self.phase = AdvancePhase::Idle;
105
12.3k
        self.may_advance = false;
106
12.3k
    }
107
108
38
    pub(super) fn schedule_retry(&mut self, advance_until: ChainPos) -> Option<u64> {
109
38
        if matches!(self.phase, AdvancePhase::RetryScheduled { .. }) {
110
0
            return None;
111
38
        }
112
38
        let token = self.next_retry_token;
113
38
        self.next_retry_token = self.next_retry_token.saturating_add(1);
114
38
        self.phase = AdvancePhase::RetryScheduled { advance_until, token };
115
38
        self.may_advance = false;
116
38
        Some(token)
117
38
    }
118
119
39
    pub(super) fn on_retry_fired(&mut self, token: u64) -> bool {
120
38
        match self.phase {
121
38
            AdvancePhase::RetryScheduled { token: 
expected37
, .. } if expected == toke
n37
=> {
122
37
                self.phase = AdvancePhase::Idle;
123
37
                true
124
            }
125
2
            _ => false,
126
        }
127
39
    }
128
129
462
    pub(super) fn reset_to_idle(&mut self) {
130
462
        self.phase = AdvancePhase::Idle;
131
462
    }
132
133
28.7k
    pub(super) fn last_advance(&self) -> ChainPos {
134
28.7k
        self.last_advance
135
28.7k
    }
136
}
137
138
#[cfg(test)]
139
mod tests {
140
    use super::*;
141
    use lyquor_test::test;
142
143
    #[test]
144
    fn next_step_table_tests() {
145
        #[derive(Clone, Copy)]
146
        enum Setup {
147
            Fresh,
148
            Requested,
149
            RequestedWithFrontier(ChainPos),
150
            RequestedThenWaiting,
151
        }
152
153
        struct Case {
154
            name: &'static str,
155
            setup: Setup,
156
            next_to_sync: Option<ChainPos>,
157
            selective_hosting: bool,
158
            expected: AdvanceAction,
159
        }
160
161
        let p0 = ChainPos::new(0, 0);
162
        let p5 = ChainPos::new(5, 0);
163
        let p7 = ChainPos::new(7, 0);
164
        let cases = vec![
165
            Case {
166
                name: "no_request_no_step",
167
                setup: Setup::Fresh,
168
                next_to_sync: Some(p5),
169
                selective_hosting: true,
170
                expected: AdvanceAction::Noop,
171
            },
172
            Case {
173
                name: "requested_but_no_frontier",
174
                setup: Setup::Requested,
175
                next_to_sync: None,
176
                selective_hosting: true,
177
                expected: AdvanceAction::Noop,
178
            },
179
            Case {
180
                name: "needs_inbound_sync",
181
                setup: Setup::Requested,
182
                next_to_sync: Some(p5),
183
                selective_hosting: true,
184
                expected: AdvanceAction::SyncInbound { advance_until: p5 },
185
            },
186
            Case {
187
                name: "archive_mode_skips_inbound_sync",
188
                setup: Setup::Requested,
189
                next_to_sync: Some(p5),
190
                selective_hosting: false,
191
                expected: AdvanceAction::ApplyReady { advance_until: p5 },
192
            },
193
            Case {
194
                name: "frontier_caught_up_can_apply",
195
                setup: Setup::RequestedWithFrontier(p5),
196
                next_to_sync: Some(p5),
197
                selective_hosting: true,
198
                expected: AdvanceAction::ApplyReady { advance_until: p5 },
199
            },
200
            Case {
201
                name: "while_waiting_returns_noop",
202
                setup: Setup::RequestedThenWaiting,
203
                next_to_sync: Some(p7),
204
                selective_hosting: true,
205
                expected: AdvanceAction::Noop,
206
            },
207
            Case {
208
                name: "advance_until_never_moves_backwards",
209
                setup: Setup::RequestedWithFrontier(p0),
210
                next_to_sync: Some(p0),
211
                selective_hosting: true,
212
                expected: AdvanceAction::ApplyReady { advance_until: p0 },
213
            },
214
        ];
215
216
        for case in cases {
217
            let mut engine = AdvanceEngine::new(p0);
218
            match case.setup {
219
                Setup::Fresh => {}
220
                Setup::Requested => {
221
                    engine.request_advance();
222
                }
223
                Setup::RequestedWithFrontier(frontier) => {
224
                    engine.request_advance();
225
                    engine.mark_inbound_backfill_frontier(frontier);
226
                }
227
                Setup::RequestedThenWaiting => {
228
                    engine.request_advance();
229
                    let action = engine.next_step(Some(p5), true);
230
                    assert!(matches!(action, AdvanceAction::SyncInbound { .. }));
231
                }
232
            }
233
234
            let got = engine.next_step(case.next_to_sync, case.selective_hosting);
235
            assert_eq!(got, case.expected, "case={}", case.name);
236
        }
237
    }
238
239
    #[test]
240
    fn on_inbound_sync_event_table_tests() {
241
        struct Case {
242
            name: &'static str,
243
            setup_waiting_for: Option<ChainPos>,
244
            event_until: ChainPos,
245
            event: AdvanceEvent,
246
            expected: AdvanceAction,
247
        }
248
249
        let p0 = ChainPos::new(0, 0);
250
        let p5 = ChainPos::new(5, 0);
251
        let p6 = ChainPos::new(6, 0);
252
        let cases = vec![
253
            Case {
254
                name: "stale_when_not_waiting",
255
                setup_waiting_for: None,
256
                event_until: p5,
257
                event: AdvanceEvent::SyncReady,
258
                expected: AdvanceAction::Noop,
259
            },
260
            Case {
261
                name: "stale_when_mismatched_until",
262
                setup_waiting_for: Some(p5),
263
                event_until: p6,
264
                event: AdvanceEvent::SyncReady,
265
                expected: AdvanceAction::Noop,
266
            },
267
            Case {
268
                name: "ready_transitions_to_apply",
269
                setup_waiting_for: Some(p5),
270
                event_until: p5,
271
                event: AdvanceEvent::SyncReady,
272
                expected: AdvanceAction::ApplyReady { advance_until: p5 },
273
            },
274
            Case {
275
                name: "pending_requests_retry",
276
                setup_waiting_for: Some(p5),
277
                event_until: p5,
278
                event: AdvanceEvent::SyncPending,
279
                expected: AdvanceAction::ScheduleRetry { advance_until: p5 },
280
            },
281
            Case {
282
                name: "failure_requests_retry",
283
                setup_waiting_for: Some(p5),
284
                event_until: p5,
285
                event: AdvanceEvent::SyncFailed,
286
                expected: AdvanceAction::ScheduleRetry { advance_until: p5 },
287
            },
288
        ];
289
290
        for case in cases {
291
            let mut engine = AdvanceEngine::new(p0);
292
            if case.setup_waiting_for.is_some() {
293
                engine.request_advance();
294
                let got = engine.next_step(case.setup_waiting_for, true);
295
                assert!(matches!(got, AdvanceAction::SyncInbound { .. }));
296
            }
297
            let got = engine.on_inbound_sync_event(case.event_until, case.event);
298
            assert_eq!(got, case.expected, "case={}", case.name);
299
        }
300
    }
301
302
    #[test]
303
    fn on_retry_fired_table_tests() {
304
        struct Case {
305
            name: &'static str,
306
            fired_token_offset: Option<u64>,
307
            expected: bool,
308
        }
309
310
        let p0 = ChainPos::new(0, 0);
311
312
        let cases = vec![
313
            Case {
314
                name: "matching_token_fires",
315
                fired_token_offset: Some(0),
316
                expected: true,
317
            },
318
            Case {
319
                name: "stale_token_ignored",
320
                fired_token_offset: Some(1),
321
                expected: false,
322
            },
323
            Case {
324
                name: "no_retry_state_ignored",
325
                fired_token_offset: None,
326
                expected: false,
327
            },
328
        ];
329
330
        for case in cases {
331
            let mut engine = AdvanceEngine::new(p0);
332
            let fired = if let Some(offset) = case.fired_token_offset {
333
                engine.request_advance();
334
                let scheduled = engine
335
                    .schedule_retry(ChainPos::new(5, 0))
336
                    .expect("token should be scheduled");
337
                scheduled.saturating_add(offset)
338
            } else {
339
                0
340
            };
341
            assert_eq!(engine.on_retry_fired(fired), case.expected, "case={}", case.name);
342
        }
343
    }
344
}