/home/runner/work/lyquor/lyquor/platform/hosting/src/lvm.rs
Line | Count | Source |
1 | | use std::sync::Arc; |
2 | | use std::time::{Duration, SystemTime, UNIX_EPOCH}; |
3 | | |
4 | | use async_trait::async_trait; |
5 | | use lyquor_api::{ |
6 | | anyhow, |
7 | | call::CallParams, |
8 | | interface::{ |
9 | | FetchOracleInfo, GetAddressByEd25519, GetAddressByEd25519Service, GetEd25519ByAddress, |
10 | | GetEd25519ByAddressService, GetEthContractAddr, GetEthContractAddrService, InterCallService, OnInterCall, |
11 | | OracleInfoService, SubmitCall, SubmitService, |
12 | | }, |
13 | | store::{KVStore, KVStoreError, StoreFuture}, |
14 | | }; |
15 | | use lyquor_crypto as crypto; |
16 | | use lyquor_primitives::oracle::OracleTarget; |
17 | | use lyquor_primitives::{ |
18 | | Address, Bytes, ConsoleSink, LyquidID, LyquidNumber, NodeID, SequenceBackendID, encode_object, |
19 | | }; |
20 | | use lyquor_upc::{CallHeader, MulticastContext, RemoteObj}; |
21 | | use lyquor_vm::{RuntimeEnv, lyquid}; |
22 | | use thiserror::Error; |
23 | | use tokio::sync::mpsc; |
24 | | use tower::{ServiceExt, util::BoxCloneSyncService}; |
25 | | |
26 | | /// Per-Lyquid stores required to construct a VM instance. |
27 | | pub struct ProcessStores { |
28 | | pub vm: lyquor_vm::InstanceStores, |
29 | | pub progress: Arc<dyn KVStore>, |
30 | | } |
31 | | |
32 | | /// Factory that returns namespaced stores for a Lyquid process. |
33 | | pub type ProcessStoreFactory = Arc<dyn Fn(LyquidID) -> StoreFuture<Result<ProcessStores, KVStoreError>> + Send + Sync>; |
34 | | |
35 | | /// Sequencing services exposed to hosted Lyquid VM host APIs. |
36 | | #[derive(Clone)] |
37 | | pub struct Sequencer { |
38 | | pub inter: InterCallService, |
39 | | pub submit: SubmitService, |
40 | | pub fetch_oracle_info: OracleInfoService, |
41 | | } |
42 | | |
43 | | /// Bartender lookup services exposed to hosted Lyquid VM host APIs. |
44 | | #[derive(Clone, Debug)] |
45 | | pub struct Bartender { |
46 | | pub get_address_by_ed25519: GetAddressByEd25519Service, |
47 | | pub get_ed25519_by_address: GetEd25519ByAddressService, |
48 | | pub get_eth_contract_address: GetEthContractAddrService, |
49 | | pub sequence_backend_id: SequenceBackendID, |
50 | | } |
51 | | |
52 | | /// Console output emitted by a hosted Lyquid. |
53 | | #[derive(Debug, Clone)] |
54 | | pub struct ConsoleOutput { |
55 | | pub sink: ConsoleSink, |
56 | | pub data: String, |
57 | | } |
58 | | |
59 | | /// Trigger registration or removal requested by a hosted Lyquid (see [`lyquor_primitives::TriggerMode`]). |
60 | | #[derive(Debug, Clone)] |
61 | | pub struct Trigger { |
62 | | pub group: String, |
63 | | pub method: String, |
64 | | pub input: Vec<u8>, |
65 | | pub version: LyquidNumber, |
66 | | pub mode: lyquor_primitives::TriggerMode, |
67 | | } |
68 | | |
69 | | /// Service used by VM host APIs to register or stop triggers. |
70 | | pub type TriggerService = BoxCloneSyncService<Trigger, (), anyhow::Error>; |
71 | | /// Shared VM instance type for hosting runtime environments. |
72 | | pub type Instance = lyquor_vm::Instance<Env>; |
73 | | |
74 | | // TODO: make those configurable (from Lyquid? and/or with some guard at Node level) |
75 | | const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); |
76 | | const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); |
77 | | const HTTP_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90); |
78 | | const HTTP_POOL_MAX_IDLE_PER_HOST: usize = 8; |
79 | | const HTTP_TCP_KEEPALIVE: Duration = Duration::from_secs(60); |
80 | | |
81 | | #[derive(Clone)] |
82 | | struct HttpClient { |
83 | | client: Result<reqwest::Client, String>, |
84 | | } |
85 | | |
86 | | impl HttpClient { |
87 | 39 | fn new() -> Self { |
88 | 39 | let client = reqwest::Client::builder() |
89 | 39 | .connect_timeout(HTTP_CONNECT_TIMEOUT) |
90 | 39 | .timeout(HTTP_REQUEST_TIMEOUT) |
91 | 39 | .pool_max_idle_per_host(HTTP_POOL_MAX_IDLE_PER_HOST) |
92 | 39 | .pool_idle_timeout(Some(HTTP_POOL_IDLE_TIMEOUT)) |
93 | 39 | .tcp_keepalive(Some(HTTP_TCP_KEEPALIVE)) |
94 | 39 | .build() |
95 | 39 | .map_err(|err| err0 .to_string0 ()); |
96 | | |
97 | 39 | if let Err(ref err0 ) = client { |
98 | 0 | tracing::warn!(error = %err, "failed to initialize instance http client"); |
99 | | } else { |
100 | 39 | tracing::trace!("initialized instance http client"); |
101 | | } |
102 | | |
103 | 39 | Self { client } |
104 | 39 | } |
105 | | |
106 | 4 | fn client(&self) -> Result<&reqwest::Client, lyquid::LyquidError> { |
107 | 4 | self.client |
108 | 4 | .as_ref() |
109 | 4 | .map_err(|err| lyquid::LyquidError::LyquorRuntime(format!0 ("http client init failed: {err}"))) |
110 | 4 | } |
111 | | } |
112 | | |
113 | | #[derive(Clone)] |
114 | | struct NodeEnv { |
115 | | upc: lyquor_upc::Requester<lyquor_vm::instance::Error>, |
116 | | node_id: NodeID, |
117 | | sequencer: Option<Sequencer>, |
118 | | sig_provider: Arc<crypto::SigProvider>, |
119 | | bartender: Option<Bartender>, |
120 | | } |
121 | | |
122 | | #[derive(Clone)] |
123 | | struct LyquidEnv { |
124 | | lyquid_id: LyquidID, |
125 | | console: Option<mpsc::Sender<ConsoleOutput>>, |
126 | | http_client: HttpClient, |
127 | | trigger: Option<TriggerService>, |
128 | | } |
129 | | |
130 | | /// Runtime environment captured by hosted Lyquid VM instances. |
131 | | #[derive(Clone)] |
132 | | pub struct Env { |
133 | | node: NodeEnv, |
134 | | lyquid: LyquidEnv, |
135 | | } |
136 | | |
137 | | impl Env { |
138 | 2 | fn sequencer(&self) -> Option<&Sequencer> { |
139 | 2 | self.node.sequencer.as_ref() |
140 | 2 | } |
141 | | } |
142 | | |
143 | | impl lyquor_vm::RuntimeEnv for Env { |
144 | 257 | fn get_lyquid_id(&self) -> LyquidID { |
145 | 257 | self.lyquid.lyquid_id |
146 | 257 | } |
147 | | |
148 | 169 | fn get_node_id(&self) -> NodeID { |
149 | 169 | self.node.node_id |
150 | 169 | } |
151 | | } |
152 | | |
153 | | /// Errors returned while constructing or wiring hosted VM runtime state. |
154 | | #[derive(Debug, Error)] |
155 | | pub enum VmRuntimeError { |
156 | | #[error("VM: {0}")] |
157 | | VM(#[from] lyquor_vm::Error), |
158 | | #[error("UPC: {0}")] |
159 | | UPC(#[from] lyquor_upc::Error<lyquor_vm::instance::Error>), |
160 | | } |
161 | | |
162 | | type HostAPIFunc = lyquor_vm::barrel::HostAPIFunc<lyquor_vm::instance::Host<Env>>; |
163 | | |
164 | | lazy_static::lazy_static! { |
165 | 134 | static ref LVM_CONSOLE: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; sink: ConsoleSink, data: String) { |
166 | | // Silently drop the message if the output is not available. |
167 | 134 | if let Some(console0 ) = &host.env.lyquid.console { |
168 | 0 | console.send(ConsoleOutput { |
169 | 0 | sink, |
170 | 0 | data, |
171 | 0 | }).await.ok(); |
172 | 134 | } |
173 | 134 | Ok(()) |
174 | | }); |
175 | | |
176 | | static ref LVM_UPC: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
177 | 7 | target: LyquidID, group: Option<String>, method: String, input: Vec<u8>, client_params: Option<Bytes>, timeout_ms: Option<u64>) -> Vec<u8> { |
178 | 7 | if host.category != lyquor_primitives::StateCategory::Instance { |
179 | 1 | return Err(lyquid::LyquidError::LyquorRuntime( |
180 | 1 | "`universal_procedural_call` only allowed in instance functions".to_string(), |
181 | 1 | )); |
182 | 6 | } |
183 | | |
184 | 6 | tracing::trace!("Host handling UPC {:?} -> {:?} (group={:?}, method={}, input=<{} bytes>, client_params={:?})", |
185 | 0 | host.env.lyquid.lyquid_id, target, group, method, input.len(), client_params); |
186 | | |
187 | 6 | let number = if target == host.env.lyquid.lyquid_id { |
188 | 3 | Some(*host.version.read().await) |
189 | | } else { |
190 | 3 | None |
191 | | }; |
192 | 6 | let timeout = timeout_ms |
193 | 6 | .map(Duration::from_millis) |
194 | 6 | .unwrap_or_else(|| tokio::time::Duration::from_secs(lyquor_upc::message::UPC_PROCEDURE_TIMEOUT_DEFAULT)); |
195 | 6 | host.env.node.upc.call(CallHeader { |
196 | 6 | lyquid: target, |
197 | 6 | method, |
198 | 6 | number, |
199 | 6 | group, |
200 | 6 | timeout, |
201 | 6 | context: None, |
202 | 6 | }, RemoteObj::from_encoded(input.into()), client_params, None).await |
203 | 6 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!0 ("UPC failed: {}", e))) |
204 | 6 | .map(|r| Vec::from(r.to_encoded())) |
205 | | }); |
206 | | |
207 | | static ref LVM_INTER_LYQUID_CALL: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
208 | 2 | callee: LyquidID, method: String, input: Vec<u8>) -> Vec<u8> { |
209 | 2 | if host.category != lyquor_primitives::StateCategory::Network { |
210 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
211 | 0 | "`inter_lyquid_call` only allowed in network functions".to_string(), |
212 | 0 | )); |
213 | 2 | } |
214 | | |
215 | 2 | let pos = *host.chain_pos.read().await; |
216 | 2 | if let Some(sys) = host.env.sequencer() { |
217 | 2 | let caller = host.env.lyquid.lyquid_id; |
218 | 2 | sys.inter |
219 | 2 | .clone() |
220 | 2 | .oneshot(OnInterCall { |
221 | 2 | origin: caller.into(), // TODO: pass on the real origin |
222 | 2 | caller, |
223 | 2 | callee, |
224 | 2 | method, |
225 | 2 | input, |
226 | 2 | pos, |
227 | 2 | }) |
228 | 2 | .await |
229 | 2 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!0 ("error during service call: {:?}", e))) |
230 | | } else { |
231 | 0 | Err(lyquid::LyquidError::LyquorRuntime("InterCallSys not available".into())) |
232 | | } |
233 | | }); |
234 | | |
235 | 0 | static ref LVM_SUBMIT_CALL: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; call: CallParams, signed: bool) -> Vec<u8> { |
236 | 0 | if host.category != lyquor_primitives::StateCategory::Instance { |
237 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
238 | 0 | "`submit_call` only allowed in instance functions".to_string(), |
239 | 0 | )); |
240 | 0 | } |
241 | | |
242 | 0 | let lyquid = host.env.lyquid.lyquid_id; |
243 | 0 | if let Some(sys) = host.env.sequencer() { |
244 | 0 | sys.submit.clone().oneshot(SubmitCall { |
245 | 0 | lyquid, |
246 | 0 | params: call, |
247 | 0 | signed, |
248 | 0 | }).await.map_err(|e| |
249 | 0 | lyquid::LyquidError::LyquorRuntime(format!("submit call error: {:?}", e)) |
250 | | ) |
251 | | } else { |
252 | 0 | Err(lyquid::LyquidError::LyquorRuntime("submit call recipient unavailable".into())) |
253 | | } |
254 | | }); |
255 | | |
256 | | static ref LVM_SIGN: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
257 | | msg: Bytes, |
258 | 0 | cipher: lyquor_primitives::Cipher) -> lyquor_primitives::Signature { |
259 | 0 | if host.category != lyquor_primitives::StateCategory::Instance { |
260 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
261 | 0 | "`sign` only allowed in instance functions".to_string(), |
262 | 0 | )); |
263 | 0 | } |
264 | | |
265 | 0 | let sig = host.env.node.sig_provider.sign(cipher, msg).await.unwrap_or_else(Bytes::new); |
266 | 0 | Ok(sig) |
267 | | }); |
268 | | |
269 | | static ref LVM_VERIFY: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
270 | | msg: Bytes, |
271 | | cipher: lyquor_primitives::Cipher, |
272 | | sig: Bytes, |
273 | 0 | pubkey: Bytes) -> bool { |
274 | 0 | Ok(host.env.node.sig_provider.verify(cipher, msg, sig, pubkey).await) |
275 | | }); |
276 | | |
277 | 0 | static ref LVM_RNG: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; length: usize) -> Vec<u8> { |
278 | 0 | if host.category != lyquor_primitives::StateCategory::Instance { |
279 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
280 | 0 | "random_bytes only allowed in instance functions (for now)".to_string(), |
281 | 0 | )); |
282 | 0 | } |
283 | | |
284 | 0 | let len = if length > 256 { |
285 | 0 | 256 |
286 | | } else { |
287 | 0 | length |
288 | | }; |
289 | | use rand::Rng; |
290 | 0 | let mut bytes = vec![0u8; len]; |
291 | 0 | rand::rng().fill_bytes(&mut bytes); |
292 | 0 | Ok(bytes) |
293 | | }); |
294 | | |
295 | 0 | static ref LVM_GET_TIME: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> u64 { |
296 | 0 | if host.category != lyquor_primitives::StateCategory::Instance { |
297 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
298 | 0 | "`systime` only allowed in instance functions".to_string(), |
299 | 0 | )); |
300 | 0 | } |
301 | | |
302 | 0 | let now = SystemTime::now() |
303 | 0 | .duration_since(UNIX_EPOCH) |
304 | 0 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("System time before unix epoch: {e}.")))?; |
305 | 0 | u64::try_from(now.as_millis()) |
306 | 0 | .map_err(|_| lyquid::LyquidError::LyquorRuntime("Timestamp overflow.".to_string())) |
307 | | }); |
308 | | |
309 | | static ref LVM_HTTP_REQUEST: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
310 | | request: lyquid::http::Request, |
311 | 4 | options: Option<lyquid::http::RequestOptions>) -> lyquid::http::Response { |
312 | 4 | if host.category != lyquor_primitives::StateCategory::Instance { |
313 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
314 | 0 | "`http_request` only allowed in instance functions".to_string(), |
315 | 0 | )); |
316 | 4 | } |
317 | | |
318 | | let lyquid::http::Request { |
319 | 4 | method, |
320 | 4 | url, |
321 | 4 | headers, |
322 | 4 | body, |
323 | | } = request; |
324 | | |
325 | 4 | let method = match method { |
326 | 4 | lyquid::http::Method::Get => reqwest::Method::GET, |
327 | 0 | lyquid::http::Method::Head => reqwest::Method::HEAD, |
328 | 0 | lyquid::http::Method::Post => reqwest::Method::POST, |
329 | 0 | lyquid::http::Method::Put => reqwest::Method::PUT, |
330 | 0 | lyquid::http::Method::Delete => reqwest::Method::DELETE, |
331 | 0 | lyquid::http::Method::Connect => reqwest::Method::CONNECT, |
332 | 0 | lyquid::http::Method::Options => reqwest::Method::OPTIONS, |
333 | 0 | lyquid::http::Method::Trace => reqwest::Method::TRACE, |
334 | 0 | lyquid::http::Method::Patch => reqwest::Method::PATCH, |
335 | 0 | lyquid::http::Method::Other(method) => reqwest::Method::from_bytes(method.as_bytes()).map_err(|_| { |
336 | 0 | lyquid::LyquidError::LyquorRuntime(format!("invalid http method: {method}")) |
337 | 0 | })?, |
338 | | }; |
339 | | |
340 | 4 | let http_client = host.env.lyquid.http_client.clone(); |
341 | 4 | tracing::debug!(method = %method, url = %url, "dispatching http_request"); |
342 | | |
343 | 4 | let client = http_client.client()?0 ; |
344 | 4 | let mut req = client.request(method, url); |
345 | 4 | if let Some(options0 ) = options |
346 | 0 | && let Some(timeout_ms) = options.timeout_ms |
347 | 0 | { |
348 | 0 | req = req.timeout(Duration::from_millis(timeout_ms)); |
349 | 4 | } |
350 | 4 | for header0 in headers { |
351 | 0 | let name = reqwest::header::HeaderName::from_bytes(header.name.as_bytes()).map_err(|e| { |
352 | 0 | lyquid::LyquidError::LyquorRuntime(format!("invalid http header name: {e}")) |
353 | 0 | })?; |
354 | 0 | let value = reqwest::header::HeaderValue::from_bytes(&header.value).map_err(|e| { |
355 | 0 | lyquid::LyquidError::LyquorRuntime(format!("invalid http header value: {e}")) |
356 | 0 | })?; |
357 | 0 | req = req.header(name, value); |
358 | | } |
359 | 4 | if let Some(body0 ) = body { |
360 | 0 | req = req.body(body); |
361 | 4 | } |
362 | | |
363 | 4 | let response = req |
364 | 4 | .send() |
365 | 4 | .await |
366 | 4 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!0 ("http request failed: {e}")))?0 ; |
367 | 4 | let status = response.status().as_u16(); |
368 | 4 | let headers = response |
369 | 4 | .headers() |
370 | 4 | .iter() |
371 | 4 | .map(|(name, value)| lyquid::http::Header { |
372 | 8 | name: name.as_str().to_string(), |
373 | 8 | value: value.as_bytes().to_vec(), |
374 | 8 | }) |
375 | 4 | .collect(); |
376 | 4 | let body = response |
377 | 4 | .bytes() |
378 | 4 | .await |
379 | 4 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!0 ("http body read failed: {e}")))?0 ; |
380 | | |
381 | 4 | Ok(lyquid::http::Response { |
382 | 4 | status, |
383 | 4 | headers, |
384 | 4 | body: body.to_vec(), |
385 | 4 | }) |
386 | | }); |
387 | | |
388 | 0 | static ref LVM_GET_ED25519_QXY: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; pubkey: [u8; 32]) -> (lyquor_primitives::U256, lyquor_primitives::U256) { |
389 | 0 | let ret = crypto::ed25519::SCLPubkey::new(pubkey); |
390 | 0 | Ok((ret.qx, ret.qy)) |
391 | | }); |
392 | | |
393 | 0 | static ref LVM_GET_ADDRESS_BY_ED25519: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; pubkey: [u8; 32]) -> Option<Address> { |
394 | | // NOTE: The result of this function is deterministic for network function, because we use the |
395 | | // same chain position here for the address retrieval. |
396 | 0 | let pos = *host.chain_pos.read().await; |
397 | 0 | host.env.node.bartender |
398 | 0 | .as_ref() |
399 | 0 | .ok_or(lyquid::LyquidError::LyquorRuntime("GetAddressByEd25519 function not supported.".into()))? |
400 | 0 | .get_address_by_ed25519.clone() |
401 | 0 | .oneshot(GetAddressByEd25519 { |
402 | 0 | pos, |
403 | 0 | id: pubkey.into(), |
404 | 0 | }) |
405 | 0 | .await |
406 | 0 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("GetAddressByEd25519 failed: {}", e))) |
407 | | }); |
408 | | |
409 | 0 | static ref LVM_GET_ED25519_BY_ADDRESS: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; address: Address) -> Option<NodeID> { |
410 | 0 | let pos = *host.chain_pos.read().await; |
411 | 0 | host.env.node.bartender |
412 | 0 | .as_ref() |
413 | 0 | .ok_or(lyquid::LyquidError::LyquorRuntime("GetEd25519ByAddress function not supported.".into()))? |
414 | 0 | .get_ed25519_by_address.clone() |
415 | 0 | .oneshot(GetEd25519ByAddress { |
416 | 0 | pos, |
417 | 0 | address, |
418 | 0 | }) |
419 | 0 | .await |
420 | 0 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("GetEd25519ByAddress failed: {}", e))) |
421 | | }); |
422 | | |
423 | 12 | static ref LVM_ETH_CONTRACT: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> Option<Address> { |
424 | | // NOTE: The result of this function is deterministic for network function, because we use the |
425 | | // same chain position here for the address retrieval. |
426 | 12 | let pos = *host.chain_pos.read().await; |
427 | 12 | let id = host.env.get_lyquid_id(); |
428 | 12 | host.env.node.bartender |
429 | 12 | .as_ref() |
430 | 12 | .ok_or(lyquid::LyquidError::LyquorRuntime("eth_contract function not supported.".into()))?0 |
431 | | .get_eth_contract_address |
432 | 12 | .clone() |
433 | 12 | .oneshot(GetEthContractAddr { pos, id }) |
434 | 12 | .await |
435 | 12 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!0 ("GetEthContractAddr failed: {}", e))) |
436 | | }); |
437 | | |
438 | 13 | static ref LVM_SEQUENCE_BACKEND_ID: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> SequenceBackendID { |
439 | 13 | host.env |
440 | 13 | .node |
441 | 13 | .bartender |
442 | 13 | .as_ref() |
443 | 13 | .map(|b| b.sequence_backend_id) |
444 | 13 | .ok_or(lyquid::LyquidError::LyquorRuntime("sequence_backend_id function not supported.".into())) |
445 | | }); |
446 | | |
447 | | static ref LVM_FETCH_ORACLE_INFO: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; |
448 | | topic: String, |
449 | | target: OracleTarget, |
450 | 0 | full_config: bool) -> Option<lyquor_primitives::oracle::OracleEpochInfo> { |
451 | 0 | if host.category != lyquor_primitives::StateCategory::Instance { |
452 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
453 | 0 | "`fetch_oracle_info` only allowed in instance functions".to_string(), |
454 | 0 | )); |
455 | 0 | } |
456 | 0 | let local_seq_id = host |
457 | 0 | .env |
458 | 0 | .node |
459 | 0 | .bartender |
460 | 0 | .as_ref() |
461 | 0 | .ok_or(lyquid::LyquidError::LyquorRuntime("fetch_oracle_info not supported.".into()))? |
462 | | .sequence_backend_id; |
463 | 0 | if target.seq_id != local_seq_id { |
464 | 0 | return Ok(None); |
465 | 0 | } |
466 | | |
467 | 0 | let query = FetchOracleInfo { |
468 | 0 | pos: *host.chain_pos.read().await, |
469 | 0 | topic, |
470 | 0 | target, |
471 | 0 | full_config, |
472 | | }; |
473 | | // NOTE: Query routing is currently local-only and does not dispatch by |
474 | | // `query.target.seq_id`. Cross-backend settlement should route by seq_id. |
475 | 0 | host.env |
476 | 0 | .sequencer() |
477 | 0 | .ok_or(lyquid::LyquidError::LyquorRuntime( |
478 | 0 | "fetch_oracle_info service unavailable.".into(), |
479 | 0 | ))? |
480 | | .fetch_oracle_info |
481 | 0 | .clone() |
482 | 0 | .oneshot(query) |
483 | 0 | .await |
484 | 0 | .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("FetchOracleInfo failed: {}", e))) |
485 | | }); |
486 | | |
487 | 0 | static ref LVM_TRIGGER: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; group: String, method: String, input: Vec<u8>, mode: lyquor_primitives::TriggerMode) -> () { |
488 | | // NOTE: Network and instance functions can both trigger instance functions directly, but |
489 | | // commit-mode triggers are only valid from network functions. This function should not |
490 | | // return anything. |
491 | 0 | if matches!(mode, lyquor_primitives::TriggerMode::Commit) |
492 | 0 | && host.category != lyquor_primitives::StateCategory::Network |
493 | | { |
494 | 0 | return Err(lyquid::LyquidError::LyquorRuntime( |
495 | 0 | "`TriggerMode::Commit` only allowed in network functions".to_string(), |
496 | 0 | )); |
497 | 0 | } |
498 | | |
499 | 0 | let version = *host.version.read().await; |
500 | 0 | host.env.lyquid.trigger |
501 | 0 | .as_ref() |
502 | 0 | .ok_or(lyquid::LyquidError::LyquorRuntime("`trigger` not supported.".into()))? |
503 | 0 | .clone() |
504 | 0 | .oneshot(Trigger { |
505 | 0 | group, |
506 | 0 | method, |
507 | 0 | input, |
508 | 0 | version, |
509 | 0 | mode, |
510 | 0 | }) |
511 | 0 | .await |
512 | 0 | .map_err(|_| lyquid::LyquidError::LyquorRuntime("Trigger failed: trigger queue is closed".into()))?; |
513 | 0 | Ok(()) |
514 | | }); |
515 | | } |
516 | | |
517 | | struct Endpoint { |
518 | | instance: Instance, |
519 | | } |
520 | | |
521 | | impl std::fmt::Debug for Endpoint { |
522 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
523 | 0 | let id = self.instance.id(); |
524 | 0 | f.debug_struct("Endpoint").field("instance", &id).finish() |
525 | 0 | } |
526 | | } |
527 | | |
528 | | #[derive(Clone)] |
529 | | struct UpcCacheEntry { |
530 | | barrel: Arc<lyquor_vm::instance::Barrel<Env>>, |
531 | | cache_ptr: Option<lyquid::upc::CachePtr>, |
532 | | } |
533 | | |
534 | | #[async_trait] |
535 | | impl lyquor_upc::Caller for Endpoint { |
536 | | type Error = lyquor_vm::instance::Error; |
537 | | |
538 | | #[tracing::instrument(level = "trace", skip(context))] |
539 | | async fn on_prepare( |
540 | | &self, header: &mut CallHeader, client_params: Bytes, context: &MulticastContext<Self::Error>, |
541 | | ) -> Result<Vec<NodeID>, Self::Error> { |
542 | | let input = lyquid::upc::PrepareInput { |
543 | | client_params: client_params.clone(), |
544 | | }; |
545 | | let barrel = { |
546 | | let number = match header.number { |
547 | | Some(n) => n, |
548 | | // NOTE: Only execute latest_number() if not given. |
549 | | None => self.instance.latest_number().await, |
550 | | }; |
551 | | // Write back exact lyquor number to header |
552 | | header.number = Some(number); |
553 | | self.instance |
554 | | .get_instance_barrel(number) |
555 | | .await |
556 | 0 | .map_err(|e| lyquor_vm::instance::Error::Setup(e.into()))? |
557 | | }; |
558 | | |
559 | | let call_params = CallParams::builder() |
560 | | .caller(Address::ZERO) |
561 | | .method(header.method.clone()) |
562 | | .input(encode_object(&input).into()) |
563 | | .group({ |
564 | | let mut group = lyquor_primitives::GROUP_UPC_PREPARE.to_string(); |
565 | | if let Some(ref suffix) = header.group { |
566 | | group.push_str("::"); |
567 | | group.push_str(suffix); |
568 | | } |
569 | | group |
570 | | }) |
571 | | .build(); |
572 | | let call = barrel.call_func_decoded( |
573 | | call_params, |
574 | | Some(( |
575 | | lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcPrepare), |
576 | | self.instance.scheduler(), |
577 | | )), |
578 | | ); |
579 | 0 | let result: Result<lyquid::upc::PrepareOutput, Self::Error> = call.await.map_err(|e| e.into()); |
580 | | let result = result?; |
581 | | |
582 | | { |
583 | | let mut cache = context.cache.lock().await; |
584 | | *cache = Some(Box::new(UpcCacheEntry { |
585 | | barrel, |
586 | | cache_ptr: result.cache, |
587 | | })); |
588 | | } |
589 | | |
590 | | Ok(result.result) |
591 | | } |
592 | | |
593 | | #[tracing::instrument(level = "trace", skip(context, returned))] |
594 | | async fn on_return( |
595 | | &self, header: CallHeader, from: NodeID, context: &MulticastContext<Self::Error>, |
596 | | returned: Result<RemoteObj, lyquor_upc::Error<Self::Error>>, |
597 | | ) -> Result<Option<RemoteObj>, lyquor_upc::Error<Self::Error>> { |
598 | | let id = context.id; |
599 | | let returned = returned?; |
600 | | |
601 | | let mut cache = context.cache.lock().await; |
602 | | if cache.is_none() { |
603 | | let number = match header.number { |
604 | | Some(n) => n, |
605 | | // NOTE: Only execute latest_number() if not given. |
606 | | None => self.instance.latest_number().await, |
607 | | }; |
608 | | *cache = Some(Box::new(UpcCacheEntry { |
609 | | barrel: self |
610 | | .instance |
611 | | .get_instance_barrel(number) |
612 | | .await |
613 | | .map_err(lyquor_upc::Error::Call)?, |
614 | | cache_ptr: None, |
615 | | })); |
616 | | } |
617 | | let cache_entry = cache |
618 | | .as_mut() |
619 | | .unwrap() |
620 | | .as_mut() |
621 | | .downcast_mut::<UpcCacheEntry>() |
622 | | .unwrap(); |
623 | | |
624 | | let input = lyquid::upc::ResponseInput { |
625 | | from, |
626 | | id, |
627 | | returned: returned.to_encoded().into(), |
628 | | cache: cache_entry.cache_ptr, |
629 | | }; |
630 | | let call_params = CallParams::builder() |
631 | | .caller(Address::ZERO) |
632 | | .method(header.method) |
633 | | .input(encode_object(&input).into()) |
634 | | .group({ |
635 | | let mut group = lyquor_primitives::GROUP_UPC_RESP.to_string(); |
636 | | if let Some(ref suffix) = header.group { |
637 | | group.push_str("::"); |
638 | | group.push_str(suffix); |
639 | | } |
640 | | group |
641 | | }) |
642 | | .build(); |
643 | | let call = cache_entry.barrel.call_func_decoded( |
644 | | call_params, |
645 | | Some(( |
646 | | lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcResponse), |
647 | | self.instance.scheduler(), |
648 | | )), |
649 | | ); |
650 | | let result: Result<lyquid::upc::ResponseOutput, lyquor_vm::instance::Error> = |
651 | | call.await.map_err(lyquor_vm::instance::Error::Barrel); |
652 | | let result = match result { |
653 | | Err(lyquor_vm::instance::Error::Barrel(lyquor_vm::barrel::Error::FuncNotFound)) => { |
654 | | // by default if upc_response event handler is not implemented, it will just |
655 | | // return the first seen result |
656 | | return Ok(Some(returned)); |
657 | | } |
658 | | r => r, |
659 | | }; |
660 | | let result = result.map_err(lyquor_upc::Error::Call)?; |
661 | | |
662 | | Ok(match result { |
663 | | lyquid::upc::ResponseOutput::Continue(cache) => { |
664 | | cache_entry.cache_ptr = cache; |
665 | | None |
666 | | } |
667 | | lyquid::upc::ResponseOutput::Return(result) => { |
668 | | *cache = None; |
669 | | Some(RemoteObj::from_encoded(result.into())) |
670 | | } |
671 | | }) |
672 | | } |
673 | | } |
674 | | |
675 | | #[async_trait] |
676 | | impl lyquor_upc::Callee for Endpoint { |
677 | | type Error = lyquor_vm::instance::Error; |
678 | | |
679 | | #[tracing::instrument(level = "trace", skip(input))] |
680 | | async fn on_call(&self, header: CallHeader, from: NodeID, input: RemoteObj) -> Result<RemoteObj, Self::Error> { |
681 | | let input = encode_object(&lyquid::upc::RequestInput { |
682 | | from, |
683 | | id: 0, |
684 | | input: input.as_encoded().unwrap().into(), |
685 | | }) |
686 | | .into(); |
687 | | let call = { |
688 | | // TODO: Now, header should always have LYQNUM present. However this leads to a problem |
689 | | // which the LYQNUM may not be a valid one on callee(which is me here). For now let's |
690 | | // check the existance of this number and if not let's go with the latest. |
691 | | let version = match header.number { |
692 | | Some(n) => n, |
693 | | // NOTE: Only execute latest_number() if not given. |
694 | | None => self.instance.latest_number().await, |
695 | | }; |
696 | | let number = if self.instance.check_version(version).await { |
697 | | version |
698 | | } else { |
699 | | self.instance.latest_number().await |
700 | | }; |
701 | | |
702 | | self.instance |
703 | | .call_instance_func( |
704 | | number, |
705 | | CallParams::builder() |
706 | | .caller(Address::ZERO) |
707 | | .method(header.method) |
708 | | .input(input) |
709 | | .group({ |
710 | | let mut group = lyquor_primitives::GROUP_UPC_REQ.to_string(); |
711 | | if let Some(ref suffix) = header.group { |
712 | | group.push_str("::"); |
713 | | group.push_str(suffix); |
714 | | } |
715 | | group |
716 | | }) |
717 | | .build(), |
718 | | lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcRequest), |
719 | | ) |
720 | | .await |
721 | | }; |
722 | | let result: lyquid::upc::RequestOutput = call.await?; |
723 | | Ok(RemoteObj::from_encoded(result.into())) |
724 | | } |
725 | | } |
726 | | |
727 | | /// Hosting-owned VM runtime plus UPC registration state. |
728 | | pub struct VmRuntime { |
729 | | vm_engine: lyquor_vm::VmEngine, |
730 | | upc: lyquor_upc::UPC<lyquor_vm::instance::Error>, |
731 | | sequencer: Option<Sequencer>, |
732 | | } |
733 | | |
734 | | impl VmRuntime { |
735 | | /// Create a VM runtime bound to a network hub and optional sequencer services. |
736 | 32 | pub fn new( |
737 | 32 | runtime: tokio::runtime::Handle, network: Arc<lyquor_net::hub::Hub>, sequencer: Option<Sequencer>, |
738 | 32 | ) -> Result<Self, VmRuntimeError> { |
739 | | Ok(Self { |
740 | 32 | vm_engine: lyquor_vm::VmEngine::new(runtime)?0 , |
741 | 32 | upc: lyquor_upc::UPC::new(network), |
742 | 32 | sequencer, |
743 | | }) |
744 | 32 | } |
745 | | |
746 | | /// Replace sequencer services exposed to subsequently created instances. |
747 | 0 | pub fn set_sequencer(&mut self, sequencer: Option<Sequencer>) { |
748 | 0 | self.sequencer = sequencer; |
749 | 0 | } |
750 | | |
751 | | /// Add a remote UPC peer to the runtime. |
752 | 50 | pub async fn add_remote( |
753 | 50 | &mut self, node: NodeID, address: Option<String>, |
754 | 50 | ) -> Result<(), lyquor_upc::Error<lyquor_vm::instance::Error>> { |
755 | 50 | self.upc.add_remote(node, address).await |
756 | 50 | } |
757 | | |
758 | | /// Return a UPC requester for hosted Lyquid calls. |
759 | 23 | pub fn requester(&self) -> lyquor_upc::Requester<lyquor_vm::instance::Error> { |
760 | 23 | self.upc.requester() |
761 | 23 | } |
762 | | |
763 | | /// Create and register a hosted VM instance with all Lyquor host APIs. |
764 | 39 | pub async fn add_instance( |
765 | 39 | &mut self, id: LyquidID, stores: lyquor_vm::InstanceStores, console: Option<mpsc::Sender<ConsoleOutput>>, |
766 | 39 | bartender: Option<Bartender>, trigger: Option<TriggerService>, |
767 | 39 | image_source: Option<lyquor_vm::instance::ImageSource>, |
768 | 39 | ) -> Result<Instance, VmRuntimeError> { |
769 | 39 | let upc = self.upc.requester(); |
770 | 39 | let node_id = *self.upc.node_id(); |
771 | 39 | let sig_provider = Arc::new(crypto::SigProvider::new(self.upc.signing_key().as_ref().clone())); |
772 | 39 | let http_client = HttpClient::new(); |
773 | 39 | let builder = self |
774 | 39 | .vm_engine |
775 | 39 | .instance_builder( |
776 | 39 | id, |
777 | 39 | stores, |
778 | 39 | Env { |
779 | 39 | node: NodeEnv { |
780 | 39 | upc, |
781 | 39 | node_id, |
782 | 39 | sequencer: self.sequencer.clone(), |
783 | 39 | sig_provider, |
784 | 39 | bartender, |
785 | 39 | }, |
786 | 39 | lyquid: LyquidEnv { |
787 | 39 | lyquid_id: id, |
788 | 39 | console, |
789 | 39 | http_client, |
790 | 39 | trigger, |
791 | 39 | }, |
792 | 39 | }, |
793 | 39 | ) |
794 | 39 | .await?0 ; |
795 | 39 | let instance = builder |
796 | 39 | .with_image_source(image_source) |
797 | 39 | .host_api("console_output", LVM_CONSOLE.clone()) |
798 | 39 | .host_api("universal_procedural_call", LVM_UPC.clone()) |
799 | 39 | .host_api("inter_lyquid_call", LVM_INTER_LYQUID_CALL.clone()) |
800 | 39 | .host_api("submit_call", LVM_SUBMIT_CALL.clone()) |
801 | 39 | .host_api("sign", LVM_SIGN.clone()) |
802 | 39 | .host_api("verify", LVM_VERIFY.clone()) |
803 | 39 | .host_api("random_bytes", LVM_RNG.clone()) |
804 | 39 | .host_api("systime", LVM_GET_TIME.clone()) |
805 | 39 | .host_api("http_request", LVM_HTTP_REQUEST.clone()) |
806 | 39 | .host_api("get_ed25519_qxy", LVM_GET_ED25519_QXY.clone()) |
807 | 39 | .host_api("get_address_by_ed25519", LVM_GET_ADDRESS_BY_ED25519.clone()) |
808 | 39 | .host_api("get_ed25519_by_address", LVM_GET_ED25519_BY_ADDRESS.clone()) |
809 | 39 | .host_api("eth_contract", LVM_ETH_CONTRACT.clone()) |
810 | 39 | .host_api("sequence_backend_id", LVM_SEQUENCE_BACKEND_ID.clone()) |
811 | 39 | .host_api("fetch_oracle_info", LVM_FETCH_ORACLE_INFO.clone()) |
812 | 39 | .host_api("trigger", LVM_TRIGGER.clone()) |
813 | 39 | .build() |
814 | 39 | .await |
815 | 39 | .map_err(lyquor_vm::Error::from)?0 ; |
816 | | |
817 | 39 | let instance = Arc::new(instance); |
818 | 39 | self.upc |
819 | 39 | .register_caller( |
820 | 39 | &id, |
821 | 39 | Endpoint { |
822 | 39 | instance: instance.clone(), |
823 | 39 | }, |
824 | 39 | ) |
825 | 39 | .await?0 ; |
826 | 39 | if let Err(err0 ) = self |
827 | 39 | .upc |
828 | 39 | .register_callee( |
829 | 39 | &id, |
830 | 39 | Endpoint { |
831 | 39 | instance: instance.clone(), |
832 | 39 | }, |
833 | | ) |
834 | 39 | .await |
835 | | { |
836 | 0 | self.upc.deregister_caller(&id); |
837 | 0 | return Err(err.into()); |
838 | 39 | } |
839 | 39 | Ok(instance) |
840 | 39 | } |
841 | | |
842 | | /// Deregister a hosted instance from UPC caller and callee tables. |
843 | 6 | pub fn remove_instance(&mut self, id: &LyquidID) { |
844 | 6 | self.upc.deregister_caller(id); |
845 | 6 | self.upc.deregister_callee(id); |
846 | 6 | } |
847 | | } |