/home/runner/work/lyquor/lyquor/seq/src/fco/advance.rs
Line | Count | Source |
1 | | use lyquor_primitives::ChainPos; |
2 | | |
3 | | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
4 | | enum AdvancePhase { |
5 | | Idle, |
6 | | WaitingInboundSync { advance_until: ChainPos }, |
7 | | ApplyReady { advance_until: ChainPos }, |
8 | | RetryScheduled { advance_until: ChainPos, token: u64 }, |
9 | | } |
10 | | |
11 | | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
12 | | pub(super) enum AdvanceAction { |
13 | | Noop, |
14 | | SyncInbound { advance_until: ChainPos }, |
15 | | ApplyReady { advance_until: ChainPos }, |
16 | | ScheduleRetry { advance_until: ChainPos }, |
17 | | } |
18 | | |
19 | | #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
20 | | #[allow(clippy::enum_variant_names)] |
21 | | pub(super) enum AdvanceEvent { |
22 | | SyncReady, |
23 | | SyncPending, |
24 | | SyncFailed, |
25 | | } |
26 | | |
27 | | #[derive(Debug)] |
28 | | pub(super) struct AdvanceEngine { |
29 | | phase: AdvancePhase, |
30 | | may_advance: bool, |
31 | | next_retry_token: u64, |
32 | | last_advance: ChainPos, |
33 | | inbound_backfill_frontier: ChainPos, |
34 | | } |
35 | | |
36 | | impl AdvanceEngine { |
37 | 271 | pub(super) fn new(initial_pos: ChainPos) -> Self { |
38 | 271 | Self { |
39 | 271 | phase: AdvancePhase::Idle, |
40 | 271 | may_advance: false, |
41 | 271 | next_retry_token: 0, |
42 | 271 | last_advance: initial_pos, |
43 | 271 | inbound_backfill_frontier: initial_pos, |
44 | 271 | } |
45 | 271 | } |
46 | | |
47 | 12.4k | pub(super) fn request_advance(&mut self) { |
48 | 12.4k | self.may_advance = true; |
49 | 12.4k | if matches!(self.phase, AdvancePhase::RetryScheduled { .. }) { |
50 | 0 | self.phase = AdvancePhase::Idle; |
51 | 12.4k | } |
52 | 12.4k | } |
53 | | |
54 | 12.4k | fn current_advance_until(&mut self, next_to_sync: Option<ChainPos>) -> Option<ChainPos> { |
55 | 12.4k | if !self.may_advance { |
56 | 1 | return None; |
57 | 12.4k | } |
58 | 12.4k | let next_to_sync12.4k = next_to_sync?8 ; |
59 | 12.4k | let advance_until = std::cmp::max(next_to_sync, self.last_advance); |
60 | 12.4k | self.last_advance = advance_until; |
61 | 12.4k | Some(advance_until) |
62 | 12.4k | } |
63 | | |
64 | 12.4k | pub(super) fn next_step(&mut self, next_to_sync: Option<ChainPos>, selective_hosting: bool) -> AdvanceAction { |
65 | 12.4k | if !matches!1 (self.phase, AdvancePhase::Idle) { |
66 | 1 | return AdvanceAction::Noop; |
67 | 12.4k | } |
68 | 12.4k | let Some(advance_until12.4k ) = self.current_advance_until(next_to_sync) else { |
69 | 9 | return AdvanceAction::Noop; |
70 | | }; |
71 | | |
72 | 12.4k | if selective_hosting && self.inbound_backfill_frontier < advance_until151 { |
73 | 76 | self.phase = AdvancePhase::WaitingInboundSync { advance_until }; |
74 | 76 | return AdvanceAction::SyncInbound { advance_until }; |
75 | 12.3k | } |
76 | | |
77 | 12.3k | self.inbound_backfill_frontier = advance_until; |
78 | 12.3k | self.phase = AdvancePhase::ApplyReady { advance_until }; |
79 | 12.3k | AdvanceAction::ApplyReady { advance_until } |
80 | 12.4k | } |
81 | | |
82 | 75 | pub(super) fn on_inbound_sync_event(&mut self, advance_until: ChainPos, event: AdvanceEvent) -> AdvanceAction { |
83 | 74 | match self.phase { |
84 | | AdvancePhase::WaitingInboundSync { |
85 | 73 | advance_until: expected, |
86 | 74 | } if expected == advance_until73 => {}73 |
87 | 2 | _ => return AdvanceAction::Noop, |
88 | | } |
89 | 73 | self.phase = AdvancePhase::Idle; |
90 | 73 | match event { |
91 | | AdvanceEvent::SyncReady => { |
92 | 35 | self.phase = AdvancePhase::ApplyReady { advance_until }; |
93 | 35 | AdvanceAction::ApplyReady { advance_until } |
94 | | } |
95 | 38 | AdvanceEvent::SyncPending | AdvanceEvent::SyncFailed => AdvanceAction::ScheduleRetry { advance_until }, |
96 | | } |
97 | 75 | } |
98 | | |
99 | 36 | pub(super) fn mark_inbound_backfill_frontier(&mut self, advance_until: ChainPos) { |
100 | 36 | self.inbound_backfill_frontier = advance_until; |
101 | 36 | } |
102 | | |
103 | 12.3k | pub(super) fn complete_apply_ready(&mut self) { |
104 | 12.3k | self.phase = AdvancePhase::Idle; |
105 | 12.3k | self.may_advance = false; |
106 | 12.3k | } |
107 | | |
108 | 38 | pub(super) fn schedule_retry(&mut self, advance_until: ChainPos) -> Option<u64> { |
109 | 38 | if matches!(self.phase, AdvancePhase::RetryScheduled { .. }) { |
110 | 0 | return None; |
111 | 38 | } |
112 | 38 | let token = self.next_retry_token; |
113 | 38 | self.next_retry_token = self.next_retry_token.saturating_add(1); |
114 | 38 | self.phase = AdvancePhase::RetryScheduled { advance_until, token }; |
115 | 38 | self.may_advance = false; |
116 | 38 | Some(token) |
117 | 38 | } |
118 | | |
119 | 39 | pub(super) fn on_retry_fired(&mut self, token: u64) -> bool { |
120 | 38 | match self.phase { |
121 | 38 | AdvancePhase::RetryScheduled { token: expected37 , .. } if expected == token37 => { |
122 | 37 | self.phase = AdvancePhase::Idle; |
123 | 37 | true |
124 | | } |
125 | 2 | _ => false, |
126 | | } |
127 | 39 | } |
128 | | |
129 | 462 | pub(super) fn reset_to_idle(&mut self) { |
130 | 462 | self.phase = AdvancePhase::Idle; |
131 | 462 | } |
132 | | |
133 | 28.7k | pub(super) fn last_advance(&self) -> ChainPos { |
134 | 28.7k | self.last_advance |
135 | 28.7k | } |
136 | | } |
137 | | |
138 | | #[cfg(test)] |
139 | | mod tests { |
140 | | use super::*; |
141 | | use lyquor_test::test; |
142 | | |
143 | | #[test] |
144 | | fn next_step_table_tests() { |
145 | | #[derive(Clone, Copy)] |
146 | | enum Setup { |
147 | | Fresh, |
148 | | Requested, |
149 | | RequestedWithFrontier(ChainPos), |
150 | | RequestedThenWaiting, |
151 | | } |
152 | | |
153 | | struct Case { |
154 | | name: &'static str, |
155 | | setup: Setup, |
156 | | next_to_sync: Option<ChainPos>, |
157 | | selective_hosting: bool, |
158 | | expected: AdvanceAction, |
159 | | } |
160 | | |
161 | | let p0 = ChainPos::new(0, 0); |
162 | | let p5 = ChainPos::new(5, 0); |
163 | | let p7 = ChainPos::new(7, 0); |
164 | | let cases = vec![ |
165 | | Case { |
166 | | name: "no_request_no_step", |
167 | | setup: Setup::Fresh, |
168 | | next_to_sync: Some(p5), |
169 | | selective_hosting: true, |
170 | | expected: AdvanceAction::Noop, |
171 | | }, |
172 | | Case { |
173 | | name: "requested_but_no_frontier", |
174 | | setup: Setup::Requested, |
175 | | next_to_sync: None, |
176 | | selective_hosting: true, |
177 | | expected: AdvanceAction::Noop, |
178 | | }, |
179 | | Case { |
180 | | name: "needs_inbound_sync", |
181 | | setup: Setup::Requested, |
182 | | next_to_sync: Some(p5), |
183 | | selective_hosting: true, |
184 | | expected: AdvanceAction::SyncInbound { advance_until: p5 }, |
185 | | }, |
186 | | Case { |
187 | | name: "archive_mode_skips_inbound_sync", |
188 | | setup: Setup::Requested, |
189 | | next_to_sync: Some(p5), |
190 | | selective_hosting: false, |
191 | | expected: AdvanceAction::ApplyReady { advance_until: p5 }, |
192 | | }, |
193 | | Case { |
194 | | name: "frontier_caught_up_can_apply", |
195 | | setup: Setup::RequestedWithFrontier(p5), |
196 | | next_to_sync: Some(p5), |
197 | | selective_hosting: true, |
198 | | expected: AdvanceAction::ApplyReady { advance_until: p5 }, |
199 | | }, |
200 | | Case { |
201 | | name: "while_waiting_returns_noop", |
202 | | setup: Setup::RequestedThenWaiting, |
203 | | next_to_sync: Some(p7), |
204 | | selective_hosting: true, |
205 | | expected: AdvanceAction::Noop, |
206 | | }, |
207 | | Case { |
208 | | name: "advance_until_never_moves_backwards", |
209 | | setup: Setup::RequestedWithFrontier(p0), |
210 | | next_to_sync: Some(p0), |
211 | | selective_hosting: true, |
212 | | expected: AdvanceAction::ApplyReady { advance_until: p0 }, |
213 | | }, |
214 | | ]; |
215 | | |
216 | | for case in cases { |
217 | | let mut engine = AdvanceEngine::new(p0); |
218 | | match case.setup { |
219 | | Setup::Fresh => {} |
220 | | Setup::Requested => { |
221 | | engine.request_advance(); |
222 | | } |
223 | | Setup::RequestedWithFrontier(frontier) => { |
224 | | engine.request_advance(); |
225 | | engine.mark_inbound_backfill_frontier(frontier); |
226 | | } |
227 | | Setup::RequestedThenWaiting => { |
228 | | engine.request_advance(); |
229 | | let action = engine.next_step(Some(p5), true); |
230 | | assert!(matches!(action, AdvanceAction::SyncInbound { .. })); |
231 | | } |
232 | | } |
233 | | |
234 | | let got = engine.next_step(case.next_to_sync, case.selective_hosting); |
235 | | assert_eq!(got, case.expected, "case={}", case.name); |
236 | | } |
237 | | } |
238 | | |
239 | | #[test] |
240 | | fn on_inbound_sync_event_table_tests() { |
241 | | struct Case { |
242 | | name: &'static str, |
243 | | setup_waiting_for: Option<ChainPos>, |
244 | | event_until: ChainPos, |
245 | | event: AdvanceEvent, |
246 | | expected: AdvanceAction, |
247 | | } |
248 | | |
249 | | let p0 = ChainPos::new(0, 0); |
250 | | let p5 = ChainPos::new(5, 0); |
251 | | let p6 = ChainPos::new(6, 0); |
252 | | let cases = vec![ |
253 | | Case { |
254 | | name: "stale_when_not_waiting", |
255 | | setup_waiting_for: None, |
256 | | event_until: p5, |
257 | | event: AdvanceEvent::SyncReady, |
258 | | expected: AdvanceAction::Noop, |
259 | | }, |
260 | | Case { |
261 | | name: "stale_when_mismatched_until", |
262 | | setup_waiting_for: Some(p5), |
263 | | event_until: p6, |
264 | | event: AdvanceEvent::SyncReady, |
265 | | expected: AdvanceAction::Noop, |
266 | | }, |
267 | | Case { |
268 | | name: "ready_transitions_to_apply", |
269 | | setup_waiting_for: Some(p5), |
270 | | event_until: p5, |
271 | | event: AdvanceEvent::SyncReady, |
272 | | expected: AdvanceAction::ApplyReady { advance_until: p5 }, |
273 | | }, |
274 | | Case { |
275 | | name: "pending_requests_retry", |
276 | | setup_waiting_for: Some(p5), |
277 | | event_until: p5, |
278 | | event: AdvanceEvent::SyncPending, |
279 | | expected: AdvanceAction::ScheduleRetry { advance_until: p5 }, |
280 | | }, |
281 | | Case { |
282 | | name: "failure_requests_retry", |
283 | | setup_waiting_for: Some(p5), |
284 | | event_until: p5, |
285 | | event: AdvanceEvent::SyncFailed, |
286 | | expected: AdvanceAction::ScheduleRetry { advance_until: p5 }, |
287 | | }, |
288 | | ]; |
289 | | |
290 | | for case in cases { |
291 | | let mut engine = AdvanceEngine::new(p0); |
292 | | if case.setup_waiting_for.is_some() { |
293 | | engine.request_advance(); |
294 | | let got = engine.next_step(case.setup_waiting_for, true); |
295 | | assert!(matches!(got, AdvanceAction::SyncInbound { .. })); |
296 | | } |
297 | | let got = engine.on_inbound_sync_event(case.event_until, case.event); |
298 | | assert_eq!(got, case.expected, "case={}", case.name); |
299 | | } |
300 | | } |
301 | | |
302 | | #[test] |
303 | | fn on_retry_fired_table_tests() { |
304 | | struct Case { |
305 | | name: &'static str, |
306 | | fired_token_offset: Option<u64>, |
307 | | expected: bool, |
308 | | } |
309 | | |
310 | | let p0 = ChainPos::new(0, 0); |
311 | | |
312 | | let cases = vec![ |
313 | | Case { |
314 | | name: "matching_token_fires", |
315 | | fired_token_offset: Some(0), |
316 | | expected: true, |
317 | | }, |
318 | | Case { |
319 | | name: "stale_token_ignored", |
320 | | fired_token_offset: Some(1), |
321 | | expected: false, |
322 | | }, |
323 | | Case { |
324 | | name: "no_retry_state_ignored", |
325 | | fired_token_offset: None, |
326 | | expected: false, |
327 | | }, |
328 | | ]; |
329 | | |
330 | | for case in cases { |
331 | | let mut engine = AdvanceEngine::new(p0); |
332 | | let fired = if let Some(offset) = case.fired_token_offset { |
333 | | engine.request_advance(); |
334 | | let scheduled = engine |
335 | | .schedule_retry(ChainPos::new(5, 0)) |
336 | | .expect("token should be scheduled"); |
337 | | scheduled.saturating_add(offset) |
338 | | } else { |
339 | | 0 |
340 | | }; |
341 | | assert_eq!(engine.on_retry_fired(fired), case.expected, "case={}", case.name); |
342 | | } |
343 | | } |
344 | | } |