hypha: implement enhanced heuristics for reducing the use of file descriptors

This commit is contained in:
Alex Zenla 2024-01-20 12:52:51 -08:00
parent 2567a93512
commit 83264839dd
No known key found for this signature in database
GPG Key ID: 067B238899B51269

View File

@ -8,18 +8,29 @@ use crate::image::fetch::RegistryClient;
use crate::image::name::ImageName; use crate::image::name::ImageName;
use backhand::{FilesystemWriter, NodeHeader}; use backhand::{FilesystemWriter, NodeHeader};
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use log::{debug, trace}; use log::{debug, trace, warn};
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType, ToDockerV2S2};
use std::fs;
use std::fs::File; use std::fs::File;
use std::io::{copy, BufReader, Seek, SeekFrom, Write}; use std::io::{BufReader, Cursor};
use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt}; use std::os::unix::fs::{FileTypeExt, MetadataExt, PermissionsExt};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tar::Entry; use std::{fs, io};
use tar::{Archive, Entry};
use uuid::Uuid; use uuid::Uuid;
use walkdir::WalkDir; use walkdir::WalkDir;
pub const IMAGE_SQUASHFS_VERSION: u64 = 1; pub const IMAGE_SQUASHFS_VERSION: u64 = 1;
const LAYER_BUFFER_SIZE: usize = 128 * 1024;
// we utilize in-memory buffers when generating the squashfs for files
// under this size. for files of or above this size, we open a file.
// the file is then read during writing. we want to reduce the number
// of open files during squashfs generation, so this limit should be set
// to something that limits the number of files on average, at the expense
// of increased memory usage.
// TODO: it may be wise to, during crawling of the image layers, infer this
// value from the size to file count ratio of all layers.
const SQUASHFS_MEMORY_BUFFER_LIMIT: usize = 8 * 1024 * 1024;
pub struct ImageInfo { pub struct ImageInfo {
pub image_squashfs: PathBuf, pub image_squashfs: PathBuf,
@ -45,6 +56,38 @@ pub struct ImageCompiler<'a> {
cache: &'a ImageCache, cache: &'a ImageCache,
} }
#[derive(Debug)]
enum LayerCompressionType {
None,
Gzip,
Zstd,
}
struct LayerFile {
digest: String,
compression: LayerCompressionType,
path: PathBuf,
}
impl LayerFile {
fn open_reader(&self) -> Result<Box<dyn io::Read>> {
Ok(match self.compression {
LayerCompressionType::None => Box::new(BufReader::with_capacity(
LAYER_BUFFER_SIZE,
File::open(&self.path)?,
)),
LayerCompressionType::Gzip => Box::new(GzDecoder::new(BufReader::with_capacity(
LAYER_BUFFER_SIZE,
File::open(&self.path)?,
))),
LayerCompressionType::Zstd => Box::new(zstd::Decoder::new(BufReader::with_capacity(
LAYER_BUFFER_SIZE,
File::open(&self.path)?,
))?),
})
}
}
impl ImageCompiler<'_> { impl ImageCompiler<'_> {
pub fn new(cache: &ImageCache) -> Result<ImageCompiler> { pub fn new(cache: &ImageCache) -> Result<ImageCompiler> {
Ok(ImageCompiler { cache }) Ok(ImageCompiler { cache })
@ -102,19 +145,34 @@ impl ImageCompiler<'_> {
let config_bytes = client.get_blob(&image.name, manifest.config())?; let config_bytes = client.get_blob(&image.name, manifest.config())?;
let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?; let config: ImageConfiguration = serde_json::from_slice(&config_bytes)?;
let mut layers: Vec<PathBuf> = Vec::new(); let mut layers: Vec<LayerFile> = Vec::new();
for layer in manifest.layers() { for layer in manifest.layers() {
let layer_path = self.download_layer(image, layer, layer_dir, &mut client)?; layers.push(self.download_layer(image, layer, layer_dir, &mut client)?);
layers.push(layer_path);
} }
for layer in layers { for layer in layers {
let mut file = File::open(&layer)?; debug!(
self.process_whiteout_entries(&file, image_dir)?; "ImageCompiler process layer digest={} compression={:?}",
file.seek(SeekFrom::Start(0))?; &layer.digest, layer.compression
self.process_write_entries(&file, image_dir)?; );
drop(file); let mut archive = Archive::new(layer.open_reader()?);
fs::remove_file(&layer)?; for entry in archive.entries()? {
let mut entry = entry?;
let path = entry.path()?;
let Some(name) = path.file_name() else {
return Err(HyphaError::new("unable to get file name"));
};
let Some(name) = name.to_str() else {
return Err(HyphaError::new("unable to get file name as string"));
};
if name.starts_with(".wh.") {
self.process_whiteout_entry(&entry, name, &layer, image_dir)?;
} else {
self.process_write_entry(&mut entry, &layer, image_dir)?;
}
}
fs::remove_file(&layer.path)?;
} }
self.squash(image_dir, squash_file)?; self.squash(image_dir, squash_file)?;
@ -122,71 +180,99 @@ impl ImageCompiler<'_> {
self.cache.store(&cache_digest, &info) self.cache.store(&cache_digest, &info)
} }
fn process_whiteout_entries(&self, file: &File, image_dir: &PathBuf) -> Result<()> { fn process_whiteout_entry<T: io::Read>(
let mut archive = tar::Archive::new(file); &self,
for entry in archive.entries()? { entry: &Entry<T>,
let entry = entry?; name: &str,
let dst = self.check_safe_entry(&entry, image_dir)?; layer: &LayerFile,
let Some(name) = dst.file_name() else { image_dir: &PathBuf,
return Err(HyphaError::new("unable to get file name")); ) -> Result<()> {
}; let dst = self.check_safe_entry(entry, image_dir)?;
let Some(name) = name.to_str() else { let mut dst = dst.clone();
return Err(HyphaError::new("unable to get file name as string")); dst.pop();
};
if !name.starts_with(".wh.") {
continue;
}
let mut dst = dst.clone();
dst.pop();
let opaque = name == ".wh..wh..opq"; let opaque = name == ".wh..wh..opq";
if !opaque { if !opaque {
dst.push(name); dst.push(name);
self.check_safe_path(&dst, image_dir)?; self.check_safe_path(&dst, image_dir)?;
} }
if opaque { trace!(
"ImageCompiler whiteout entry layer={} path={:?}",
&layer.digest,
entry.path()?
);
if opaque {
if dst.is_dir() {
for entry in fs::read_dir(dst)? { for entry in fs::read_dir(dst)? {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if path.is_file() { if path.is_symlink() || path.is_file() {
fs::remove_file(&path)?; fs::remove_file(&path)?;
} else { } else if path.is_dir() {
fs::remove_dir_all(&path)?; fs::remove_dir_all(&path)?;
} else {
return Err(HyphaError::new("opaque whiteout entry did not exist"));
} }
} }
} else if dst.is_file() {
fs::remove_file(&dst)?;
} else { } else {
fs::remove_dir(&dst)?; warn!(
"ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}",
&layer.digest,
entry.path()?,
dst,
);
} }
} else if dst.is_file() || dst.is_symlink() {
fs::remove_file(&dst)?;
} else if dst.is_dir() {
fs::remove_dir(&dst)?;
} else {
warn!(
"ImageCompiler whiteout entry missing locally layer={} path={:?} local={:?}",
&layer.digest,
entry.path()?,
dst,
);
} }
Ok(()) Ok(())
} }
fn process_write_entries(&self, file: &File, image_dir: &PathBuf) -> Result<()> { fn process_write_entry<T: io::Read>(
let mut archive = tar::Archive::new(file); &self,
for entry in archive.entries()? { entry: &mut Entry<T>,
let mut entry = entry?; layer: &LayerFile,
let dst = self.check_safe_entry(&entry, image_dir)?; image_dir: &PathBuf,
let Some(name) = dst.file_name() else { ) -> Result<()> {
return Err(HyphaError::new("unable to get file name")); trace!(
}; "ImageCompiler unpack entry layer={} path={:?} type={:?}",
let Some(name) = name.to_str() else { &layer.digest,
return Err(HyphaError::new("unable to get file name as string")); entry.path()?,
}; entry.header().entry_type()
if name.starts_with(".wh.") { );
continue; entry.unpack_in(image_dir)?;
}
entry.unpack(dst)?;
}
Ok(()) Ok(())
} }
fn check_safe_entry(&self, entry: &Entry<&File>, image_dir: &PathBuf) -> Result<PathBuf> { fn check_safe_entry<T: io::Read>(
&self,
entry: &Entry<T>,
image_dir: &PathBuf,
) -> Result<PathBuf> {
let mut dst = image_dir.clone(); let mut dst = image_dir.clone();
dst.push(entry.path()?); dst.push(entry.path()?);
if let Some(name) = dst.file_name() {
if let Some(name) = name.to_str() {
if name.starts_with(".wh.") {
let copy = dst.clone();
dst.pop();
self.check_safe_path(&dst, image_dir)?;
return Ok(copy);
}
}
}
self.check_safe_path(&dst, image_dir)?; self.check_safe_path(&dst, image_dir)?;
Ok(dst) Ok(dst)
} }
@ -205,7 +291,7 @@ impl ImageCompiler<'_> {
layer: &Descriptor, layer: &Descriptor,
layer_dir: &Path, layer_dir: &Path,
client: &mut RegistryClient, client: &mut RegistryClient,
) -> Result<PathBuf> { ) -> Result<LayerFile> {
debug!( debug!(
"ImageCompiler download layer digest={} size={}", "ImageCompiler download layer digest={} size={}",
layer.digest(), layer.digest(),
@ -226,31 +312,28 @@ impl ImageCompiler<'_> {
} }
} }
let compressed = match layer.media_type() { let mut media_type = layer.media_type().clone();
MediaType::ImageLayer => false,
MediaType::ImageLayerGzip => {
let reader = File::open(&layer_path)?;
let mut decoder = GzDecoder::new(&reader);
let mut writer = File::create(&tmp_path)?;
copy(&mut decoder, &mut writer)?;
writer.flush()?;
true
}
MediaType::ImageLayerZstd => {
let reader = File::open(&layer_path)?;
let mut decoder = zstd::Decoder::new(&reader)?;
let mut writer = File::create(&tmp_path)?;
copy(&mut decoder, &mut writer)?;
writer.flush()?;
true
}
_ => return Err(HyphaError::new("found layer with unknown media type")),
};
if compressed { // docker layer compatibility
fs::rename(tmp_path, &layer_path)?; if media_type.to_string() == MediaType::ImageLayerGzip.to_docker_v2s2()? {
media_type = MediaType::ImageLayerGzip;
} }
Ok(layer_path)
let compression = match media_type {
MediaType::ImageLayer => LayerCompressionType::None,
MediaType::ImageLayerGzip => LayerCompressionType::Gzip,
MediaType::ImageLayerZstd => LayerCompressionType::Zstd,
other => {
return Err(HyphaError::new(
format!("found layer with unknown media type: {}", other).as_str(),
))
}
};
Ok(LayerFile {
digest: layer.digest().clone(),
compression,
path: layer_path,
})
} }
fn squash(&self, image_dir: &PathBuf, squash_file: &PathBuf) -> Result<()> { fn squash(&self, image_dir: &PathBuf, squash_file: &PathBuf) -> Result<()> {
@ -294,8 +377,14 @@ impl ImageCompiler<'_> {
} else if typ.is_dir() { } else if typ.is_dir() {
writer.push_dir(rel, header)?; writer.push_dir(rel, header)?;
} else if typ.is_file() { } else if typ.is_file() {
let reader = BufReader::new(File::open(entry.path())?); if metadata.size() >= SQUASHFS_MEMORY_BUFFER_LIMIT as u64 {
writer.push_file(reader, rel, header)?; let reader =
BufReader::with_capacity(LAYER_BUFFER_SIZE, File::open(entry.path())?);
writer.push_file(reader, rel, header)?;
} else {
let cursor = Cursor::new(fs::read(entry.path())?);
writer.push_file(cursor, rel, header)?;
}
} else if typ.is_block_device() { } else if typ.is_block_device() {
let device = metadata.dev(); let device = metadata.dev();
writer.push_block_device(device as u32, rel, header)?; writer.push_block_device(device as u32, rel, header)?;
@ -313,9 +402,9 @@ impl ImageCompiler<'_> {
.to_str() .to_str()
.ok_or_else(|| HyphaError::new("failed to convert squashfs string"))?; .ok_or_else(|| HyphaError::new("failed to convert squashfs string"))?;
let mut out = File::create(squash_file)?; let mut file = File::create(squash_file)?;
trace!("ImageCompiler squash generate: {}", squash_file_path); trace!("ImageCompiler squash generate: {}", squash_file_path);
writer.write(&mut out)?; writer.write(&mut file)?;
Ok(()) Ok(())
} }
} }