From 83264839dd537cbe72f15002f95e736bd6b91b02 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Sat, 20 Jan 2024 12:52:51 -0800 Subject: [PATCH] hypha: implement enhanced heuristics for reducing the use of file descriptors --- hypha/src/image/mod.rs | 261 +++++++++++++++++++++++++++-------------- 1 file changed, 175 insertions(+), 86 deletions(-) diff --git a/hypha/src/image/mod.rs b/hypha/src/image/mod.rs index ef6aaad..54253c9 100644 --- a/hypha/src/image/mod.rs +++ b/hypha/src/image/mod.rs @@ -8,18 +8,29 @@ use crate::image::fetch::RegistryClient; use crate::image::name::ImageName; use backhand::{FilesystemWriter, NodeHeader}; use flate2::read::GzDecoder; -use log::{debug, trace}; -use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; -use std::fs; +use log::{debug, trace, warn}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2}; use std::fs::File; -use std::io::{copy, BufReader, Seek, SeekFrom, Write}; +use std::io::{BufReader, Cursor}; use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; use std::path::{Path, PathBuf}; -use tar::Entry; +use std::{fs, io}; +use tar::{Archive, Entry}; use uuid::Uuid; use walkdir::WalkDir; pub const IMAGE_SQUASHFS_VERSION: u64 = 1; +const LAYER_BUFFER_SIZE: usize = 128 * 1024; + +// we utilize in-memory buffers when generating the squashfs for files +// under this size. for files of or above this size, we open a file. +// the file is then read during writing. we want to reduce the number +// of open files during squashfs generation, so this limit should be set +// to something that limits the number of files on average, at the expense +// of increased memory usage. +// TODO: it may be wise to, during crawling of the image layers, infer this +// value from the size to file count ratio of all layers. +const SQUASHFS_MEMORY_BUFFER_LIMIT: usize = 8 * 1024 * 1024; pub struct ImageInfo { pub image_squashfs: PathBuf, @@ -45,6 +56,38 @@ pub struct ImageCompiler<'a> { cache: &'a ImageCache, } +#[derive(Debug)] +enum LayerCompressionType { + None, + Gzip, + Zstd, +} + +struct LayerFile { + digest: String, + compression: LayerCompressionType, + path: PathBuf, +} + +impl LayerFile { + fn open_reader(&self) -> Result> { + Ok(match self.compression { + LayerCompressionType::None => Box::new(BufReader::with_capacity( + LAYER_BUFFER_SIZE, + File::open(&self.path)?, + )), + LayerCompressionType::Gzip => Box::new(GzDecoder::new(BufReader::with_capacity( + LAYER_BUFFER_SIZE, + File::open(&self.path)?, + ))), + LayerCompressionType::Zstd => Box::new(zstd::Decoder::new(BufReader::with_capacity( + LAYER_BUFFER_SIZE, + File::open(&self.path)?, + ))?), + }) + } +} + impl ImageCompiler<'_> { pub fn new(cache: &ImageCache) -> Result { Ok(ImageCompiler { cache }) @@ -102,19 +145,34 @@ impl ImageCompiler<'_> { let config_bytes = client.get_blob(&image.name, manifest.config())?; let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; - let mut layers: Vec = Vec::new(); + let mut layers: Vec = Vec::new(); for layer in manifest.layers() { - let layer_path = self.download_layer(image, layer, layer_dir, &mut client)?; - layers.push(layer_path); + layers.push(self.download_layer(image, layer, layer_dir, &mut client)?); } for layer in layers { - let mut file = File::open(&layer)?; - self.process_whiteout_entries(&file, image_dir)?; - file.seek(SeekFrom::Start(0))?; - self.process_write_entries(&file, image_dir)?; - drop(file); - fs::remove_file(&layer)?; + debug!( + "ImageCompiler process layer digest={} compression={:?}", + &layer.digest, layer.compression + ); + let mut archive = Archive::new(layer.open_reader()?); + for entry in archive.entries()? { + let mut entry = entry?; + let path = entry.path()?; + let Some(name) = path.file_name() else { + return Err(HyphaError::new("unable to get file name")); + }; + let Some(name) = name.to_str() else { + return Err(HyphaError::new("unable to get file name as string")); + }; + + if name.starts_with(".wh.") { + self.process_whiteout_entry(&entry, name, &layer, image_dir)?; + } else { + self.process_write_entry(&mut entry, &layer, image_dir)?; + } + } + fs::remove_file(&layer.path)?; } self.squash(image_dir, squash_file)?; @@ -122,71 +180,99 @@ impl ImageCompiler<'_> { self.cache.store(&cache_digest, &info) } - fn process_whiteout_entries(&self, file: &File, image_dir: &PathBuf) -> Result<()> { - let mut archive = tar::Archive::new(file); - for entry in archive.entries()? { - let entry = entry?; - let dst = self.check_safe_entry(&entry, image_dir)?; - let Some(name) = dst.file_name() else { - return Err(HyphaError::new("unable to get file name")); - }; - let Some(name) = name.to_str() else { - return Err(HyphaError::new("unable to get file name as string")); - }; - if !name.starts_with(".wh.") { - continue; - } - let mut dst = dst.clone(); - dst.pop(); + fn process_whiteout_entry( + &self, + entry: &Entry, + name: &str, + layer: &LayerFile, + image_dir: &PathBuf, + ) -> Result<()> { + let dst = self.check_safe_entry(entry, image_dir)?; + let mut dst = dst.clone(); + dst.pop(); - let opaque = name == ".wh..wh..opq"; + let opaque = name == ".wh..wh..opq"; - if !opaque { - dst.push(name); - self.check_safe_path(&dst, image_dir)?; - } + if !opaque { + dst.push(name); + self.check_safe_path(&dst, image_dir)?; + } - if opaque { + trace!( + "ImageCompiler whiteout entry layer={} path={:?}", + &layer.digest, + entry.path()? + ); + + if opaque { + if dst.is_dir() { for entry in fs::read_dir(dst)? { let entry = entry?; let path = entry.path(); - if path.is_file() { + if path.is_symlink() || path.is_file() { fs::remove_file(&path)?; - } else { + } else if path.is_dir() { fs::remove_dir_all(&path)?; + } else { + return Err(HyphaError::new("opaque whiteout entry did not exist")); } } - } else if dst.is_file() { - fs::remove_file(&dst)?; } else { - fs::remove_dir(&dst)?; + warn!( + "ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}", + &layer.digest, + entry.path()?, + dst, + ); } + } else if dst.is_file() || dst.is_symlink() { + fs::remove_file(&dst)?; + } else if dst.is_dir() { + fs::remove_dir(&dst)?; + } else { + warn!( + "ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}", + &layer.digest, + entry.path()?, + dst, + ); } Ok(()) } - fn process_write_entries(&self, file: &File, image_dir: &PathBuf) -> Result<()> { - let mut archive = tar::Archive::new(file); - for entry in archive.entries()? { - let mut entry = entry?; - let dst = self.check_safe_entry(&entry, image_dir)?; - let Some(name) = dst.file_name() else { - return Err(HyphaError::new("unable to get file name")); - }; - let Some(name) = name.to_str() else { - return Err(HyphaError::new("unable to get file name as string")); - }; - if name.starts_with(".wh.") { - continue; - } - entry.unpack(dst)?; - } + fn process_write_entry( + &self, + entry: &mut Entry, + layer: &LayerFile, + image_dir: &PathBuf, + ) -> Result<()> { + trace!( + "ImageCompiler unpack entry layer={} path={:?} type={:?}", + &layer.digest, + entry.path()?, + entry.header().entry_type() + ); + entry.unpack_in(image_dir)?; Ok(()) } - fn check_safe_entry(&self, entry: &Entry<&File>, image_dir: &PathBuf) -> Result { + fn check_safe_entry( + &self, + entry: &Entry, + image_dir: &PathBuf, + ) -> Result { let mut dst = image_dir.clone(); dst.push(entry.path()?); + if let Some(name) = dst.file_name() { + if let Some(name) = name.to_str() { + if name.starts_with(".wh.") { + let copy = dst.clone(); + dst.pop(); + self.check_safe_path(&dst, image_dir)?; + return Ok(copy); + } + } + } self.check_safe_path(&dst, image_dir)?; Ok(dst) } @@ -205,7 +291,7 @@ impl ImageCompiler<'_> { layer: &Descriptor, layer_dir: &Path, client: &mut RegistryClient, - ) -> Result { + ) -> Result { debug!( "ImageCompiler download layer digest={} size={}", layer.digest(), @@ -226,31 +312,28 @@ impl ImageCompiler<'_> { } } - let compressed = match layer.media_type() { - MediaType::ImageLayer => false, - MediaType::ImageLayerGzip => { - let reader = File::open(&layer_path)?; - let mut decoder = GzDecoder::new(&reader); - let mut writer = File::create(&tmp_path)?; - copy(&mut decoder, &mut writer)?; - writer.flush()?; - true - } - MediaType::ImageLayerZstd => { - let reader = File::open(&layer_path)?; - let mut decoder = zstd::Decoder::new(&reader)?; - let mut writer = File::create(&tmp_path)?; - copy(&mut decoder, &mut writer)?; - writer.flush()?; - true - } - _ => return Err(HyphaError::new("found layer with unknown media type")), - }; + let mut media_type = layer.media_type().clone(); - if compressed { - fs::rename(tmp_path, &layer_path)?; + // docker layer compatibility + if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? { + media_type = MediaType::ImageLayerGzip; } - Ok(layer_path) + + let compression = match media_type { + MediaType::ImageLayer => LayerCompressionType::None, + MediaType::ImageLayerGzip => LayerCompressionType::Gzip, + MediaType::ImageLayerZstd => LayerCompressionType::Zstd, + other => { + return Err(HyphaError::new( + format!("found layer with unknown media type: {}", other).as_str(), + )) + } + }; + Ok(LayerFile { + digest: layer.digest().clone(), + compression, + path: layer_path, + }) } fn squash(&self, image_dir: &PathBuf, squash_file: &PathBuf) -> Result<()> { @@ -294,8 +377,14 @@ impl ImageCompiler<'_> { } else if typ.is_dir() { writer.push_dir(rel, header)?; } else if typ.is_file() { - let reader = BufReader::new(File::open(entry.path())?); - writer.push_file(reader, rel, header)?; + if metadata.size() >= SQUASHFS_MEMORY_BUFFER_LIMIT as u64 { + let reader = + BufReader::with_capacity(LAYER_BUFFER_SIZE, File::open(entry.path())?); + writer.push_file(reader, rel, header)?; + } else { + let cursor = Cursor::new(fs::read(entry.path())?); + writer.push_file(cursor, rel, header)?; + } } else if typ.is_block_device() { let device = metadata.dev(); writer.push_block_device(device as u32, rel, header)?; @@ -313,9 +402,9 @@ impl ImageCompiler<'_> { .to_str() .ok_or_else(|| HyphaError::new("failed to convert squashfs string"))?; - let mut out = File::create(squash_file)?; + let mut file = File::create(squash_file)?; trace!("ImageCompiler squash generate: {}", squash_file_path); - writer.write(&mut out)?; + writer.write(&mut file)?; Ok(()) } }