/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line | Count | Source |
1 | | #![doc(html_no_source)] // remove it upon open-source |
2 | | |
3 | | use std::collections::HashMap; |
4 | | use std::path::Path; |
5 | | |
6 | | pub use rocksdb; |
7 | | use rocksdb::{OptimisticTransactionDB, Options}; |
8 | | |
9 | | use lyquor_api::{ |
10 | | anyhow::Context, |
11 | | kvstore::{KVStore, KVStoreError, Key, Value}, |
12 | | parking_lot::RwLock, |
13 | | }; |
14 | | |
15 | | pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>); |
16 | | |
17 | | impl MemDB { |
18 | 390 | pub fn new() -> Self { |
19 | 390 | Self(RwLock::new(HashMap::new())) |
20 | 390 | } |
21 | | } |
22 | | |
23 | | impl Default for MemDB { |
24 | 0 | fn default() -> Self { |
25 | 0 | Self::new() |
26 | 0 | } |
27 | | } |
28 | | |
29 | | impl KVStore for MemDB { |
30 | 15.4k | fn atomic_write<'a>( |
31 | 15.4k | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
32 | 15.4k | ) -> Result<(), KVStoreError> { |
33 | 15.4k | let mut map = self.0.write(); |
34 | 45.4k | for (k, v) in changes15.4k { |
35 | 45.4k | match v { |
36 | 45.4k | Some(v) => { |
37 | 45.4k | map.insert(k.consolidated(), v); |
38 | 45.4k | } |
39 | 2 | None => { |
40 | 2 | map.remove(&k.consolidated()); |
41 | 2 | } |
42 | | } |
43 | | } |
44 | 15.4k | Ok(()) |
45 | 15.4k | } |
46 | | |
47 | 24 | fn contains(&self, key: Key) -> bool { |
48 | 24 | self.0.read().contains_key(&key.consolidated()) |
49 | 24 | } |
50 | | |
51 | 465k | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
52 | 465k | Ok(self.0.read().get(&key.consolidated()).cloned()) |
53 | 465k | } |
54 | | } |
55 | | |
56 | | pub struct RocksDB(rocksdb::OptimisticTransactionDB); |
57 | | |
58 | | impl KVStore for RocksDB { |
59 | 7 | fn atomic_write<'a>( |
60 | 7 | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
61 | 7 | ) -> Result<(), KVStoreError> { |
62 | 7 | let mut wb = rocksdb::WriteBatchWithTransaction::default(); |
63 | 25 | for (k, v) in changes7 { |
64 | 25 | match v { |
65 | 25 | Some(v) => wb.put(k.consolidated(), v), |
66 | 0 | None => wb.delete(k.consolidated()), |
67 | | } |
68 | | } |
69 | 7 | Ok(self.0.write(wb).context("RocksDB write error.")?0 ) |
70 | 7 | } |
71 | | |
72 | 0 | fn contains(&self, key: Key) -> bool { |
73 | 0 | self.0.key_may_exist(key.consolidated()) |
74 | 0 | } |
75 | | |
76 | 385 | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
77 | 385 | Ok(self |
78 | 385 | .0 |
79 | 385 | .get(key.consolidated()) |
80 | 385 | .context("RocksDB read error.")?0 |
81 | 385 | .map(|v| v371 .into371 ())) |
82 | 385 | } |
83 | | } |
84 | | |
85 | | impl RocksDB { |
86 | 6 | pub fn new(path: &Path) -> Result<Self, KVStoreError> { |
87 | 6 | let mut opts = Options::default(); |
88 | 6 | opts.create_if_missing(true); |
89 | 6 | let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")?0 ; |
90 | 6 | Ok(Self(db)) |
91 | 6 | } |
92 | | } |
93 | | |
94 | | use lyquor_api::subkey_builder; |
95 | | use lyquor_primitives::{Bytes, decode_object, encode_object}; |
96 | | use serde::{Deserialize, Serialize}; |
97 | | use std::hash::Hash; |
98 | | |
99 | | pub struct SortedMappingStore< |
100 | | S: KVStore, |
101 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
102 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
103 | | > { |
104 | | cache: lru::LruCache<K, V>, |
105 | | max: Option<K>, |
106 | | first_idx: u64, |
107 | | next_idx: u64, |
108 | | store: S, |
109 | | } |
110 | | |
111 | | subkey_builder!(SortedMappingSubkey( |
112 | | ([0x00])-value(&[u8]) => Key, |
113 | | ([0x01])-index(&u64) => Key, |
114 | | ([0x02])-max() => Key, |
115 | | ([0x03])-first_idx() => Key, |
116 | | ([0x04])-next_idx() => Key, |
117 | | )); |
118 | | |
119 | | impl< |
120 | | S: KVStore, |
121 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
122 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
123 | | > SortedMappingStore<S, K, V> |
124 | | { |
125 | 70 | fn subkey() -> &'static SortedMappingSubkey { |
126 | | static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new(); |
127 | 70 | KEY.get_or_init(|| SortedMappingSubkey::new5 (Value::new5 ().into5 ())) |
128 | 70 | } |
129 | | |
130 | 7 | pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> { |
131 | 7 | let max: Option<K> = match store.get(Self::subkey().max())?0 { |
132 | 0 | Some(raw) => decode_object(&raw), |
133 | 7 | None => None, |
134 | | }; |
135 | 7 | let first_idx: u64 = store |
136 | 7 | .get(Self::subkey().first_idx())?0 |
137 | 7 | .and_then(|raw| decode_object0 (&raw0 )) |
138 | 7 | .unwrap_or(0); |
139 | 7 | let next_idx: u64 = store |
140 | 7 | .get(Self::subkey().next_idx())?0 |
141 | 7 | .and_then(|raw| decode_object0 (&raw0 )) |
142 | 7 | .unwrap_or(0); |
143 | | |
144 | 7 | Ok(Self { |
145 | 7 | cache: lru::LruCache::new(cache_size), |
146 | 7 | max, |
147 | 7 | first_idx, |
148 | 7 | next_idx, |
149 | 7 | store, |
150 | 7 | }) |
151 | 7 | } |
152 | | |
153 | 24 | pub fn max_key(&self) -> Option<K> { |
154 | 24 | self.max.clone() |
155 | 24 | } |
156 | | |
157 | 8 | pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> { |
158 | 8 | if !self.max_key().map(|max| key1 > max1 ).unwrap_or(true) { |
159 | 0 | return Ok(false); |
160 | 8 | } |
161 | | |
162 | 8 | let idx = self.next_idx; |
163 | 8 | self.next_idx += 1; |
164 | 8 | self.cache.push(key.clone(), value.clone()); |
165 | 8 | self.max = Some(key.clone()); |
166 | 8 | let key_raw: Bytes = encode_object(&key).into(); |
167 | 8 | let mapping = [ |
168 | 8 | (Self::subkey().value(&key_raw), Some(encode_object(&value).into())), |
169 | 8 | (Self::subkey().index(&idx), Some(key_raw.clone())), |
170 | 8 | (Self::subkey().max(), Some(key_raw)), |
171 | 8 | (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())), |
172 | 8 | ] |
173 | 8 | .into_iter(); |
174 | 8 | self.store.atomic_write(Box::new(mapping))?0 ; |
175 | 8 | Ok(true) |
176 | 8 | } |
177 | | |
178 | 8 | pub fn append_or_update_max(&mut self, key: K, value: V) -> Result<bool, KVStoreError> { |
179 | 8 | match self.max_key() { |
180 | 2 | Some(max1 ) if key > max1 => self1 .append1 (key1 , value1 ), |
181 | 1 | Some(max) if key == max => { |
182 | 1 | self.cache.push(key.clone(), value.clone()); |
183 | 1 | let key_raw: Bytes = encode_object(&key).into(); |
184 | 1 | self.store.atomic_write(Box::new( |
185 | 1 | [(Self::subkey().value(&key_raw), Some(encode_object(&value).into()))].into_iter(), |
186 | 1 | ))?0 ; |
187 | 1 | Ok(true) |
188 | | } |
189 | 0 | Some(_) => Ok(false), |
190 | 6 | None => self.append(key, value), |
191 | | } |
192 | 8 | } |
193 | | |
194 | 14 | pub fn get_le(&mut self, key: K) -> Option<V> { |
195 | 14 | let mut l = self.first_idx; |
196 | 14 | let mut r = self.next_idx; |
197 | 14 | let mut l_key8 = self |
198 | 14 | .store |
199 | 14 | .get(Self::subkey().index(&l)) |
200 | 14 | .ok() |
201 | 14 | .flatten() |
202 | 14 | .and_then(|b| decode_object8 (&b8 ))?6 ; |
203 | 8 | if key < l_key { |
204 | 0 | return None; |
205 | 8 | } |
206 | 10 | while r > l + 1 { |
207 | 2 | let mid = (l + r) >> 1; |
208 | 2 | let mid_key: K = self |
209 | 2 | .store |
210 | 2 | .get(Self::subkey().index(&mid)) |
211 | 2 | .ok() |
212 | 2 | .flatten() |
213 | 2 | .and_then(|b| decode_object(&b))?0 ; |
214 | 2 | if key < mid_key { |
215 | 1 | r = mid |
216 | 1 | } else { |
217 | 1 | l = mid; |
218 | 1 | l_key = mid_key; |
219 | 1 | } |
220 | | } |
221 | 8 | let value = self.get(l_key.clone()); |
222 | 8 | if let Some(value) = &value && |
223 | 8 | l_key != key |
224 | 0 | { |
225 | 0 | self.cache.push(key, value.clone()); |
226 | 8 | } |
227 | 8 | value |
228 | 14 | } |
229 | | |
230 | 9 | pub fn get(&mut self, key: K) -> Option<V> { |
231 | 9 | match self.cache.get(&key) { |
232 | 9 | Some(value) => Some(value.clone()), |
233 | | None => { |
234 | 0 | let value: V = self |
235 | 0 | .store |
236 | 0 | .get(Self::subkey().value(&encode_object(&key))) |
237 | 0 | .ok() |
238 | 0 | .flatten() |
239 | 0 | .and_then(|b| decode_object(&b))?; |
240 | 0 | self.cache.push(key.clone(), value.clone()); |
241 | 0 | Some(value) |
242 | | } |
243 | | } |
244 | 9 | } |
245 | | } |
246 | | |
247 | | #[cfg(test)] |
248 | | mod tests { |
249 | | use super::SortedMappingStore; |
250 | | use crate::MemDB; |
251 | | |
252 | | #[test] |
253 | 1 | fn sorted_mapping_store_updates_existing_max_key() { |
254 | 1 | let mut store = SortedMappingStore::new(MemDB::new(), 4.try_into().unwrap()).unwrap(); |
255 | 1 | assert!(store.append(7_u64, 10_u64).unwrap()); |
256 | 1 | assert!(store.append_or_update_max(7, 11).unwrap()); |
257 | 1 | assert_eq!(store.max_key(), Some(7)); |
258 | 1 | assert_eq!(store.get(7), Some(11)); |
259 | 1 | assert_eq!(store.get_le(7), Some(11)); |
260 | 1 | } |
261 | | } |