/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line | Count | Source |
1 | | #![doc(html_no_source)] // remove it upon open-source |
2 | | |
3 | | use std::collections::HashMap; |
4 | | use std::path::Path; |
5 | | |
6 | | pub use rocksdb; |
7 | | use rocksdb::{OptimisticTransactionDB, Options}; |
8 | | |
9 | | use lyquor_api::{ |
10 | | anyhow::Context, |
11 | | kvstore::{KVStore, KVStoreError, Key, Value}, |
12 | | parking_lot::RwLock, |
13 | | }; |
14 | | |
15 | | pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>); |
16 | | |
17 | | impl MemDB { |
18 | 219 | pub fn new() -> Self { |
19 | 219 | Self(RwLock::new(HashMap::new())) |
20 | 219 | } |
21 | | } |
22 | | |
23 | | impl KVStore for MemDB { |
24 | 29.1k | fn atomic_write<'a>( |
25 | 29.1k | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
26 | 29.1k | ) -> Result<(), KVStoreError> { |
27 | 29.1k | let mut map = self.0.write(); |
28 | 44.7k | for (k, v) in changes29.1k { |
29 | 44.7k | match v { |
30 | 44.7k | Some(v) => { |
31 | 44.7k | map.insert(k.consolidated(), v); |
32 | 44.7k | } |
33 | 2 | None => { |
34 | 2 | map.remove(&k.consolidated()); |
35 | 2 | } |
36 | | } |
37 | | } |
38 | 29.1k | Ok(()) |
39 | 29.1k | } |
40 | | |
41 | 24 | fn contains(&self, key: Key) -> bool { |
42 | 24 | self.0.read().contains_key(&key.consolidated()) |
43 | 24 | } |
44 | | |
45 | 464k | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
46 | 464k | Ok(self.0.read().get(&key.consolidated()).map(|v| v463k .clone463k ())) |
47 | 464k | } |
48 | | } |
49 | | |
50 | | pub struct RocksDB(rocksdb::OptimisticTransactionDB); |
51 | | |
52 | | impl KVStore for RocksDB { |
53 | 5 | fn atomic_write<'a>( |
54 | 5 | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
55 | 5 | ) -> Result<(), KVStoreError> { |
56 | 5 | let mut wb = rocksdb::WriteBatchWithTransaction::default(); |
57 | 23 | for (k, v) in changes5 { |
58 | 23 | match v { |
59 | 23 | Some(v) => wb.put(k.consolidated(), v), |
60 | 0 | None => wb.delete(k.consolidated()), |
61 | | } |
62 | | } |
63 | 5 | Ok(self.0.write(wb).context("RocksDB write error.")?0 ) |
64 | 5 | } |
65 | | |
66 | 0 | fn contains(&self, key: Key) -> bool { |
67 | 0 | self.0.key_may_exist(key.consolidated()) |
68 | 0 | } |
69 | | |
70 | 381 | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
71 | 381 | Ok(self |
72 | 381 | .0 |
73 | 381 | .get(key.consolidated()) |
74 | 381 | .context("RocksDB read error.")?0 |
75 | 381 | .map(|v| v369 .into369 ())) |
76 | 381 | } |
77 | | } |
78 | | |
79 | | impl RocksDB { |
80 | 2 | pub fn new(path: &Path) -> Result<Self, KVStoreError> { |
81 | 2 | let mut opts = Options::default(); |
82 | 2 | opts.create_if_missing(true); |
83 | 2 | let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")?0 ; |
84 | 2 | Ok(Self(db)) |
85 | 2 | } |
86 | | } |
87 | | |
88 | | use lyquor_api::subkey_builder; |
89 | | use lyquor_primitives::{Bytes, decode_object, encode_object}; |
90 | | use serde::{Deserialize, Serialize}; |
91 | | use std::hash::Hash; |
92 | | |
93 | | pub struct SortedMappingStore< |
94 | | S: KVStore, |
95 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
96 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
97 | | > { |
98 | | cache: lru::LruCache<K, V>, |
99 | | max: Option<K>, |
100 | | first_idx: u64, |
101 | | next_idx: u64, |
102 | | store: S, |
103 | | } |
104 | | |
105 | | subkey_builder!(SortedMappingSubkey( |
106 | | ([0x00])-value(&[u8]) => Key, |
107 | | ([0x01])-index(&u64) => Key, |
108 | | ([0x02])-max() => Key, |
109 | | ([0x03])-first_idx() => Key, |
110 | | ([0x04])-next_idx() => Key, |
111 | | )); |
112 | | |
113 | | impl< |
114 | | S: KVStore, |
115 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
116 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
117 | | > SortedMappingStore<S, K, V> |
118 | | { |
119 | 0 | fn subkey() -> &'static SortedMappingSubkey { |
120 | | static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new(); |
121 | 0 | KEY.get_or_init(|| SortedMappingSubkey::new(Value::new().into())) |
122 | 0 | } |
123 | | |
124 | 0 | pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> { |
125 | 0 | let max: Option<K> = match store.get(Self::subkey().max())? { |
126 | 0 | Some(raw) => decode_object(&raw), |
127 | 0 | None => None, |
128 | | }; |
129 | 0 | let first_idx: u64 = store |
130 | 0 | .get(Self::subkey().first_idx())? |
131 | 0 | .and_then(|raw| decode_object(&raw)) |
132 | 0 | .unwrap_or(0); |
133 | 0 | let next_idx: u64 = store |
134 | 0 | .get(Self::subkey().next_idx())? |
135 | 0 | .and_then(|raw| decode_object(&raw)) |
136 | 0 | .unwrap_or(0); |
137 | | |
138 | 0 | Ok(Self { |
139 | 0 | cache: lru::LruCache::new(cache_size), |
140 | 0 | max, |
141 | 0 | first_idx, |
142 | 0 | next_idx, |
143 | 0 | store, |
144 | 0 | }) |
145 | 0 | } |
146 | | |
147 | 0 | pub fn max_key(&self) -> Option<K> { |
148 | 0 | self.max.clone() |
149 | 0 | } |
150 | | |
151 | 0 | pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> { |
152 | 0 | if !self.max_key().map(|max| key > max).unwrap_or(true) { |
153 | 0 | return Ok(false); |
154 | 0 | } |
155 | | |
156 | 0 | let idx = self.next_idx; |
157 | 0 | self.next_idx += 1; |
158 | 0 | self.cache.push(key.clone(), value.clone()); |
159 | 0 | self.max = Some(key.clone()); |
160 | 0 | let key_raw: Bytes = encode_object(&key).into(); |
161 | 0 | let mapping = [ |
162 | 0 | (Self::subkey().value(&key_raw), Some(encode_object(&value).into())), |
163 | 0 | (Self::subkey().index(&idx), Some(key_raw.clone())), |
164 | 0 | (Self::subkey().max(), Some(key_raw)), |
165 | 0 | (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())), |
166 | 0 | ] |
167 | 0 | .into_iter(); |
168 | 0 | self.store.atomic_write(Box::new(mapping))?; |
169 | 0 | Ok(true) |
170 | 0 | } |
171 | | |
172 | 0 | pub fn get_le(&mut self, key: K) -> Option<V> { |
173 | 0 | let mut l = self.first_idx; |
174 | 0 | let mut r = self.next_idx; |
175 | 0 | let mut l_key = self |
176 | 0 | .store |
177 | 0 | .get(Self::subkey().index(&l)) |
178 | 0 | .ok() |
179 | 0 | .flatten() |
180 | 0 | .and_then(|b| decode_object(&b))?; |
181 | 0 | if key < l_key { |
182 | 0 | return None; |
183 | 0 | } |
184 | 0 | while r > l + 1 { |
185 | 0 | let mid = (l + r) >> 1; |
186 | 0 | let mid_key: K = self |
187 | 0 | .store |
188 | 0 | .get(Self::subkey().index(&mid)) |
189 | 0 | .ok() |
190 | 0 | .flatten() |
191 | 0 | .and_then(|b| decode_object(&b))?; |
192 | 0 | if key < mid_key { |
193 | 0 | r = mid |
194 | 0 | } else { |
195 | 0 | l = mid; |
196 | 0 | l_key = mid_key; |
197 | 0 | } |
198 | | } |
199 | 0 | let value = self.get(l_key.clone()); |
200 | 0 | if let Some(value) = &value { |
201 | 0 | if l_key != key { |
202 | 0 | self.cache.push(key, value.clone()); |
203 | 0 | } |
204 | 0 | } |
205 | 0 | value |
206 | 0 | } |
207 | | |
208 | 0 | pub fn get(&mut self, key: K) -> Option<V> { |
209 | 0 | match self.cache.get(&key) { |
210 | 0 | Some(value) => Some(value.clone()), |
211 | | None => { |
212 | 0 | let value: V = self |
213 | 0 | .store |
214 | 0 | .get(Self::subkey().value(&encode_object(&key))) |
215 | 0 | .ok() |
216 | 0 | .flatten() |
217 | 0 | .and_then(|b| decode_object(&b))?; |
218 | 0 | self.cache.push(key.clone(), value.clone()); |
219 | 0 | Some(value) |
220 | | } |
221 | | } |
222 | 0 | } |
223 | | } |