Coverage Report

Created: 2026-03-22 03:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line
Count
Source
1
#![doc(html_no_source)] // remove it upon open-source
2
3
use std::collections::HashMap;
4
use std::path::Path;
5
6
pub use rocksdb;
7
use rocksdb::{OptimisticTransactionDB, Options};
8
9
use lyquor_api::{
10
    anyhow::Context,
11
    kvstore::{KVStore, KVStoreError, Key, Value},
12
    parking_lot::RwLock,
13
};
14
15
pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>);
16
17
impl MemDB {
18
526
    pub fn new() -> Self {
19
526
        Self(RwLock::new(HashMap::new()))
20
526
    }
21
}
22
23
impl Default for MemDB {
24
0
    fn default() -> Self {
25
0
        Self::new()
26
0
    }
27
}
28
29
impl KVStore for MemDB {
30
44.3k
    fn atomic_write<'a>(
31
44.3k
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
32
44.3k
    ) -> Result<(), KVStoreError> {
33
44.3k
        let mut map = self.0.write();
34
60.1k
        for (k, v) in 
changes44.3k
{
35
60.1k
            match v {
36
60.1k
                Some(v) => {
37
60.1k
                    map.insert(k.consolidated(), v);
38
60.1k
                }
39
2
                None => {
40
2
                    map.remove(&k.consolidated());
41
2
                }
42
            }
43
        }
44
44.3k
        Ok(())
45
44.3k
    }
46
47
24
    fn contains(&self, key: Key) -> bool {
48
24
        self.0.read().contains_key(&key.consolidated())
49
24
    }
50
51
480k
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
52
480k
        Ok(self.0.read().get(&key.consolidated()).cloned())
53
480k
    }
54
}
55
56
pub struct RocksDB(rocksdb::OptimisticTransactionDB);
57
58
impl KVStore for RocksDB {
59
7
    fn atomic_write<'a>(
60
7
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
61
7
    ) -> Result<(), KVStoreError> {
62
7
        let mut wb = rocksdb::WriteBatchWithTransaction::default();
63
25
        for (k, v) in 
changes7
{
64
25
            match v {
65
25
                Some(v) => wb.put(k.consolidated(), v),
66
0
                None => wb.delete(k.consolidated()),
67
            }
68
        }
69
7
        Ok(self.0.write(wb).context("RocksDB write error.")
?0
)
70
7
    }
71
72
0
    fn contains(&self, key: Key) -> bool {
73
0
        self.0.key_may_exist(key.consolidated())
74
0
    }
75
76
385
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
77
385
        Ok(self
78
385
            .0
79
385
            .get(key.consolidated())
80
385
            .context("RocksDB read error.")
?0
81
385
            .map(|v| 
v371
.
into371
()))
82
385
    }
83
}
84
85
impl RocksDB {
86
6
    pub fn new(path: &Path) -> Result<Self, KVStoreError> {
87
6
        let mut opts = Options::default();
88
6
        opts.create_if_missing(true);
89
6
        let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")
?0
;
90
6
        Ok(Self(db))
91
6
    }
92
}
93
94
use lyquor_api::subkey_builder;
95
use lyquor_primitives::{Bytes, decode_object, encode_object};
96
use serde::{Deserialize, Serialize};
97
use std::hash::Hash;
98
99
pub struct SortedMappingStore<
100
    S: KVStore,
101
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
102
    V: Serialize + for<'a> Deserialize<'a> + Clone,
103
> {
104
    cache: lru::LruCache<K, V>,
105
    max: Option<K>,
106
    first_idx: u64,
107
    next_idx: u64,
108
    store: S,
109
}
110
111
subkey_builder!(SortedMappingSubkey(
112
    ([0x00])-value(&[u8]) => Key,
113
    ([0x01])-index(&u64) => Key,
114
    ([0x02])-max() => Key,
115
    ([0x03])-first_idx() => Key,
116
    ([0x04])-next_idx() => Key,
117
));
118
119
impl<
120
    S: KVStore,
121
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
122
    V: Serialize + for<'a> Deserialize<'a> + Clone,
123
> SortedMappingStore<S, K, V>
124
{
125
67
    fn subkey() -> &'static SortedMappingSubkey {
126
        static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new();
127
67
        KEY.get_or_init(|| 
SortedMappingSubkey::new4
(
Value::new4
().
into4
()))
128
67
    }
129
130
6
    pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> {
131
6
        let max: Option<K> = match store.get(Self::subkey().max())
?0
{
132
0
            Some(raw) => decode_object(&raw),
133
6
            None => None,
134
        };
135
6
        let first_idx: u64 = store
136
6
            .get(Self::subkey().first_idx())
?0
137
6
            .and_then(|raw| 
decode_object0
(
&raw0
))
138
6
            .unwrap_or(0);
139
6
        let next_idx: u64 = store
140
6
            .get(Self::subkey().next_idx())
?0
141
6
            .and_then(|raw| 
decode_object0
(
&raw0
))
142
6
            .unwrap_or(0);
143
144
6
        Ok(Self {
145
6
            cache: lru::LruCache::new(cache_size),
146
6
            max,
147
6
            first_idx,
148
6
            next_idx,
149
6
            store,
150
6
        })
151
6
    }
152
153
14
    pub fn max_key(&self) -> Option<K> {
154
14
        self.max.clone()
155
14
    }
156
157
7
    pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> {
158
7
        if !self.max_key().map(|max| 
key1
>
max1
).unwrap_or(true) {
159
0
            return Ok(false);
160
7
        }
161
162
7
        let idx = self.next_idx;
163
7
        self.next_idx += 1;
164
7
        self.cache.push(key.clone(), value.clone());
165
7
        self.max = Some(key.clone());
166
7
        let key_raw: Bytes = encode_object(&key).into();
167
7
        let mapping = [
168
7
            (Self::subkey().value(&key_raw), Some(encode_object(&value).into())),
169
7
            (Self::subkey().index(&idx), Some(key_raw.clone())),
170
7
            (Self::subkey().max(), Some(key_raw)),
171
7
            (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())),
172
7
        ]
173
7
        .into_iter();
174
7
        self.store.atomic_write(Box::new(mapping))
?0
;
175
7
        Ok(true)
176
7
    }
177
178
19
    pub fn get_le(&mut self, key: K) -> Option<V> {
179
19
        let mut l = self.first_idx;
180
19
        let mut r = self.next_idx;
181
19
        let 
mut l_key7
= self
182
19
            .store
183
19
            .get(Self::subkey().index(&l))
184
19
            .ok()
185
19
            .flatten()
186
19
            .and_then(|b| 
decode_object7
(
&b7
))
?12
;
187
7
        if key < l_key {
188
0
            return None;
189
7
        }
190
9
        while r > l + 1 {
191
2
            let mid = (l + r) >> 1;
192
2
            let mid_key: K = self
193
2
                .store
194
2
                .get(Self::subkey().index(&mid))
195
2
                .ok()
196
2
                .flatten()
197
2
                .and_then(|b| decode_object(&b))
?0
;
198
2
            if key < mid_key {
199
1
                r = mid
200
1
            } else {
201
1
                l = mid;
202
1
                l_key = mid_key;
203
1
            }
204
        }
205
7
        let value = self.get(l_key.clone());
206
7
        if let Some(value) = &value &&
207
7
            l_key != key
208
0
        {
209
0
            self.cache.push(key, value.clone());
210
7
        }
211
7
        value
212
19
    }
213
214
7
    pub fn get(&mut self, key: K) -> Option<V> {
215
7
        match self.cache.get(&key) {
216
7
            Some(value) => Some(value.clone()),
217
            None => {
218
0
                let value: V = self
219
0
                    .store
220
0
                    .get(Self::subkey().value(&encode_object(&key)))
221
0
                    .ok()
222
0
                    .flatten()
223
0
                    .and_then(|b| decode_object(&b))?;
224
0
                self.cache.push(key.clone(), value.clone());
225
0
                Some(value)
226
            }
227
        }
228
7
    }
229
}