/home/runner/work/lyquor/lyquor/platform/db/src/lib.rs
Line | Count | Source |
1 | | #![doc(html_no_source)] // remove it upon open-source |
2 | | |
3 | | //! Concrete key-value backends for Lyquor persistence traits. |
4 | | //! |
5 | | //! `lyquor-db` implements the `lyquor_api::store` interfaces for both in-memory storage and |
6 | | //! RocksDB-backed storage. Higher layers use those interfaces for simple state, versioned state, |
7 | | //! artifact caches, and sequencing metadata; this crate keeps backend transactions and iteration |
8 | | //! behavior below that shared storage boundary. |
9 | | |
10 | | use std::collections::HashMap; |
11 | | use std::path::Path; |
12 | | use std::sync::Arc; |
13 | | |
14 | | pub use rocksdb; |
15 | | use rocksdb::{OptimisticTransactionDB, Options}; |
16 | | |
17 | | use lyquor_api::{ |
18 | | anyhow::{Context, anyhow}, |
19 | | parking_lot::RwLock, |
20 | | store::{KVStore, KVStoreError, Key, SortedMapping, StoreFuture, Value}, |
21 | | }; |
22 | | use tokio::sync::Mutex; |
23 | | |
24 | | /// In-memory key-value store used by tests and ephemeral node configurations. |
25 | | pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>); |
26 | | |
27 | | impl MemDB { |
28 | | /// Create an empty in-memory store. |
29 | 445 | pub fn new() -> Self { |
30 | 445 | Self(RwLock::new(HashMap::new())) |
31 | 445 | } |
32 | | } |
33 | | |
34 | | impl Default for MemDB { |
35 | 0 | fn default() -> Self { |
36 | 0 | Self::new() |
37 | 0 | } |
38 | | } |
39 | | |
40 | | impl KVStore for MemDB { |
41 | 16.2k | fn atomic_write<'a>( |
42 | 16.2k | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
43 | 16.2k | ) -> Result<(), KVStoreError> { |
44 | 16.2k | let mut map = self.0.write(); |
45 | 47.1k | for (k, v) in changes16.2k { |
46 | 47.1k | match v { |
47 | 47.1k | Some(v) => { |
48 | 47.1k | map.insert(k.consolidated(), v); |
49 | 47.1k | } |
50 | 2 | None => { |
51 | 2 | map.remove(&k.consolidated()); |
52 | 2 | } |
53 | | } |
54 | | } |
55 | 16.2k | Ok(()) |
56 | 16.2k | } |
57 | | |
58 | 24 | fn contains(&self, key: Key) -> StoreFuture<bool> { |
59 | 24 | Box::pin(std::future::ready(self.0.read().contains_key(&key.consolidated()))) |
60 | 24 | } |
61 | | |
62 | 417k | fn get(&self, key: Key) -> StoreFuture<Result<Option<Value>, KVStoreError>> { |
63 | 417k | Box::pin(std::future::ready(Ok(self.0.read().get(&key.consolidated()).cloned()))) |
64 | 417k | } |
65 | | } |
66 | | |
67 | | /// RocksDB-backed key-value store implementing Lyquor's shared storage trait. |
68 | | #[derive(Clone)] |
69 | | pub struct RocksDB(Arc<rocksdb::OptimisticTransactionDB>); |
70 | | |
71 | | impl KVStore for RocksDB { |
72 | 7 | fn atomic_write<'a>( |
73 | 7 | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
74 | 7 | ) -> Result<(), KVStoreError> { |
75 | 7 | let mut wb = rocksdb::WriteBatchWithTransaction::default(); |
76 | 25 | for (k, v) in changes7 { |
77 | 25 | match v { |
78 | 25 | Some(v) => wb.put(k.consolidated(), v), |
79 | 0 | None => wb.delete(k.consolidated()), |
80 | | } |
81 | | } |
82 | 7 | Ok(self.0.write(wb).context("RocksDB write error.")?0 ) |
83 | 7 | } |
84 | | |
85 | 0 | fn contains(&self, key: Key) -> StoreFuture<bool> { |
86 | 0 | let db = self.0.clone(); |
87 | 0 | let key = key.consolidated(); |
88 | 0 | Box::pin(async move { |
89 | 0 | tokio::task::spawn_blocking(move || db.key_may_exist(key)) |
90 | 0 | .await |
91 | 0 | .unwrap_or(false) |
92 | 0 | }) |
93 | 0 | } |
94 | | |
95 | 290 | fn get(&self, key: Key) -> StoreFuture<Result<Option<Value>, KVStoreError>> { |
96 | 290 | let db = self.0.clone(); |
97 | 290 | let key = key.consolidated(); |
98 | 290 | Box::pin(async move { |
99 | 290 | tokio::task::spawn_blocking(move || { |
100 | 290 | let value = db.get(key).context("RocksDB read error.")?0 ; |
101 | 290 | Ok(value.map(|v| v276 .into276 ())) |
102 | 290 | }) |
103 | 290 | .await |
104 | 290 | .context("RocksDB read task join error.")?0 |
105 | 290 | }) |
106 | 290 | } |
107 | | } |
108 | | |
109 | | impl RocksDB { |
110 | | /// Open or create a RocksDB store at `path`. |
111 | 6 | pub fn new(path: &Path) -> Result<Self, KVStoreError> { |
112 | 6 | let mut opts = Options::default(); |
113 | 6 | opts.create_if_missing(true); |
114 | 6 | let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")?0 ; |
115 | 6 | Ok(Self(Arc::new(db))) |
116 | 6 | } |
117 | | } |
118 | | |
119 | | use lyquor_api::subkey_builder; |
120 | | use lyquor_primitives::{Bytes, decode_object, encode_object}; |
121 | | use serde::{Deserialize, Serialize}; |
122 | | use std::hash::Hash; |
123 | | |
124 | | /// Persistent sorted mapping backed by a Lyquor key-value store. |
125 | | pub struct SortedMappingStore< |
126 | | S: KVStore, |
127 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
128 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
129 | | > { |
130 | | state: Arc<Mutex<SortedMappingState<K, V>>>, |
131 | | store: Arc<S>, |
132 | | } |
133 | | |
134 | | struct SortedMappingState< |
135 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
136 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
137 | | > { |
138 | | cache: lru::LruCache<K, V>, |
139 | | max: Option<K>, |
140 | | first_idx: u64, |
141 | | next_idx: u64, |
142 | | } |
143 | | |
144 | | subkey_builder!(SortedMappingSubkey( |
145 | | ([0x00])-value(&[u8]) => Key, |
146 | | ([0x01])-index(&u64) => Key, |
147 | | ([0x02])-max() => Key, |
148 | | ([0x03])-first_idx() => Key, |
149 | | ([0x04])-next_idx() => Key, |
150 | | )); |
151 | | |
152 | | impl< |
153 | | S: KVStore, |
154 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
155 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
156 | | > SortedMappingStore<S, K, V> |
157 | | { |
158 | 86 | fn subkey() -> &'static SortedMappingSubkey { |
159 | | static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new(); |
160 | 86 | KEY.get_or_init(|| SortedMappingSubkey::new6 (Value::new6 ().into6 ())) |
161 | 86 | } |
162 | | |
163 | | /// Open a sorted mapping view over `store` with an in-memory lookup cache. |
164 | 10 | pub async fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> { |
165 | 10 | let max9 : Option<K>9 = match store.get(Self::subkey().max()).await?0 { |
166 | 2 | Some(raw) => Some(decode_object(&raw).ok_or_else(|| anyhow!1 ("failed to decode sorted mapping max key"))?1 ), |
167 | 8 | None => None, |
168 | | }; |
169 | 9 | let first_idx: u64 = match store.get(Self::subkey().first_idx()).await?0 { |
170 | 0 | Some(raw) => decode_object(&raw).ok_or_else(|| anyhow!("failed to decode sorted mapping first index"))?, |
171 | 9 | None => 0, |
172 | | }; |
173 | 9 | let next_idx: u64 = match store.get(Self::subkey().next_idx()).await?0 { |
174 | 1 | Some(raw) => decode_object(&raw).ok_or_else(|| anyhow!0 ("failed to decode sorted mapping next index"))?0 , |
175 | 8 | None => 0, |
176 | | }; |
177 | | |
178 | 9 | Ok(Self { |
179 | 9 | state: Arc::new(Mutex::new(SortedMappingState { |
180 | 9 | cache: lru::LruCache::new(cache_size), |
181 | 9 | max, |
182 | 9 | first_idx, |
183 | 9 | next_idx, |
184 | 9 | })), |
185 | 9 | store: Arc::new(store), |
186 | 9 | }) |
187 | 10 | } |
188 | | } |
189 | | |
190 | | impl< |
191 | | S: KVStore + 'static, |
192 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static, |
193 | | V: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static, |
194 | | > SortedMapping<K, V> for SortedMappingStore<S, K, V> |
195 | | { |
196 | 8 | fn max_key(&self) -> StoreFuture<Option<K>> { |
197 | 8 | let state = self.state.clone(); |
198 | 8 | Box::pin(async move { state.lock().await.max.clone() }) |
199 | 8 | } |
200 | | |
201 | 2 | fn append(&self, key: K, value: V) -> StoreFuture<Result<bool, KVStoreError>> { |
202 | 2 | let state = self.state.clone(); |
203 | 2 | let store = self.store.clone(); |
204 | 2 | Box::pin(async move { |
205 | 2 | let mut state = state.lock().await; |
206 | 2 | Self::append_new_max(store.as_ref(), &mut state, key, value) |
207 | 2 | }) |
208 | 2 | } |
209 | | |
210 | 8 | fn append_or_update_max(&self, key: K, value: V) -> StoreFuture<Result<bool, KVStoreError>> { |
211 | 8 | let state = self.state.clone(); |
212 | 8 | let store = self.store.clone(); |
213 | 8 | Box::pin(async move { |
214 | 8 | let mut state = state.lock().await; |
215 | 8 | match state.max.clone() { |
216 | 2 | Some(max1 ) if key > max1 => Self::append_new_max1 (store.as_ref()1 , &mut state1 , key1 , value1 ), |
217 | 1 | Some(max) if key == max => { |
218 | 1 | let key_raw: Bytes = encode_object(&key).into(); |
219 | 1 | store.atomic_write(Box::new( |
220 | 1 | [(Self::subkey().value(&key_raw), Some(encode_object(&value).into()))].into_iter(), |
221 | 1 | ))?0 ; |
222 | 1 | state.cache.push(key.clone(), value.clone()); |
223 | 1 | Ok(true) |
224 | | } |
225 | 0 | Some(_) => Ok(false), |
226 | 6 | None => Self::append_new_max(store.as_ref(), &mut state, key, value), |
227 | | } |
228 | 8 | }) |
229 | 8 | } |
230 | | |
231 | 16 | fn get_le(&self, key: K) -> StoreFuture<Result<Option<V>, KVStoreError>> { |
232 | 16 | let state = self.state.clone(); |
233 | 16 | let store = self.store.clone(); |
234 | 16 | Box::pin(async move { |
235 | 16 | let mut state = state.lock().await; |
236 | 16 | let first_idx = state.first_idx; |
237 | 16 | let next_idx = state.next_idx; |
238 | 16 | let Some(raw8 ) = store.get(Self::subkey().index(&first_idx)).await?0 else { |
239 | 8 | return Ok(None); |
240 | | }; |
241 | 8 | let mut l_key = decode_object(&raw).ok_or_else(|| anyhow!0 ("failed to decode sorted mapping index key"))?0 ; |
242 | 8 | if key < l_key { |
243 | 0 | return Ok(None); |
244 | 8 | } |
245 | 8 | let mut l = first_idx; |
246 | 8 | let mut r = next_idx; |
247 | 10 | while r > l + 1 { |
248 | 2 | let mid = (l + r) >> 1; |
249 | 2 | let Some(raw) = store.get(Self::subkey().index(&mid)).await?0 else { |
250 | 0 | return Ok(None); |
251 | | }; |
252 | 2 | let mid_key = |
253 | 2 | decode_object(&raw).ok_or_else(|| anyhow!0 ("failed to decode sorted mapping index key"))?0 ; |
254 | 2 | if key < mid_key { |
255 | 1 | r = mid |
256 | 1 | } else { |
257 | 1 | l = mid; |
258 | 1 | l_key = mid_key; |
259 | 1 | } |
260 | | } |
261 | 8 | Self::get_locked(store, &mut state, l_key).await |
262 | 16 | }) |
263 | 16 | } |
264 | | |
265 | 2 | fn get(&self, key: K) -> StoreFuture<Result<Option<V>, KVStoreError>> { |
266 | 2 | let state = self.state.clone(); |
267 | 2 | let store = self.store.clone(); |
268 | 2 | Box::pin(async move { |
269 | 2 | let mut state = state.lock().await; |
270 | 2 | Self::get_locked(store, &mut state, key).await |
271 | 2 | }) |
272 | 2 | } |
273 | | } |
274 | | |
275 | | impl< |
276 | | S: KVStore + 'static, |
277 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static, |
278 | | V: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static, |
279 | | > SortedMappingStore<S, K, V> |
280 | | { |
281 | 9 | fn append_new_max(store: &S, state: &mut SortedMappingState<K, V>, key: K, value: V) -> Result<bool, KVStoreError> { |
282 | 9 | if !state.max.clone().map(|max| key1 > max1 ).unwrap_or(true) { |
283 | 0 | return Ok(false); |
284 | 9 | } |
285 | | |
286 | 9 | let idx = state.next_idx; |
287 | 9 | let next_idx = idx + 1; |
288 | 9 | let key_raw: Bytes = encode_object(&key).into(); |
289 | 9 | let mapping = [ |
290 | 9 | (Self::subkey().value(&key_raw), Some(encode_object(&value).into())), |
291 | 9 | (Self::subkey().index(&idx), Some(key_raw.clone())), |
292 | 9 | (Self::subkey().max(), Some(key_raw)), |
293 | 9 | (Self::subkey().next_idx(), Some(encode_object(&next_idx).into())), |
294 | 9 | ] |
295 | 9 | .into_iter(); |
296 | 9 | store.atomic_write(Box::new(mapping))?0 ; |
297 | 9 | state.next_idx = next_idx; |
298 | 9 | state.cache.push(key.clone(), value.clone()); |
299 | 9 | state.max = Some(key); |
300 | 9 | Ok(true) |
301 | 9 | } |
302 | | |
303 | 10 | async fn get_locked( |
304 | 10 | store: Arc<S>, state: &mut SortedMappingState<K, V>, key: K, |
305 | 10 | ) -> Result<Option<V>, KVStoreError> { |
306 | 10 | if let Some(value9 ) = state.cache.get(&key).cloned() { |
307 | 9 | return Ok(Some(value)); |
308 | 1 | } |
309 | | |
310 | 1 | let Some(raw) = store.get(Self::subkey().value(&encode_object(&key))).await?0 else { |
311 | 0 | return Ok(None); |
312 | | }; |
313 | 1 | let value0 : V0 = decode_object(&raw).ok_or_else(|| anyhow!("failed to decode sorted mapping value"))?; |
314 | 0 | state.cache.push(key.clone(), value.clone()); |
315 | 0 | Ok(Some(value)) |
316 | 10 | } |
317 | | } |
318 | | |
319 | | #[cfg(test)] |
320 | | mod tests { |
321 | | use std::sync::Arc; |
322 | | |
323 | | use super::SortedMappingStore; |
324 | | use crate::MemDB; |
325 | | use lyquor_api::store::{KVStore, SortedMapping}; |
326 | | use lyquor_primitives::{Bytes, encode_object}; |
327 | | use lyquor_test::test; |
328 | | |
329 | | #[test(tokio::test)] |
330 | | async fn sorted_mapping_store_updates_existing_max_key() { |
331 | | let store = SortedMappingStore::new(MemDB::new(), 4.try_into().unwrap()) |
332 | | .await |
333 | | .unwrap(); |
334 | | assert!(store.append(7_u64, 10_u64).await.unwrap()); |
335 | | assert!(store.append_or_update_max(7, 11).await.unwrap()); |
336 | | assert_eq!(store.max_key().await, Some(7)); |
337 | | assert_eq!(store.get(7).await.unwrap(), Some(11)); |
338 | | assert_eq!(store.get_le(7).await.unwrap(), Some(11)); |
339 | | } |
340 | | |
341 | | #[test(tokio::test)] |
342 | | async fn sorted_mapping_rejects_corrupt_reads() { |
343 | | let db = MemDB::new(); |
344 | | db.atomic_write(Box::new( |
345 | | [( |
346 | | SortedMappingStore::<MemDB, u64, u64>::subkey().max(), |
347 | | Some(Bytes::new()), |
348 | | )] |
349 | | .into_iter(), |
350 | | )) |
351 | | .unwrap(); |
352 | | |
353 | | assert!( |
354 | | SortedMappingStore::<_, u64, u64>::new(db, 4.try_into().unwrap()) |
355 | | .await |
356 | | .is_err() |
357 | | ); |
358 | | |
359 | | let db = Arc::new(MemDB::new()); |
360 | | let store = SortedMappingStore::new(db.clone(), 4.try_into().unwrap()) |
361 | | .await |
362 | | .unwrap(); |
363 | | assert!(store.append(7_u64, 10_u64).await.unwrap()); |
364 | | |
365 | | let key_raw: Bytes = encode_object(&7_u64).into(); |
366 | | db.atomic_write(Box::new( |
367 | | [( |
368 | | SortedMappingStore::<Arc<MemDB>, u64, u64>::subkey().value(&key_raw), |
369 | | Some(Bytes::new()), |
370 | | )] |
371 | | .into_iter(), |
372 | | )) |
373 | | .unwrap(); |
374 | | |
375 | | let reopened: SortedMappingStore<_, u64, u64> = |
376 | | SortedMappingStore::new(db, 4.try_into().unwrap()).await.unwrap(); |
377 | | assert!(reopened.get(7_u64).await.is_err()); |
378 | | } |
379 | | } |