diff --git a/Cargo.toml b/Cargo.toml index 213ce49..4da9d62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,11 +51,11 @@ sha256 = "1.5.0" signal-hook = "0.3.17" slice-copy = "0.3.0" smoltcp = "0.11.0" -tar = "0.4.40" termion = "3.0.0" thiserror = "1.0" tokio-listener = "0.3.1" tokio-native-tls = "0.3.1" +tokio-tar = "0.3.1" tokio-tun = "0.11.2" tonic-build = "0.11.0" tower = "0.4.13" @@ -66,6 +66,9 @@ walkdir = "2" xz2 = "0.1" zstd = "0.13.0" +[workspace.dependencies.async-compression] +version = "0.4.6" + [workspace.dependencies.clap] version = "4.4.18" features = ["derive"] diff --git a/crates/kratart/Cargo.toml b/crates/kratart/Cargo.toml index ec39449..f87e9ed 100644 --- a/crates/kratart/Cargo.toml +++ b/crates/kratart/Cargo.toml @@ -7,11 +7,11 @@ resolver = "2" [dependencies] advmac = { path = "../vendor/advmac" } anyhow = { workspace = true } +async-compression = { workspace = true, features = ["tokio", "gzip", "zstd"] } async-stream = { workspace = true } async-trait = { workspace = true } backhand = { workspace = true } bytes = { workspace = true } -flate2 = { workspace = true } ipnetwork = { workspace = true } krata = { path = "../krata" } log = { workspace = true } @@ -23,16 +23,22 @@ reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha256 = { workspace = true } -tar = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +tokio-tar = { workspace = true } tonic = { workspace = true, features = ["tls"] } url = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } -zstd = { workspace = true } xenclient = { path = "../xen/xenclient" } xenstore = { path = "../xen/xenstore" } [lib] name = "kratart" + +[dev-dependencies] +env_logger = { workspace = true } + +[[example]] +name = "kratart-squashify" +path = "examples/squashify.rs" diff --git a/crates/kratart/examples/squashify.rs b/crates/kratart/examples/squashify.rs new file mode 100644 index 0000000..89ae2ac --- /dev/null +++ b/crates/kratart/examples/squashify.rs @@ -0,0 +1,26 @@ +use std::{env::args, path::PathBuf}; + +use anyhow::Result; +use env_logger::Env; +use kratart::image::{cache::ImageCache, compiler::ImageCompiler, name::ImageName}; +use tokio::fs; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init(); + + let image = ImageName::parse(&args().nth(1).unwrap())?; + let cache_dir = PathBuf::from("krata-cache"); + if !cache_dir.exists() { + fs::create_dir(&cache_dir).await?; + } + let cache = ImageCache::new(&cache_dir)?; + let compiler = ImageCompiler::new(&cache)?; + let info = compiler.compile(&image).await?; + println!( + "generated squashfs of {} to {}", + image, + info.image_squashfs.to_string_lossy() + ); + Ok(()) +} diff --git a/crates/kratart/src/cfgblk.rs b/crates/kratart/src/cfgblk.rs index d8921b0..571f83c 100644 --- a/crates/kratart/src/cfgblk.rs +++ b/crates/kratart/src/cfgblk.rs @@ -1,4 +1,4 @@ -use crate::image::ImageInfo; +use crate::image::compiler::ImageInfo; use anyhow::Result; use backhand::{FilesystemWriter, NodeHeader}; use krata::launchcfg::LaunchInfo; diff --git a/crates/kratart/src/image/cache.rs b/crates/kratart/src/image/cache.rs index d8c1b37..7ced47b 100644 --- a/crates/kratart/src/image/cache.rs +++ b/crates/kratart/src/image/cache.rs @@ -1,4 +1,4 @@ -use super::ImageInfo; +use super::compiler::ImageInfo; use anyhow::Result; use log::debug; use oci_spec::image::{ImageConfiguration, ImageManifest}; diff --git a/crates/kratart/src/image/compiler.rs b/crates/kratart/src/image/compiler.rs new file mode 100644 index 0000000..dffe1ac --- /dev/null +++ b/crates/kratart/src/image/compiler.rs @@ -0,0 +1,324 @@ +use crate::image::cache::ImageCache; +use crate::image::name::ImageName; +use crate::image::registry::OciRegistryPlatform; +use anyhow::{anyhow, Result}; +use backhand::compression::Compressor; +use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader}; +use log::{debug, trace, warn}; +use oci_spec::image::{ImageConfiguration, ImageManifest}; +use std::fs::File; +use std::io::{BufReader, Cursor}; +use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use tokio::fs; +use tokio::io::AsyncRead; +use tokio_stream::StreamExt; +use tokio_tar::{Archive, Entry}; +use uuid::Uuid; +use walkdir::WalkDir; + +use crate::image::fetch::{OciImageDownloader, OciImageLayer}; + +pub const IMAGE_SQUASHFS_VERSION: u64 = 1; +const LAYER_BUFFER_SIZE: usize = 128 * 1024; + +// we utilize in-memory buffers when generating the squashfs for files +// under this size. for files of or above this size, we open a file. +// the file is then read during writing. we want to reduce the number +// of open files during squashfs generation, so this limit should be set +// to something that limits the number of files on average, at the expense +// of increased memory usage. +// TODO: it may be wise to, during crawling of the image layers, infer this +// value from the size to file count ratio of all layers. +const SQUASHFS_MEMORY_BUFFER_LIMIT: usize = 8 * 1024 * 1024; + +pub struct ImageInfo { + pub image_squashfs: PathBuf, + pub manifest: ImageManifest, + pub config: ImageConfiguration, +} + +impl ImageInfo { + pub fn new( + squashfs: PathBuf, + manifest: ImageManifest, + config: ImageConfiguration, + ) -> Result { + Ok(ImageInfo { + image_squashfs: squashfs, + manifest, + config, + }) + } +} + +pub struct ImageCompiler<'a> { + cache: &'a ImageCache, +} + +impl ImageCompiler<'_> { + pub fn new(cache: &ImageCache) -> Result { + Ok(ImageCompiler { cache }) + } + + pub async fn compile(&self, image: &ImageName) -> Result { + debug!("compile image={image}"); + let mut tmp_dir = std::env::temp_dir().clone(); + tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); + + let mut image_dir = tmp_dir.clone(); + image_dir.push("image"); + fs::create_dir_all(&image_dir).await?; + + let mut layer_dir = tmp_dir.clone(); + layer_dir.push("layer"); + fs::create_dir_all(&layer_dir).await?; + + let mut squash_file = tmp_dir.clone(); + squash_file.push("image.squashfs"); + let info = self + .download_and_compile(image, &layer_dir, &image_dir, &squash_file) + .await?; + fs::remove_dir_all(&tmp_dir).await?; + Ok(info) + } + + async fn download_and_compile( + &self, + image: &ImageName, + layer_dir: &Path, + image_dir: &Path, + squash_file: &Path, + ) -> Result { + let downloader = + OciImageDownloader::new(layer_dir.to_path_buf(), OciRegistryPlatform::current()); + let resolved = downloader.resolve(image.clone()).await?; + let cache_key = format!( + "manifest={}:squashfs-version={}\n", + resolved.digest, IMAGE_SQUASHFS_VERSION + ); + let cache_digest = sha256::digest(cache_key); + + if let Some(cached) = self.cache.recall(&cache_digest).await? { + return Ok(cached); + } + + let local = downloader.download(resolved).await?; + for layer in local.layers { + debug!( + "process layer digest={} compression={:?}", + &layer.digest, layer.compression + ); + let mut archive = layer.archive().await?; + + let mut entries = archive.entries()?; + while let Some(entry) = entries.next().await { + let mut entry: Entry>>> = + entry?; + let path = entry.path()?; + let Some(name) = path.file_name() else { + return Err(anyhow!("unable to get file name")); + }; + let Some(name) = name.to_str() else { + return Err(anyhow!("unable to get file name as string")); + }; + + if name.starts_with(".wh.") { + self.process_whiteout_entry(&entry, name, &layer, image_dir) + .await?; + } else { + self.process_write_entry(&mut entry, &layer, image_dir) + .await?; + } + } + fs::remove_file(&layer.path).await?; + } + + self.squash(image_dir, squash_file)?; + let info = ImageInfo::new( + squash_file.to_path_buf(), + local.image.manifest, + local.config, + )?; + self.cache.store(&cache_digest, &info).await + } + + async fn process_whiteout_entry( + &self, + entry: &Entry>>>, + name: &str, + layer: &OciImageLayer, + image_dir: &Path, + ) -> Result<()> { + let dst = self.check_safe_entry(entry, image_dir)?; + let mut dst = dst.clone(); + dst.pop(); + + let opaque = name == ".wh..wh..opq"; + + if !opaque { + dst.push(name); + self.check_safe_path(&dst, image_dir)?; + } + + trace!( + "whiteout entry layer={} path={:?}", + &layer.digest, + entry.path()? + ); + + if opaque { + if dst.is_dir() { + let mut reader = fs::read_dir(dst).await?; + while let Some(entry) = reader.next_entry().await? { + let path = entry.path(); + if path.is_symlink() || path.is_file() { + fs::remove_file(&path).await?; + } else if path.is_dir() { + fs::remove_dir_all(&path).await?; + } else { + return Err(anyhow!("opaque whiteout entry did not exist")); + } + } + } else { + warn!( + "whiteout entry missing locally layer={} path={:?} local={:?}", + &layer.digest, + entry.path()?, + dst, + ); + } + } else if dst.is_file() || dst.is_symlink() { + fs::remove_file(&dst).await?; + } else if dst.is_dir() { + fs::remove_dir(&dst).await?; + } else { + warn!( + "whiteout entry missing locally layer={} path={:?} local={:?}", + &layer.digest, + entry.path()?, + dst, + ); + } + Ok(()) + } + + async fn process_write_entry( + &self, + entry: &mut Entry>>>, + layer: &OciImageLayer, + image_dir: &Path, + ) -> Result<()> { + trace!( + "unpack entry layer={} path={:?} type={:?}", + &layer.digest, + entry.path()?, + entry.header().entry_type() + ); + entry.unpack_in(image_dir).await?; + Ok(()) + } + + fn check_safe_entry( + &self, + entry: &Entry>>>, + image_dir: &Path, + ) -> Result { + let mut dst = image_dir.to_path_buf(); + dst.push(entry.path()?); + if let Some(name) = dst.file_name() { + if let Some(name) = name.to_str() { + if name.starts_with(".wh.") { + let copy = dst.clone(); + dst.pop(); + self.check_safe_path(&dst, image_dir)?; + return Ok(copy); + } + } + } + self.check_safe_path(&dst, image_dir)?; + Ok(dst) + } + + fn check_safe_path(&self, dst: &Path, image_dir: &Path) -> Result<()> { + let resolved = path_clean::clean(dst); + if !resolved.starts_with(image_dir) { + return Err(anyhow!("layer attempts to work outside image dir")); + } + Ok(()) + } + + fn squash(&self, image_dir: &Path, squash_file: &Path) -> Result<()> { + let mut writer = FilesystemWriter::default(); + writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); + let walk = WalkDir::new(image_dir).follow_links(false); + for entry in walk { + let entry = entry?; + let rel = entry + .path() + .strip_prefix(image_dir)? + .to_str() + .ok_or_else(|| anyhow!("failed to strip prefix of tmpdir"))?; + let rel = format!("/{}", rel); + trace!("squash write {}", rel); + let typ = entry.file_type(); + let metadata = std::fs::symlink_metadata(entry.path())?; + let uid = metadata.uid(); + let gid = metadata.gid(); + let mode = metadata.permissions().mode(); + let mtime = metadata.mtime(); + + if rel == "/" { + writer.set_root_uid(uid); + writer.set_root_gid(gid); + writer.set_root_mode(mode as u16); + continue; + } + + let header = NodeHeader { + permissions: mode as u16, + uid, + gid, + mtime: mtime as u32, + }; + if typ.is_symlink() { + let symlink = std::fs::read_link(entry.path())?; + let symlink = symlink + .to_str() + .ok_or_else(|| anyhow!("failed to read symlink"))?; + writer.push_symlink(symlink, rel, header)?; + } else if typ.is_dir() { + writer.push_dir(rel, header)?; + } else if typ.is_file() { + if metadata.size() >= SQUASHFS_MEMORY_BUFFER_LIMIT as u64 { + let reader = + BufReader::with_capacity(LAYER_BUFFER_SIZE, File::open(entry.path())?); + writer.push_file(reader, rel, header)?; + } else { + let cursor = Cursor::new(std::fs::read(entry.path())?); + writer.push_file(cursor, rel, header)?; + } + } else if typ.is_block_device() { + let device = metadata.dev(); + writer.push_block_device(device as u32, rel, header)?; + } else if typ.is_char_device() { + let device = metadata.dev(); + writer.push_char_device(device as u32, rel, header)?; + } else { + return Err(anyhow!("invalid file type")); + } + } + + std::fs::remove_dir_all(image_dir)?; + + let squash_file_path = squash_file + .to_str() + .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; + + let mut file = File::create(squash_file)?; + trace!("squash generate: {}", squash_file_path); + writer.write(&mut file)?; + Ok(()) + } +} diff --git a/crates/kratart/src/image/fetch.rs b/crates/kratart/src/image/fetch.rs index 6744891..3fe3a72 100644 --- a/crates/kratart/src/image/fetch.rs +++ b/crates/kratart/src/image/fetch.rs @@ -1,138 +1,147 @@ +use std::{path::PathBuf, pin::Pin}; + use anyhow::{anyhow, Result}; -use bytes::Bytes; -use oci_spec::image::{Arch, Descriptor, ImageIndex, ImageManifest, MediaType, Os, ToDockerV2S2}; -use reqwest::{Client, RequestBuilder, Response}; -use tokio::{fs::File, io::AsyncWriteExt}; -use url::Url; +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use log::debug; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2}; +use tokio::{ + fs::File, + io::{AsyncRead, BufReader}, +}; +use tokio_tar::Archive; -const MANIFEST_PICKER_PLATFORM: Os = Os::Linux; -const MANIFEST_PICKER_ARCHITECTURE: Arch = Arch::Amd64; +use super::{ + name::ImageName, + registry::{OciRegistryClient, OciRegistryPlatform}, +}; -pub struct RegistryClient { - agent: Client, - url: Url, +pub struct OciImageDownloader { + storage: PathBuf, + platform: OciRegistryPlatform, } -impl RegistryClient { - pub fn new(url: Url) -> Result { - Ok(RegistryClient { - agent: Client::new(), - url, +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum OciImageLayerCompression { + None, + Gzip, + Zstd, +} + +#[derive(Clone, Debug)] +pub struct OciImageLayer { + pub path: PathBuf, + pub digest: String, + pub compression: OciImageLayerCompression, +} + +impl OciImageLayer { + pub async fn decompress(&self) -> Result>> { + let file = File::open(&self.path).await?; + let reader = BufReader::new(file); + let reader: Pin> = match self.compression { + OciImageLayerCompression::None => Box::pin(reader), + OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)), + OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)), + }; + Ok(reader) + } + + pub async fn archive(&self) -> Result>>> { + let decompress = self.decompress().await?; + Ok(Archive::new(decompress)) + } +} + +#[derive(Clone, Debug)] +pub struct OciResolvedImage { + pub name: ImageName, + pub digest: String, + pub manifest: ImageManifest, +} + +#[derive(Clone, Debug)] +pub struct OciLocalImage { + pub image: OciResolvedImage, + pub config: ImageConfiguration, + pub layers: Vec, +} + +impl OciImageDownloader { + pub fn new(storage: PathBuf, platform: OciRegistryPlatform) -> OciImageDownloader { + OciImageDownloader { storage, platform } + } + + pub async fn resolve(&self, image: ImageName) -> Result { + debug!("download manifest image={}", image); + let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?; + let (manifest, digest) = client + .get_manifest_with_digest(&image.name, &image.reference) + .await?; + Ok(OciResolvedImage { + name: image, + digest, + manifest, }) } - async fn call(&mut self, req: RequestBuilder) -> Result { - self.agent.execute(req.build()?).await.map_err(|x| x.into()) - } - - pub async fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result { - let url = self - .url - .join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?; - let response = self.call(self.agent.get(url.as_str())).await?; - Ok(response.bytes().await?) - } - - pub async fn write_blob_to_file( - &mut self, - name: &str, - descriptor: &Descriptor, - mut dest: File, - ) -> Result { - let url = self - .url - .join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?; - let mut response = self.call(self.agent.get(url.as_str())).await?; - let mut size: u64 = 0; - while let Some(chunk) = response.chunk().await? { - dest.write_all(&chunk).await?; - size += chunk.len() as u64; + pub async fn download(&self, image: OciResolvedImage) -> Result { + let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?; + let config_bytes = client + .get_blob(&image.name.name, image.manifest.config()) + .await?; + let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; + let mut layers = Vec::new(); + for layer in image.manifest.layers() { + layers.push(self.download_layer(&image.name, layer, &mut client).await?); } - Ok(size) + Ok(OciLocalImage { + image, + config, + layers, + }) } - async fn get_raw_manifest_with_digest( - &mut self, - name: &str, - reference: &str, - ) -> Result<(ImageManifest, String)> { - let url = self - .url - .join(&format!("/v2/{}/manifests/{}", name, reference))?; - let accept = format!( - "{}, {}, {}, {}", - MediaType::ImageManifest.to_docker_v2s2()?, - MediaType::ImageManifest, - MediaType::ImageIndex, - MediaType::ImageIndex.to_docker_v2s2()?, + async fn download_layer( + &self, + image: &ImageName, + layer: &Descriptor, + client: &mut OciRegistryClient, + ) -> Result { + debug!( + "download layer digest={} size={}", + layer.digest(), + layer.size() ); - let response = self - .call(self.agent.get(url.as_str()).header("Accept", &accept)) - .await?; - let digest = response - .headers() - .get("Docker-Content-Digest") - .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? - .to_str()? - .to_string(); - let manifest = serde_json::from_str(&response.text().await?)?; - Ok((manifest, digest)) - } + let mut layer_path = self.storage.clone(); + layer_path.push(format!("{}.layer", layer.digest())); - pub async fn get_manifest_with_digest( - &mut self, - name: &str, - reference: &str, - ) -> Result<(ImageManifest, String)> { - let url = self - .url - .join(&format!("/v2/{}/manifests/{}", name, reference))?; - let accept = format!( - "{}, {}, {}, {}", - MediaType::ImageManifest.to_docker_v2s2()?, - MediaType::ImageManifest, - MediaType::ImageIndex, - MediaType::ImageIndex.to_docker_v2s2()?, - ); - let response = self - .call(self.agent.get(url.as_str()).header("Accept", &accept)) - .await?; - let content_type = response - .headers() - .get("Content-Type") - .ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))? - .to_str()?; - if content_type == MediaType::ImageIndex.to_string() - || content_type == MediaType::ImageIndex.to_docker_v2s2()? { - let index = serde_json::from_str(&response.text().await?)?; - let descriptor = self - .pick_manifest(index) - .ok_or_else(|| anyhow!("unable to pick manifest from index"))?; - return self - .get_raw_manifest_with_digest(name, descriptor.digest()) - .await; - } - let digest = response - .headers() - .get("Docker-Content-Digest") - .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? - .to_str()? - .to_string(); - let manifest = serde_json::from_str(&response.text().await?)?; - Ok((manifest, digest)) - } - - fn pick_manifest(&mut self, index: ImageIndex) -> Option { - for item in index.manifests() { - if let Some(platform) = item.platform() { - if *platform.os() == MANIFEST_PICKER_PLATFORM - && *platform.architecture() == MANIFEST_PICKER_ARCHITECTURE - { - return Some(item.clone()); - } + let file = tokio::fs::File::create(&layer_path).await?; + let size = client.write_blob_to_file(&image.name, layer, file).await?; + if layer.size() as u64 != size { + return Err(anyhow!( + "downloaded layer size differs from size in manifest", + )); } } - None + + let mut media_type = layer.media_type().clone(); + + // docker layer compatibility + if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? { + media_type = MediaType::ImageLayerGzip; + } + + let compression = match media_type { + MediaType::ImageLayer => OciImageLayerCompression::None, + MediaType::ImageLayerGzip => OciImageLayerCompression::Gzip, + MediaType::ImageLayerZstd => OciImageLayerCompression::Zstd, + other => return Err(anyhow!("found layer with unknown media type: {}", other)), + }; + Ok(OciImageLayer { + path: layer_path, + digest: layer.digest().clone(), + compression, + }) } } diff --git a/crates/kratart/src/image/mod.rs b/crates/kratart/src/image/mod.rs index e0d15da..e7272b6 100644 --- a/crates/kratart/src/image/mod.rs +++ b/crates/kratart/src/image/mod.rs @@ -1,414 +1,5 @@ pub mod cache; +pub mod compiler; pub mod fetch; pub mod name; - -use crate::image::cache::ImageCache; -use crate::image::fetch::RegistryClient; -use crate::image::name::ImageName; -use anyhow::{anyhow, Result}; -use backhand::compression::Compressor; -use backhand::{FilesystemCompressor, FilesystemWriter, NodeHeader}; -use flate2::read::GzDecoder; -use log::{debug, trace, warn}; -use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2}; -use std::fs::File; -use std::io::{BufReader, Cursor}; -use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; -use std::path::{Path, PathBuf}; -use std::{fs, io}; -use tar::{Archive, Entry}; -use uuid::Uuid; -use walkdir::WalkDir; - -pub const IMAGE_SQUASHFS_VERSION: u64 = 1; -const LAYER_BUFFER_SIZE: usize = 128 * 1024; - -// we utilize in-memory buffers when generating the squashfs for files -// under this size. for files of or above this size, we open a file. -// the file is then read during writing. we want to reduce the number -// of open files during squashfs generation, so this limit should be set -// to something that limits the number of files on average, at the expense -// of increased memory usage. -// TODO: it may be wise to, during crawling of the image layers, infer this -// value from the size to file count ratio of all layers. -const SQUASHFS_MEMORY_BUFFER_LIMIT: usize = 8 * 1024 * 1024; - -pub struct ImageInfo { - pub image_squashfs: PathBuf, - pub manifest: ImageManifest, - pub config: ImageConfiguration, -} - -impl ImageInfo { - fn new( - squashfs: PathBuf, - manifest: ImageManifest, - config: ImageConfiguration, - ) -> Result { - Ok(ImageInfo { - image_squashfs: squashfs, - manifest, - config, - }) - } -} - -pub struct ImageCompiler<'a> { - cache: &'a ImageCache, -} - -#[derive(Debug)] -enum LayerCompressionType { - None, - Gzip, - Zstd, -} - -struct LayerFile { - digest: String, - compression: LayerCompressionType, - path: PathBuf, -} - -impl LayerFile { - fn open_reader(&self) -> Result> { - Ok(match self.compression { - LayerCompressionType::None => Box::new(BufReader::with_capacity( - LAYER_BUFFER_SIZE, - File::open(&self.path)?, - )), - LayerCompressionType::Gzip => Box::new(GzDecoder::new(BufReader::with_capacity( - LAYER_BUFFER_SIZE, - File::open(&self.path)?, - ))), - LayerCompressionType::Zstd => Box::new(zstd::Decoder::new(BufReader::with_capacity( - LAYER_BUFFER_SIZE, - File::open(&self.path)?, - ))?), - }) - } -} - -impl ImageCompiler<'_> { - pub fn new(cache: &ImageCache) -> Result { - Ok(ImageCompiler { cache }) - } - - pub async fn compile(&self, image: &ImageName) -> Result { - debug!("compile image={image}"); - let mut tmp_dir = std::env::temp_dir().clone(); - tmp_dir.push(format!("krata-compile-{}", Uuid::new_v4())); - - let mut image_dir = tmp_dir.clone(); - image_dir.push("image"); - fs::create_dir_all(&image_dir)?; - - let mut layer_dir = tmp_dir.clone(); - layer_dir.push("layer"); - fs::create_dir_all(&layer_dir)?; - - let mut squash_file = tmp_dir.clone(); - squash_file.push("image.squashfs"); - let info = self - .download_and_compile(image, &layer_dir, &image_dir, &squash_file) - .await?; - fs::remove_dir_all(&tmp_dir)?; - Ok(info) - } - - async fn download_and_compile( - &self, - image: &ImageName, - layer_dir: &Path, - image_dir: &PathBuf, - squash_file: &PathBuf, - ) -> Result { - debug!( - "download manifest image={image}, image_dir={}", - image_dir.to_str().unwrap() - ); - let mut client = RegistryClient::new(image.registry_url()?)?; - let (manifest, digest) = client - .get_manifest_with_digest(&image.name, &image.reference) - .await?; - let cache_key = format!( - "manifest={}:squashfs-version={}\n", - digest, IMAGE_SQUASHFS_VERSION - ); - let cache_digest = sha256::digest(cache_key); - - if let Some(cached) = self.cache.recall(&cache_digest).await? { - return Ok(cached); - } - - debug!( - "download config digest={} size={}", - manifest.config().digest(), - manifest.config().size(), - ); - let config_bytes = client.get_blob(&image.name, manifest.config()).await?; - let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; - - let mut layers: Vec = Vec::new(); - for layer in manifest.layers() { - layers.push( - self.download_layer(image, layer, layer_dir, &mut client) - .await?, - ); - } - - for layer in layers { - debug!( - "process layer digest={} compression={:?}", - &layer.digest, layer.compression - ); - let mut archive = Archive::new(layer.open_reader()?); - for entry in archive.entries()? { - let mut entry = entry?; - let path = entry.path()?; - let Some(name) = path.file_name() else { - return Err(anyhow!("unable to get file name")); - }; - let Some(name) = name.to_str() else { - return Err(anyhow!("unable to get file name as string")); - }; - - if name.starts_with(".wh.") { - self.process_whiteout_entry(&entry, name, &layer, image_dir)?; - } else { - self.process_write_entry(&mut entry, &layer, image_dir)?; - } - } - fs::remove_file(&layer.path)?; - } - - self.squash(image_dir, squash_file)?; - let info = ImageInfo::new(squash_file.clone(), manifest.clone(), config)?; - self.cache.store(&cache_digest, &info).await - } - - fn process_whiteout_entry( - &self, - entry: &Entry, - name: &str, - layer: &LayerFile, - image_dir: &PathBuf, - ) -> Result<()> { - let dst = self.check_safe_entry(entry, image_dir)?; - let mut dst = dst.clone(); - dst.pop(); - - let opaque = name == ".wh..wh..opq"; - - if !opaque { - dst.push(name); - self.check_safe_path(&dst, image_dir)?; - } - - trace!( - "whiteout entry layer={} path={:?}", - &layer.digest, - entry.path()? - ); - - if opaque { - if dst.is_dir() { - for entry in fs::read_dir(dst)? { - let entry = entry?; - let path = entry.path(); - if path.is_symlink() || path.is_file() { - fs::remove_file(&path)?; - } else if path.is_dir() { - fs::remove_dir_all(&path)?; - } else { - return Err(anyhow!("opaque whiteout entry did not exist")); - } - } - } else { - warn!( - "whiteout entry missing locally layer={} path={:?} local={:?}", - &layer.digest, - entry.path()?, - dst, - ); - } - } else if dst.is_file() || dst.is_symlink() { - fs::remove_file(&dst)?; - } else if dst.is_dir() { - fs::remove_dir(&dst)?; - } else { - warn!( - "whiteout entry missing locally layer={} path={:?} local={:?}", - &layer.digest, - entry.path()?, - dst, - ); - } - Ok(()) - } - - fn process_write_entry( - &self, - entry: &mut Entry, - layer: &LayerFile, - image_dir: &PathBuf, - ) -> Result<()> { - trace!( - "unpack entry layer={} path={:?} type={:?}", - &layer.digest, - entry.path()?, - entry.header().entry_type() - ); - entry.unpack_in(image_dir)?; - Ok(()) - } - - fn check_safe_entry( - &self, - entry: &Entry, - image_dir: &PathBuf, - ) -> Result { - let mut dst = image_dir.clone(); - dst.push(entry.path()?); - if let Some(name) = dst.file_name() { - if let Some(name) = name.to_str() { - if name.starts_with(".wh.") { - let copy = dst.clone(); - dst.pop(); - self.check_safe_path(&dst, image_dir)?; - return Ok(copy); - } - } - } - self.check_safe_path(&dst, image_dir)?; - Ok(dst) - } - - fn check_safe_path(&self, dst: &PathBuf, image_dir: &PathBuf) -> Result<()> { - let resolved = path_clean::clean(dst); - if !resolved.starts_with(image_dir) { - return Err(anyhow!("layer attempts to work outside image dir")); - } - Ok(()) - } - - async fn download_layer( - &self, - image: &ImageName, - layer: &Descriptor, - layer_dir: &Path, - client: &mut RegistryClient, - ) -> Result { - debug!( - "download layer digest={} size={}", - layer.digest(), - layer.size() - ); - let mut layer_path = layer_dir.to_path_buf(); - layer_path.push(layer.digest()); - let mut tmp_path = layer_dir.to_path_buf(); - tmp_path.push(format!("{}.tmp", layer.digest())); - - { - let file = tokio::fs::File::create(&layer_path).await?; - let size = client.write_blob_to_file(&image.name, layer, file).await?; - if layer.size() as u64 != size { - return Err(anyhow!( - "downloaded layer size differs from size in manifest", - )); - } - } - - let mut media_type = layer.media_type().clone(); - - // docker layer compatibility - if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? { - media_type = MediaType::ImageLayerGzip; - } - - let compression = match media_type { - MediaType::ImageLayer => LayerCompressionType::None, - MediaType::ImageLayerGzip => LayerCompressionType::Gzip, - MediaType::ImageLayerZstd => LayerCompressionType::Zstd, - other => return Err(anyhow!("found layer with unknown media type: {}", other)), - }; - Ok(LayerFile { - digest: layer.digest().clone(), - compression, - path: layer_path, - }) - } - - fn squash(&self, image_dir: &PathBuf, squash_file: &PathBuf) -> Result<()> { - let mut writer = FilesystemWriter::default(); - writer.set_compressor(FilesystemCompressor::new(Compressor::Gzip, None)?); - let walk = WalkDir::new(image_dir).follow_links(false); - for entry in walk { - let entry = entry?; - let rel = entry - .path() - .strip_prefix(image_dir)? - .to_str() - .ok_or_else(|| anyhow!("failed to strip prefix of tmpdir"))?; - let rel = format!("/{}", rel); - trace!("squash write {}", rel); - let typ = entry.file_type(); - let metadata = fs::symlink_metadata(entry.path())?; - let uid = metadata.uid(); - let gid = metadata.gid(); - let mode = metadata.permissions().mode(); - let mtime = metadata.mtime(); - - if rel == "/" { - writer.set_root_uid(uid); - writer.set_root_gid(gid); - writer.set_root_mode(mode as u16); - continue; - } - - let header = NodeHeader { - permissions: mode as u16, - uid, - gid, - mtime: mtime as u32, - }; - if typ.is_symlink() { - let symlink = fs::read_link(entry.path())?; - let symlink = symlink - .to_str() - .ok_or_else(|| anyhow!("failed to read symlink"))?; - writer.push_symlink(symlink, rel, header)?; - } else if typ.is_dir() { - writer.push_dir(rel, header)?; - } else if typ.is_file() { - if metadata.size() >= SQUASHFS_MEMORY_BUFFER_LIMIT as u64 { - let reader = - BufReader::with_capacity(LAYER_BUFFER_SIZE, File::open(entry.path())?); - writer.push_file(reader, rel, header)?; - } else { - let cursor = Cursor::new(fs::read(entry.path())?); - writer.push_file(cursor, rel, header)?; - } - } else if typ.is_block_device() { - let device = metadata.dev(); - writer.push_block_device(device as u32, rel, header)?; - } else if typ.is_char_device() { - let device = metadata.dev(); - writer.push_char_device(device as u32, rel, header)?; - } else { - return Err(anyhow!("invalid file type")); - } - } - - fs::remove_dir_all(image_dir)?; - - let squash_file_path = squash_file - .to_str() - .ok_or_else(|| anyhow!("failed to convert squashfs string"))?; - - let mut file = File::create(squash_file)?; - trace!("squash generate: {}", squash_file_path); - writer.write(&mut file)?; - Ok(()) - } -} +pub mod registry; diff --git a/crates/kratart/src/image/name.rs b/crates/kratart/src/image/name.rs index a6b1d2c..8dffbcf 100644 --- a/crates/kratart/src/image/name.rs +++ b/crates/kratart/src/image/name.rs @@ -72,7 +72,6 @@ impl ImageName { }) } - /// URL for OCI distribution API endpoint pub fn registry_url(&self) -> Result { let hostname = if let Some(port) = self.port { format!("{}:{}", self.hostname, port) diff --git a/crates/kratart/src/image/registry.rs b/crates/kratart/src/image/registry.rs new file mode 100644 index 0000000..633f51f --- /dev/null +++ b/crates/kratart/src/image/registry.rs @@ -0,0 +1,173 @@ +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use oci_spec::image::{Arch, Descriptor, ImageIndex, ImageManifest, MediaType, Os, ToDockerV2S2}; +use reqwest::{Client, RequestBuilder, Response}; +use tokio::{fs::File, io::AsyncWriteExt}; +use url::Url; + +#[derive(Clone, Debug)] +pub struct OciRegistryPlatform { + pub os: Os, + pub arch: Arch, +} + +impl OciRegistryPlatform { + #[cfg(target_arch = "x86_64")] + const CURRENT_ARCH: Arch = Arch::Amd64; + #[cfg(target_arch = "aarch64")] + const CURRENT_ARCH: Arch = Arch::ARM64; + + pub fn new(os: Os, arch: Arch) -> OciRegistryPlatform { + OciRegistryPlatform { os, arch } + } + + pub fn current() -> OciRegistryPlatform { + OciRegistryPlatform { + os: Os::Linux, + arch: OciRegistryPlatform::CURRENT_ARCH, + } + } +} + +pub struct OciRegistryClient { + agent: Client, + url: Url, + platform: OciRegistryPlatform, +} + +impl OciRegistryClient { + pub fn new(url: Url, platform: OciRegistryPlatform) -> Result { + Ok(OciRegistryClient { + agent: Client::new(), + url, + platform, + }) + } + + async fn call(&mut self, req: RequestBuilder) -> Result { + self.agent.execute(req.build()?).await.map_err(|x| x.into()) + } + + pub async fn get_blob>( + &mut self, + name: N, + descriptor: &Descriptor, + ) -> Result { + let url = self.url.join(&format!( + "/v2/{}/blobs/{}", + name.as_ref(), + descriptor.digest() + ))?; + let response = self.call(self.agent.get(url.as_str())).await?; + Ok(response.bytes().await?) + } + + pub async fn write_blob_to_file>( + &mut self, + name: N, + descriptor: &Descriptor, + mut dest: File, + ) -> Result { + let url = self.url.join(&format!( + "/v2/{}/blobs/{}", + name.as_ref(), + descriptor.digest() + ))?; + let mut response = self.call(self.agent.get(url.as_str())).await?; + let mut size: u64 = 0; + while let Some(chunk) = response.chunk().await? { + dest.write_all(&chunk).await?; + size += chunk.len() as u64; + } + Ok(size) + } + + async fn get_raw_manifest_with_digest, R: AsRef>( + &mut self, + name: N, + reference: R, + ) -> Result<(ImageManifest, String)> { + let url = self.url.join(&format!( + "/v2/{}/manifests/{}", + name.as_ref(), + reference.as_ref() + ))?; + let accept = format!( + "{}, {}, {}, {}", + MediaType::ImageManifest.to_docker_v2s2()?, + MediaType::ImageManifest, + MediaType::ImageIndex, + MediaType::ImageIndex.to_docker_v2s2()?, + ); + let response = self + .call(self.agent.get(url.as_str()).header("Accept", &accept)) + .await?; + let digest = response + .headers() + .get("Docker-Content-Digest") + .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? + .to_str()? + .to_string(); + let manifest = serde_json::from_str(&response.text().await?)?; + Ok((manifest, digest)) + } + + pub async fn get_manifest_with_digest, R: AsRef>( + &mut self, + name: N, + reference: R, + ) -> Result<(ImageManifest, String)> { + let url = self.url.join(&format!( + "/v2/{}/manifests/{}", + name.as_ref(), + reference.as_ref() + ))?; + let accept = format!( + "{}, {}, {}, {}", + MediaType::ImageManifest.to_docker_v2s2()?, + MediaType::ImageManifest, + MediaType::ImageIndex, + MediaType::ImageIndex.to_docker_v2s2()?, + ); + let response = self + .call(self.agent.get(url.as_str()).header("Accept", &accept)) + .await?; + let content_type = response + .headers() + .get("Content-Type") + .ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))? + .to_str()?; + if content_type == MediaType::ImageIndex.to_string() + || content_type == MediaType::ImageIndex.to_docker_v2s2()? + { + let index = serde_json::from_str(&response.text().await?)?; + let descriptor = self + .pick_manifest(index) + .ok_or_else(|| anyhow!("unable to pick manifest from index"))?; + return self + .get_raw_manifest_with_digest(name, descriptor.digest()) + .await; + } + let digest = response + .headers() + .get("Docker-Content-Digest") + .ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))? + .to_str()? + .to_string(); + let manifest = serde_json::from_str(&response.text().await?)?; + Ok((manifest, digest)) + } + + fn pick_manifest(&mut self, index: ImageIndex) -> Option { + for item in index.manifests() { + if let Some(platform) = item.platform() { + if *platform.os() == self.platform.os + && *platform.architecture() == self.platform.arch + { + return Some(item.clone()); + } + } + } + None + } +} diff --git a/crates/kratart/src/launch/mod.rs b/crates/kratart/src/launch/mod.rs index 940ba4a..d522d3d 100644 --- a/crates/kratart/src/launch/mod.rs +++ b/crates/kratart/src/launch/mod.rs @@ -12,7 +12,11 @@ use xenclient::{DomainConfig, DomainDisk, DomainNetworkInterface}; use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; -use crate::image::{cache::ImageCache, name::ImageName, ImageCompiler, ImageInfo}; +use crate::image::{ + cache::ImageCache, + compiler::{ImageCompiler, ImageInfo}, + name::ImageName, +}; use crate::RuntimeContext; use super::{GuestInfo, GuestState}; diff --git a/crates/xen/xenclient/src/elfloader.rs b/crates/xen/xenclient/src/elfloader.rs index 655a675..7822ea9 100644 --- a/crates/xen/xenclient/src/elfloader.rs +++ b/crates/xen/xenclient/src/elfloader.rs @@ -62,11 +62,6 @@ impl ElfImageLoader { ElfImageLoader { data } } - pub fn load_file(path: &str) -> Result { - let data = std::fs::read(path)?; - Ok(ElfImageLoader::new(data)) - } - pub fn load_gz(data: &[u8]) -> Result { let buff = BufReader::new(data); let image = ElfImageLoader::read_one_stream(&mut GzDecoder::new(buff))?;