Coverage Report

Created: 2025-12-04 08:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line
Count
Source
1
#![doc(html_no_source)] // remove it upon open-source
2
3
use std::collections::HashMap;
4
use std::path::Path;
5
6
pub use rocksdb;
7
use rocksdb::{OptimisticTransactionDB, Options};
8
9
use lyquor_api::{
10
    anyhow::Context,
11
    kvstore::{KVStore, KVStoreError, Key, Value},
12
    parking_lot::RwLock,
13
};
14
15
pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>);
16
17
impl MemDB {
18
219
    pub fn new() -> Self {
19
219
        Self(RwLock::new(HashMap::new()))
20
219
    }
21
}
22
23
impl KVStore for MemDB {
24
29.1k
    fn atomic_write<'a>(
25
29.1k
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
26
29.1k
    ) -> Result<(), KVStoreError> {
27
29.1k
        let mut map = self.0.write();
28
73.8k
        for (
k44.7k
,
v44.7k
) in changes {
29
44.7k
            match v {
30
44.7k
                Some(v) => {
31
44.7k
                    map.insert(k.consolidated(), v);
32
44.7k
                }
33
2
                None => {
34
2
                    map.remove(&k.consolidated());
35
2
                }
36
            }
37
        }
38
29.1k
        Ok(())
39
29.1k
    }
40
41
464k
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
42
464k
        Ok(self.0.read().get(&key.consolidated()).map(|v| 
v463k
.
clone463k
()))
43
464k
    }
44
}
45
46
pub struct RocksDB(rocksdb::OptimisticTransactionDB);
47
48
impl KVStore for RocksDB {
49
5
    fn atomic_write<'a>(
50
5
        &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>,
51
5
    ) -> Result<(), KVStoreError> {
52
5
        let mut wb = rocksdb::WriteBatchWithTransaction::default();
53
28
        for (
k23
,
v23
) in changes {
54
23
            match v {
55
23
                Some(v) => wb.put(k.consolidated(), v),
56
0
                None => wb.delete(k.consolidated()),
57
            }
58
        }
59
5
        Ok(self.0.write(wb).context("RocksDB write error.")
?0
)
60
5
    }
61
62
138
    fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> {
63
138
        Ok(self
64
138
            .0
65
138
            .get(key.consolidated())
66
138
            .context("RocksDB read error.")
?0
67
138
            .map(|v| 
v129
.
into129
()))
68
138
    }
69
}
70
71
impl RocksDB {
72
2
    pub fn new(path: &Path) -> Result<Self, KVStoreError> {
73
2
        let mut opts = Options::default();
74
2
        opts.create_if_missing(true);
75
2
        let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")
?0
;
76
2
        Ok(Self(db))
77
2
    }
78
}
79
80
use lyquor_api::subkey_builder;
81
use lyquor_primitives::{Bytes, decode_object, encode_object};
82
use serde::{Deserialize, Serialize};
83
use std::hash::Hash;
84
85
pub struct SortedMappingStore<
86
    S: KVStore,
87
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
88
    V: Serialize + for<'a> Deserialize<'a> + Clone,
89
> {
90
    cache: lru::LruCache<K, V>,
91
    max: Option<K>,
92
    first_idx: u64,
93
    next_idx: u64,
94
    store: S,
95
}
96
97
subkey_builder!(SortedMappingSubkey(
98
    ([0x00])-value(&[u8]) => Key,
99
    ([0x01])-index(&u64) => Key,
100
    ([0x02])-max() => Key,
101
    ([0x03])-first_idx() => Key,
102
    ([0x04])-next_idx() => Key,
103
));
104
105
impl<
106
    S: KVStore,
107
    K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone,
108
    V: Serialize + for<'a> Deserialize<'a> + Clone,
109
> SortedMappingStore<S, K, V>
110
{
111
0
    fn subkey() -> &'static SortedMappingSubkey {
112
        static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new();
113
0
        KEY.get_or_init(|| SortedMappingSubkey::new(Value::new().into()))
114
0
    }
115
116
0
    pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> {
117
0
        let max: Option<K> = match store.get(Self::subkey().max())? {
118
0
            Some(raw) => decode_object(&raw),
119
0
            None => None,
120
        };
121
0
        let first_idx: u64 = store
122
0
            .get(Self::subkey().first_idx())?
123
0
            .and_then(|raw| decode_object(&raw))
124
0
            .unwrap_or(0);
125
0
        let next_idx: u64 = store
126
0
            .get(Self::subkey().next_idx())?
127
0
            .and_then(|raw| decode_object(&raw))
128
0
            .unwrap_or(0);
129
130
0
        Ok(Self {
131
0
            cache: lru::LruCache::new(cache_size),
132
0
            max,
133
0
            first_idx,
134
0
            next_idx,
135
0
            store,
136
0
        })
137
0
    }
138
139
0
    pub fn max_key(&self) -> Option<K> {
140
0
        self.max.clone()
141
0
    }
142
143
0
    pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> {
144
0
        if !self.max_key().map(|max| key > max).unwrap_or(true) {
145
0
            return Ok(false)
146
0
        }
147
148
0
        let idx = self.next_idx;
149
0
        self.next_idx += 1;
150
0
        self.cache.push(key.clone(), value.clone());
151
0
        self.max = Some(key.clone());
152
0
        let key_raw: Bytes = encode_object(&key).into();
153
0
        let mapping = [
154
0
            (Self::subkey().value(&key_raw), Some(encode_object(&value).into())),
155
0
            (Self::subkey().index(&idx), Some(key_raw.clone())),
156
0
            (Self::subkey().max(), Some(key_raw)),
157
0
            (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())),
158
0
        ]
159
0
        .into_iter();
160
0
        self.store.atomic_write(Box::new(mapping))?;
161
0
        Ok(true)
162
0
    }
163
164
0
    pub fn get_le(&mut self, key: K) -> Option<V> {
165
0
        let mut l = self.first_idx;
166
0
        let mut r = self.next_idx;
167
0
        let mut l_key = self
168
0
            .store
169
0
            .get(Self::subkey().index(&l))
170
0
            .ok()
171
0
            .flatten()
172
0
            .and_then(|b| decode_object(&b))?;
173
0
        if key < l_key {
174
0
            return None
175
0
        }
176
0
        while r > l + 1 {
177
0
            let mid = (l + r) >> 1;
178
0
            let mid_key: K = self
179
0
                .store
180
0
                .get(Self::subkey().index(&mid))
181
0
                .ok()
182
0
                .flatten()
183
0
                .and_then(|b| decode_object(&b))?;
184
0
            if key < mid_key {
185
0
                r = mid
186
0
            } else {
187
0
                l = mid;
188
0
                l_key = mid_key;
189
0
            }
190
        }
191
0
        let value = self.get(l_key.clone());
192
0
        if let Some(value) = &value {
193
0
            if l_key != key {
194
0
                self.cache.push(key, value.clone());
195
0
            }
196
0
        }
197
0
        value
198
0
    }
199
200
0
    pub fn get(&mut self, key: K) -> Option<V> {
201
0
        match self.cache.get(&key) {
202
0
            Some(value) => Some(value.clone()),
203
            None => {
204
0
                let value: V = self
205
0
                    .store
206
0
                    .get(Self::subkey().value(&encode_object(&key)))
207
0
                    .ok()
208
0
                    .flatten()
209
0
                    .and_then(|b| decode_object(&b))?;
210
0
                self.cache.push(key.clone(), value.clone());
211
0
                Some(value)
212
            }
213
        }
214
0
    }
215
}