/home/runner/work/lyquor/lyquor/hosting/src/image.rs
Line | Count | Source |
1 | | use std::collections::{HashMap, HashSet}; |
2 | | use std::sync::{Arc, RwLock}; |
3 | | |
4 | | use lyquor_image_store::{Error as ImageRepoError, LyquidImageRepo}; |
5 | | use lyquor_oci::pack::{LyquidPack, LyquidPackDigest}; |
6 | | use lyquor_oci::registry::{OCIRegistryClient, PinnedImage}; |
7 | | use lyquor_primitives::B256; |
8 | | use thiserror::Error; |
9 | | |
10 | | #[derive(Debug, Error)] |
11 | | pub enum Error { |
12 | | #[error("Image repository lookup: {0}")] |
13 | | ImageRepo(#[from] ImageRepoError), |
14 | | #[error("Failed to store resolved image in the local repository.\nDetail: {0}")] |
15 | | ImageRepoWrite(String), |
16 | | #[error("No image source candidates are known for digest {0:?}.")] |
17 | | NoCandidates(B256), |
18 | | #[error("Failed to resolve image digest {image_digest:?}.\nAttempts: {attempts}")] |
19 | | ResolveFailed { image_digest: B256, attempts: String }, |
20 | | } |
21 | | |
22 | | pub struct ImageResolver { |
23 | | image_repo: Arc<dyn LyquidImageRepo>, |
24 | | repo_hints_by_digest: RwLock<HashMap<B256, Vec<String>>>, |
25 | | fallback_repository_roots: Vec<String>, |
26 | | } |
27 | | |
28 | | impl ImageResolver { |
29 | 10 | pub fn new(image_repo: Arc<dyn LyquidImageRepo>, fallback_repository_roots: Vec<String>) -> Self { |
30 | 10 | Self { |
31 | 10 | image_repo, |
32 | 10 | repo_hints_by_digest: RwLock::new(HashMap::new()), |
33 | 10 | fallback_repository_roots, |
34 | 10 | } |
35 | 10 | } |
36 | | |
37 | 6 | pub fn image_repo(&self) -> Arc<dyn LyquidImageRepo> { |
38 | 6 | self.image_repo.clone() |
39 | 6 | } |
40 | | |
41 | 7 | pub async fn ensure_local(&self, image_digest: B256) -> Result<(), Error> { |
42 | 7 | if self.image_repo.contains(image_digest).await?0 { |
43 | 7 | return Ok(()); |
44 | 0 | } |
45 | | |
46 | 0 | let candidates = self.repository_candidates(image_digest); |
47 | 0 | if candidates.is_empty() { |
48 | 0 | return Err(Error::NoCandidates(image_digest)); |
49 | 0 | } |
50 | | |
51 | 0 | let mut failures = Vec::new(); |
52 | 0 | for candidate in candidates { |
53 | 0 | tracing::debug!("Resolving lyquid image digest {image_digest:?} from {candidate}"); |
54 | 0 | match self.pull_by_digest(&candidate, image_digest).await { |
55 | 0 | Ok(pack) => { |
56 | 0 | self.image_repo |
57 | 0 | .put(pack) |
58 | 0 | .await |
59 | 0 | .map_err(|e| Error::ImageRepoWrite(e.to_string()))?; |
60 | 0 | self.remember_repository_hint(image_digest, &candidate); |
61 | 0 | return Ok(()); |
62 | | } |
63 | 0 | Err(err) => failures.push(format!("{candidate} ({err})")), |
64 | | } |
65 | | } |
66 | | |
67 | 0 | Err(Error::ResolveFailed { |
68 | 0 | image_digest, |
69 | 0 | attempts: failures.join(", "), |
70 | 0 | }) |
71 | 7 | } |
72 | | |
73 | 5 | pub fn remember_repository_hint(&self, image_digest: B256, repo_hint: &str) { |
74 | 5 | if repo_hint.is_empty() { |
75 | 0 | return; |
76 | 5 | } |
77 | | |
78 | 5 | let mut repo_hints_by_digest = self.repo_hints_by_digest.write().expect("repo hint cache poisoned"); |
79 | 5 | let repo_hints = repo_hints_by_digest.entry(image_digest).or_default(); |
80 | 5 | if let Some(pos1 ) = repo_hints.iter().position(|candidate| candidate1 == repo_hint1 ) { |
81 | 1 | repo_hints.remove(pos); |
82 | 4 | } |
83 | 5 | repo_hints.insert(0, repo_hint.to_owned()); |
84 | 5 | } |
85 | | |
86 | 3 | fn repository_candidates(&self, image_digest: B256) -> Vec<String> { |
87 | 3 | let cached = self |
88 | 3 | .repo_hints_by_digest |
89 | 3 | .read() |
90 | 3 | .expect("repo hint cache poisoned") |
91 | 3 | .get(&image_digest) |
92 | 3 | .cloned() |
93 | 3 | .unwrap_or_default(); |
94 | | |
95 | 3 | let mut candidates = Vec::new(); |
96 | 3 | let mut seen = HashSet::new(); |
97 | | |
98 | 3 | for candidate in &cached { |
99 | 3 | if seen.insert(candidate.clone()) { |
100 | 3 | candidates.push(candidate.clone()); |
101 | 3 | }0 |
102 | | } |
103 | | |
104 | 4 | for fallback_root in &self.fallback_repository_roots3 { |
105 | 4 | if fallback_root.is_empty() { |
106 | 0 | continue; |
107 | 4 | } |
108 | 4 | if seen.insert(fallback_root.clone()) { |
109 | 3 | candidates.push(fallback_root.clone()); |
110 | 3 | }1 |
111 | | } |
112 | | |
113 | 3 | candidates |
114 | 3 | } |
115 | | |
116 | 0 | async fn pull_by_digest(&self, repo_hint: &str, image_digest: B256) -> Result<LyquidPack, String> { |
117 | 0 | let pinned = PinnedImage::new(repo_hint, LyquidPackDigest::new(image_digest)).map_err(|e| e.to_string())?; |
118 | 0 | let oci = OCIRegistryClient::new(pinned.protocol()); |
119 | 0 | oci.pull_full_pinned(&pinned).await.map_err(|e| e.to_string()) |
120 | 0 | } |
121 | | } |
122 | | |
123 | | #[cfg(test)] |
124 | | mod tests { |
125 | | use super::*; |
126 | | use lyquor_image_store::DirectoryRepo; |
127 | | use lyquor_test::test; |
128 | | use tempfile::TempDir; |
129 | | |
130 | | struct TempImageRepo { |
131 | | _dir: TempDir, |
132 | | repo: Arc<dyn LyquidImageRepo>, |
133 | | } |
134 | | |
135 | 4 | async fn temp_image_repo() -> TempImageRepo { |
136 | 4 | let dir = tempfile::tempdir().expect("temp dir must initialize"); |
137 | 4 | let repo = Arc::new( |
138 | 4 | DirectoryRepo::new(dir.path()) |
139 | 4 | .await |
140 | 4 | .expect("image repo must initialize"), |
141 | | ); |
142 | | |
143 | 4 | TempImageRepo { _dir: dir, repo } |
144 | 4 | } |
145 | | |
146 | 5 | fn remember_repository_hint(resolver: &ImageResolver, repo_hint: &str, pack: &LyquidPack) { |
147 | 5 | resolver.remember_repository_hint(*pack.digest().digest(), repo_hint); |
148 | 5 | } |
149 | | |
150 | 4 | fn rebuilt_pack(name: &str, description: &str) -> LyquidPack { |
151 | 4 | let pack = lyquid_examples::hello(); |
152 | 4 | let metadata = lyquor_oci::pack::LyquidPackMetadata::new(name, None, Some(description), None, None) |
153 | 4 | .with_abi_str(pack.metadata().abi_str.clone()); |
154 | 4 | LyquidPack::build_with_binary( |
155 | 4 | pack.wasm().clone(), |
156 | 4 | pack.evm_deployment_bytecode().clone(), |
157 | 4 | pack.evm_auxiliary_bytecodes().cloned(), |
158 | 4 | pack.assets().cloned(), |
159 | 4 | metadata, |
160 | | ) |
161 | 4 | } |
162 | | |
163 | | #[test(tokio::test)] |
164 | | async fn local_hit_short_circuits_remote_fetch() { |
165 | | let pack = rebuilt_pack("hello", "local-hit"); |
166 | | let image_repo = temp_image_repo().await; |
167 | | image_repo.repo.put(pack.clone()).await.expect("put should succeed"); |
168 | | |
169 | | let resolver = ImageResolver::new(image_repo.repo.clone(), vec!["unused/root".to_owned()]); |
170 | | remember_repository_hint(&resolver, "not a valid repo hint", &pack); |
171 | | |
172 | | resolver |
173 | | .ensure_local(*pack.digest().digest()) |
174 | | .await |
175 | | .expect("local image should resolve"); |
176 | | } |
177 | | |
178 | | #[test(tokio::test)] |
179 | | async fn cached_repo_hints_are_tried_before_fallback_roots() { |
180 | | let pack = rebuilt_pack("hello", "cached-before-fallback"); |
181 | | let image_repo = temp_image_repo().await; |
182 | | let resolver = ImageResolver::new(image_repo.repo.clone(), vec!["fallback.registry/lyquids".to_owned()]); |
183 | | remember_repository_hint(&resolver, "bad.registry/ns/hello", &pack); |
184 | | |
185 | | assert_eq!( |
186 | | resolver.repository_candidates(*pack.digest().digest()), |
187 | | vec![ |
188 | | "bad.registry/ns/hello".to_owned(), |
189 | | "fallback.registry/lyquids".to_owned(), |
190 | | ] |
191 | | ); |
192 | | } |
193 | | |
194 | | #[test(tokio::test)] |
195 | | async fn duplicate_candidates_are_only_attempted_once() { |
196 | | let pack = rebuilt_pack("hello", "dedupe"); |
197 | | let image_repo = temp_image_repo().await; |
198 | | let resolver = ImageResolver::new(image_repo.repo.clone(), vec!["same.registry/root/hello".to_owned()]); |
199 | | remember_repository_hint(&resolver, "same.registry/root/hello", &pack); |
200 | | remember_repository_hint(&resolver, "same.registry/root/hello", &pack); |
201 | | |
202 | | assert_eq!( |
203 | | resolver.repository_candidates(*pack.digest().digest()), |
204 | | vec!["same.registry/root/hello".to_owned()] |
205 | | ); |
206 | | } |
207 | | |
208 | | #[test(tokio::test)] |
209 | | async fn failed_source_does_not_stop_later_candidates() { |
210 | | let pack = rebuilt_pack("hello", "fallback-chain"); |
211 | | let image_repo = temp_image_repo().await; |
212 | | let resolver = ImageResolver::new( |
213 | | image_repo.repo.clone(), |
214 | | vec!["fallback-one".to_owned(), "fallback-two".to_owned()], |
215 | | ); |
216 | | remember_repository_hint(&resolver, "first.registry/ns/hello", &pack); |
217 | | |
218 | | assert_eq!( |
219 | | resolver.repository_candidates(*pack.digest().digest()), |
220 | | vec![ |
221 | | "first.registry/ns/hello".to_owned(), |
222 | | "fallback-one".to_owned(), |
223 | | "fallback-two".to_owned(), |
224 | | ] |
225 | | ); |
226 | | } |
227 | | } |