Coverage Report

Created: 2026-06-13 00:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/platform/db/src/lib.rs
Line
Count
Source
1
#![doc(html_no_source)] // remove it upon open-source
2
3
//! Concrete key-value backends for Lyquor persistence traits.
4
//!
5
//! `lyquor-db` implements the `lyquor_api::store` interfaces for both in-memory storage and
6
//! RocksDB-backed storage. Higher layers use those interfaces for simple state, versioned state,
7
//! artifact caches, and sequencing metadata; this crate keeps backend transactions and iteration
8
//! behavior below that shared storage boundary.
9
10
use std::collections::HashMap;
11
use std::path::Path;
12
use std::sync::Arc;
13
14
pub use rocksdb;
15
use rocksdb::{OptimisticTransactionDB, Options};
16
17
use lyquor_api::{
18
    anyhow::{Context, anyhow},
19
    parking_lot::RwLock,
20
    store::{KVStore, KVStoreError, Key, SortedMapping, StoreFuture, Value},
21
};
22
use tokio::sync::Mutex;
23
24
/// In-memory key-value store used by tests and ephemeral node configurations.
25
pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>);
26
27
impl MemDB {
28
    /// Create an empty in-memory store.
29
445
    pub fn new() -> Self {
30
445
        Self(RwLock::new(HashMap::new()))
31
445
    }
32
}
33
34
impl Default for MemDB {
35
0
    fn default() -> Self {
36
0
        Self::new()
37
0
    }
38
}
39
40
impl KVStore for MemDB {
41
16.2k
    fn atomic_write<'a>(
42
16.2k
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
43
16.2k
    ) -> Result<(), KVStoreError> {
44
16.2k
        let mut map = self.0.write();
45
47.1k
        for (k, v) in 
changes16.2k
{
46
47.1k
            match v {
47
47.1k
                Some(v) => {
48
47.1k
                    map.insert(k.consolidated(), v);
49
47.1k
                }
50
2
                None => {
51
2
                    map.remove(&k.consolidated());
52
2
                }
53
            }
54
        }
55
16.2k
        Ok(())
56
16.2k
    }
57
58
24
    fn contains(&self, key: Key) -> StoreFuture<bool> {
59
24
        Box::pin(std::future::ready(self.0.read().contains_key(&key.consolidated())))
60
24
    }
61
62
417k
    fn get(&self, key: Key) -> StoreFuture<Result<Option<Value>, KVStoreError>> {
63
417k
        Box::pin(std::future::ready(Ok(self.0.read().get(&key.consolidated()).cloned())))
64
417k
    }
65
}
66
67
/// RocksDB-backed key-value store implementing Lyquor's shared storage trait.
68
#[derive(Clone)]
69
pub struct RocksDB(Arc<rocksdb::OptimisticTransactionDB>);
70
71
impl KVStore for RocksDB {
72
7
    fn atomic_write<'a>(
73
7
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
74
7
    ) -> Result<(), KVStoreError> {
75
7
        let mut wb = rocksdb::WriteBatchWithTransaction::default();
76
25
        for (k, v) in 
changes7
{
77
25
            match v {
78
25
                Some(v) => wb.put(k.consolidated(), v),
79
0
                None => wb.delete(k.consolidated()),
80
            }
81
        }
82
7
        Ok(self.0.write(wb).context("RocksDB write error.")
?0
)
83
7
    }
84
85
0
    fn contains(&self, key: Key) -> StoreFuture<bool> {
86
0
        let db = self.0.clone();
87
0
        let key = key.consolidated();
88
0
        Box::pin(async move {
89
0
            tokio::task::spawn_blocking(move || db.key_may_exist(key))
90
0
                .await
91
0
                .unwrap_or(false)
92
0
        })
93
0
    }
94
95
290
    fn get(&self, key: Key) -> StoreFuture<Result<Option<Value>, KVStoreError>> {
96
290
        let db = self.0.clone();
97
290
        let key = key.consolidated();
98
290
        Box::pin(async move {
99
290
            tokio::task::spawn_blocking(move || {
100
290
                let value = db.get(key).context("RocksDB read error.")
?0
;
101
290
                Ok(value.map(|v| 
v276
.
into276
()))
102
290
            })
103
290
            .await
104
290
            .context("RocksDB read task join error.")
?0
105
290
        })
106
290
    }
107
}
108
109
impl RocksDB {
110
    /// Open or create a RocksDB store at `path`.
111
6
    pub fn new(path: &Path) -> Result<Self, KVStoreError> {
112
6
        let mut opts = Options::default();
113
6
        opts.create_if_missing(true);
114
6
        let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")
?0
;
115
6
        Ok(Self(Arc::new(db)))
116
6
    }
117
}
118
119
use lyquor_api::subkey_builder;
120
use lyquor_primitives::{Bytes, decode_object, encode_object};
121
use serde::{Deserialize, Serialize};
122
use std::hash::Hash;
123
124
/// Persistent sorted mapping backed by a Lyquor key-value store.
125
pub struct SortedMappingStore<
126
    S: KVStore,
127
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
128
    V: Serialize + for<'a> Deserialize<'a> + Clone,
129
> {
130
    state: Arc<Mutex<SortedMappingState<K, V>>>,
131
    store: Arc<S>,
132
}
133
134
struct SortedMappingState<
135
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
136
    V: Serialize + for<'a> Deserialize<'a> + Clone,
137
> {
138
    cache: lru::LruCache<K, V>,
139
    max: Option<K>,
140
    first_idx: u64,
141
    next_idx: u64,
142
}
143
144
subkey_builder!(SortedMappingSubkey(
145
    ([0x00])-value(&[u8]) => Key,
146
    ([0x01])-index(&u64) => Key,
147
    ([0x02])-max() => Key,
148
    ([0x03])-first_idx() => Key,
149
    ([0x04])-next_idx() => Key,
150
));
151
152
impl<
153
    S: KVStore,
154
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
155
    V: Serialize + for<'a> Deserialize<'a> + Clone,
156
> SortedMappingStore<S, K, V>
157
{
158
86
    fn subkey() -> &'static SortedMappingSubkey {
159
        static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new();
160
86
        KEY.get_or_init(|| 
SortedMappingSubkey::new6
(
Value::new6
().
into6
()))
161
86
    }
162
163
    /// Open a sorted mapping view over `store` with an in-memory lookup cache.
164
10
    pub async fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> {
165
10
        let 
max9
:
Option<K>9
= match store.get(Self::subkey().max()).await
?0
{
166
2
            Some(raw) => Some(decode_object(&raw).ok_or_else(|| 
anyhow!1
("failed to decode sorted mapping max key"))
?1
),
167
8
            None => None,
168
        };
169
9
        let first_idx: u64 = match store.get(Self::subkey().first_idx()).await
?0
{
170
0
            Some(raw) => decode_object(&raw).ok_or_else(|| anyhow!("failed to decode sorted mapping first index"))?,
171
9
            None => 0,
172
        };
173
9
        let next_idx: u64 = match store.get(Self::subkey().next_idx()).await
?0
{
174
1
            Some(raw) => decode_object(&raw).ok_or_else(|| 
anyhow!0
("failed to decode sorted mapping next index"))
?0
,
175
8
            None => 0,
176
        };
177
178
9
        Ok(Self {
179
9
            state: Arc::new(Mutex::new(SortedMappingState {
180
9
                cache: lru::LruCache::new(cache_size),
181
9
                max,
182
9
                first_idx,
183
9
                next_idx,
184
9
            })),
185
9
            store: Arc::new(store),
186
9
        })
187
10
    }
188
}
189
190
impl<
191
    S: KVStore + 'static,
192
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static,
193
    V: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static,
194
> SortedMapping<K, V> for SortedMappingStore<S, K, V>
195
{
196
8
    fn max_key(&self) -> StoreFuture<Option<K>> {
197
8
        let state = self.state.clone();
198
8
        Box::pin(async move { state.lock().await.max.clone() })
199
8
    }
200
201
2
    fn append(&self, key: K, value: V) -> StoreFuture<Result<bool, KVStoreError>> {
202
2
        let state = self.state.clone();
203
2
        let store = self.store.clone();
204
2
        Box::pin(async move {
205
2
            let mut state = state.lock().await;
206
2
            Self::append_new_max(store.as_ref(), &mut state, key, value)
207
2
        })
208
2
    }
209
210
8
    fn append_or_update_max(&self, key: K, value: V) -> StoreFuture<Result<bool, KVStoreError>> {
211
8
        let state = self.state.clone();
212
8
        let store = self.store.clone();
213
8
        Box::pin(async move {
214
8
            let mut state = state.lock().await;
215
8
            match state.max.clone() {
216
2
                Some(
max1
) if key > ma
x1
=>
Self::append_new_max1
(
store.as_ref()1
,
&mut state1
,
key1
,
value1
),
217
1
                Some(max) if key == max => {
218
1
                    let key_raw: Bytes = encode_object(&key).into();
219
1
                    store.atomic_write(Box::new(
220
1
                        [(Self::subkey().value(&key_raw), Some(encode_object(&value).into()))].into_iter(),
221
1
                    ))
?0
;
222
1
                    state.cache.push(key.clone(), value.clone());
223
1
                    Ok(true)
224
                }
225
0
                Some(_) => Ok(false),
226
6
                None => Self::append_new_max(store.as_ref(), &mut state, key, value),
227
            }
228
8
        })
229
8
    }
230
231
16
    fn get_le(&self, key: K) -> StoreFuture<Result<Option<V>, KVStoreError>> {
232
16
        let state = self.state.clone();
233
16
        let store = self.store.clone();
234
16
        Box::pin(async move {
235
16
            let mut state = state.lock().await;
236
16
            let first_idx = state.first_idx;
237
16
            let next_idx = state.next_idx;
238
16
            let Some(
raw8
) = store.get(Self::subkey().index(&first_idx)).await
?0
else {
239
8
                return Ok(None);
240
            };
241
8
            let mut l_key = decode_object(&raw).ok_or_else(|| 
anyhow!0
("failed to decode sorted mapping index key"))
?0
;
242
8
            if key < l_key {
243
0
                return Ok(None);
244
8
            }
245
8
            let mut l = first_idx;
246
8
            let mut r = next_idx;
247
10
            while r > l + 1 {
248
2
                let mid = (l + r) >> 1;
249
2
                let Some(raw) = store.get(Self::subkey().index(&mid)).await
?0
else {
250
0
                    return Ok(None);
251
                };
252
2
                let mid_key =
253
2
                    decode_object(&raw).ok_or_else(|| 
anyhow!0
("failed to decode sorted mapping index key"))
?0
;
254
2
                if key < mid_key {
255
1
                    r = mid
256
1
                } else {
257
1
                    l = mid;
258
1
                    l_key = mid_key;
259
1
                }
260
            }
261
8
            Self::get_locked(store, &mut state, l_key).await
262
16
        })
263
16
    }
264
265
2
    fn get(&self, key: K) -> StoreFuture<Result<Option<V>, KVStoreError>> {
266
2
        let state = self.state.clone();
267
2
        let store = self.store.clone();
268
2
        Box::pin(async move {
269
2
            let mut state = state.lock().await;
270
2
            Self::get_locked(store, &mut state, key).await
271
2
        })
272
2
    }
273
}
274
275
impl<
276
    S: KVStore + 'static,
277
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static,
278
    V: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static,
279
> SortedMappingStore<S, K, V>
280
{
281
9
    fn append_new_max(store: &S, state: &mut SortedMappingState<K, V>, key: K, value: V) -> Result<bool, KVStoreError> {
282
9
        if !state.max.clone().map(|max| 
key1
>
max1
).unwrap_or(true) {
283
0
            return Ok(false);
284
9
        }
285
286
9
        let idx = state.next_idx;
287
9
        let next_idx = idx + 1;
288
9
        let key_raw: Bytes = encode_object(&key).into();
289
9
        let mapping = [
290
9
            (Self::subkey().value(&key_raw), Some(encode_object(&value).into())),
291
9
            (Self::subkey().index(&idx), Some(key_raw.clone())),
292
9
            (Self::subkey().max(), Some(key_raw)),
293
9
            (Self::subkey().next_idx(), Some(encode_object(&next_idx).into())),
294
9
        ]
295
9
        .into_iter();
296
9
        store.atomic_write(Box::new(mapping))
?0
;
297
9
        state.next_idx = next_idx;
298
9
        state.cache.push(key.clone(), value.clone());
299
9
        state.max = Some(key);
300
9
        Ok(true)
301
9
    }
302
303
10
    async fn get_locked(
304
10
        store: Arc<S>, state: &mut SortedMappingState<K, V>, key: K,
305
10
    ) -> Result<Option<V>, KVStoreError> {
306
10
        if let Some(
value9
) = state.cache.get(&key).cloned() {
307
9
            return Ok(Some(value));
308
1
        }
309
310
1
        let Some(raw) = store.get(Self::subkey().value(&encode_object(&key))).await
?0
else {
311
0
            return Ok(None);
312
        };
313
1
        let 
value0
:
V0
= decode_object(&raw).ok_or_else(|| anyhow!("failed to decode sorted mapping value"))?;
314
0
        state.cache.push(key.clone(), value.clone());
315
0
        Ok(Some(value))
316
10
    }
317
}
318
319
#[cfg(test)]
320
mod tests {
321
    use std::sync::Arc;
322
323
    use super::SortedMappingStore;
324
    use crate::MemDB;
325
    use lyquor_api::store::{KVStore, SortedMapping};
326
    use lyquor_primitives::{Bytes, encode_object};
327
    use lyquor_test::test;
328
329
    #[test(tokio::test)]
330
    async fn sorted_mapping_store_updates_existing_max_key() {
331
        let store = SortedMappingStore::new(MemDB::new(), 4.try_into().unwrap())
332
            .await
333
            .unwrap();
334
        assert!(store.append(7_u64, 10_u64).await.unwrap());
335
        assert!(store.append_or_update_max(7, 11).await.unwrap());
336
        assert_eq!(store.max_key().await, Some(7));
337
        assert_eq!(store.get(7).await.unwrap(), Some(11));
338
        assert_eq!(store.get_le(7).await.unwrap(), Some(11));
339
    }
340
341
    #[test(tokio::test)]
342
    async fn sorted_mapping_rejects_corrupt_reads() {
343
        let db = MemDB::new();
344
        db.atomic_write(Box::new(
345
            [(
346
                SortedMappingStore::<MemDB, u64, u64>::subkey().max(),
347
                Some(Bytes::new()),
348
            )]
349
            .into_iter(),
350
        ))
351
        .unwrap();
352
353
        assert!(
354
            SortedMappingStore::<_, u64, u64>::new(db, 4.try_into().unwrap())
355
                .await
356
                .is_err()
357
        );
358
359
        let db = Arc::new(MemDB::new());
360
        let store = SortedMappingStore::new(db.clone(), 4.try_into().unwrap())
361
            .await
362
            .unwrap();
363
        assert!(store.append(7_u64, 10_u64).await.unwrap());
364
365
        let key_raw: Bytes = encode_object(&7_u64).into();
366
        db.atomic_write(Box::new(
367
            [(
368
                SortedMappingStore::<Arc<MemDB>, u64, u64>::subkey().value(&key_raw),
369
                Some(Bytes::new()),
370
            )]
371
            .into_iter(),
372
        ))
373
        .unwrap();
374
375
        let reopened: SortedMappingStore<_, u64, u64> =
376
            SortedMappingStore::new(db, 4.try_into().unwrap()).await.unwrap();
377
        assert!(reopened.get(7_u64).await.is_err());
378
    }
379
}