Coverage Report

Created: 2026-02-04 05:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line
Count
Source
1
#![doc(html_no_source)] // remove it upon open-source
2
3
use std::collections::HashMap;
4
use std::path::Path;
5
6
pub use rocksdb;
7
use rocksdb::{OptimisticTransactionDB, Options};
8
9
use lyquor_api::{
10
    anyhow::Context,
11
    kvstore::{KVStore, KVStoreError, Key, Value},
12
    parking_lot::RwLock,
13
};
14
15
pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>);
16
17
impl MemDB {
18
219
    pub fn new() -> Self {
19
219
        Self(RwLock::new(HashMap::new()))
20
219
    }
21
}
22
23
impl KVStore for MemDB {
24
29.1k
    fn atomic_write<'a>(
25
29.1k
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
26
29.1k
    ) -> Result<(), KVStoreError> {
27
29.1k
        let mut map = self.0.write();
28
44.7k
        for (k, v) in 
changes29.1k
{
29
44.7k
            match v {
30
44.7k
                Some(v) => {
31
44.7k
                    map.insert(k.consolidated(), v);
32
44.7k
                }
33
2
                None => {
34
2
                    map.remove(&k.consolidated());
35
2
                }
36
            }
37
        }
38
29.1k
        Ok(())
39
29.1k
    }
40
41
24
    fn contains(&self, key: Key) -> bool {
42
24
        self.0.read().contains_key(&key.consolidated())
43
24
    }
44
45
464k
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
46
464k
        Ok(self.0.read().get(&key.consolidated()).map(|v| 
v463k
.
clone463k
()))
47
464k
    }
48
}
49
50
pub struct RocksDB(rocksdb::OptimisticTransactionDB);
51
52
impl KVStore for RocksDB {
53
5
    fn atomic_write<'a>(
54
5
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
55
5
    ) -> Result<(), KVStoreError> {
56
5
        let mut wb = rocksdb::WriteBatchWithTransaction::default();
57
23
        for (k, v) in 
changes5
{
58
23
            match v {
59
23
                Some(v) => wb.put(k.consolidated(), v),
60
0
                None => wb.delete(k.consolidated()),
61
            }
62
        }
63
5
        Ok(self.0.write(wb).context("RocksDB write error.")
?0
)
64
5
    }
65
66
0
    fn contains(&self, key: Key) -> bool {
67
0
        self.0.key_may_exist(key.consolidated())
68
0
    }
69
70
381
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
71
381
        Ok(self
72
381
            .0
73
381
            .get(key.consolidated())
74
381
            .context("RocksDB read error.")
?0
75
381
            .map(|v| 
v369
.
into369
()))
76
381
    }
77
}
78
79
impl RocksDB {
80
2
    pub fn new(path: &Path) -> Result<Self, KVStoreError> {
81
2
        let mut opts = Options::default();
82
2
        opts.create_if_missing(true);
83
2
        let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")
?0
;
84
2
        Ok(Self(db))
85
2
    }
86
}
87
88
use lyquor_api::subkey_builder;
89
use lyquor_primitives::{Bytes, decode_object, encode_object};
90
use serde::{Deserialize, Serialize};
91
use std::hash::Hash;
92
93
pub struct SortedMappingStore<
94
    S: KVStore,
95
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
96
    V: Serialize + for<'a> Deserialize<'a> + Clone,
97
> {
98
    cache: lru::LruCache<K, V>,
99
    max: Option<K>,
100
    first_idx: u64,
101
    next_idx: u64,
102
    store: S,
103
}
104
105
subkey_builder!(SortedMappingSubkey(
106
    ([0x00])-value(&[u8]) => Key,
107
    ([0x01])-index(&u64) => Key,
108
    ([0x02])-max() => Key,
109
    ([0x03])-first_idx() => Key,
110
    ([0x04])-next_idx() => Key,
111
));
112
113
impl<
114
    S: KVStore,
115
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
116
    V: Serialize + for<'a> Deserialize<'a> + Clone,
117
> SortedMappingStore<S, K, V>
118
{
119
0
    fn subkey() -> &'static SortedMappingSubkey {
120
        static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new();
121
0
        KEY.get_or_init(|| SortedMappingSubkey::new(Value::new().into()))
122
0
    }
123
124
0
    pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> {
125
0
        let max: Option<K> = match store.get(Self::subkey().max())? {
126
0
            Some(raw) => decode_object(&raw),
127
0
            None => None,
128
        };
129
0
        let first_idx: u64 = store
130
0
            .get(Self::subkey().first_idx())?
131
0
            .and_then(|raw| decode_object(&raw))
132
0
            .unwrap_or(0);
133
0
        let next_idx: u64 = store
134
0
            .get(Self::subkey().next_idx())?
135
0
            .and_then(|raw| decode_object(&raw))
136
0
            .unwrap_or(0);
137
138
0
        Ok(Self {
139
0
            cache: lru::LruCache::new(cache_size),
140
0
            max,
141
0
            first_idx,
142
0
            next_idx,
143
0
            store,
144
0
        })
145
0
    }
146
147
0
    pub fn max_key(&self) -> Option<K> {
148
0
        self.max.clone()
149
0
    }
150
151
0
    pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> {
152
0
        if !self.max_key().map(|max| key > max).unwrap_or(true) {
153
0
            return Ok(false);
154
0
        }
155
156
0
        let idx = self.next_idx;
157
0
        self.next_idx += 1;
158
0
        self.cache.push(key.clone(), value.clone());
159
0
        self.max = Some(key.clone());
160
0
        let key_raw: Bytes = encode_object(&key).into();
161
0
        let mapping = [
162
0
            (Self::subkey().value(&key_raw), Some(encode_object(&value).into())),
163
0
            (Self::subkey().index(&idx), Some(key_raw.clone())),
164
0
            (Self::subkey().max(), Some(key_raw)),
165
0
            (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())),
166
0
        ]
167
0
        .into_iter();
168
0
        self.store.atomic_write(Box::new(mapping))?;
169
0
        Ok(true)
170
0
    }
171
172
0
    pub fn get_le(&mut self, key: K) -> Option<V> {
173
0
        let mut l = self.first_idx;
174
0
        let mut r = self.next_idx;
175
0
        let mut l_key = self
176
0
            .store
177
0
            .get(Self::subkey().index(&l))
178
0
            .ok()
179
0
            .flatten()
180
0
            .and_then(|b| decode_object(&b))?;
181
0
        if key < l_key {
182
0
            return None;
183
0
        }
184
0
        while r > l + 1 {
185
0
            let mid = (l + r) >> 1;
186
0
            let mid_key: K = self
187
0
                .store
188
0
                .get(Self::subkey().index(&mid))
189
0
                .ok()
190
0
                .flatten()
191
0
                .and_then(|b| decode_object(&b))?;
192
0
            if key < mid_key {
193
0
                r = mid
194
0
            } else {
195
0
                l = mid;
196
0
                l_key = mid_key;
197
0
            }
198
        }
199
0
        let value = self.get(l_key.clone());
200
0
        if let Some(value) = &value {
201
0
            if l_key != key {
202
0
                self.cache.push(key, value.clone());
203
0
            }
204
0
        }
205
0
        value
206
0
    }
207
208
0
    pub fn get(&mut self, key: K) -> Option<V> {
209
0
        match self.cache.get(&key) {
210
0
            Some(value) => Some(value.clone()),
211
            None => {
212
0
                let value: V = self
213
0
                    .store
214
0
                    .get(Self::subkey().value(&encode_object(&key)))
215
0
                    .ok()
216
0
                    .flatten()
217
0
                    .and_then(|b| decode_object(&b))?;
218
0
                self.cache.push(key.clone(), value.clone());
219
0
                Some(value)
220
            }
221
        }
222
0
    }
223
}