Coverage Report

Created: 2026-05-06 07:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line
Count
Source
1
#![doc(html_no_source)] // remove it upon open-source
2
3
use std::collections::HashMap;
4
use std::path::Path;
5
6
pub use rocksdb;
7
use rocksdb::{OptimisticTransactionDB, Options};
8
9
use lyquor_api::{
10
    anyhow::Context,
11
    kvstore::{KVStore, KVStoreError, Key, Value},
12
    parking_lot::RwLock,
13
};
14
15
pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>);
16
17
impl MemDB {
18
390
    pub fn new() -> Self {
19
390
        Self(RwLock::new(HashMap::new()))
20
390
    }
21
}
22
23
impl Default for MemDB {
24
0
    fn default() -> Self {
25
0
        Self::new()
26
0
    }
27
}
28
29
impl KVStore for MemDB {
30
15.4k
    fn atomic_write<'a>(
31
15.4k
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
32
15.4k
    ) -> Result<(), KVStoreError> {
33
15.4k
        let mut map = self.0.write();
34
45.4k
        for (k, v) in 
changes15.4k
{
35
45.4k
            match v {
36
45.4k
                Some(v) => {
37
45.4k
                    map.insert(k.consolidated(), v);
38
45.4k
                }
39
2
                None => {
40
2
                    map.remove(&k.consolidated());
41
2
                }
42
            }
43
        }
44
15.4k
        Ok(())
45
15.4k
    }
46
47
24
    fn contains(&self, key: Key) -> bool {
48
24
        self.0.read().contains_key(&key.consolidated())
49
24
    }
50
51
465k
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
52
465k
        Ok(self.0.read().get(&key.consolidated()).cloned())
53
465k
    }
54
}
55
56
pub struct RocksDB(rocksdb::OptimisticTransactionDB);
57
58
impl KVStore for RocksDB {
59
7
    fn atomic_write<'a>(
60
7
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
61
7
    ) -> Result<(), KVStoreError> {
62
7
        let mut wb = rocksdb::WriteBatchWithTransaction::default();
63
25
        for (k, v) in 
changes7
{
64
25
            match v {
65
25
                Some(v) => wb.put(k.consolidated(), v),
66
0
                None => wb.delete(k.consolidated()),
67
            }
68
        }
69
7
        Ok(self.0.write(wb).context("RocksDB write error.")
?0
)
70
7
    }
71
72
0
    fn contains(&self, key: Key) -> bool {
73
0
        self.0.key_may_exist(key.consolidated())
74
0
    }
75
76
385
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
77
385
        Ok(self
78
385
            .0
79
385
            .get(key.consolidated())
80
385
            .context("RocksDB read error.")
?0
81
385
            .map(|v| 
v371
.
into371
()))
82
385
    }
83
}
84
85
impl RocksDB {
86
6
    pub fn new(path: &Path) -> Result<Self, KVStoreError> {
87
6
        let mut opts = Options::default();
88
6
        opts.create_if_missing(true);
89
6
        let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")
?0
;
90
6
        Ok(Self(db))
91
6
    }
92
}
93
94
use lyquor_api::subkey_builder;
95
use lyquor_primitives::{Bytes, decode_object, encode_object};
96
use serde::{Deserialize, Serialize};
97
use std::hash::Hash;
98
99
pub struct SortedMappingStore<
100
    S: KVStore,
101
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
102
    V: Serialize + for<'a> Deserialize<'a> + Clone,
103
> {
104
    cache: lru::LruCache<K, V>,
105
    max: Option<K>,
106
    first_idx: u64,
107
    next_idx: u64,
108
    store: S,
109
}
110
111
subkey_builder!(SortedMappingSubkey(
112
    ([0x00])-value(&[u8]) => Key,
113
    ([0x01])-index(&u64) => Key,
114
    ([0x02])-max() => Key,
115
    ([0x03])-first_idx() => Key,
116
    ([0x04])-next_idx() => Key,
117
));
118
119
impl<
120
    S: KVStore,
121
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
122
    V: Serialize + for<'a> Deserialize<'a> + Clone,
123
> SortedMappingStore<S, K, V>
124
{
125
70
    fn subkey() -> &'static SortedMappingSubkey {
126
        static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new();
127
70
        KEY.get_or_init(|| 
SortedMappingSubkey::new5
(
Value::new5
().
into5
()))
128
70
    }
129
130
7
    pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> {
131
7
        let max: Option<K> = match store.get(Self::subkey().max())
?0
{
132
0
            Some(raw) => decode_object(&raw),
133
7
            None => None,
134
        };
135
7
        let first_idx: u64 = store
136
7
            .get(Self::subkey().first_idx())
?0
137
7
            .and_then(|raw| 
decode_object0
(
&raw0
))
138
7
            .unwrap_or(0);
139
7
        let next_idx: u64 = store
140
7
            .get(Self::subkey().next_idx())
?0
141
7
            .and_then(|raw| 
decode_object0
(
&raw0
))
142
7
            .unwrap_or(0);
143
144
7
        Ok(Self {
145
7
            cache: lru::LruCache::new(cache_size),
146
7
            max,
147
7
            first_idx,
148
7
            next_idx,
149
7
            store,
150
7
        })
151
7
    }
152
153
24
    pub fn max_key(&self) -> Option<K> {
154
24
        self.max.clone()
155
24
    }
156
157
8
    pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> {
158
8
        if !self.max_key().map(|max| 
key1
>
max1
).unwrap_or(true) {
159
0
            return Ok(false);
160
8
        }
161
162
8
        let idx = self.next_idx;
163
8
        self.next_idx += 1;
164
8
        self.cache.push(key.clone(), value.clone());
165
8
        self.max = Some(key.clone());
166
8
        let key_raw: Bytes = encode_object(&key).into();
167
8
        let mapping = [
168
8
            (Self::subkey().value(&key_raw), Some(encode_object(&value).into())),
169
8
            (Self::subkey().index(&idx), Some(key_raw.clone())),
170
8
            (Self::subkey().max(), Some(key_raw)),
171
8
            (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())),
172
8
        ]
173
8
        .into_iter();
174
8
        self.store.atomic_write(Box::new(mapping))
?0
;
175
8
        Ok(true)
176
8
    }
177
178
8
    pub fn append_or_update_max(&mut self, key: K, value: V) -> Result<bool, KVStoreError> {
179
8
        match self.max_key() {
180
2
            Some(
max1
) if key > ma
x1
=>
self1
.
append1
(
key1
,
value1
),
181
1
            Some(max) if key == max => {
182
1
                self.cache.push(key.clone(), value.clone());
183
1
                let key_raw: Bytes = encode_object(&key).into();
184
1
                self.store.atomic_write(Box::new(
185
1
                    [(Self::subkey().value(&key_raw), Some(encode_object(&value).into()))].into_iter(),
186
1
                ))
?0
;
187
1
                Ok(true)
188
            }
189
0
            Some(_) => Ok(false),
190
6
            None => self.append(key, value),
191
        }
192
8
    }
193
194
14
    pub fn get_le(&mut self, key: K) -> Option<V> {
195
14
        let mut l = self.first_idx;
196
14
        let mut r = self.next_idx;
197
14
        let 
mut l_key8
= self
198
14
            .store
199
14
            .get(Self::subkey().index(&l))
200
14
            .ok()
201
14
            .flatten()
202
14
            .and_then(|b| 
decode_object8
(
&b8
))
?6
;
203
8
        if key < l_key {
204
0
            return None;
205
8
        }
206
10
        while r > l + 1 {
207
2
            let mid = (l + r) >> 1;
208
2
            let mid_key: K = self
209
2
                .store
210
2
                .get(Self::subkey().index(&mid))
211
2
                .ok()
212
2
                .flatten()
213
2
                .and_then(|b| decode_object(&b))
?0
;
214
2
            if key < mid_key {
215
1
                r = mid
216
1
            } else {
217
1
                l = mid;
218
1
                l_key = mid_key;
219
1
            }
220
        }
221
8
        let value = self.get(l_key.clone());
222
8
        if let Some(value) = &value &&
223
8
            l_key != key
224
0
        {
225
0
            self.cache.push(key, value.clone());
226
8
        }
227
8
        value
228
14
    }
229
230
9
    pub fn get(&mut self, key: K) -> Option<V> {
231
9
        match self.cache.get(&key) {
232
9
            Some(value) => Some(value.clone()),
233
            None => {
234
0
                let value: V = self
235
0
                    .store
236
0
                    .get(Self::subkey().value(&encode_object(&key)))
237
0
                    .ok()
238
0
                    .flatten()
239
0
                    .and_then(|b| decode_object(&b))?;
240
0
                self.cache.push(key.clone(), value.clone());
241
0
                Some(value)
242
            }
243
        }
244
9
    }
245
}
246
247
#[cfg(test)]
248
mod tests {
249
    use super::SortedMappingStore;
250
    use crate::MemDB;
251
252
    #[test]
253
1
    fn sorted_mapping_store_updates_existing_max_key() {
254
1
        let mut store = SortedMappingStore::new(MemDB::new(), 4.try_into().unwrap()).unwrap();
255
1
        assert!(store.append(7_u64, 10_u64).unwrap());
256
1
        assert!(store.append_or_update_max(7, 11).unwrap());
257
1
        assert_eq!(store.max_key(), Some(7));
258
1
        assert_eq!(store.get(7), Some(11));
259
1
        assert_eq!(store.get_le(7), Some(11));
260
1
    }
261
}