mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 21:21:32 +00:00
krata: implement image seeding backend
This commit is contained in:
@ -10,12 +10,15 @@ async fn main() -> Result<()> {
|
|||||||
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
|
env_logger::Builder::from_env(Env::default().default_filter_or("warn")).init();
|
||||||
|
|
||||||
let image = ImageName::parse(&args().nth(1).unwrap())?;
|
let image = ImageName::parse(&args().nth(1).unwrap())?;
|
||||||
|
let seed = args().nth(2).map(PathBuf::from);
|
||||||
|
|
||||||
let cache_dir = PathBuf::from("krata-cache");
|
let cache_dir = PathBuf::from("krata-cache");
|
||||||
if !cache_dir.exists() {
|
if !cache_dir.exists() {
|
||||||
fs::create_dir(&cache_dir).await?;
|
fs::create_dir(&cache_dir).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let cache = ImageCache::new(&cache_dir)?;
|
let cache = ImageCache::new(&cache_dir)?;
|
||||||
let compiler = ImageCompiler::new(&cache)?;
|
let compiler = ImageCompiler::new(&cache, seed)?;
|
||||||
let info = compiler.compile(&image).await?;
|
let info = compiler.compile(&image).await?;
|
||||||
println!(
|
println!(
|
||||||
"generated squashfs of {} to {}",
|
"generated squashfs of {} to {}",
|
||||||
|
@ -55,11 +55,12 @@ impl ImageInfo {
|
|||||||
|
|
||||||
pub struct ImageCompiler<'a> {
|
pub struct ImageCompiler<'a> {
|
||||||
cache: &'a ImageCache,
|
cache: &'a ImageCache,
|
||||||
|
seed: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ImageCompiler<'_> {
|
impl ImageCompiler<'_> {
|
||||||
pub fn new(cache: &ImageCache) -> Result<ImageCompiler> {
|
pub fn new(cache: &ImageCache, seed: Option<PathBuf>) -> Result<ImageCompiler> {
|
||||||
Ok(ImageCompiler { cache })
|
Ok(ImageCompiler { cache, seed })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn compile(&self, image: &ImageName) -> Result<ImageInfo> {
|
pub async fn compile(&self, image: &ImageName) -> Result<ImageInfo> {
|
||||||
@ -91,8 +92,11 @@ impl ImageCompiler<'_> {
|
|||||||
image_dir: &Path,
|
image_dir: &Path,
|
||||||
squash_file: &Path,
|
squash_file: &Path,
|
||||||
) -> Result<ImageInfo> {
|
) -> Result<ImageInfo> {
|
||||||
let downloader =
|
let downloader = OciImageDownloader::new(
|
||||||
OciImageDownloader::new(layer_dir.to_path_buf(), OciRegistryPlatform::current());
|
self.seed.clone(),
|
||||||
|
layer_dir.to_path_buf(),
|
||||||
|
OciRegistryPlatform::current(),
|
||||||
|
);
|
||||||
let resolved = downloader.resolve(image.clone()).await?;
|
let resolved = downloader.resolve(image.clone()).await?;
|
||||||
let cache_key = format!(
|
let cache_key = format!(
|
||||||
"manifest={}:squashfs-version={}\n",
|
"manifest={}:squashfs-version={}\n",
|
||||||
|
@ -1,13 +1,20 @@
|
|||||||
use std::{path::PathBuf, pin::Pin};
|
use std::{
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
pin::Pin,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
|
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2};
|
use oci_spec::image::{
|
||||||
|
Descriptor, ImageConfiguration, ImageIndex, ImageManifest, MediaType, ToDockerV2S2,
|
||||||
|
};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{AsyncRead, BufReader},
|
io::{AsyncRead, AsyncReadExt, BufReader},
|
||||||
};
|
};
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
use tokio_tar::Archive;
|
use tokio_tar::Archive;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
@ -16,6 +23,7 @@ use super::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub struct OciImageDownloader {
|
pub struct OciImageDownloader {
|
||||||
|
seed: Option<PathBuf>,
|
||||||
storage: PathBuf,
|
storage: PathBuf,
|
||||||
platform: OciRegistryPlatform,
|
platform: OciRegistryPlatform,
|
||||||
}
|
}
|
||||||
@ -67,12 +75,122 @@ pub struct OciLocalImage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl OciImageDownloader {
|
impl OciImageDownloader {
|
||||||
pub fn new(storage: PathBuf, platform: OciRegistryPlatform) -> OciImageDownloader {
|
pub fn new(
|
||||||
OciImageDownloader { storage, platform }
|
seed: Option<PathBuf>,
|
||||||
|
storage: PathBuf,
|
||||||
|
platform: OciRegistryPlatform,
|
||||||
|
) -> OciImageDownloader {
|
||||||
|
OciImageDownloader {
|
||||||
|
seed,
|
||||||
|
storage,
|
||||||
|
platform,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_seed_json_blob<T: DeserializeOwned>(
|
||||||
|
&self,
|
||||||
|
descriptor: &Descriptor,
|
||||||
|
) -> Result<Option<T>> {
|
||||||
|
let digest = descriptor.digest();
|
||||||
|
let Some((digest_type, digest_content)) = digest.split_once(':') else {
|
||||||
|
return Err(anyhow!("digest content was not properly formatted"));
|
||||||
|
};
|
||||||
|
let want = format!("blobs/{}/{}", digest_type, digest_content);
|
||||||
|
self.load_seed_json(&want).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_seed_json<T: DeserializeOwned>(&self, want: &str) -> Result<Option<T>> {
|
||||||
|
let Some(ref seed) = self.seed else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let file = File::open(seed).await?;
|
||||||
|
let mut archive = Archive::new(file);
|
||||||
|
let mut entries = archive.entries()?;
|
||||||
|
while let Some(entry) = entries.next().await {
|
||||||
|
let mut entry = entry?;
|
||||||
|
let path = String::from_utf8(entry.path_bytes().to_vec())?;
|
||||||
|
if path == want {
|
||||||
|
let mut content = String::new();
|
||||||
|
entry.read_to_string(&mut content).await?;
|
||||||
|
let data = serde_json::from_str::<T>(&content)?;
|
||||||
|
return Ok(Some(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn extract_seed_blob(&self, descriptor: &Descriptor, to: &Path) -> Result<bool> {
|
||||||
|
let Some(ref seed) = self.seed else {
|
||||||
|
return Ok(false);
|
||||||
|
};
|
||||||
|
|
||||||
|
let digest = descriptor.digest();
|
||||||
|
let Some((digest_type, digest_content)) = digest.split_once(':') else {
|
||||||
|
return Err(anyhow!("digest content was not properly formatted"));
|
||||||
|
};
|
||||||
|
let want = format!("blobs/{}/{}", digest_type, digest_content);
|
||||||
|
|
||||||
|
let seed = File::open(seed).await?;
|
||||||
|
let mut archive = Archive::new(seed);
|
||||||
|
let mut entries = archive.entries()?;
|
||||||
|
while let Some(entry) = entries.next().await {
|
||||||
|
let mut entry = entry?;
|
||||||
|
let path = String::from_utf8(entry.path_bytes().to_vec())?;
|
||||||
|
if path == want {
|
||||||
|
let mut file = File::create(to).await?;
|
||||||
|
tokio::io::copy(&mut entry, &mut file).await?;
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn resolve(&self, image: ImageName) -> Result<OciResolvedImage> {
|
pub async fn resolve(&self, image: ImageName) -> Result<OciResolvedImage> {
|
||||||
debug!("download manifest image={}", image);
|
debug!("resolve manifest image={}", image);
|
||||||
|
|
||||||
|
if let Some(index) = self.load_seed_json::<ImageIndex>("index.json").await? {
|
||||||
|
let mut found: Option<&Descriptor> = None;
|
||||||
|
for manifest in index.manifests() {
|
||||||
|
let Some(annotations) = manifest.annotations() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(image_name) = annotations.get("io.containerd.image.name") else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if *image_name != image.to_string() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(platform) = manifest.platform() {
|
||||||
|
if *platform.architecture() != self.platform.arch
|
||||||
|
|| *platform.os() != self.platform.os
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
found = Some(manifest);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(found) = found {
|
||||||
|
if let Some(manifest) = self.load_seed_json_blob(found).await? {
|
||||||
|
debug!(
|
||||||
|
"found seeded manifest image={} manifest={}",
|
||||||
|
image,
|
||||||
|
found.digest()
|
||||||
|
);
|
||||||
|
return Ok(OciResolvedImage {
|
||||||
|
name: image,
|
||||||
|
digest: found.digest().clone(),
|
||||||
|
manifest,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
|
let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
|
||||||
let (manifest, digest) = client
|
let (manifest, digest) = client
|
||||||
.get_manifest_with_digest(&image.name, &image.reference)
|
.get_manifest_with_digest(&image.name, &image.reference)
|
||||||
@ -85,14 +203,23 @@ impl OciImageDownloader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn download(&self, image: OciResolvedImage) -> Result<OciLocalImage> {
|
pub async fn download(&self, image: OciResolvedImage) -> Result<OciLocalImage> {
|
||||||
|
let config: ImageConfiguration;
|
||||||
|
|
||||||
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
|
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
|
||||||
|
if let Some(seeded) = self
|
||||||
|
.load_seed_json_blob::<ImageConfiguration>(image.manifest.config())
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
config = seeded;
|
||||||
|
} else {
|
||||||
let config_bytes = client
|
let config_bytes = client
|
||||||
.get_blob(&image.name.name, image.manifest.config())
|
.get_blob(&image.name.name, image.manifest.config())
|
||||||
.await?;
|
.await?;
|
||||||
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
|
config = serde_json::from_slice(&config_bytes)?;
|
||||||
|
}
|
||||||
let mut layers = Vec::new();
|
let mut layers = Vec::new();
|
||||||
for layer in image.manifest.layers() {
|
for layer in image.manifest.layers() {
|
||||||
layers.push(self.download_layer(&image.name, layer, &mut client).await?);
|
layers.push(self.acquire_layer(&image.name, layer, &mut client).await?);
|
||||||
}
|
}
|
||||||
Ok(OciLocalImage {
|
Ok(OciLocalImage {
|
||||||
image,
|
image,
|
||||||
@ -101,22 +228,23 @@ impl OciImageDownloader {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download_layer(
|
async fn acquire_layer(
|
||||||
&self,
|
&self,
|
||||||
image: &ImageName,
|
image: &ImageName,
|
||||||
layer: &Descriptor,
|
layer: &Descriptor,
|
||||||
client: &mut OciRegistryClient,
|
client: &mut OciRegistryClient,
|
||||||
) -> Result<OciImageLayer> {
|
) -> Result<OciImageLayer> {
|
||||||
debug!(
|
debug!(
|
||||||
"download layer digest={} size={}",
|
"acquire layer digest={} size={}",
|
||||||
layer.digest(),
|
layer.digest(),
|
||||||
layer.size()
|
layer.size()
|
||||||
);
|
);
|
||||||
let mut layer_path = self.storage.clone();
|
let mut layer_path = self.storage.clone();
|
||||||
layer_path.push(format!("{}.layer", layer.digest()));
|
layer_path.push(format!("{}.layer", layer.digest()));
|
||||||
|
|
||||||
{
|
let seeded = self.extract_seed_blob(layer, &layer_path).await?;
|
||||||
let file = tokio::fs::File::create(&layer_path).await?;
|
if !seeded {
|
||||||
|
let file = File::create(&layer_path).await?;
|
||||||
let size = client.write_blob_to_file(&image.name, layer, file).await?;
|
let size = client.write_blob_to_file(&image.name, layer, file).await?;
|
||||||
if layer.size() as u64 != size {
|
if layer.size() as u64 != size {
|
||||||
return Err(anyhow!(
|
return Err(anyhow!(
|
||||||
|
@ -218,7 +218,7 @@ impl GuestLauncher {
|
|||||||
|
|
||||||
async fn compile(&self, image: &str, image_cache: &ImageCache) -> Result<ImageInfo> {
|
async fn compile(&self, image: &str, image_cache: &ImageCache) -> Result<ImageInfo> {
|
||||||
let image = ImageName::parse(image)?;
|
let image = ImageName::parse(image)?;
|
||||||
let compiler = ImageCompiler::new(image_cache)?;
|
let compiler = ImageCompiler::new(image_cache, None)?;
|
||||||
compiler.compile(&image).await
|
compiler.compile(&image).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user