controller: make image downloads async

This commit is contained in:
Alex Zenla 2024-02-25 05:38:23 +00:00
parent f66ab25521
commit 973db87b98
No known key found for this signature in database
GPG Key ID: 067B238899B51269
8 changed files with 112 additions and 66 deletions

View File

@ -72,6 +72,9 @@ features = ["derive"]
version = "1.35.1"
features = ["macros", "rt", "rt-multi-thread"]
[workspace.dependencies.reqwest]
version = "0.11.24"
[workspace.dependencies.serde]
version = "1.0.196"
features = ["derive"]

View File

@ -18,6 +18,7 @@ serde_json = { workspace = true }
sha256 = { workspace = true }
url = { workspace = true }
ureq = { workspace = true }
reqwest = { workspace = true }
path-clean = { workspace = true }
termion = { workspace = true }
cli-tables = { workspace = true }
@ -28,6 +29,7 @@ uuid = { workspace = true }
ipnetwork = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
[dependencies.krata]
path = "../shared"

View File

@ -37,7 +37,7 @@ impl ControllerLaunch<'_> {
pub async fn perform(&mut self, request: ControllerLaunchRequest<'_>) -> Result<(Uuid, u32)> {
let uuid = Uuid::new_v4();
let name = format!("krata-{uuid}");
let image_info = self.compile(request.image)?;
let image_info = self.compile(request.image).await?;
let mut gateway_mac = MacAddr6::random();
gateway_mac.set_local(true);
@ -220,9 +220,9 @@ impl ControllerLaunch<'_> {
Ok(found.unwrap())
}
fn compile(&self, image: &str) -> Result<ImageInfo> {
async fn compile(&self, image: &str) -> Result<ImageInfo> {
let image = ImageName::parse(image)?;
let compiler = ImageCompiler::new(&self.context.image_cache)?;
compiler.compile(&image)
compiler.compile(&image).await
}
}

View File

@ -1,56 +1,57 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use oci_spec::image::{Arch, Descriptor, ImageIndex, ImageManifest, MediaType, Os, ToDockerV2S2};
use std::io::copy;
use std::io::{Read, Write};
use std::ops::DerefMut;
use ureq::{Agent, Request, Response};
use reqwest::{Client, RequestBuilder, Response};
use tokio::{fs::File, io::AsyncWriteExt};
use url::Url;
const MANIFEST_PICKER_PLATFORM: Os = Os::Linux;
const MANIFEST_PICKER_ARCHITECTURE: Arch = Arch::Amd64;
pub struct RegistryClient {
agent: Agent,
agent: Client,
url: Url,
}
impl RegistryClient {
pub fn new(url: Url) -> Result<RegistryClient> {
Ok(RegistryClient {
agent: Agent::new(),
agent: Client::new(),
url,
})
}
fn call(&mut self, req: Request) -> Result<Response> {
Ok(req.call()?)
async fn call(&mut self, req: RequestBuilder) -> Result<Response> {
self.agent.execute(req.build()?).await.map_err(|x| x.into())
}
pub fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result<Vec<u8>> {
pub async fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result<Bytes> {
let url = self
.url
.join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?;
let response = self.call(self.agent.get(url.as_str()))?;
let mut buffer: Vec<u8> = Vec::new();
response.into_reader().read_to_end(&mut buffer)?;
Ok(buffer)
let response = self.call(self.agent.get(url.as_str())).await?;
Ok(response.bytes().await?)
}
pub fn write_blob(
pub async fn write_blob_to_file(
&mut self,
name: &str,
descriptor: &Descriptor,
dest: &mut dyn Write,
mut dest: File,
) -> Result<u64> {
let url = self
.url
.join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?;
let response = self.call(self.agent.get(url.as_str()))?;
let mut reader = response.into_reader();
Ok(copy(reader.deref_mut(), dest)?)
let mut response = self.call(self.agent.get(url.as_str())).await?;
let mut size: u64 = 0;
while let Some(chunk) = response.chunk().await? {
dest.write_all(&chunk).await?;
size += chunk.len() as u64;
}
Ok(size)
}
pub fn get_manifest_with_digest(
async fn get_raw_manifest_with_digest(
&mut self,
name: &str,
reference: &str,
@ -65,24 +66,60 @@ impl RegistryClient {
MediaType::ImageIndex,
MediaType::ImageIndex.to_docker_v2s2()?,
);
let response = self.call(self.agent.get(url.as_str()).set("Accept", &accept))?;
let response = self
.call(self.agent.get(url.as_str()).header("Accept", &accept))
.await?;
let digest = response
.headers()
.get("Docker-Content-Digest")
.ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))?
.to_str()?
.to_string();
let manifest = serde_json::from_str(&response.text().await?)?;
Ok((manifest, digest))
}
pub async fn get_manifest_with_digest(
&mut self,
name: &str,
reference: &str,
) -> Result<(ImageManifest, String)> {
let url = self
.url
.join(&format!("/v2/{}/manifests/{}", name, reference))?;
let accept = format!(
"{}, {}, {}, {}",
MediaType::ImageManifest.to_docker_v2s2()?,
MediaType::ImageManifest,
MediaType::ImageIndex,
MediaType::ImageIndex.to_docker_v2s2()?,
);
let response = self
.call(self.agent.get(url.as_str()).header("Accept", &accept))
.await?;
let content_type = response
.header("Content-Type")
.ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))?;
.headers()
.get("Content-Type")
.ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))?
.to_str()?;
if content_type == MediaType::ImageIndex.to_string()
|| content_type == MediaType::ImageIndex.to_docker_v2s2()?
{
let index = ImageIndex::from_reader(response.into_reader())?;
let index = serde_json::from_str(&response.text().await?)?;
let descriptor = self
.pick_manifest(index)
.ok_or_else(|| anyhow!("unable to pick manifest from index"))?;
return self.get_manifest_with_digest(name, descriptor.digest());
return self
.get_raw_manifest_with_digest(name, descriptor.digest())
.await;
}
let digest = response
.header("Docker-Content-Digest")
.headers()
.get("Docker-Content-Digest")
.ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))?
.to_str()?
.to_string();
let manifest = ImageManifest::from_reader(response.into_reader())?;
let manifest = serde_json::from_str(&response.text().await?)?;
Ok((manifest, digest))
}

View File

@ -94,8 +94,8 @@ impl ImageCompiler<'_> {
Ok(ImageCompiler { cache })
}
pub fn compile(&self, image: &ImageName) -> Result<ImageInfo> {
debug!("ImageCompiler compile image={image}");
pub async fn compile(&self, image: &ImageName) -> Result<ImageInfo> {
debug!("compile image={image}");
let mut tmp_dir = std::env::temp_dir().clone();
tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4()));
@ -109,12 +109,14 @@ impl ImageCompiler<'_> {
let mut squash_file = tmp_dir.clone();
squash_file.push("image.squashfs");
let info = self.download_and_compile(image, &layer_dir, &image_dir, &squash_file)?;
let info = self
.download_and_compile(image, &layer_dir, &image_dir, &squash_file)
.await?;
fs::remove_dir_all(&tmp_dir)?;
Ok(info)
}
fn download_and_compile(
async fn download_and_compile(
&self,
image: &ImageName,
layer_dir: &Path,
@ -122,11 +124,13 @@ impl ImageCompiler<'_> {
squash_file: &PathBuf,
) -> Result<ImageInfo> {
debug!(
"ImageCompiler download manifest image={image}, image_dir={}",
"download manifest image={image}, image_dir={}",
image_dir.to_str().unwrap()
);
let mut client = RegistryClient::new(image.registry_url()?)?;
let (manifest, digest) = client.get_manifest_with_digest(&image.name, &image.reference)?;
let (manifest, digest) = client
.get_manifest_with_digest(&image.name, &image.reference)
.await?;
let cache_key = format!(
"manifest={}:squashfs-version={}\n",
digest, IMAGE_SQUASHFS_VERSION
@ -138,21 +142,24 @@ impl ImageCompiler<'_> {
}
debug!(
"ImageCompiler download config digest={} size={}",
"download config digest={} size={}",
manifest.config().digest(),
manifest.config().size(),
);
let config_bytes = client.get_blob(&image.name, manifest.config())?;
let config_bytes = client.get_blob(&image.name, manifest.config()).await?;
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
let mut layers: Vec<LayerFile> = Vec::new();
for layer in manifest.layers() {
layers.push(self.download_layer(image, layer, layer_dir, &mut client)?);
layers.push(
self.download_layer(image, layer, layer_dir, &mut client)
.await?,
);
}
for layer in layers {
debug!(
"ImageCompiler process layer digest={} compression={:?}",
"process layer digest={} compression={:?}",
&layer.digest, layer.compression
);
let mut archive = Archive::new(layer.open_reader()?);
@ -199,7 +206,7 @@ impl ImageCompiler<'_> {
}
trace!(
"ImageCompiler whiteout entry layer={} path={:?}",
"whiteout entry layer={} path={:?}",
&layer.digest,
entry.path()?
);
@ -219,7 +226,7 @@ impl ImageCompiler<'_> {
}
} else {
warn!(
"ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}",
"whiteout entry missing locally layer={} path={:?} local={:?}",
&layer.digest,
entry.path()?,
dst,
@ -231,7 +238,7 @@ impl ImageCompiler<'_> {
fs::remove_dir(&dst)?;
} else {
warn!(
"ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}",
"whiteout entry missing locally layer={} path={:?} local={:?}",
&layer.digest,
entry.path()?,
dst,
@ -247,7 +254,7 @@ impl ImageCompiler<'_> {
image_dir: &PathBuf,
) -> Result<()> {
trace!(
"ImageCompiler unpack entry layer={} path={:?} type={:?}",
"unpack entry layer={} path={:?} type={:?}",
&layer.digest,
entry.path()?,
entry.header().entry_type()
@ -285,7 +292,7 @@ impl ImageCompiler<'_> {
Ok(())
}
fn download_layer(
async fn download_layer(
&self,
image: &ImageName,
layer: &Descriptor,
@ -293,7 +300,7 @@ impl ImageCompiler<'_> {
client: &mut RegistryClient,
) -> Result<LayerFile> {
debug!(
"ImageCompiler download layer digest={} size={}",
"download layer digest={} size={}",
layer.digest(),
layer.size()
);
@ -303,8 +310,8 @@ impl ImageCompiler<'_> {
tmp_path.push(format!("{}.tmp", layer.digest()));
{
let mut file = File::create(&layer_path)?;
let size = client.write_blob(&image.name, layer, &mut file)?;
let file = tokio::fs::File::create(&layer_path).await?;
let size = client.write_blob_to_file(&image.name, layer, file).await?;
if layer.size() as u64 != size {
return Err(anyhow!(
"downloaded layer size differs from size in manifest",
@ -344,7 +351,7 @@ impl ImageCompiler<'_> {
.to_str()
.ok_or_else(|| anyhow!("failed to strip prefix of tmpdir"))?;
let rel = format!("/{}", rel);
trace!("ImageCompiler squash write {}", rel);
trace!("squash write {}", rel);
let typ = entry.file_type();
let metadata = fs::symlink_metadata(entry.path())?;
let uid = metadata.uid();
@ -400,7 +407,7 @@ impl ImageCompiler<'_> {
.ok_or_else(|| anyhow!("failed to convert squashfs string"))?;
let mut file = File::create(squash_file)?;
trace!("ImageCompiler squash generate: {}", squash_file_path);
trace!("squash generate: {}", squash_file_path);
writer.write(&mut file)?;
Ok(())
}

View File

@ -92,16 +92,13 @@ impl BootSetup<'_> {
max_vcpus: u32,
mem_mb: u64,
) -> Result<BootState> {
debug!(
"BootSetup initialize max_vcpus={:?} mem_mb={:?}",
max_vcpus, mem_mb
);
debug!("initialize max_vcpus={:?} mem_mb={:?}", max_vcpus, mem_mb);
let total_pages = mem_mb << (20 - arch.page_shift());
self.initialize_memory(arch, total_pages)?;
let image_info = image_loader.parse()?;
debug!("BootSetup initialize image_info={:?}", image_info);
debug!("initialize image_info={:?}", image_info);
self.virt_alloc_end = image_info.virt_base;
let kernel_segment = self.load_kernel_segment(arch, image_loader, &image_info)?;
let mut p2m_segment: Option<DomainSegment> = None;
@ -152,7 +149,7 @@ impl BootSetup<'_> {
console_evtchn,
shared_info_frame: 0,
};
debug!("BootSetup initialize state={:?}", state);
debug!("initialize state={:?}", state);
Ok(state)
}
@ -259,7 +256,7 @@ impl BootSetup<'_> {
slice.fill(0);
segment.vend = self.virt_alloc_end;
debug!(
"BootSetup alloc_segment {:#x} -> {:#x} (pfn {:#x} + {:#x} pages)",
"alloc_segment {:#x} -> {:#x} (pfn {:#x} + {:#x} pages)",
start, segment.vend, segment.pfn, pages
);
Ok(segment)
@ -270,7 +267,7 @@ impl BootSetup<'_> {
let pfn = self.pfn_alloc_end;
self.chk_alloc_pages(arch, 1)?;
debug!("BootSetup alloc_page {:#x} (pfn {:#x})", start, pfn);
debug!("alloc_page {:#x} (pfn {:#x})", start, pfn);
Ok(DomainSegment {
vstart: start,
vend: (start + arch.page_size()) - 1,

View File

@ -254,7 +254,7 @@ impl BootImageLoader for ElfImageLoader {
let segments = elf.segments().ok_or(Error::ElfInvalidImage)?;
debug!(
"ElfImageLoader load dst={:#x} segments={}",
"load dst={:#x} segments={}",
dst.as_ptr() as u64,
segments.len()
);
@ -267,7 +267,7 @@ impl BootImageLoader for ElfImageLoader {
let segment_dst = &mut dst[base_offset as usize..];
let copy_slice = &data[0..filesz as usize];
debug!(
"ElfImageLoader load copy hdr={:?} dst={:#x} len={}",
"load copy hdr={:?} dst={:#x} len={}",
header,
copy_slice.as_ptr() as u64,
copy_slice.len()
@ -276,7 +276,7 @@ impl BootImageLoader for ElfImageLoader {
if (memsz - filesz) > 0 {
let remaining = &mut segment_dst[filesz as usize..memsz as usize];
debug!(
"ElfImageLoader load fill_zero hdr={:?} dst={:#x} len={}",
"load fill_zero hdr={:?} dst={:#x} len={}",
header.p_offset,
remaining.as_ptr() as u64,
remaining.len()

View File

@ -268,7 +268,7 @@ impl X86BootSetup {
}
debug!(
"BootSetup count_pgtables {:#x}/{}: {:#x} -> {:#x}, {} tables",
"count_pgtables {:#x}/{}: {:#x} -> {:#x}, {} tables",
mask, bits, map.levels[l].from, map.levels[l].to, map.levels[l].pgtables
);
map.area.pgtables += map.levels[l].pgtables;
@ -342,7 +342,7 @@ impl ArchBootSetup for X86BootSetup {
let size = self.table.mappings[m].area.pgtables as u64 * X86_PAGE_SIZE;
let segment = setup.alloc_segment(self, 0, size)?;
debug!(
"BootSetup alloc_page_tables table={:?} segment={:?}",
"alloc_page_tables table={:?} segment={:?}",
self.table, segment
);
Ok(segment)
@ -387,7 +387,7 @@ impl ArchBootSetup for X86BootSetup {
let mut pfn = ((max(from, lvl.from) - lvl.from) >> rhs) + lvl.pfn;
debug!(
"BootSetup setup_page_tables lvl={} map_1={} map_2={} pfn={:#x} p_s={:#x} p_e={:#x}",
"setup_page_tables lvl={} map_1={} map_2={} pfn={:#x} p_s={:#x} p_e={:#x}",
l, m1, m2, pfn, p_s, p_e
);
@ -439,7 +439,7 @@ impl ArchBootSetup for X86BootSetup {
(*info).cmdline[i] = c as c_char;
}
(*info).cmdline[MAX_GUEST_CMDLINE - 1] = 0;
trace!("BootSetup setup_start_info start_info={:?}", *info);
trace!("setup_start_info start_info={:?}", *info);
}
Ok(())
}
@ -456,7 +456,7 @@ impl ArchBootSetup for X86BootSetup {
for i in 0..32 {
(*info).vcpu_info[i].evtchn_upcall_mask = 1;
}
trace!("BootSetup setup_shared_info shared_info={:?}", *info);
trace!("setup_shared_info shared_info={:?}", *info);
}
Ok(())
}
@ -620,7 +620,7 @@ impl ArchBootSetup for X86BootSetup {
vcpu.user_regs.cs = 0xe033;
vcpu.kernel_ss = vcpu.user_regs.ss as u64;
vcpu.kernel_sp = vcpu.user_regs.rsp;
debug!("vcpu context: {:?}", vcpu);
trace!("vcpu context: {:?}", vcpu);
setup.call.set_vcpu_context(setup.domid, 0, &vcpu)?;
Ok(())
}