Coverage Report

Created: 2026-06-13 03:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/lyquor/lyquor/platform/hosting/src/lvm.rs
Line
Count
Source
1
use std::sync::Arc;
2
use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4
use async_trait::async_trait;
5
use lyquor_api::{
6
    anyhow,
7
    call::CallParams,
8
    interface::{
9
        FetchOracleInfo, GetAddressByEd25519, GetAddressByEd25519Service, GetEd25519ByAddress,
10
        GetEd25519ByAddressService, GetEthContractAddr, GetEthContractAddrService, InterCallService, OnInterCall,
11
        OracleInfoService, SubmitCall, SubmitService,
12
    },
13
    store::{KVStore, KVStoreError, StoreFuture},
14
};
15
use lyquor_crypto as crypto;
16
use lyquor_primitives::oracle::OracleTarget;
17
use lyquor_primitives::{
18
    Address, Bytes, ConsoleSink, LyquidID, LyquidNumber, NodeID, SequenceBackendID, encode_object,
19
};
20
use lyquor_upc::{CallHeader, MulticastContext, RemoteObj};
21
use lyquor_vm::{RuntimeEnv, lyquid};
22
use thiserror::Error;
23
use tokio::sync::mpsc;
24
use tower::{ServiceExt, util::BoxCloneSyncService};
25
26
/// Per-Lyquid stores required to construct a VM instance.
27
pub struct ProcessStores {
28
    pub vm: lyquor_vm::InstanceStores,
29
    pub progress: Arc<dyn KVStore>,
30
}
31
32
/// Factory that returns namespaced stores for a Lyquid process.
33
pub type ProcessStoreFactory = Arc<dyn Fn(LyquidID) -> StoreFuture<Result<ProcessStores, KVStoreError>> + Send + Sync>;
34
35
/// Sequencing services exposed to hosted Lyquid VM host APIs.
36
#[derive(Clone)]
37
pub struct Sequencer {
38
    pub inter: InterCallService,
39
    pub submit: SubmitService,
40
    pub fetch_oracle_info: OracleInfoService,
41
}
42
43
/// Bartender lookup services exposed to hosted Lyquid VM host APIs.
44
#[derive(Clone, Debug)]
45
pub struct Bartender {
46
    pub get_address_by_ed25519: GetAddressByEd25519Service,
47
    pub get_ed25519_by_address: GetEd25519ByAddressService,
48
    pub get_eth_contract_address: GetEthContractAddrService,
49
    pub sequence_backend_id: SequenceBackendID,
50
}
51
52
/// Console output emitted by a hosted Lyquid.
53
#[derive(Debug, Clone)]
54
pub struct ConsoleOutput {
55
    pub sink: ConsoleSink,
56
    pub data: String,
57
}
58
59
/// Trigger registration or removal requested by a hosted Lyquid (see [`lyquor_primitives::TriggerMode`]).
60
#[derive(Debug, Clone)]
61
pub struct Trigger {
62
    pub group: String,
63
    pub method: String,
64
    pub input: Vec<u8>,
65
    pub version: LyquidNumber,
66
    pub mode: lyquor_primitives::TriggerMode,
67
}
68
69
/// Service used by VM host APIs to register or stop triggers.
70
pub type TriggerService = BoxCloneSyncService<Trigger, (), anyhow::Error>;
71
/// Shared VM instance type for hosting runtime environments.
72
pub type Instance = lyquor_vm::Instance<Env>;
73
74
// TODO: make those configurable (from Lyquid? and/or with some guard at Node level)
75
const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
76
const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
77
const HTTP_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90);
78
const HTTP_POOL_MAX_IDLE_PER_HOST: usize = 8;
79
const HTTP_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
80
81
#[derive(Clone)]
82
struct HttpClient {
83
    client: Result<reqwest::Client, String>,
84
}
85
86
impl HttpClient {
87
39
    fn new() -> Self {
88
39
        let client = reqwest::Client::builder()
89
39
            .connect_timeout(HTTP_CONNECT_TIMEOUT)
90
39
            .timeout(HTTP_REQUEST_TIMEOUT)
91
39
            .pool_max_idle_per_host(HTTP_POOL_MAX_IDLE_PER_HOST)
92
39
            .pool_idle_timeout(Some(HTTP_POOL_IDLE_TIMEOUT))
93
39
            .tcp_keepalive(Some(HTTP_TCP_KEEPALIVE))
94
39
            .build()
95
39
            .map_err(|err| 
err0
.
to_string0
());
96
97
39
        if let Err(
ref err0
) = client {
98
0
            tracing::warn!(error = %err, "failed to initialize instance http client");
99
        } else {
100
39
            tracing::trace!("initialized instance http client");
101
        }
102
103
39
        Self { client }
104
39
    }
105
106
4
    fn client(&self) -> Result<&reqwest::Client, lyquid::LyquidError> {
107
4
        self.client
108
4
            .as_ref()
109
4
            .map_err(|err| lyquid::LyquidError::LyquorRuntime(
format!0
("http client init failed: {err}")))
110
4
    }
111
}
112
113
#[derive(Clone)]
114
struct NodeEnv {
115
    upc: lyquor_upc::Requester<lyquor_vm::instance::Error>,
116
    node_id: NodeID,
117
    sequencer: Option<Sequencer>,
118
    sig_provider: Arc<crypto::SigProvider>,
119
    bartender: Option<Bartender>,
120
}
121
122
#[derive(Clone)]
123
struct LyquidEnv {
124
    lyquid_id: LyquidID,
125
    console: Option<mpsc::Sender<ConsoleOutput>>,
126
    http_client: HttpClient,
127
    trigger: Option<TriggerService>,
128
}
129
130
/// Runtime environment captured by hosted Lyquid VM instances.
131
#[derive(Clone)]
132
pub struct Env {
133
    node: NodeEnv,
134
    lyquid: LyquidEnv,
135
}
136
137
impl Env {
138
2
    fn sequencer(&self) -> Option<&Sequencer> {
139
2
        self.node.sequencer.as_ref()
140
2
    }
141
}
142
143
impl lyquor_vm::RuntimeEnv for Env {
144
257
    fn get_lyquid_id(&self) -> LyquidID {
145
257
        self.lyquid.lyquid_id
146
257
    }
147
148
169
    fn get_node_id(&self) -> NodeID {
149
169
        self.node.node_id
150
169
    }
151
}
152
153
/// Errors returned while constructing or wiring hosted VM runtime state.
154
#[derive(Debug, Error)]
155
pub enum VmRuntimeError {
156
    #[error("VM: {0}")]
157
    VM(#[from] lyquor_vm::Error),
158
    #[error("UPC: {0}")]
159
    UPC(#[from] lyquor_upc::Error<lyquor_vm::instance::Error>),
160
}
161
162
type HostAPIFunc = lyquor_vm::barrel::HostAPIFunc<lyquor_vm::instance::Host<Env>>;
163
164
lazy_static::lazy_static! {
165
134
    static ref LVM_CONSOLE: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; sink: ConsoleSink, data: String) {
166
        // Silently drop the message if the output is not available.
167
134
        if let Some(
console0
) = &host.env.lyquid.console {
168
0
            console.send(ConsoleOutput {
169
0
                sink,
170
0
                data,
171
0
            }).await.ok();
172
134
        }
173
134
        Ok(())
174
    });
175
176
    static ref LVM_UPC: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
177
7
            target: LyquidID, group: Option<String>, method: String, input: Vec<u8>, client_params: Option<Bytes>, timeout_ms: Option<u64>) -> Vec<u8> {
178
7
        if host.category != lyquor_primitives::StateCategory::Instance {
179
1
            return Err(lyquid::LyquidError::LyquorRuntime(
180
1
                "`universal_procedural_call` only allowed in instance functions".to_string(),
181
1
            ));
182
6
        }
183
184
6
        tracing::trace!("Host handling UPC {:?} -> {:?} (group={:?}, method={}, input=<{} bytes>, client_params={:?})",
185
0
            host.env.lyquid.lyquid_id, target, group, method, input.len(), client_params);
186
187
6
        let number = if target == host.env.lyquid.lyquid_id {
188
3
            Some(*host.version.read().await)
189
        } else {
190
3
            None
191
        };
192
6
        let timeout = timeout_ms
193
6
            .map(Duration::from_millis)
194
6
            .unwrap_or_else(|| tokio::time::Duration::from_secs(lyquor_upc::message::UPC_PROCEDURE_TIMEOUT_DEFAULT));
195
6
        host.env.node.upc.call(CallHeader {
196
6
            lyquid: target,
197
6
            method,
198
6
            number,
199
6
            group,
200
6
            timeout,
201
6
            context: None,
202
6
        }, RemoteObj::from_encoded(input.into()), client_params, None).await
203
6
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(
format!0
("UPC failed: {}", e)))
204
6
            .map(|r| Vec::from(r.to_encoded()))
205
    });
206
207
    static ref LVM_INTER_LYQUID_CALL: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
208
2
            callee: LyquidID, method: String, input: Vec<u8>) -> Vec<u8> {
209
2
        if host.category != lyquor_primitives::StateCategory::Network {
210
0
            return Err(lyquid::LyquidError::LyquorRuntime(
211
0
                "`inter_lyquid_call` only allowed in network functions".to_string(),
212
0
            ));
213
2
        }
214
215
2
        let pos = *host.chain_pos.read().await;
216
2
        if let Some(sys) = host.env.sequencer() {
217
2
            let caller = host.env.lyquid.lyquid_id;
218
2
            sys.inter
219
2
                .clone()
220
2
                .oneshot(OnInterCall {
221
2
                    origin: caller.into(), // TODO: pass on the real origin
222
2
                    caller,
223
2
                    callee,
224
2
                    method,
225
2
                    input,
226
2
                    pos,
227
2
                })
228
2
                .await
229
2
                .map_err(|e| lyquid::LyquidError::LyquorRuntime(
format!0
("error during service call: {:?}", e)))
230
        } else {
231
0
            Err(lyquid::LyquidError::LyquorRuntime("InterCallSys not available".into()))
232
        }
233
    });
234
235
0
    static ref LVM_SUBMIT_CALL: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; call: CallParams, signed: bool) -> Vec<u8> {
236
0
        if host.category != lyquor_primitives::StateCategory::Instance {
237
0
            return Err(lyquid::LyquidError::LyquorRuntime(
238
0
                "`submit_call` only allowed in instance functions".to_string(),
239
0
            ));
240
0
        }
241
242
0
        let lyquid = host.env.lyquid.lyquid_id;
243
0
        if let Some(sys) = host.env.sequencer() {
244
0
            sys.submit.clone().oneshot(SubmitCall {
245
0
                lyquid,
246
0
                params: call,
247
0
                signed,
248
0
            }).await.map_err(|e|
249
0
                lyquid::LyquidError::LyquorRuntime(format!("submit call error: {:?}", e))
250
            )
251
        } else {
252
0
            Err(lyquid::LyquidError::LyquorRuntime("submit call recipient unavailable".into()))
253
        }
254
    });
255
256
    static ref LVM_SIGN: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
257
            msg: Bytes,
258
0
            cipher: lyquor_primitives::Cipher) -> lyquor_primitives::Signature {
259
0
        if host.category != lyquor_primitives::StateCategory::Instance {
260
0
            return Err(lyquid::LyquidError::LyquorRuntime(
261
0
                "`sign` only allowed in instance functions".to_string(),
262
0
            ));
263
0
        }
264
265
0
        let sig = host.env.node.sig_provider.sign(cipher, msg).await.unwrap_or_else(Bytes::new);
266
0
        Ok(sig)
267
    });
268
269
    static ref LVM_VERIFY: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
270
            msg: Bytes,
271
            cipher: lyquor_primitives::Cipher,
272
            sig: Bytes,
273
0
            pubkey: Bytes) -> bool {
274
0
        Ok(host.env.node.sig_provider.verify(cipher, msg, sig, pubkey).await)
275
    });
276
277
0
    static ref LVM_RNG: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; length: usize) -> Vec<u8> {
278
0
        if host.category != lyquor_primitives::StateCategory::Instance {
279
0
            return Err(lyquid::LyquidError::LyquorRuntime(
280
0
                "random_bytes only allowed in instance functions (for now)".to_string(),
281
0
            ));
282
0
        }
283
284
0
        let len = if length > 256 {
285
0
            256
286
        } else {
287
0
            length
288
        };
289
        use rand::Rng;
290
0
        let mut bytes = vec![0u8; len];
291
0
        rand::rng().fill_bytes(&mut bytes);
292
0
        Ok(bytes)
293
    });
294
295
0
    static ref LVM_GET_TIME: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> u64 {
296
0
        if host.category != lyquor_primitives::StateCategory::Instance {
297
0
            return Err(lyquid::LyquidError::LyquorRuntime(
298
0
                "`systime` only allowed in instance functions".to_string(),
299
0
            ));
300
0
        }
301
302
0
        let now = SystemTime::now()
303
0
            .duration_since(UNIX_EPOCH)
304
0
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("System time before unix epoch: {e}.")))?;
305
0
        u64::try_from(now.as_millis())
306
0
            .map_err(|_| lyquid::LyquidError::LyquorRuntime("Timestamp overflow.".to_string()))
307
    });
308
309
    static ref LVM_HTTP_REQUEST: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
310
            request: lyquid::http::Request,
311
4
            options: Option<lyquid::http::RequestOptions>) -> lyquid::http::Response {
312
4
        if host.category != lyquor_primitives::StateCategory::Instance {
313
0
            return Err(lyquid::LyquidError::LyquorRuntime(
314
0
                "`http_request` only allowed in instance functions".to_string(),
315
0
            ));
316
4
        }
317
318
        let lyquid::http::Request {
319
4
            method,
320
4
            url,
321
4
            headers,
322
4
            body,
323
        } = request;
324
325
4
        let method = match method {
326
4
            lyquid::http::Method::Get => reqwest::Method::GET,
327
0
            lyquid::http::Method::Head => reqwest::Method::HEAD,
328
0
            lyquid::http::Method::Post => reqwest::Method::POST,
329
0
            lyquid::http::Method::Put => reqwest::Method::PUT,
330
0
            lyquid::http::Method::Delete => reqwest::Method::DELETE,
331
0
            lyquid::http::Method::Connect => reqwest::Method::CONNECT,
332
0
            lyquid::http::Method::Options => reqwest::Method::OPTIONS,
333
0
            lyquid::http::Method::Trace => reqwest::Method::TRACE,
334
0
            lyquid::http::Method::Patch => reqwest::Method::PATCH,
335
0
            lyquid::http::Method::Other(method) => reqwest::Method::from_bytes(method.as_bytes()).map_err(|_| {
336
0
                lyquid::LyquidError::LyquorRuntime(format!("invalid http method: {method}"))
337
0
            })?,
338
        };
339
340
4
        let http_client = host.env.lyquid.http_client.clone();
341
4
        tracing::debug!(method = %method, url = %url, "dispatching http_request");
342
343
4
        let client = http_client.client()
?0
;
344
4
        let mut req = client.request(method, url);
345
4
        if let Some(
options0
) = options
346
0
            && let Some(timeout_ms) = options.timeout_ms
347
0
        {
348
0
            req = req.timeout(Duration::from_millis(timeout_ms));
349
4
        }
350
4
        for 
header0
in headers {
351
0
            let name = reqwest::header::HeaderName::from_bytes(header.name.as_bytes()).map_err(|e| {
352
0
                lyquid::LyquidError::LyquorRuntime(format!("invalid http header name: {e}"))
353
0
            })?;
354
0
            let value = reqwest::header::HeaderValue::from_bytes(&header.value).map_err(|e| {
355
0
                lyquid::LyquidError::LyquorRuntime(format!("invalid http header value: {e}"))
356
0
            })?;
357
0
            req = req.header(name, value);
358
        }
359
4
        if let Some(
body0
) = body {
360
0
            req = req.body(body);
361
4
        }
362
363
4
        let response = req
364
4
            .send()
365
4
            .await
366
4
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(
format!0
("http request failed: {e}")))
?0
;
367
4
        let status = response.status().as_u16();
368
4
        let headers = response
369
4
            .headers()
370
4
            .iter()
371
4
            .map(|(name, value)| lyquid::http::Header {
372
8
                name: name.as_str().to_string(),
373
8
                value: value.as_bytes().to_vec(),
374
8
            })
375
4
            .collect();
376
4
        let body = response
377
4
            .bytes()
378
4
            .await
379
4
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(
format!0
("http body read failed: {e}")))
?0
;
380
381
4
        Ok(lyquid::http::Response {
382
4
            status,
383
4
            headers,
384
4
            body: body.to_vec(),
385
4
        })
386
    });
387
388
0
    static ref LVM_GET_ED25519_QXY: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; pubkey: [u8; 32]) -> (lyquor_primitives::U256, lyquor_primitives::U256) {
389
0
        let ret = crypto::ed25519::SCLPubkey::new(pubkey);
390
0
        Ok((ret.qx, ret.qy))
391
    });
392
393
0
    static ref LVM_GET_ADDRESS_BY_ED25519: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; pubkey: [u8; 32]) -> Option<Address> {
394
        // NOTE: The result of this function is deterministic for network function, because we use the
395
        // same chain position here for the address retrieval.
396
0
        let pos = *host.chain_pos.read().await;
397
0
        host.env.node.bartender
398
0
            .as_ref()
399
0
            .ok_or(lyquid::LyquidError::LyquorRuntime("GetAddressByEd25519 function not supported.".into()))?
400
0
            .get_address_by_ed25519.clone()
401
0
            .oneshot(GetAddressByEd25519 {
402
0
                pos,
403
0
                id: pubkey.into(),
404
0
            })
405
0
            .await
406
0
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("GetAddressByEd25519 failed: {}", e)))
407
    });
408
409
0
    static ref LVM_GET_ED25519_BY_ADDRESS: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; address: Address) -> Option<NodeID> {
410
0
        let pos = *host.chain_pos.read().await;
411
0
        host.env.node.bartender
412
0
            .as_ref()
413
0
            .ok_or(lyquid::LyquidError::LyquorRuntime("GetEd25519ByAddress function not supported.".into()))?
414
0
            .get_ed25519_by_address.clone()
415
0
            .oneshot(GetEd25519ByAddress {
416
0
                pos,
417
0
                address,
418
0
            })
419
0
            .await
420
0
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("GetEd25519ByAddress failed: {}", e)))
421
    });
422
423
12
    static ref LVM_ETH_CONTRACT: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> Option<Address> {
424
        // NOTE: The result of this function is deterministic for network function, because we use the
425
        // same chain position here for the address retrieval.
426
12
        let pos = *host.chain_pos.read().await;
427
12
        let id = host.env.get_lyquid_id();
428
12
        host.env.node.bartender
429
12
            .as_ref()
430
12
            .ok_or(lyquid::LyquidError::LyquorRuntime("eth_contract function not supported.".into()))
?0
431
            .get_eth_contract_address
432
12
            .clone()
433
12
            .oneshot(GetEthContractAddr { pos, id })
434
12
            .await
435
12
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(
format!0
("GetEthContractAddr failed: {}", e)))
436
    });
437
438
13
    static ref LVM_SEQUENCE_BACKEND_ID: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;) -> SequenceBackendID {
439
13
        host.env
440
13
            .node
441
13
            .bartender
442
13
            .as_ref()
443
13
            .map(|b| b.sequence_backend_id)
444
13
            .ok_or(lyquid::LyquidError::LyquorRuntime("sequence_backend_id function not supported.".into()))
445
    });
446
447
    static ref LVM_FETCH_ORACLE_INFO: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>;
448
            topic: String,
449
            target: OracleTarget,
450
0
            full_config: bool) -> Option<lyquor_primitives::oracle::OracleEpochInfo> {
451
0
        if host.category != lyquor_primitives::StateCategory::Instance {
452
0
            return Err(lyquid::LyquidError::LyquorRuntime(
453
0
                "`fetch_oracle_info` only allowed in instance functions".to_string(),
454
0
            ));
455
0
        }
456
0
        let local_seq_id = host
457
0
            .env
458
0
            .node
459
0
            .bartender
460
0
            .as_ref()
461
0
            .ok_or(lyquid::LyquidError::LyquorRuntime("fetch_oracle_info not supported.".into()))?
462
            .sequence_backend_id;
463
0
        if target.seq_id != local_seq_id {
464
0
            return Ok(None);
465
0
        }
466
467
0
        let query = FetchOracleInfo {
468
0
            pos: *host.chain_pos.read().await,
469
0
            topic,
470
0
            target,
471
0
            full_config,
472
        };
473
        // NOTE: Query routing is currently local-only and does not dispatch by
474
        // `query.target.seq_id`. Cross-backend settlement should route by seq_id.
475
0
        host.env
476
0
            .sequencer()
477
0
            .ok_or(lyquid::LyquidError::LyquorRuntime(
478
0
                "fetch_oracle_info service unavailable.".into(),
479
0
            ))?
480
            .fetch_oracle_info
481
0
            .clone()
482
0
            .oneshot(query)
483
0
            .await
484
0
            .map_err(|e| lyquid::LyquidError::LyquorRuntime(format!("FetchOracleInfo failed: {}", e)))
485
    });
486
487
0
    static ref LVM_TRIGGER: HostAPIFunc = lyquor_vm::host_api!((host: Host<Env>; group: String, method: String, input: Vec<u8>, mode: lyquor_primitives::TriggerMode) -> () {
488
        // NOTE: Network and instance functions can both trigger instance functions directly, but
489
        // commit-mode triggers are only valid from network functions. This function should not
490
        // return anything.
491
0
        if matches!(mode, lyquor_primitives::TriggerMode::Commit)
492
0
            && host.category != lyquor_primitives::StateCategory::Network
493
        {
494
0
            return Err(lyquid::LyquidError::LyquorRuntime(
495
0
                "`TriggerMode::Commit` only allowed in network functions".to_string(),
496
0
            ));
497
0
        }
498
499
0
        let version = *host.version.read().await;
500
0
        host.env.lyquid.trigger
501
0
            .as_ref()
502
0
            .ok_or(lyquid::LyquidError::LyquorRuntime("`trigger` not supported.".into()))?
503
0
            .clone()
504
0
            .oneshot(Trigger {
505
0
                    group,
506
0
                    method,
507
0
                    input,
508
0
                    version,
509
0
                    mode,
510
0
                })
511
0
            .await
512
0
            .map_err(|_| lyquid::LyquidError::LyquorRuntime("Trigger failed: trigger queue is closed".into()))?;
513
0
        Ok(())
514
    });
515
}
516
517
struct Endpoint {
518
    instance: Instance,
519
}
520
521
impl std::fmt::Debug for Endpoint {
522
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523
0
        let id = self.instance.id();
524
0
        f.debug_struct("Endpoint").field("instance", &id).finish()
525
0
    }
526
}
527
528
#[derive(Clone)]
529
struct UpcCacheEntry {
530
    barrel: Arc<lyquor_vm::instance::Barrel<Env>>,
531
    cache_ptr: Option<lyquid::upc::CachePtr>,
532
}
533
534
#[async_trait]
535
impl lyquor_upc::Caller for Endpoint {
536
    type Error = lyquor_vm::instance::Error;
537
538
    #[tracing::instrument(level = "trace", skip(context))]
539
    async fn on_prepare(
540
        &self, header: &mut CallHeader, client_params: Bytes, context: &MulticastContext<Self::Error>,
541
    ) -> Result<Vec<NodeID>, Self::Error> {
542
        let input = lyquid::upc::PrepareInput {
543
            client_params: client_params.clone(),
544
        };
545
        let barrel = {
546
            let number = match header.number {
547
                Some(n) => n,
548
                // NOTE: Only execute latest_number() if not given.
549
                None => self.instance.latest_number().await,
550
            };
551
            // Write back exact lyquor number to header
552
            header.number = Some(number);
553
            self.instance
554
                .get_instance_barrel(number)
555
                .await
556
0
                .map_err(|e| lyquor_vm::instance::Error::Setup(e.into()))?
557
        };
558
559
        let call_params = CallParams::builder()
560
            .caller(Address::ZERO)
561
            .method(header.method.clone())
562
            .input(encode_object(&input).into())
563
            .group({
564
                let mut group = lyquor_primitives::GROUP_UPC_PREPARE.to_string();
565
                if let Some(ref suffix) = header.group {
566
                    group.push_str("::");
567
                    group.push_str(suffix);
568
                }
569
                group
570
            })
571
            .build();
572
        let call = barrel.call_func_decoded(
573
            call_params,
574
            Some((
575
                lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcPrepare),
576
                self.instance.scheduler(),
577
            )),
578
        );
579
0
        let result: Result<lyquid::upc::PrepareOutput, Self::Error> = call.await.map_err(|e| e.into());
580
        let result = result?;
581
582
        {
583
            let mut cache = context.cache.lock().await;
584
            *cache = Some(Box::new(UpcCacheEntry {
585
                barrel,
586
                cache_ptr: result.cache,
587
            }));
588
        }
589
590
        Ok(result.result)
591
    }
592
593
    #[tracing::instrument(level = "trace", skip(context, returned))]
594
    async fn on_return(
595
        &self, header: CallHeader, from: NodeID, context: &MulticastContext<Self::Error>,
596
        returned: Result<RemoteObj, lyquor_upc::Error<Self::Error>>,
597
    ) -> Result<Option<RemoteObj>, lyquor_upc::Error<Self::Error>> {
598
        let id = context.id;
599
        let returned = returned?;
600
601
        let mut cache = context.cache.lock().await;
602
        if cache.is_none() {
603
            let number = match header.number {
604
                Some(n) => n,
605
                // NOTE: Only execute latest_number() if not given.
606
                None => self.instance.latest_number().await,
607
            };
608
            *cache = Some(Box::new(UpcCacheEntry {
609
                barrel: self
610
                    .instance
611
                    .get_instance_barrel(number)
612
                    .await
613
                    .map_err(lyquor_upc::Error::Call)?,
614
                cache_ptr: None,
615
            }));
616
        }
617
        let cache_entry = cache
618
            .as_mut()
619
            .unwrap()
620
            .as_mut()
621
            .downcast_mut::<UpcCacheEntry>()
622
            .unwrap();
623
624
        let input = lyquid::upc::ResponseInput {
625
            from,
626
            id,
627
            returned: returned.to_encoded().into(),
628
            cache: cache_entry.cache_ptr,
629
        };
630
        let call_params = CallParams::builder()
631
            .caller(Address::ZERO)
632
            .method(header.method)
633
            .input(encode_object(&input).into())
634
            .group({
635
                let mut group = lyquor_primitives::GROUP_UPC_RESP.to_string();
636
                if let Some(ref suffix) = header.group {
637
                    group.push_str("::");
638
                    group.push_str(suffix);
639
                }
640
                group
641
            })
642
            .build();
643
        let call = cache_entry.barrel.call_func_decoded(
644
            call_params,
645
            Some((
646
                lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcResponse),
647
                self.instance.scheduler(),
648
            )),
649
        );
650
        let result: Result<lyquid::upc::ResponseOutput, lyquor_vm::instance::Error> =
651
            call.await.map_err(lyquor_vm::instance::Error::Barrel);
652
        let result = match result {
653
            Err(lyquor_vm::instance::Error::Barrel(lyquor_vm::barrel::Error::FuncNotFound)) => {
654
                // by default if upc_response event handler is not implemented, it will just
655
                // return the first seen result
656
                return Ok(Some(returned));
657
            }
658
            r => r,
659
        };
660
        let result = result.map_err(lyquor_upc::Error::Call)?;
661
662
        Ok(match result {
663
            lyquid::upc::ResponseOutput::Continue(cache) => {
664
                cache_entry.cache_ptr = cache;
665
                None
666
            }
667
            lyquid::upc::ResponseOutput::Return(result) => {
668
                *cache = None;
669
                Some(RemoteObj::from_encoded(result.into()))
670
            }
671
        })
672
    }
673
}
674
675
#[async_trait]
676
impl lyquor_upc::Callee for Endpoint {
677
    type Error = lyquor_vm::instance::Error;
678
679
    #[tracing::instrument(level = "trace", skip(input))]
680
    async fn on_call(&self, header: CallHeader, from: NodeID, input: RemoteObj) -> Result<RemoteObj, Self::Error> {
681
        let input = encode_object(&lyquid::upc::RequestInput {
682
            from,
683
            id: 0,
684
            input: input.as_encoded().unwrap().into(),
685
        })
686
        .into();
687
        let call = {
688
            // TODO: Now, header should always have LYQNUM present. However this leads to a problem
689
            // which the LYQNUM may not be a valid one on callee(which is me here). For now let's
690
            // check the existance of this number and if not let's go with the latest.
691
            let version = match header.number {
692
                Some(n) => n,
693
                // NOTE: Only execute latest_number() if not given.
694
                None => self.instance.latest_number().await,
695
            };
696
            let number = if self.instance.check_version(version).await {
697
                version
698
            } else {
699
                self.instance.latest_number().await
700
            };
701
702
            self.instance
703
                .call_instance_func(
704
                    number,
705
                    CallParams::builder()
706
                        .caller(Address::ZERO)
707
                        .method(header.method)
708
                        .input(input)
709
                        .group({
710
                            let mut group = lyquor_primitives::GROUP_UPC_REQ.to_string();
711
                            if let Some(ref suffix) = header.group {
712
                                group.push_str("::");
713
                                group.push_str(suffix);
714
                            }
715
                            group
716
                        })
717
                        .build(),
718
                    lyquor_vm::scheduler::RunOptions::new(lyquor_vm::scheduler::RunSource::UpcRequest),
719
                )
720
                .await
721
        };
722
        let result: lyquid::upc::RequestOutput = call.await?;
723
        Ok(RemoteObj::from_encoded(result.into()))
724
    }
725
}
726
727
/// Hosting-owned VM runtime plus UPC registration state.
728
pub struct VmRuntime {
729
    vm_engine: lyquor_vm::VmEngine,
730
    upc: lyquor_upc::UPC<lyquor_vm::instance::Error>,
731
    sequencer: Option<Sequencer>,
732
}
733
734
impl VmRuntime {
735
    /// Create a VM runtime bound to a network hub and optional sequencer services.
736
32
    pub fn new(
737
32
        runtime: tokio::runtime::Handle, network: Arc<lyquor_net::hub::Hub>, sequencer: Option<Sequencer>,
738
32
    ) -> Result<Self, VmRuntimeError> {
739
        Ok(Self {
740
32
            vm_engine: lyquor_vm::VmEngine::new(runtime)
?0
,
741
32
            upc: lyquor_upc::UPC::new(network),
742
32
            sequencer,
743
        })
744
32
    }
745
746
    /// Replace sequencer services exposed to subsequently created instances.
747
0
    pub fn set_sequencer(&mut self, sequencer: Option<Sequencer>) {
748
0
        self.sequencer = sequencer;
749
0
    }
750
751
    /// Add a remote UPC peer to the runtime.
752
50
    pub async fn add_remote(
753
50
        &mut self, node: NodeID, address: Option<String>,
754
50
    ) -> Result<(), lyquor_upc::Error<lyquor_vm::instance::Error>> {
755
50
        self.upc.add_remote(node, address).await
756
50
    }
757
758
    /// Return a UPC requester for hosted Lyquid calls.
759
23
    pub fn requester(&self) -> lyquor_upc::Requester<lyquor_vm::instance::Error> {
760
23
        self.upc.requester()
761
23
    }
762
763
    /// Create and register a hosted VM instance with all Lyquor host APIs.
764
39
    pub async fn add_instance(
765
39
        &mut self, id: LyquidID, stores: lyquor_vm::InstanceStores, console: Option<mpsc::Sender<ConsoleOutput>>,
766
39
        bartender: Option<Bartender>, trigger: Option<TriggerService>,
767
39
        image_source: Option<lyquor_vm::instance::ImageSource>,
768
39
    ) -> Result<Instance, VmRuntimeError> {
769
39
        let upc = self.upc.requester();
770
39
        let node_id = *self.upc.node_id();
771
39
        let sig_provider = Arc::new(crypto::SigProvider::new(self.upc.signing_key().as_ref().clone()));
772
39
        let http_client = HttpClient::new();
773
39
        let builder = self
774
39
            .vm_engine
775
39
            .instance_builder(
776
39
                id,
777
39
                stores,
778
39
                Env {
779
39
                    node: NodeEnv {
780
39
                        upc,
781
39
                        node_id,
782
39
                        sequencer: self.sequencer.clone(),
783
39
                        sig_provider,
784
39
                        bartender,
785
39
                    },
786
39
                    lyquid: LyquidEnv {
787
39
                        lyquid_id: id,
788
39
                        console,
789
39
                        http_client,
790
39
                        trigger,
791
39
                    },
792
39
                },
793
39
            )
794
39
            .await
?0
;
795
39
        let instance = builder
796
39
            .with_image_source(image_source)
797
39
            .host_api("console_output", LVM_CONSOLE.clone())
798
39
            .host_api("universal_procedural_call", LVM_UPC.clone())
799
39
            .host_api("inter_lyquid_call", LVM_INTER_LYQUID_CALL.clone())
800
39
            .host_api("submit_call", LVM_SUBMIT_CALL.clone())
801
39
            .host_api("sign", LVM_SIGN.clone())
802
39
            .host_api("verify", LVM_VERIFY.clone())
803
39
            .host_api("random_bytes", LVM_RNG.clone())
804
39
            .host_api("systime", LVM_GET_TIME.clone())
805
39
            .host_api("http_request", LVM_HTTP_REQUEST.clone())
806
39
            .host_api("get_ed25519_qxy", LVM_GET_ED25519_QXY.clone())
807
39
            .host_api("get_address_by_ed25519", LVM_GET_ADDRESS_BY_ED25519.clone())
808
39
            .host_api("get_ed25519_by_address", LVM_GET_ED25519_BY_ADDRESS.clone())
809
39
            .host_api("eth_contract", LVM_ETH_CONTRACT.clone())
810
39
            .host_api("sequence_backend_id", LVM_SEQUENCE_BACKEND_ID.clone())
811
39
            .host_api("fetch_oracle_info", LVM_FETCH_ORACLE_INFO.clone())
812
39
            .host_api("trigger", LVM_TRIGGER.clone())
813
39
            .build()
814
39
            .await
815
39
            .map_err(lyquor_vm::Error::from)
?0
;
816
817
39
        let instance = Arc::new(instance);
818
39
        self.upc
819
39
            .register_caller(
820
39
                &id,
821
39
                Endpoint {
822
39
                    instance: instance.clone(),
823
39
                },
824
39
            )
825
39
            .await
?0
;
826
39
        if let Err(
err0
) = self
827
39
            .upc
828
39
            .register_callee(
829
39
                &id,
830
39
                Endpoint {
831
39
                    instance: instance.clone(),
832
39
                },
833
            )
834
39
            .await
835
        {
836
0
            self.upc.deregister_caller(&id);
837
0
            return Err(err.into());
838
39
        }
839
39
        Ok(instance)
840
39
    }
841
842
    /// Deregister a hosted instance from UPC caller and callee tables.
843
6
    pub fn remove_instance(&mut self, id: &LyquidID) {
844
6
        self.upc.deregister_caller(id);
845
6
        self.upc.deregister_callee(id);
846
6
    }
847
}