/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line | Count | Source |
1 | | #![doc(html_no_source)] // remove it upon open-source |
2 | | |
3 | | use std::collections::HashMap; |
4 | | use std::path::Path; |
5 | | |
6 | | pub use rocksdb; |
7 | | use rocksdb::{OptimisticTransactionDB, Options}; |
8 | | |
9 | | use lyquor_api::{ |
10 | | anyhow::Context, |
11 | | kvstore::{KVStore, KVStoreError, Key, Value}, |
12 | | parking_lot::RwLock, |
13 | | }; |
14 | | |
15 | | pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>); |
16 | | |
17 | | impl MemDB { |
18 | 526 | pub fn new() -> Self { |
19 | 526 | Self(RwLock::new(HashMap::new())) |
20 | 526 | } |
21 | | } |
22 | | |
23 | | impl Default for MemDB { |
24 | 0 | fn default() -> Self { |
25 | 0 | Self::new() |
26 | 0 | } |
27 | | } |
28 | | |
29 | | impl KVStore for MemDB { |
30 | 44.3k | fn atomic_write<'a>( |
31 | 44.3k | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
32 | 44.3k | ) -> Result<(), KVStoreError> { |
33 | 44.3k | let mut map = self.0.write(); |
34 | 60.1k | for (k, v) in changes44.3k { |
35 | 60.1k | match v { |
36 | 60.1k | Some(v) => { |
37 | 60.1k | map.insert(k.consolidated(), v); |
38 | 60.1k | } |
39 | 2 | None => { |
40 | 2 | map.remove(&k.consolidated()); |
41 | 2 | } |
42 | | } |
43 | | } |
44 | 44.3k | Ok(()) |
45 | 44.3k | } |
46 | | |
47 | 24 | fn contains(&self, key: Key) -> bool { |
48 | 24 | self.0.read().contains_key(&key.consolidated()) |
49 | 24 | } |
50 | | |
51 | 480k | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
52 | 480k | Ok(self.0.read().get(&key.consolidated()).cloned()) |
53 | 480k | } |
54 | | } |
55 | | |
56 | | pub struct RocksDB(rocksdb::OptimisticTransactionDB); |
57 | | |
58 | | impl KVStore for RocksDB { |
59 | 7 | fn atomic_write<'a>( |
60 | 7 | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
61 | 7 | ) -> Result<(), KVStoreError> { |
62 | 7 | let mut wb = rocksdb::WriteBatchWithTransaction::default(); |
63 | 25 | for (k, v) in changes7 { |
64 | 25 | match v { |
65 | 25 | Some(v) => wb.put(k.consolidated(), v), |
66 | 0 | None => wb.delete(k.consolidated()), |
67 | | } |
68 | | } |
69 | 7 | Ok(self.0.write(wb).context("RocksDB write error.")?0 ) |
70 | 7 | } |
71 | | |
72 | 0 | fn contains(&self, key: Key) -> bool { |
73 | 0 | self.0.key_may_exist(key.consolidated()) |
74 | 0 | } |
75 | | |
76 | 385 | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
77 | 385 | Ok(self |
78 | 385 | .0 |
79 | 385 | .get(key.consolidated()) |
80 | 385 | .context("RocksDB read error.")?0 |
81 | 385 | .map(|v| v371 .into371 ())) |
82 | 385 | } |
83 | | } |
84 | | |
85 | | impl RocksDB { |
86 | 6 | pub fn new(path: &Path) -> Result<Self, KVStoreError> { |
87 | 6 | let mut opts = Options::default(); |
88 | 6 | opts.create_if_missing(true); |
89 | 6 | let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")?0 ; |
90 | 6 | Ok(Self(db)) |
91 | 6 | } |
92 | | } |
93 | | |
94 | | use lyquor_api::subkey_builder; |
95 | | use lyquor_primitives::{Bytes, decode_object, encode_object}; |
96 | | use serde::{Deserialize, Serialize}; |
97 | | use std::hash::Hash; |
98 | | |
99 | | pub struct SortedMappingStore< |
100 | | S: KVStore, |
101 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
102 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
103 | | > { |
104 | | cache: lru::LruCache<K, V>, |
105 | | max: Option<K>, |
106 | | first_idx: u64, |
107 | | next_idx: u64, |
108 | | store: S, |
109 | | } |
110 | | |
111 | | subkey_builder!(SortedMappingSubkey( |
112 | | ([0x00])-value(&[u8]) => Key, |
113 | | ([0x01])-index(&u64) => Key, |
114 | | ([0x02])-max() => Key, |
115 | | ([0x03])-first_idx() => Key, |
116 | | ([0x04])-next_idx() => Key, |
117 | | )); |
118 | | |
119 | | impl< |
120 | | S: KVStore, |
121 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
122 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
123 | | > SortedMappingStore<S, K, V> |
124 | | { |
125 | 67 | fn subkey() -> &'static SortedMappingSubkey { |
126 | | static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new(); |
127 | 67 | KEY.get_or_init(|| SortedMappingSubkey::new4 (Value::new4 ().into4 ())) |
128 | 67 | } |
129 | | |
130 | 6 | pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> { |
131 | 6 | let max: Option<K> = match store.get(Self::subkey().max())?0 { |
132 | 0 | Some(raw) => decode_object(&raw), |
133 | 6 | None => None, |
134 | | }; |
135 | 6 | let first_idx: u64 = store |
136 | 6 | .get(Self::subkey().first_idx())?0 |
137 | 6 | .and_then(|raw| decode_object0 (&raw0 )) |
138 | 6 | .unwrap_or(0); |
139 | 6 | let next_idx: u64 = store |
140 | 6 | .get(Self::subkey().next_idx())?0 |
141 | 6 | .and_then(|raw| decode_object0 (&raw0 )) |
142 | 6 | .unwrap_or(0); |
143 | | |
144 | 6 | Ok(Self { |
145 | 6 | cache: lru::LruCache::new(cache_size), |
146 | 6 | max, |
147 | 6 | first_idx, |
148 | 6 | next_idx, |
149 | 6 | store, |
150 | 6 | }) |
151 | 6 | } |
152 | | |
153 | 14 | pub fn max_key(&self) -> Option<K> { |
154 | 14 | self.max.clone() |
155 | 14 | } |
156 | | |
157 | 7 | pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> { |
158 | 7 | if !self.max_key().map(|max| key1 > max1 ).unwrap_or(true) { |
159 | 0 | return Ok(false); |
160 | 7 | } |
161 | | |
162 | 7 | let idx = self.next_idx; |
163 | 7 | self.next_idx += 1; |
164 | 7 | self.cache.push(key.clone(), value.clone()); |
165 | 7 | self.max = Some(key.clone()); |
166 | 7 | let key_raw: Bytes = encode_object(&key).into(); |
167 | 7 | let mapping = [ |
168 | 7 | (Self::subkey().value(&key_raw), Some(encode_object(&value).into())), |
169 | 7 | (Self::subkey().index(&idx), Some(key_raw.clone())), |
170 | 7 | (Self::subkey().max(), Some(key_raw)), |
171 | 7 | (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())), |
172 | 7 | ] |
173 | 7 | .into_iter(); |
174 | 7 | self.store.atomic_write(Box::new(mapping))?0 ; |
175 | 7 | Ok(true) |
176 | 7 | } |
177 | | |
178 | 19 | pub fn get_le(&mut self, key: K) -> Option<V> { |
179 | 19 | let mut l = self.first_idx; |
180 | 19 | let mut r = self.next_idx; |
181 | 19 | let mut l_key7 = self |
182 | 19 | .store |
183 | 19 | .get(Self::subkey().index(&l)) |
184 | 19 | .ok() |
185 | 19 | .flatten() |
186 | 19 | .and_then(|b| decode_object7 (&b7 ))?12 ; |
187 | 7 | if key < l_key { |
188 | 0 | return None; |
189 | 7 | } |
190 | 9 | while r > l + 1 { |
191 | 2 | let mid = (l + r) >> 1; |
192 | 2 | let mid_key: K = self |
193 | 2 | .store |
194 | 2 | .get(Self::subkey().index(&mid)) |
195 | 2 | .ok() |
196 | 2 | .flatten() |
197 | 2 | .and_then(|b| decode_object(&b))?0 ; |
198 | 2 | if key < mid_key { |
199 | 1 | r = mid |
200 | 1 | } else { |
201 | 1 | l = mid; |
202 | 1 | l_key = mid_key; |
203 | 1 | } |
204 | | } |
205 | 7 | let value = self.get(l_key.clone()); |
206 | 7 | if let Some(value) = &value && |
207 | 7 | l_key != key |
208 | 0 | { |
209 | 0 | self.cache.push(key, value.clone()); |
210 | 7 | } |
211 | 7 | value |
212 | 19 | } |
213 | | |
214 | 7 | pub fn get(&mut self, key: K) -> Option<V> { |
215 | 7 | match self.cache.get(&key) { |
216 | 7 | Some(value) => Some(value.clone()), |
217 | | None => { |
218 | 0 | let value: V = self |
219 | 0 | .store |
220 | 0 | .get(Self::subkey().value(&encode_object(&key))) |
221 | 0 | .ok() |
222 | 0 | .flatten() |
223 | 0 | .and_then(|b| decode_object(&b))?; |
224 | 0 | self.cache.push(key.clone(), value.clone()); |
225 | 0 | Some(value) |
226 | | } |
227 | | } |
228 | 7 | } |
229 | | } |