/home/runner/work/lyquor/lyquor/db/src/lib.rs
Line | Count | Source |
1 | | #![doc(html_no_source)] // remove it upon open-source |
2 | | |
3 | | use std::collections::HashMap; |
4 | | use std::path::Path; |
5 | | |
6 | | pub use rocksdb; |
7 | | use rocksdb::{OptimisticTransactionDB, Options}; |
8 | | |
9 | | use lyquor_api::{ |
10 | | anyhow::Context, |
11 | | kvstore::{KVStore, KVStoreError, Key, Value}, |
12 | | parking_lot::RwLock, |
13 | | }; |
14 | | |
15 | | pub struct MemDB(RwLock<HashMap<Bytes, Bytes>>); |
16 | | |
17 | | impl MemDB { |
18 | 219 | pub fn new() -> Self { |
19 | 219 | Self(RwLock::new(HashMap::new())) |
20 | 219 | } |
21 | | } |
22 | | |
23 | | impl KVStore for MemDB { |
24 | 29.1k | fn atomic_write<'a>( |
25 | 29.1k | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
26 | 29.1k | ) -> Result<(), KVStoreError> { |
27 | 29.1k | let mut map = self.0.write(); |
28 | 73.8k | for (k44.7k , v44.7k ) in changes { |
29 | 44.7k | match v { |
30 | 44.7k | Some(v) => { |
31 | 44.7k | map.insert(k.consolidated(), v); |
32 | 44.7k | } |
33 | 2 | None => { |
34 | 2 | map.remove(&k.consolidated()); |
35 | 2 | } |
36 | | } |
37 | | } |
38 | 29.1k | Ok(()) |
39 | 29.1k | } |
40 | | |
41 | 464k | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
42 | 464k | Ok(self.0.read().get(&key.consolidated()).map(|v| v463k .clone463k ())) |
43 | 464k | } |
44 | | } |
45 | | |
46 | | pub struct RocksDB(rocksdb::OptimisticTransactionDB); |
47 | | |
48 | | impl KVStore for RocksDB { |
49 | 5 | fn atomic_write<'a>( |
50 | 5 | &'a self, changes: Box<dyn Iterator<Item = (Key, Option<Value>)> + 'a>, |
51 | 5 | ) -> Result<(), KVStoreError> { |
52 | 5 | let mut wb = rocksdb::WriteBatchWithTransaction::default(); |
53 | 28 | for (k23 , v23 ) in changes { |
54 | 23 | match v { |
55 | 23 | Some(v) => wb.put(k.consolidated(), v), |
56 | 0 | None => wb.delete(k.consolidated()), |
57 | | } |
58 | | } |
59 | 5 | Ok(self.0.write(wb).context("RocksDB write error.")?0 ) |
60 | 5 | } |
61 | | |
62 | 138 | fn get(&self, key: Key) -> Result<Option<Value>, KVStoreError> { |
63 | 138 | Ok(self |
64 | 138 | .0 |
65 | 138 | .get(key.consolidated()) |
66 | 138 | .context("RocksDB read error.")?0 |
67 | 138 | .map(|v| v129 .into129 ())) |
68 | 138 | } |
69 | | } |
70 | | |
71 | | impl RocksDB { |
72 | 2 | pub fn new(path: &Path) -> Result<Self, KVStoreError> { |
73 | 2 | let mut opts = Options::default(); |
74 | 2 | opts.create_if_missing(true); |
75 | 2 | let db = OptimisticTransactionDB::open(&opts, path).context("RocksDB open error")?0 ; |
76 | 2 | Ok(Self(db)) |
77 | 2 | } |
78 | | } |
79 | | |
80 | | use lyquor_api::subkey_builder; |
81 | | use lyquor_primitives::{Bytes, decode_object, encode_object}; |
82 | | use serde::{Deserialize, Serialize}; |
83 | | use std::hash::Hash; |
84 | | |
85 | | pub struct SortedMappingStore< |
86 | | S: KVStore, |
87 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
88 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
89 | | > { |
90 | | cache: lru::LruCache<K, V>, |
91 | | max: Option<K>, |
92 | | first_idx: u64, |
93 | | next_idx: u64, |
94 | | store: S, |
95 | | } |
96 | | |
97 | | subkey_builder!(SortedMappingSubkey( |
98 | | ([0x00])-value(&[u8]) => Key, |
99 | | ([0x01])-index(&u64) => Key, |
100 | | ([0x02])-max() => Key, |
101 | | ([0x03])-first_idx() => Key, |
102 | | ([0x04])-next_idx() => Key, |
103 | | )); |
104 | | |
105 | | impl< |
106 | | S: KVStore, |
107 | | K: Hash + Eq + Ord + Serialize + for<'a> Deserialize<'a> + Clone, |
108 | | V: Serialize + for<'a> Deserialize<'a> + Clone, |
109 | | > SortedMappingStore<S, K, V> |
110 | | { |
111 | 0 | fn subkey() -> &'static SortedMappingSubkey { |
112 | | static KEY: std::sync::OnceLock<SortedMappingSubkey> = std::sync::OnceLock::new(); |
113 | 0 | KEY.get_or_init(|| SortedMappingSubkey::new(Value::new().into())) |
114 | 0 | } |
115 | | |
116 | 0 | pub fn new(store: S, cache_size: std::num::NonZeroUsize) -> Result<Self, KVStoreError> { |
117 | 0 | let max: Option<K> = match store.get(Self::subkey().max())? { |
118 | 0 | Some(raw) => decode_object(&raw), |
119 | 0 | None => None, |
120 | | }; |
121 | 0 | let first_idx: u64 = store |
122 | 0 | .get(Self::subkey().first_idx())? |
123 | 0 | .and_then(|raw| decode_object(&raw)) |
124 | 0 | .unwrap_or(0); |
125 | 0 | let next_idx: u64 = store |
126 | 0 | .get(Self::subkey().next_idx())? |
127 | 0 | .and_then(|raw| decode_object(&raw)) |
128 | 0 | .unwrap_or(0); |
129 | | |
130 | 0 | Ok(Self { |
131 | 0 | cache: lru::LruCache::new(cache_size), |
132 | 0 | max, |
133 | 0 | first_idx, |
134 | 0 | next_idx, |
135 | 0 | store, |
136 | 0 | }) |
137 | 0 | } |
138 | | |
139 | 0 | pub fn max_key(&self) -> Option<K> { |
140 | 0 | self.max.clone() |
141 | 0 | } |
142 | | |
143 | 0 | pub fn append(&mut self, key: K, value: V) -> Result<bool, KVStoreError> { |
144 | 0 | if !self.max_key().map(|max| key > max).unwrap_or(true) { |
145 | 0 | return Ok(false) |
146 | 0 | } |
147 | | |
148 | 0 | let idx = self.next_idx; |
149 | 0 | self.next_idx += 1; |
150 | 0 | self.cache.push(key.clone(), value.clone()); |
151 | 0 | self.max = Some(key.clone()); |
152 | 0 | let key_raw: Bytes = encode_object(&key).into(); |
153 | 0 | let mapping = [ |
154 | 0 | (Self::subkey().value(&key_raw), Some(encode_object(&value).into())), |
155 | 0 | (Self::subkey().index(&idx), Some(key_raw.clone())), |
156 | 0 | (Self::subkey().max(), Some(key_raw)), |
157 | 0 | (Self::subkey().next_idx(), Some(encode_object(&self.next_idx).into())), |
158 | 0 | ] |
159 | 0 | .into_iter(); |
160 | 0 | self.store.atomic_write(Box::new(mapping))?; |
161 | 0 | Ok(true) |
162 | 0 | } |
163 | | |
164 | 0 | pub fn get_le(&mut self, key: K) -> Option<V> { |
165 | 0 | let mut l = self.first_idx; |
166 | 0 | let mut r = self.next_idx; |
167 | 0 | let mut l_key = self |
168 | 0 | .store |
169 | 0 | .get(Self::subkey().index(&l)) |
170 | 0 | .ok() |
171 | 0 | .flatten() |
172 | 0 | .and_then(|b| decode_object(&b))?; |
173 | 0 | if key < l_key { |
174 | 0 | return None |
175 | 0 | } |
176 | 0 | while r > l + 1 { |
177 | 0 | let mid = (l + r) >> 1; |
178 | 0 | let mid_key: K = self |
179 | 0 | .store |
180 | 0 | .get(Self::subkey().index(&mid)) |
181 | 0 | .ok() |
182 | 0 | .flatten() |
183 | 0 | .and_then(|b| decode_object(&b))?; |
184 | 0 | if key < mid_key { |
185 | 0 | r = mid |
186 | 0 | } else { |
187 | 0 | l = mid; |
188 | 0 | l_key = mid_key; |
189 | 0 | } |
190 | | } |
191 | 0 | let value = self.get(l_key.clone()); |
192 | 0 | if let Some(value) = &value { |
193 | 0 | if l_key != key { |
194 | 0 | self.cache.push(key, value.clone()); |
195 | 0 | } |
196 | 0 | } |
197 | 0 | value |
198 | 0 | } |
199 | | |
200 | 0 | pub fn get(&mut self, key: K) -> Option<V> { |
201 | 0 | match self.cache.get(&key) { |
202 | 0 | Some(value) => Some(value.clone()), |
203 | | None => { |
204 | 0 | let value: V = self |
205 | 0 | .store |
206 | 0 | .get(Self::subkey().value(&encode_object(&key))) |
207 | 0 | .ok() |
208 | 0 | .flatten() |
209 | 0 | .and_then(|b| decode_object(&b))?; |
210 | 0 | self.cache.push(key.clone(), value.clone()); |
211 | 0 | Some(value) |
212 | | } |
213 | | } |
214 | 0 | } |
215 | | } |