From 973db87b98a81b17f8acaeda917e54c6c2f97b68 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sun, 25 Feb 2024 05:38:23 +0000 Subject: [PATCH] controller: make image downloads async --- Cargo.toml | 3 + controller/Cargo.toml | 2 + controller/src/ctl/launch.rs | 6 +- controller/src/image/fetch.rs | 89 ++++++++++++++++++++--------- controller/src/image/mod.rs | 47 ++++++++------- libs/xen/xenclient/src/boot.rs | 13 ++--- libs/xen/xenclient/src/elfloader.rs | 6 +- libs/xen/xenclient/src/x86.rs | 12 ++-- 8 files changed, 112 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 652599b..4ad9f5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,9 @@ features = ["derive"] version = "1.35.1" features = ["macros", "rt", "rt-multi-thread"] +[workspace.dependencies.reqwest] +version = "0.11.24" + [workspace.dependencies.serde] version = "1.0.196" features = ["derive"] diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 5ee790a..22fce9e 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -18,6 +18,7 @@ serde_json = { workspace = true } sha256 = { workspace = true } url = { workspace = true } ureq = { workspace = true } +reqwest = { workspace = true } path-clean = { workspace = true } termion = { workspace = true } cli-tables = { workspace = true } @@ -28,6 +29,7 @@ uuid = { workspace = true } ipnetwork = { workspace = true } tokio = { workspace = true } futures = { workspace = true } +bytes = { workspace = true } [dependencies.krata] path = "../shared" diff --git a/controller/src/ctl/launch.rs b/controller/src/ctl/launch.rs index e76a53a..4d25dd7 100644 --- a/controller/src/ctl/launch.rs +++ b/controller/src/ctl/launch.rs @@ -37,7 +37,7 @@ impl ControllerLaunch<'_> { pub async fn perform(&mut self, request: ControllerLaunchRequest<'_>) -> Result<(Uuid, u32)> { let uuid = Uuid::new_v4(); let name = format!("krata-{uuid}"); - let image_info = self.compile(request.image)?; + let image_info = self.compile(request.image).await?; let mut gateway_mac = MacAddr6::random(); gateway_mac.set_local(true); @@ -220,9 +220,9 @@ impl ControllerLaunch<'_> { Ok(found.unwrap()) } - fn compile(&self, image: &str) -> Result { + async fn compile(&self, image: &str) -> Result { let image = ImageName::parse(image)?; let compiler = ImageCompiler::new(&self.context.image_cache)?; - compiler.compile(&image) + compiler.compile(&image).await } } diff --git a/controller/src/image/fetch.rs b/controller/src/image/fetch.rs index fcd9b99..6744891 100644 --- a/controller/src/image/fetch.rs +++ b/controller/src/image/fetch.rs @@ -1,56 +1,57 @@ use anyhow::{anyhow, Result}; +use bytes::Bytes; use oci_spec::image::{Arch, Descriptor, ImageIndex, ImageManifest, MediaType, Os, ToDockerV2S2}; -use std::io::copy; -use std::io::{Read, Write}; -use std::ops::DerefMut; -use ureq::{Agent, Request, Response}; +use reqwest::{Client, RequestBuilder, Response}; +use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; const MANIFEST_PICKER_PLATFORM: Os = Os::Linux; const MANIFEST_PICKER_ARCHITECTURE: Arch = Arch::Amd64; pub struct RegistryClient { - agent: Agent, + agent: Client, url: Url, } impl RegistryClient { pub fn new(url: Url) -> Result { Ok(RegistryClient { - agent: Agent::new(), + agent: Client::new(), url, }) } - fn call(&mut self, req: Request) -> Result { - Ok(req.call()?) + async fn call(&mut self, req: RequestBuilder) -> Result { + self.agent.execute(req.build()?).await.map_err(|x| x.into()) } - pub fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result> { + pub async fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result { let url = self .url .join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?; - let response = self.call(self.agent.get(url.as_str()))?; - let mut buffer: Vec = Vec::new(); - response.into_reader().read_to_end(&mut buffer)?; - Ok(buffer) + let response = self.call(self.agent.get(url.as_str())).await?; + Ok(response.bytes().await?) } - pub fn write_blob( + pub async fn write_blob_to_file( &mut self, name: &str, descriptor: &Descriptor, - dest: &mut dyn Write, + mut dest: File, ) -> Result { let url = self .url .join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?; - let response = self.call(self.agent.get(url.as_str()))?; - let mut reader = response.into_reader(); - Ok(copy(reader.deref_mut(), dest)?) + let mut response = self.call(self.agent.get(url.as_str())).await?; + let mut size: u64 = 0; + while let Some(chunk) = response.chunk().await? { + dest.write_all(&chunk).await?; + size += chunk.len() as u64; + } + Ok(size) } - pub fn get_manifest_with_digest( + async fn get_raw_manifest_with_digest( &mut self, name: &str, reference: &str, @@ -65,24 +66,60 @@ impl RegistryClient { MediaType::ImageIndex, MediaType::ImageIndex.to_docker_v2s2()?, ); - let response = self.call(self.agent.get(url.as_str()).set("Accept", &accept))?; + let response = self + .call(self.agent.get(url.as_str()).header("Accept", &accept)) + .await?; + let digest = response + .headers() + .get("Docker-Content-Digest") + .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? + .to_str()? + .to_string(); + let manifest = serde_json::from_str(&response.text().await?)?; + Ok((manifest, digest)) + } + + pub async fn get_manifest_with_digest( + &mut self, + name: &str, + reference: &str, + ) -> Result<(ImageManifest, String)> { + let url = self + .url + .join(&format!("/v2/{}/manifests/{}", name, reference))?; + let accept = format!( + "{}, {}, {}, {}", + MediaType::ImageManifest.to_docker_v2s2()?, + MediaType::ImageManifest, + MediaType::ImageIndex, + MediaType::ImageIndex.to_docker_v2s2()?, + ); + let response = self + .call(self.agent.get(url.as_str()).header("Accept", &accept)) + .await?; let content_type = response - .header("Content-Type") - .ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))?; + .headers() + .get("Content-Type") + .ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))? + .to_str()?; if content_type == MediaType::ImageIndex.to_string() || content_type == MediaType::ImageIndex.to_docker_v2s2()? { - let index = ImageIndex::from_reader(response.into_reader())?; + let index = serde_json::from_str(&response.text().await?)?; let descriptor = self .pick_manifest(index) .ok_or_else(|| anyhow!("unable to pick manifest from index"))?; - return self.get_manifest_with_digest(name, descriptor.digest()); + return self + .get_raw_manifest_with_digest(name, descriptor.digest()) + .await; } let digest = response - .header("Docker-Content-Digest") + .headers() + .get("Docker-Content-Digest") .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? + .to_str()? .to_string(); - let manifest = ImageManifest::from_reader(response.into_reader())?; + let manifest = serde_json::from_str(&response.text().await?)?; Ok((manifest, digest)) } diff --git a/controller/src/image/mod.rs b/controller/src/image/mod.rs index a284ae4..51c6464 100644 --- a/controller/src/image/mod.rs +++ b/controller/src/image/mod.rs @@ -94,8 +94,8 @@ impl ImageCompiler<'_> { Ok(ImageCompiler { cache }) } - pub fn compile(&self, image: &ImageName) -> Result { - debug!("ImageCompiler compile image={image}"); + pub async fn compile(&self, image: &ImageName) -> Result { + debug!("compile image={image}"); let mut tmp_dir = std::env::temp_dir().clone(); tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); @@ -109,12 +109,14 @@ impl ImageCompiler<'_> { let mut squash_file = tmp_dir.clone(); squash_file.push("image.squashfs"); - let info = self.download_and_compile(image, &layer_dir, &image_dir, &squash_file)?; + let info = self + .download_and_compile(image, &layer_dir, &image_dir, &squash_file) + .await?; fs::remove_dir_all(&tmp_dir)?; Ok(info) } - fn download_and_compile( + async fn download_and_compile( &self, image: &ImageName, layer_dir: &Path, @@ -122,11 +124,13 @@ impl ImageCompiler<'_> { squash_file: &PathBuf, ) -> Result { debug!( - "ImageCompiler download manifest image={image}, image_dir={}", + "download manifest image={image}, image_dir={}", image_dir.to_str().unwrap() ); let mut client = RegistryClient::new(image.registry_url()?)?; - let (manifest, digest) = client.get_manifest_with_digest(&image.name, &image.reference)?; + let (manifest, digest) = client + .get_manifest_with_digest(&image.name, &image.reference) + .await?; let cache_key = format!( "manifest={}:squashfs-version={}\n", digest, IMAGE_SQUASHFS_VERSION @@ -138,21 +142,24 @@ impl ImageCompiler<'_> { } debug!( - "ImageCompiler download config digest={} size={}", + "download config digest={} size={}", manifest.config().digest(), manifest.config().size(), ); - let config_bytes = client.get_blob(&image.name, manifest.config())?; + let config_bytes = client.get_blob(&image.name, manifest.config()).await?; let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; let mut layers: Vec = Vec::new(); for layer in manifest.layers() { - layers.push(self.download_layer(image, layer, layer_dir, &mut client)?); + layers.push( + self.download_layer(image, layer, layer_dir, &mut client) + .await?, + ); } for layer in layers { debug!( - "ImageCompiler process layer digest={} compression={:?}", + "process layer digest={} compression={:?}", &layer.digest, layer.compression ); let mut archive = Archive::new(layer.open_reader()?); @@ -199,7 +206,7 @@ impl ImageCompiler<'_> { } trace!( - "ImageCompiler whiteout entry layer={} path={:?}", + "whiteout entry layer={} path={:?}", &layer.digest, entry.path()? ); @@ -219,7 +226,7 @@ impl ImageCompiler<'_> { } } else { warn!( - "ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}", + "whiteout entry missing locally layer={} path={:?} local={:?}", &layer.digest, entry.path()?, dst, @@ -231,7 +238,7 @@ impl ImageCompiler<'_> { fs::remove_dir(&dst)?; } else { warn!( - "ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}", + "whiteout entry missing locally layer={} path={:?} local={:?}", &layer.digest, entry.path()?, dst, @@ -247,7 +254,7 @@ impl ImageCompiler<'_> { image_dir: &PathBuf, ) -> Result<()> { trace!( - "ImageCompiler unpack entry layer={} path={:?} type={:?}", + "unpack entry layer={} path={:?} type={:?}", &layer.digest, entry.path()?, entry.header().entry_type() @@ -285,7 +292,7 @@ impl ImageCompiler<'_> { Ok(()) } - fn download_layer( + async fn download_layer( &self, image: &ImageName, layer: &Descriptor, @@ -293,7 +300,7 @@ impl ImageCompiler<'_> { client: &mut RegistryClient, ) -> Result { debug!( - "ImageCompiler download layer digest={} size={}", + "download layer digest={} size={}", layer.digest(), layer.size() ); @@ -303,8 +310,8 @@ impl ImageCompiler<'_> { tmp_path.push(format!("{}.tmp", layer.digest())); { - let mut file = File::create(&layer_path)?; - let size = client.write_blob(&image.name, layer, &mut file)?; + let file = tokio::fs::File::create(&layer_path).await?; + let size = client.write_blob_to_file(&image.name, layer, file).await?; if layer.size() as u64 != size { return Err(anyhow!( "downloaded layer size differs from size in manifest", @@ -344,7 +351,7 @@ impl ImageCompiler<'_> { .to_str() .ok_or_else(|| anyhow!("failed to strip prefix of tmpdir"))?; let rel = format!("/{}", rel); - trace!("ImageCompiler squash write {}", rel); + trace!("squash write {}", rel); let typ = entry.file_type(); let metadata = fs::symlink_metadata(entry.path())?; let uid = metadata.uid(); @@ -400,7 +407,7 @@ impl ImageCompiler<'_> { .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; let mut file = File::create(squash_file)?; - trace!("ImageCompiler squash generate: {}", squash_file_path); + trace!("squash generate: {}", squash_file_path); writer.write(&mut file)?; Ok(()) } diff --git a/libs/xen/xenclient/src/boot.rs b/libs/xen/xenclient/src/boot.rs index a0ffb8a..4acfc00 100644 --- a/libs/xen/xenclient/src/boot.rs +++ b/libs/xen/xenclient/src/boot.rs @@ -92,16 +92,13 @@ impl BootSetup<'_> { max_vcpus: u32, mem_mb: u64, ) -> Result { - debug!( - "BootSetup initialize max_vcpus={:?} mem_mb={:?}", - max_vcpus, mem_mb - ); + debug!("initialize max_vcpus={:?} mem_mb={:?}", max_vcpus, mem_mb); let total_pages = mem_mb << (20 - arch.page_shift()); self.initialize_memory(arch, total_pages)?; let image_info = image_loader.parse()?; - debug!("BootSetup initialize image_info={:?}", image_info); + debug!("initialize image_info={:?}", image_info); self.virt_alloc_end = image_info.virt_base; let kernel_segment = self.load_kernel_segment(arch, image_loader, &image_info)?; let mut p2m_segment: Option = None; @@ -152,7 +149,7 @@ impl BootSetup<'_> { console_evtchn, shared_info_frame: 0, }; - debug!("BootSetup initialize state={:?}", state); + debug!("initialize state={:?}", state); Ok(state) } @@ -259,7 +256,7 @@ impl BootSetup<'_> { slice.fill(0); segment.vend = self.virt_alloc_end; debug!( - "BootSetup alloc_segment {:#x} -> {:#x} (pfn {:#x} + {:#x} pages)", + "alloc_segment {:#x} -> {:#x} (pfn {:#x} + {:#x} pages)", start, segment.vend, segment.pfn, pages ); Ok(segment) @@ -270,7 +267,7 @@ impl BootSetup<'_> { let pfn = self.pfn_alloc_end; self.chk_alloc_pages(arch, 1)?; - debug!("BootSetup alloc_page {:#x} (pfn {:#x})", start, pfn); + debug!("alloc_page {:#x} (pfn {:#x})", start, pfn); Ok(DomainSegment { vstart: start, vend: (start + arch.page_size()) - 1, diff --git a/libs/xen/xenclient/src/elfloader.rs b/libs/xen/xenclient/src/elfloader.rs index 1bca930..655a675 100644 --- a/libs/xen/xenclient/src/elfloader.rs +++ b/libs/xen/xenclient/src/elfloader.rs @@ -254,7 +254,7 @@ impl BootImageLoader for ElfImageLoader { let segments = elf.segments().ok_or(Error::ElfInvalidImage)?; debug!( - "ElfImageLoader load dst={:#x} segments={}", + "load dst={:#x} segments={}", dst.as_ptr() as u64, segments.len() ); @@ -267,7 +267,7 @@ impl BootImageLoader for ElfImageLoader { let segment_dst = &mut dst[base_offset as usize..]; let copy_slice = &data[0..filesz as usize]; debug!( - "ElfImageLoader load copy hdr={:?} dst={:#x} len={}", + "load copy hdr={:?} dst={:#x} len={}", header, copy_slice.as_ptr() as u64, copy_slice.len() @@ -276,7 +276,7 @@ impl BootImageLoader for ElfImageLoader { if (memsz - filesz) > 0 { let remaining = &mut segment_dst[filesz as usize..memsz as usize]; debug!( - "ElfImageLoader load fill_zero hdr={:?} dst={:#x} len={}", + "load fill_zero hdr={:?} dst={:#x} len={}", header.p_offset, remaining.as_ptr() as u64, remaining.len() diff --git a/libs/xen/xenclient/src/x86.rs b/libs/xen/xenclient/src/x86.rs index c89dd32..11fb717 100644 --- a/libs/xen/xenclient/src/x86.rs +++ b/libs/xen/xenclient/src/x86.rs @@ -268,7 +268,7 @@ impl X86BootSetup { } debug!( - "BootSetup count_pgtables {:#x}/{}: {:#x} -> {:#x}, {} tables", + "count_pgtables {:#x}/{}: {:#x} -> {:#x}, {} tables", mask, bits, map.levels[l].from, map.levels[l].to, map.levels[l].pgtables ); map.area.pgtables += map.levels[l].pgtables; @@ -342,7 +342,7 @@ impl ArchBootSetup for X86BootSetup { let size = self.table.mappings[m].area.pgtables as u64 * X86_PAGE_SIZE; let segment = setup.alloc_segment(self, 0, size)?; debug!( - "BootSetup alloc_page_tables table={:?} segment={:?}", + "alloc_page_tables table={:?} segment={:?}", self.table, segment ); Ok(segment) @@ -387,7 +387,7 @@ impl ArchBootSetup for X86BootSetup { let mut pfn = ((max(from, lvl.from) - lvl.from) >> rhs) + lvl.pfn; debug!( - "BootSetup setup_page_tables lvl={} map_1={} map_2={} pfn={:#x} p_s={:#x} p_e={:#x}", + "setup_page_tables lvl={} map_1={} map_2={} pfn={:#x} p_s={:#x} p_e={:#x}", l, m1, m2, pfn, p_s, p_e ); @@ -439,7 +439,7 @@ impl ArchBootSetup for X86BootSetup { (*info).cmdline[i] = c as c_char; } (*info).cmdline[MAX_GUEST_CMDLINE - 1] = 0; - trace!("BootSetup setup_start_info start_info={:?}", *info); + trace!("setup_start_info start_info={:?}", *info); } Ok(()) } @@ -456,7 +456,7 @@ impl ArchBootSetup for X86BootSetup { for i in 0..32 { (*info).vcpu_info[i].evtchn_upcall_mask = 1; } - trace!("BootSetup setup_shared_info shared_info={:?}", *info); + trace!("setup_shared_info shared_info={:?}", *info); } Ok(()) } @@ -620,7 +620,7 @@ impl ArchBootSetup for X86BootSetup { vcpu.user_regs.cs = 0xe033; vcpu.kernel_ss = vcpu.user_regs.ss as u64; vcpu.kernel_sp = vcpu.user_regs.rsp; - debug!("vcpu context: {:?}", vcpu); + trace!("vcpu context: {:?}", vcpu); setup.call.set_vcpu_context(setup.domid, 0, &vcpu)?; Ok(()) }