mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-04 13:41:31 +00:00
kratart: async image extraction
This commit is contained in:
@ -1,138 +1,147 @@
|
||||
use std::{path::PathBuf, pin::Pin};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use bytes::Bytes;
|
||||
use oci_spec::image::{Arch, Descriptor, ImageIndex, ImageManifest, MediaType, Os, ToDockerV2S2};
|
||||
use reqwest::{Client, RequestBuilder, Response};
|
||||
use tokio::{fs::File, io::AsyncWriteExt};
|
||||
use url::Url;
|
||||
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
|
||||
use log::debug;
|
||||
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2};
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncRead, BufReader},
|
||||
};
|
||||
use tokio_tar::Archive;
|
||||
|
||||
const MANIFEST_PICKER_PLATFORM: Os = Os::Linux;
|
||||
const MANIFEST_PICKER_ARCHITECTURE: Arch = Arch::Amd64;
|
||||
use super::{
|
||||
name::ImageName,
|
||||
registry::{OciRegistryClient, OciRegistryPlatform},
|
||||
};
|
||||
|
||||
pub struct RegistryClient {
|
||||
agent: Client,
|
||||
url: Url,
|
||||
pub struct OciImageDownloader {
|
||||
storage: PathBuf,
|
||||
platform: OciRegistryPlatform,
|
||||
}
|
||||
|
||||
impl RegistryClient {
|
||||
pub fn new(url: Url) -> Result<RegistryClient> {
|
||||
Ok(RegistryClient {
|
||||
agent: Client::new(),
|
||||
url,
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum OciImageLayerCompression {
|
||||
None,
|
||||
Gzip,
|
||||
Zstd,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciImageLayer {
|
||||
pub path: PathBuf,
|
||||
pub digest: String,
|
||||
pub compression: OciImageLayerCompression,
|
||||
}
|
||||
|
||||
impl OciImageLayer {
|
||||
pub async fn decompress(&self) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
|
||||
let file = File::open(&self.path).await?;
|
||||
let reader = BufReader::new(file);
|
||||
let reader: Pin<Box<dyn AsyncRead + Send>> = match self.compression {
|
||||
OciImageLayerCompression::None => Box::pin(reader),
|
||||
OciImageLayerCompression::Gzip => Box::pin(GzipDecoder::new(reader)),
|
||||
OciImageLayerCompression::Zstd => Box::pin(ZstdDecoder::new(reader)),
|
||||
};
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
pub async fn archive(&self) -> Result<Archive<Pin<Box<dyn AsyncRead + Send>>>> {
|
||||
let decompress = self.decompress().await?;
|
||||
Ok(Archive::new(decompress))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciResolvedImage {
|
||||
pub name: ImageName,
|
||||
pub digest: String,
|
||||
pub manifest: ImageManifest,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OciLocalImage {
|
||||
pub image: OciResolvedImage,
|
||||
pub config: ImageConfiguration,
|
||||
pub layers: Vec<OciImageLayer>,
|
||||
}
|
||||
|
||||
impl OciImageDownloader {
|
||||
pub fn new(storage: PathBuf, platform: OciRegistryPlatform) -> OciImageDownloader {
|
||||
OciImageDownloader { storage, platform }
|
||||
}
|
||||
|
||||
pub async fn resolve(&self, image: ImageName) -> Result<OciResolvedImage> {
|
||||
debug!("download manifest image={}", image);
|
||||
let mut client = OciRegistryClient::new(image.registry_url()?, self.platform.clone())?;
|
||||
let (manifest, digest) = client
|
||||
.get_manifest_with_digest(&image.name, &image.reference)
|
||||
.await?;
|
||||
Ok(OciResolvedImage {
|
||||
name: image,
|
||||
digest,
|
||||
manifest,
|
||||
})
|
||||
}
|
||||
|
||||
async fn call(&mut self, req: RequestBuilder) -> Result<Response> {
|
||||
self.agent.execute(req.build()?).await.map_err(|x| x.into())
|
||||
}
|
||||
|
||||
pub async fn get_blob(&mut self, name: &str, descriptor: &Descriptor) -> Result<Bytes> {
|
||||
let url = self
|
||||
.url
|
||||
.join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?;
|
||||
let response = self.call(self.agent.get(url.as_str())).await?;
|
||||
Ok(response.bytes().await?)
|
||||
}
|
||||
|
||||
pub async fn write_blob_to_file(
|
||||
&mut self,
|
||||
name: &str,
|
||||
descriptor: &Descriptor,
|
||||
mut dest: File,
|
||||
) -> Result<u64> {
|
||||
let url = self
|
||||
.url
|
||||
.join(&format!("/v2/{}/blobs/{}", name, descriptor.digest()))?;
|
||||
let mut response = self.call(self.agent.get(url.as_str())).await?;
|
||||
let mut size: u64 = 0;
|
||||
while let Some(chunk) = response.chunk().await? {
|
||||
dest.write_all(&chunk).await?;
|
||||
size += chunk.len() as u64;
|
||||
pub async fn download(&self, image: OciResolvedImage) -> Result<OciLocalImage> {
|
||||
let mut client = OciRegistryClient::new(image.name.registry_url()?, self.platform.clone())?;
|
||||
let config_bytes = client
|
||||
.get_blob(&image.name.name, image.manifest.config())
|
||||
.await?;
|
||||
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
|
||||
let mut layers = Vec::new();
|
||||
for layer in image.manifest.layers() {
|
||||
layers.push(self.download_layer(&image.name, layer, &mut client).await?);
|
||||
}
|
||||
Ok(size)
|
||||
Ok(OciLocalImage {
|
||||
image,
|
||||
config,
|
||||
layers,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_raw_manifest_with_digest(
|
||||
&mut self,
|
||||
name: &str,
|
||||
reference: &str,
|
||||
) -> Result<(ImageManifest, String)> {
|
||||
let url = self
|
||||
.url
|
||||
.join(&format!("/v2/{}/manifests/{}", name, reference))?;
|
||||
let accept = format!(
|
||||
"{}, {}, {}, {}",
|
||||
MediaType::ImageManifest.to_docker_v2s2()?,
|
||||
MediaType::ImageManifest,
|
||||
MediaType::ImageIndex,
|
||||
MediaType::ImageIndex.to_docker_v2s2()?,
|
||||
async fn download_layer(
|
||||
&self,
|
||||
image: &ImageName,
|
||||
layer: &Descriptor,
|
||||
client: &mut OciRegistryClient,
|
||||
) -> Result<OciImageLayer> {
|
||||
debug!(
|
||||
"download layer digest={} size={}",
|
||||
layer.digest(),
|
||||
layer.size()
|
||||
);
|
||||
let response = self
|
||||
.call(self.agent.get(url.as_str()).header("Accept", &accept))
|
||||
.await?;
|
||||
let digest = response
|
||||
.headers()
|
||||
.get("Docker-Content-Digest")
|
||||
.ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))?
|
||||
.to_str()?
|
||||
.to_string();
|
||||
let manifest = serde_json::from_str(&response.text().await?)?;
|
||||
Ok((manifest, digest))
|
||||
}
|
||||
let mut layer_path = self.storage.clone();
|
||||
layer_path.push(format!("{}.layer", layer.digest()));
|
||||
|
||||
pub async fn get_manifest_with_digest(
|
||||
&mut self,
|
||||
name: &str,
|
||||
reference: &str,
|
||||
) -> Result<(ImageManifest, String)> {
|
||||
let url = self
|
||||
.url
|
||||
.join(&format!("/v2/{}/manifests/{}", name, reference))?;
|
||||
let accept = format!(
|
||||
"{}, {}, {}, {}",
|
||||
MediaType::ImageManifest.to_docker_v2s2()?,
|
||||
MediaType::ImageManifest,
|
||||
MediaType::ImageIndex,
|
||||
MediaType::ImageIndex.to_docker_v2s2()?,
|
||||
);
|
||||
let response = self
|
||||
.call(self.agent.get(url.as_str()).header("Accept", &accept))
|
||||
.await?;
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get("Content-Type")
|
||||
.ok_or_else(|| anyhow!("registry response did not have a Content-Type header"))?
|
||||
.to_str()?;
|
||||
if content_type == MediaType::ImageIndex.to_string()
|
||||
|| content_type == MediaType::ImageIndex.to_docker_v2s2()?
|
||||
{
|
||||
let index = serde_json::from_str(&response.text().await?)?;
|
||||
let descriptor = self
|
||||
.pick_manifest(index)
|
||||
.ok_or_else(|| anyhow!("unable to pick manifest from index"))?;
|
||||
return self
|
||||
.get_raw_manifest_with_digest(name, descriptor.digest())
|
||||
.await;
|
||||
}
|
||||
let digest = response
|
||||
.headers()
|
||||
.get("Docker-Content-Digest")
|
||||
.ok_or_else(|| anyhow!("fetching manifest did not yield a content digest"))?
|
||||
.to_str()?
|
||||
.to_string();
|
||||
let manifest = serde_json::from_str(&response.text().await?)?;
|
||||
Ok((manifest, digest))
|
||||
}
|
||||
|
||||
fn pick_manifest(&mut self, index: ImageIndex) -> Option<Descriptor> {
|
||||
for item in index.manifests() {
|
||||
if let Some(platform) = item.platform() {
|
||||
if *platform.os() == MANIFEST_PICKER_PLATFORM
|
||||
&& *platform.architecture() == MANIFEST_PICKER_ARCHITECTURE
|
||||
{
|
||||
return Some(item.clone());
|
||||
}
|
||||
let file = tokio::fs::File::create(&layer_path).await?;
|
||||
let size = client.write_blob_to_file(&image.name, layer, file).await?;
|
||||
if layer.size() as u64 != size {
|
||||
return Err(anyhow!(
|
||||
"downloaded layer size differs from size in manifest",
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
|
||||
let mut media_type = layer.media_type().clone();
|
||||
|
||||
// docker layer compatibility
|
||||
if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? {
|
||||
media_type = MediaType::ImageLayerGzip;
|
||||
}
|
||||
|
||||
let compression = match media_type {
|
||||
MediaType::ImageLayer => OciImageLayerCompression::None,
|
||||
MediaType::ImageLayerGzip => OciImageLayerCompression::Gzip,
|
||||
MediaType::ImageLayerZstd => OciImageLayerCompression::Zstd,
|
||||
other => return Err(anyhow!("found layer with unknown media type: {}", other)),
|
||||
};
|
||||
Ok(OciImageLayer {
|
||||
path: layer_path,
|
||||
digest: layer.digest().clone(),
|
||||
compression,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user