Compare commits

..

8 Commits

47 changed files with 1449 additions and 898 deletions

View File

@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.0.19](https://github.com/edera-dev/krata/compare/v0.0.18...v0.0.19) - 2024-08-25
### Added
- *(config)* write default config to config.toml on startup ([#356](https://github.com/edera-dev/krata/pull/356))
- *(ctl)* add --format option to host status and improve cpu topology format ([#355](https://github.com/edera-dev/krata/pull/355))
### Fixed
- *(zone-exec)* ensure that the underlying process is killed when rpc is closed ([#361](https://github.com/edera-dev/krata/pull/361))
- *(rpc)* rename HostStatus to GetHostStatus ([#360](https://github.com/edera-dev/krata/pull/360))
- *(console)* don't replay history when attaching to the console ([#358](https://github.com/edera-dev/krata/pull/358))
- *(zone-exec)* catch panic errors and show all errors immediately ([#359](https://github.com/edera-dev/krata/pull/359))
### Other
- *(control)* split out all of the rpc calls into their own files ([#357](https://github.com/edera-dev/krata/pull/357))
## [0.0.18](https://github.com/edera-dev/krata/compare/v0.0.17...v0.0.18) - 2024-08-22
### Added

31
Cargo.lock generated
View File

@ -1297,7 +1297,7 @@ dependencies = [
[[package]]
name = "krata"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"async-trait",
@ -1337,7 +1337,7 @@ dependencies = [
[[package]]
name = "krata-buildtools"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"env_logger",
@ -1352,7 +1352,7 @@ dependencies = [
[[package]]
name = "krata-ctl"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"async-stream",
@ -1382,7 +1382,7 @@ dependencies = [
[[package]]
name = "krata-daemon"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"async-stream",
@ -1414,14 +1414,14 @@ dependencies = [
[[package]]
name = "krata-loopdev"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"libc",
]
[[package]]
name = "krata-network"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"async-trait",
@ -1445,7 +1445,7 @@ dependencies = [
[[package]]
name = "krata-oci"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"async-compression",
@ -1472,7 +1472,7 @@ dependencies = [
[[package]]
name = "krata-runtime"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"backhand",
@ -1513,7 +1513,7 @@ dependencies = [
[[package]]
name = "krata-xencall"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"env_logger",
"libc",
@ -1526,7 +1526,7 @@ dependencies = [
[[package]]
name = "krata-xenclient"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"async-trait",
"env_logger",
@ -1544,7 +1544,7 @@ dependencies = [
[[package]]
name = "krata-xenevtchn"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"byteorder",
"libc",
@ -1556,7 +1556,7 @@ dependencies = [
[[package]]
name = "krata-xengnt"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"libc",
"nix 0.29.0",
@ -1565,7 +1565,7 @@ dependencies = [
[[package]]
name = "krata-xenplatform"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"async-trait",
"c2rust-bitfields",
@ -1588,7 +1588,7 @@ dependencies = [
[[package]]
name = "krata-xenstore"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"byteorder",
"env_logger",
@ -1600,7 +1600,7 @@ dependencies = [
[[package]]
name = "krata-zone"
version = "0.0.18"
version = "0.0.19"
dependencies = [
"anyhow",
"cgroups-rs",
@ -1622,6 +1622,7 @@ dependencies = [
"sys-mount",
"sysinfo",
"tokio",
"tokio-util",
]
[[package]]

View File

@ -18,7 +18,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.0.18"
version = "0.0.19"
homepage = "https://krata.dev"
license = "Apache-2.0"
repository = "https://github.com/edera-dev/krata"
@ -85,6 +85,7 @@ sysinfo = "0.31.2"
termtree = "0.5.1"
thiserror = "1.0"
tokio-tun = "0.11.5"
tokio-util = "0.7.11"
toml = "0.8.19"
tonic-build = "0.12.1"
tower = "0.5.0"

View File

@ -16,7 +16,7 @@ oci-spec = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
krata-oci = { path = "../oci", version = "^0.0.18" }
krata-oci = { path = "../oci", version = "^0.0.19" }
krata-tokio-tar = { workspace = true }
uuid = { workspace = true }

View File

@ -20,7 +20,7 @@ env_logger = { workspace = true }
fancy-duration = { workspace = true }
human_bytes = { workspace = true }
indicatif = { workspace = true }
krata = { path = "../krata", version = "^0.0.18" }
krata = { path = "../krata", version = "^0.0.19" }
log = { workspace = true }
prost-reflect = { workspace = true, features = ["serde"] }
prost-types = { workspace = true }

View File

@ -23,7 +23,7 @@ enum DeviceListFormat {
}
#[derive(Parser)]
#[command(about = "List the devices on the isolation engine")]
#[command(about = "List device information")]
pub struct DeviceListCommand {
#[arg(short, long, default_value = "table", help = "Output format")]
format: DeviceListFormat,

View File

@ -5,7 +5,9 @@ use comfy_table::{Cell, Table};
use krata::v1::control::{
control_service_client::ControlServiceClient, GetHostCpuTopologyRequest, HostCpuTopologyClass,
};
use serde_json::Value;
use crate::format::{kv2line, proto2dynamic, proto2kv};
use tonic::{transport::Channel, Request};
fn class_to_str(input: HostCpuTopologyClass) -> String {
@ -19,6 +21,11 @@ fn class_to_str(input: HostCpuTopologyClass) -> String {
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum HostCpuTopologyFormat {
Table,
Json,
JsonPretty,
Jsonl,
Yaml,
KeyValue,
}
#[derive(Parser)]
@ -35,6 +42,8 @@ impl HostCpuTopologyCommand {
.await?
.into_inner();
match self.format {
HostCpuTopologyFormat::Table => {
let mut table = Table::new();
table.load_preset(UTF8_FULL_CONDENSED);
table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic);
@ -54,6 +63,41 @@ impl HostCpuTopologyCommand {
if !table.is_empty() {
println!("{}", table);
}
}
HostCpuTopologyFormat::Json
| HostCpuTopologyFormat::JsonPretty
| HostCpuTopologyFormat::Yaml => {
let mut values = Vec::new();
for cpu in response.cpus {
let message = proto2dynamic(cpu)?;
values.push(serde_json::to_value(message)?);
}
let value = Value::Array(values);
let encoded = if self.format == HostCpuTopologyFormat::JsonPretty {
serde_json::to_string_pretty(&value)?
} else if self.format == HostCpuTopologyFormat::Yaml {
serde_yaml::to_string(&value)?
} else {
serde_json::to_string(&value)?
};
println!("{}", encoded.trim());
}
HostCpuTopologyFormat::Jsonl => {
for cpu in response.cpus {
let message = proto2dynamic(cpu)?;
println!("{}", serde_json::to_string(&message)?);
}
}
HostCpuTopologyFormat::KeyValue => {
for cpu in response.cpus {
let kvs = proto2kv(cpu)?;
println!("{}", kv2line(kvs),);
}
}
}
Ok(())
}

View File

@ -1,25 +0,0 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, HostStatusRequest};
use tonic::{transport::Channel, Request};
#[derive(Parser)]
#[command(about = "Get information about the host")]
pub struct HostStatusCommand {}
impl HostStatusCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.host_status(Request::new(HostStatusRequest {}))
.await?
.into_inner();
println!("Host UUID: {}", response.host_uuid);
println!("Host Domain: {}", response.host_domid);
println!("Krata Version: {}", response.krata_version);
println!("Host IPv4: {}", response.host_ipv4);
println!("Host IPv6: {}", response.host_ipv6);
println!("Host Ethernet Address: {}", response.host_mac);
Ok(())
}
}

View File

@ -7,13 +7,13 @@ use krata::v1::control::control_service_client::ControlServiceClient;
use crate::cli::host::cpu_topology::HostCpuTopologyCommand;
use crate::cli::host::hv_console::HostHvConsoleCommand;
use crate::cli::host::identify::HostStatusCommand;
use crate::cli::host::idm_snoop::HostIdmSnoopCommand;
use crate::cli::host::status::HostStatusCommand;
pub mod cpu_topology;
pub mod hv_console;
pub mod identify;
pub mod idm_snoop;
pub mod status;
#[derive(Parser)]
#[command(about = "Manage the host of the isolation engine")]

View File

@ -0,0 +1,60 @@
use anyhow::Result;
use clap::{Parser, ValueEnum};
use krata::v1::control::{control_service_client::ControlServiceClient, GetHostStatusRequest};
use crate::format::{kv2line, proto2dynamic, proto2kv};
use tonic::{transport::Channel, Request};
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum HostStatusFormat {
Simple,
Json,
JsonPretty,
Yaml,
KeyValue,
}
#[derive(Parser)]
#[command(about = "Get information about the host")]
pub struct HostStatusCommand {
#[arg(short, long, default_value = "simple", help = "Output format")]
format: HostStatusFormat,
}
impl HostStatusCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.get_host_status(Request::new(GetHostStatusRequest {}))
.await?
.into_inner();
match self.format {
HostStatusFormat::Simple => {
println!("Host UUID: {}", response.host_uuid);
println!("Host Domain: {}", response.host_domid);
println!("Krata Version: {}", response.krata_version);
println!("Host IPv4: {}", response.host_ipv4);
println!("Host IPv6: {}", response.host_ipv6);
println!("Host Ethernet Address: {}", response.host_mac);
}
HostStatusFormat::Json | HostStatusFormat::JsonPretty | HostStatusFormat::Yaml => {
let message = proto2dynamic(response)?;
let value = serde_json::to_value(message)?;
let encoded = if self.format == HostStatusFormat::JsonPretty {
serde_json::to_string_pretty(&value)?
} else if self.format == HostStatusFormat::Yaml {
serde_yaml::to_string(&value)?
} else {
serde_json::to_string(&value)?
};
println!("{}", encoded.trim());
}
HostStatusFormat::KeyValue => {
let kvs = proto2kv(response)?;
println!("{}", kv2line(kvs),);
}
}
Ok(())
}
}

View File

@ -23,7 +23,7 @@ impl ZoneAttachCommand {
events: EventStream,
) -> Result<()> {
let zone_id: String = resolve_zone(&mut client, &self.zone).await?;
let input = StdioConsoleStream::stdin_stream(zone_id.clone()).await;
let input = StdioConsoleStream::stdin_stream(zone_id.clone(), false).await;
let output = client.attach_zone_console(input).await?.into_inner();
let stdout_handle =
tokio::task::spawn(async move { StdioConsoleStream::stdout(output, true).await });

View File

@ -187,7 +187,7 @@ impl ZoneLaunchCommand {
}
let code = if self.attach {
let input = StdioConsoleStream::stdin_stream(id.clone()).await;
let input = StdioConsoleStream::stdin_stream(id.clone(), true).await;
let output = client.attach_zone_console(input).await?.into_inner();
let stdout_handle =
tokio::task::spawn(async move { StdioConsoleStream::stdout(output, true).await });

View File

@ -29,7 +29,7 @@ enum ZoneListFormat {
}
#[derive(Parser)]
#[command(about = "List the zones on the isolation engine")]
#[command(about = "List zone information")]
pub struct ZoneListCommand {
#[arg(short, long, default_value = "table", help = "Output format")]
format: ZoneListFormat,

View File

@ -33,7 +33,7 @@ impl ZoneLogsCommand {
let zone_id_stream = zone_id.clone();
let follow = self.follow;
let input = stream! {
yield ZoneConsoleRequest { zone_id: zone_id_stream, data: Vec::new() };
yield ZoneConsoleRequest { zone_id: zone_id_stream, replay_history: true, data: Vec::new() };
if follow {
let mut pending = pending::<ZoneConsoleRequest>();
while let Some(x) = pending.next().await {

View File

@ -1,4 +1,4 @@
use anyhow::{anyhow, Result};
use anyhow::Result;
use async_stream::stream;
use crossterm::{
terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled},
@ -23,10 +23,13 @@ use tonic::Streaming;
pub struct StdioConsoleStream;
impl StdioConsoleStream {
pub async fn stdin_stream(zone: String) -> impl Stream<Item = ZoneConsoleRequest> {
pub async fn stdin_stream(
zone: String,
replay_history: bool,
) -> impl Stream<Item = ZoneConsoleRequest> {
let mut stdin = stdin();
stream! {
yield ZoneConsoleRequest { zone_id: zone, data: vec![] };
yield ZoneConsoleRequest { zone_id: zone, replay_history, data: vec![] };
let mut buffer = vec![0u8; 60];
loop {
@ -41,7 +44,7 @@ impl StdioConsoleStream {
if size == 1 && buffer[0] == 0x1d {
break;
}
yield ZoneConsoleRequest { zone_id: String::default(), data };
yield ZoneConsoleRequest { zone_id: String::default(), replay_history, data };
}
}
}
@ -115,7 +118,12 @@ impl StdioConsoleStream {
return if reply.error.is_empty() {
Ok(reply.exit_code)
} else {
Err(anyhow!("exec failed: {}", reply.error))
StdioConsoleStream::restore_terminal_mode();
stderr
.write_all(format!("Error: exec failed: {}\n", reply.error).as_bytes())
.await?;
stderr.flush().await?;
Ok(-1)
};
}
}

View File

@ -19,9 +19,9 @@ clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.18" }
krata-oci = { path = "../oci", version = "^0.0.18" }
krata-runtime = { path = "../runtime", version = "^0.0.18" }
krata = { path = "../krata", version = "^0.0.19" }
krata-oci = { path = "../oci", version = "^0.0.19" }
krata-runtime = { path = "../runtime", version = "^0.0.19" }
log = { workspace = true }
prost = { workspace = true }
redb = { workspace = true }

View File

@ -112,13 +112,13 @@ fn default_network_ipv6_subnet() -> String {
impl DaemonConfig {
pub async fn load(path: &Path) -> Result<DaemonConfig> {
if path.exists() {
if !path.exists() {
let config: DaemonConfig = toml::from_str("")?;
let content = toml::to_string_pretty(&config)?;
fs::write(&path, content).await?;
}
let content = fs::read_to_string(path).await?;
let config: DaemonConfig = toml::from_str(&content)?;
Ok(config)
} else {
fs::write(&path, "").await?;
Ok(DaemonConfig::default())
}
}
}

View File

@ -1,753 +0,0 @@
use crate::db::zone::ZoneStore;
use crate::ip::assignment::IpAssignment;
use crate::{
command::DaemonCommand, console::DaemonConsoleHandle, devices::DaemonDeviceManager,
event::DaemonEventContext, idm::DaemonIdmHandle, metrics::idm_metric_to_api,
oci::convert_oci_progress, zlt::ZoneLookupTable,
};
use async_stream::try_stream;
use futures::Stream;
use krata::v1::common::ZoneResourceStatus;
use krata::v1::control::{
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
SetHostPowerManagementPolicyRequest,
};
use krata::{
idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType,
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
ExecStreamRequestStdin, ExecStreamRequestUpdate, MetricsRequest, Request as IdmRequest,
},
v1::{
common::{OciImageFormat, Zone, ZoneState, ZoneStatus},
control::{
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest,
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecInsideZoneReply,
ExecInsideZoneRequest, GetHostCpuTopologyReply, GetHostCpuTopologyRequest,
HostCpuTopologyInfo, HostStatusReply, HostStatusRequest, ListDevicesReply,
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest,
WatchEventsReply, WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
},
},
};
use krataoci::{
name::ImageName,
packer::{service::OciPackerService, OciPackedFormat, OciPackedImage},
progress::{OciProgress, OciProgressContext},
};
use kratart::Runtime;
use std::{pin::Pin, str::FromStr};
use tokio::{
select,
sync::mpsc::{channel, Sender},
task::JoinError,
};
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid;
pub struct ApiError {
message: String,
}
impl From<anyhow::Error> for ApiError {
fn from(value: anyhow::Error) -> Self {
ApiError {
message: value.to_string(),
}
}
}
impl From<ApiError> for Status {
fn from(value: ApiError) -> Self {
Status::unknown(value.message)
}
}
#[derive(Clone)]
pub struct DaemonControlService {
zlt: ZoneLookupTable,
devices: DaemonDeviceManager,
events: DaemonEventContext,
console: DaemonConsoleHandle,
idm: DaemonIdmHandle,
zones: ZoneStore,
ip: IpAssignment,
zone_reconciler_notify: Sender<Uuid>,
packer: OciPackerService,
runtime: Runtime,
}
impl DaemonControlService {
#[allow(clippy::too_many_arguments)]
pub fn new(
zlt: ZoneLookupTable,
devices: DaemonDeviceManager,
events: DaemonEventContext,
console: DaemonConsoleHandle,
idm: DaemonIdmHandle,
zones: ZoneStore,
ip: IpAssignment,
zone_reconciler_notify: Sender<Uuid>,
packer: OciPackerService,
runtime: Runtime,
) -> Self {
Self {
zlt,
devices,
events,
console,
idm,
zones,
ip,
zone_reconciler_notify,
packer,
runtime,
}
}
}
enum ConsoleDataSelect {
Read(Option<Vec<u8>>),
Write(Option<Result<ZoneConsoleRequest, Status>>),
}
enum PullImageSelect {
Progress(Option<OciProgress>),
Completed(Result<Result<OciPackedImage, anyhow::Error>, JoinError>),
}
#[tonic::async_trait]
impl ControlService for DaemonControlService {
type ExecInsideZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
type AttachZoneConsoleStream =
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
type PullImageStream =
Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
type WatchEventsStream =
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
type SnoopIdmStream =
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
async fn host_status(
&self,
request: Request<HostStatusRequest>,
) -> Result<Response<HostStatusReply>, Status> {
let _ = request.into_inner();
let host_reservation =
self.ip
.retrieve(self.zlt.host_uuid())
.await
.map_err(|x| ApiError {
message: x.to_string(),
})?;
Ok(Response::new(HostStatusReply {
host_domid: self.zlt.host_domid(),
host_uuid: self.zlt.host_uuid().to_string(),
krata_version: DaemonCommand::version(),
host_ipv4: host_reservation
.as_ref()
.map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix))
.unwrap_or_default(),
host_ipv6: host_reservation
.as_ref()
.map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix))
.unwrap_or_default(),
host_mac: host_reservation
.as_ref()
.map(|x| x.mac.to_string().to_lowercase().replace('-', ":"))
.unwrap_or_default(),
}))
}
async fn create_zone(
&self,
request: Request<CreateZoneRequest>,
) -> Result<Response<CreateZoneReply>, Status> {
let request = request.into_inner();
let Some(spec) = request.spec else {
return Err(ApiError {
message: "zone spec not provided".to_string(),
}
.into());
};
let uuid = Uuid::new_v4();
self.zones
.update(
uuid,
Zone {
id: uuid.to_string(),
status: Some(ZoneStatus {
state: ZoneState::Creating.into(),
network_status: None,
exit_status: None,
error_status: None,
resource_status: None,
host: self.zlt.host_uuid().to_string(),
domid: u32::MAX,
}),
spec: Some(spec),
},
)
.await
.map_err(ApiError::from)?;
self.zone_reconciler_notify
.send(uuid)
.await
.map_err(|x| ApiError {
message: x.to_string(),
})?;
Ok(Response::new(CreateZoneReply {
zone_id: uuid.to_string(),
}))
}
async fn exec_inside_zone(
&self,
request: Request<Streaming<ExecInsideZoneRequest>>,
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
let mut input = request.into_inner();
let Some(request) = input.next().await else {
return Err(ApiError {
message: "expected to have at least one request".to_string(),
}
.into());
};
let request = request?;
let Some(task) = request.task else {
return Err(ApiError {
message: "task is missing".to_string(),
}
.into());
};
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?;
let idm = self.idm.client(uuid).await.map_err(|error| ApiError {
message: error.to_string(),
})?;
let idm_request = IdmRequest {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::Start(ExecStreamRequestStart {
environment: task
.environment
.into_iter()
.map(|x| ExecEnvVar {
key: x.key,
value: x.value,
})
.collect(),
command: task.command,
working_directory: task.working_directory,
tty: task.tty,
})),
})),
};
let output = try_stream! {
let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
message: x.to_string(),
})?;
loop {
select! {
x = input.next() => if let Some(update) = x {
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
message: error.to_string()
}.into());
if let Ok(update) = update {
if !update.stdin.is_empty() {
let _ = handle.update(IdmRequest {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::Stdin(ExecStreamRequestStdin {
data: update.stdin,
closed: update.stdin_closed,
})),
}))}).await;
}
}
},
x = handle.receiver.recv() => match x {
Some(response) => {
let Some(IdmResponseType::ExecStream(update)) = response.response else {
break;
};
let reply = ExecInsideZoneReply {
exited: update.exited,
error: update.error,
exit_code: update.exit_code,
stdout: update.stdout,
stderr: update.stderr,
};
yield reply;
},
None => {
break;
}
}
}
}
};
Ok(Response::new(Box::pin(output) as Self::ExecInsideZoneStream))
}
async fn destroy_zone(
&self,
request: Request<DestroyZoneRequest>,
) -> Result<Response<DestroyZoneReply>, Status> {
let request = request.into_inner();
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?;
let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else {
return Err(ApiError {
message: "zone not found".to_string(),
}
.into());
};
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
return Err(ApiError {
message: "zone already destroyed".to_string(),
}
.into());
}
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
self.zones
.update(uuid, zone)
.await
.map_err(ApiError::from)?;
self.zone_reconciler_notify
.send(uuid)
.await
.map_err(|x| ApiError {
message: x.to_string(),
})?;
Ok(Response::new(DestroyZoneReply {}))
}
async fn list_zones(
&self,
request: Request<ListZonesRequest>,
) -> Result<Response<ListZonesReply>, Status> {
let _ = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zones = zones.into_values().collect::<Vec<Zone>>();
Ok(Response::new(ListZonesReply { zones }))
}
async fn resolve_zone_id(
&self,
request: Request<ResolveZoneIdRequest>,
) -> Result<Response<ResolveZoneIdReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zones = zones
.into_values()
.filter(|x| {
let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default();
(!request.name.is_empty() && comparison_spec.name == request.name)
|| x.id == request.name
})
.collect::<Vec<Zone>>();
Ok(Response::new(ResolveZoneIdReply {
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
}))
}
async fn attach_zone_console(
&self,
request: Request<Streaming<ZoneConsoleRequest>>,
) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
let mut input = request.into_inner();
let Some(request) = input.next().await else {
return Err(ApiError {
message: "expected to have at least one request".to_string(),
}
.into());
};
let request = request?;
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?;
let (sender, mut receiver) = channel(100);
let console = self
.console
.attach(uuid, sender)
.await
.map_err(|error| ApiError {
message: format!("failed to attach to console: {}", error),
})?;
let output = try_stream! {
yield ZoneConsoleReply { data: console.initial.clone(), };
loop {
let what = select! {
x = receiver.recv() => ConsoleDataSelect::Read(x),
x = input.next() => ConsoleDataSelect::Write(x),
};
match what {
ConsoleDataSelect::Read(Some(data)) => {
yield ZoneConsoleReply { data, };
},
ConsoleDataSelect::Read(None) => {
break;
}
ConsoleDataSelect::Write(Some(request)) => {
let request = request?;
if !request.data.is_empty() {
console.send(request.data).await.map_err(|error| ApiError {
message: error.to_string(),
})?;
}
},
ConsoleDataSelect::Write(None) => {
break;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::AttachZoneConsoleStream
))
}
async fn read_zone_metrics(
&self,
request: Request<ReadZoneMetricsRequest>,
) -> Result<Response<ReadZoneMetricsReply>, Status> {
let request = request.into_inner();
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?;
let client = self.idm.client(uuid).await.map_err(|error| ApiError {
message: error.to_string(),
})?;
let response = client
.send(IdmRequest {
request: Some(IdmRequestType::Metrics(MetricsRequest {})),
})
.await
.map_err(|error| ApiError {
message: error.to_string(),
})?;
let mut reply = ReadZoneMetricsReply::default();
if let Some(IdmResponseType::Metrics(metrics)) = response.response {
reply.root = metrics.root.map(idm_metric_to_api);
}
Ok(Response::new(reply))
}
async fn pull_image(
&self,
request: Request<PullImageRequest>,
) -> Result<Response<Self::PullImageStream>, Status> {
let request = request.into_inner();
let name = ImageName::parse(&request.image).map_err(|err| ApiError {
message: err.to_string(),
})?;
let format = match request.format() {
OciImageFormat::Unknown => OciPackedFormat::Squashfs,
OciImageFormat::Squashfs => OciPackedFormat::Squashfs,
OciImageFormat::Erofs => OciPackedFormat::Erofs,
OciImageFormat::Tar => OciPackedFormat::Tar,
};
let (context, mut receiver) = OciProgressContext::create();
let our_packer = self.packer.clone();
let output = try_stream! {
let mut task = tokio::task::spawn(async move {
our_packer.request(name, format, request.overwrite_cache, request.update, context).await
});
let abort_handle = task.abort_handle();
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
handle.abort();
});
loop {
let what = select! {
x = receiver.changed() => match x {
Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())),
Err(_) => PullImageSelect::Progress(None),
},
x = &mut task => PullImageSelect::Completed(x),
};
match what {
PullImageSelect::Progress(Some(progress)) => {
let reply = PullImageReply {
progress: Some(convert_oci_progress(progress)),
digest: String::new(),
format: OciImageFormat::Unknown.into(),
};
yield reply;
},
PullImageSelect::Completed(result) => {
let result = result.map_err(|err| ApiError {
message: err.to_string(),
})?;
let packed = result.map_err(|err| ApiError {
message: err.to_string(),
})?;
let reply = PullImageReply {
progress: None,
digest: packed.digest,
format: match packed.format {
OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(),
OciPackedFormat::Erofs => OciImageFormat::Erofs.into(),
OciPackedFormat::Tar => OciImageFormat::Tar.into(),
},
};
yield reply;
break;
},
_ => {
continue;
}
}
}
};
Ok(Response::new(Box::pin(output) as Self::PullImageStream))
}
async fn watch_events(
&self,
request: Request<WatchEventsRequest>,
) -> Result<Response<Self::WatchEventsStream>, Status> {
let _ = request.into_inner();
let mut events = self.events.subscribe();
let output = try_stream! {
while let Ok(event) = events.recv().await {
yield WatchEventsReply { event: Some(event), };
}
};
Ok(Response::new(Box::pin(output) as Self::WatchEventsStream))
}
async fn snoop_idm(
&self,
request: Request<SnoopIdmRequest>,
) -> Result<Response<Self::SnoopIdmStream>, Status> {
let _ = request.into_inner();
let mut messages = self.idm.snoop();
let zlt = self.zlt.clone();
let output = try_stream! {
while let Ok(event) = messages.recv().await {
let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else {
continue;
};
let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else {
continue;
};
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
}
};
Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream))
}
async fn list_devices(
&self,
request: Request<ListDevicesRequest>,
) -> Result<Response<ListDevicesReply>, Status> {
let _ = request.into_inner();
let mut devices = Vec::new();
let state = self.devices.copy().await.map_err(|error| ApiError {
message: error.to_string(),
})?;
for (name, state) in state {
devices.push(DeviceInfo {
name,
claimed: state.owner.is_some(),
owner: state.owner.map(|x| x.to_string()).unwrap_or_default(),
});
}
Ok(Response::new(ListDevicesReply { devices }))
}
async fn get_host_cpu_topology(
&self,
request: Request<GetHostCpuTopologyRequest>,
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
let _ = request.into_inner();
let power = self
.runtime
.power_management_context()
.await
.map_err(ApiError::from)?;
let cputopo = power.cpu_topology().await.map_err(ApiError::from)?;
let mut cpus = vec![];
for cpu in cputopo {
cpus.push(HostCpuTopologyInfo {
core: cpu.core,
socket: cpu.socket,
node: cpu.node,
thread: cpu.thread,
class: cpu.class as i32,
})
}
Ok(Response::new(GetHostCpuTopologyReply { cpus }))
}
async fn set_host_power_management_policy(
&self,
request: Request<SetHostPowerManagementPolicyRequest>,
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
let policy = request.into_inner();
let power = self
.runtime
.power_management_context()
.await
.map_err(ApiError::from)?;
let scheduler = &policy.scheduler;
power
.set_smt_policy(policy.smt_awareness)
.await
.map_err(ApiError::from)?;
power
.set_scheduler_policy(scheduler)
.await
.map_err(ApiError::from)?;
Ok(Response::new(SetHostPowerManagementPolicyReply {}))
}
async fn get_zone(
&self,
request: Request<GetZoneRequest>,
) -> Result<Response<GetZoneReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zone = zones.get(&Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?);
Ok(Response::new(GetZoneReply {
zone: zone.cloned(),
}))
}
async fn update_zone_resources(
&self,
request: Request<UpdateZoneResourcesRequest>,
) -> Result<Response<UpdateZoneResourcesReply>, Status> {
let request = request.into_inner();
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?;
let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else {
return Err(ApiError {
message: "zone not found".to_string(),
}
.into());
};
let Some(ref mut status) = zone.status else {
return Err(ApiError {
message: "zone state not available".to_string(),
}
.into());
};
if status.state() != ZoneState::Created {
return Err(ApiError {
message: "zone is in an invalid state".to_string(),
}
.into());
}
if status.domid == 0 || status.domid == u32::MAX {
return Err(ApiError {
message: "zone domid is invalid".to_string(),
}
.into());
}
let mut resources = request.resources.unwrap_or_default();
if resources.target_memory > resources.max_memory {
resources.max_memory = resources.target_memory;
}
if resources.target_cpus < 1 {
resources.target_cpus = 1;
}
let initial_resources = zone
.spec
.clone()
.unwrap_or_default()
.initial_resources
.unwrap_or_default();
if resources.target_cpus > initial_resources.max_cpus {
resources.target_cpus = initial_resources.max_cpus;
}
resources.max_cpus = initial_resources.max_cpus;
self.runtime
.set_memory_resources(
status.domid,
resources.target_memory * 1024 * 1024,
resources.max_memory * 1024 * 1024,
)
.await
.map_err(|error| ApiError {
message: format!("failed to set memory resources: {}", error),
})?;
self.runtime
.set_cpu_resources(status.domid, resources.target_cpus)
.await
.map_err(|error| ApiError {
message: format!("failed to set cpu resources: {}", error),
})?;
status.resource_status = Some(ZoneResourceStatus {
active_resources: Some(resources),
});
self.zones
.update(uuid, zone)
.await
.map_err(ApiError::from)?;
Ok(Response::new(UpdateZoneResourcesReply {}))
}
async fn read_hypervisor_console(
&self,
_request: Request<ReadHypervisorConsoleRequest>,
) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
let data = self
.runtime
.read_hypervisor_console(false)
.await
.map_err(|error| ApiError {
message: error.to_string(),
})?;
Ok(Response::new(ReadHypervisorConsoleReply {
data: data.to_string(),
}))
}
}

View File

@ -0,0 +1,84 @@
use std::pin::Pin;
use std::str::FromStr;
use anyhow::{anyhow, Result};
use async_stream::try_stream;
use tokio::select;
use tokio::sync::mpsc::channel;
use tokio_stream::{Stream, StreamExt};
use tonic::{Status, Streaming};
use uuid::Uuid;
use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest};
use crate::console::DaemonConsoleHandle;
use crate::control::ApiError;
enum ConsoleDataSelect {
Read(Option<Vec<u8>>),
Write(Option<Result<ZoneConsoleRequest, Status>>),
}
pub struct AttachZoneConsoleRpc {
console: DaemonConsoleHandle,
}
impl AttachZoneConsoleRpc {
pub fn new(console: DaemonConsoleHandle) -> Self {
Self { console }
}
pub async fn process(
self,
mut input: Streaming<ZoneConsoleRequest>,
) -> Result<Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>>
{
let Some(request) = input.next().await else {
return Err(anyhow!("expected to have at least one request"));
};
let request = request?;
let uuid = Uuid::from_str(&request.zone_id)?;
let (sender, mut receiver) = channel(100);
let console = self
.console
.attach(uuid, sender)
.await
.map_err(|error| anyhow!("failed to attach to console: {}", error))?;
let output = try_stream! {
if request.replay_history {
yield ZoneConsoleReply { data: console.initial.clone(), };
}
loop {
let what = select! {
x = receiver.recv() => ConsoleDataSelect::Read(x),
x = input.next() => ConsoleDataSelect::Write(x),
};
match what {
ConsoleDataSelect::Read(Some(data)) => {
yield ZoneConsoleReply { data, };
},
ConsoleDataSelect::Read(None) => {
break;
}
ConsoleDataSelect::Write(Some(request)) => {
let request = request?;
if !request.data.is_empty() {
console.send(request.data).await.map_err(|error| ApiError {
message: error.to_string(),
})?;
}
},
ConsoleDataSelect::Write(None) => {
break;
}
}
}
};
Ok(Box::pin(output))
}
}

View File

@ -0,0 +1,56 @@
use crate::db::zone::ZoneStore;
use crate::zlt::ZoneLookupTable;
use anyhow::{anyhow, Result};
use krata::v1::common::{Zone, ZoneState, ZoneStatus};
use krata::v1::control::{CreateZoneReply, CreateZoneRequest};
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
pub struct CreateZoneRpc {
zones: ZoneStore,
zlt: ZoneLookupTable,
zone_reconciler_notify: Sender<Uuid>,
}
impl CreateZoneRpc {
pub fn new(
zones: ZoneStore,
zlt: ZoneLookupTable,
zone_reconciler_notify: Sender<Uuid>,
) -> Self {
Self {
zones,
zlt,
zone_reconciler_notify,
}
}
pub async fn process(self, request: CreateZoneRequest) -> Result<CreateZoneReply> {
let Some(spec) = request.spec else {
return Err(anyhow!("zone spec not provided"));
};
let uuid = Uuid::new_v4();
self.zones
.update(
uuid,
Zone {
id: uuid.to_string(),
status: Some(ZoneStatus {
state: ZoneState::Creating.into(),
network_status: None,
exit_status: None,
error_status: None,
resource_status: None,
host: self.zlt.host_uuid().to_string(),
domid: u32::MAX,
}),
spec: Some(spec),
},
)
.await?;
self.zone_reconciler_notify.send(uuid).await?;
Ok(CreateZoneReply {
zone_id: uuid.to_string(),
})
}
}

View File

@ -0,0 +1,42 @@
use std::str::FromStr;
use anyhow::{anyhow, Result};
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use krata::v1::common::ZoneState;
use krata::v1::control::{DestroyZoneReply, DestroyZoneRequest};
use crate::db::zone::ZoneStore;
pub struct DestroyZoneRpc {
zones: ZoneStore,
zone_reconciler_notify: Sender<Uuid>,
}
impl DestroyZoneRpc {
pub fn new(zones: ZoneStore, zone_reconciler_notify: Sender<Uuid>) -> Self {
Self {
zones,
zone_reconciler_notify,
}
}
pub async fn process(self, request: DestroyZoneRequest) -> Result<DestroyZoneReply> {
let uuid = Uuid::from_str(&request.zone_id)?;
let Some(mut zone) = self.zones.read(uuid).await? else {
return Err(anyhow!("zone not found"));
};
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
return Err(anyhow!("zone already destroyed"));
}
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
self.zones.update(uuid, zone).await?;
self.zone_reconciler_notify.send(uuid).await?;
Ok(DestroyZoneReply {})
}
}

View File

@ -0,0 +1,116 @@
use std::pin::Pin;
use std::str::FromStr;
use anyhow::{anyhow, Result};
use async_stream::try_stream;
use tokio::select;
use tokio_stream::{Stream, StreamExt};
use tonic::{Status, Streaming};
use uuid::Uuid;
use krata::idm::internal::Request;
use krata::{
idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType,
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest,
},
v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
};
use crate::control::ApiError;
use crate::idm::DaemonIdmHandle;
pub struct ExecInsideZoneRpc {
idm: DaemonIdmHandle,
}
impl ExecInsideZoneRpc {
pub fn new(idm: DaemonIdmHandle) -> Self {
Self { idm }
}
pub async fn process(
self,
mut input: Streaming<ExecInsideZoneRequest>,
) -> Result<Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>>
{
let Some(request) = input.next().await else {
return Err(anyhow!("expected to have at least one request"));
};
let request = request?;
let Some(task) = request.task else {
return Err(anyhow!("task is missing"));
};
let uuid = Uuid::from_str(&request.zone_id)?;
let idm = self.idm.client(uuid).await?;
let idm_request = Request {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::Start(ExecStreamRequestStart {
environment: task
.environment
.into_iter()
.map(|x| ExecEnvVar {
key: x.key,
value: x.value,
})
.collect(),
command: task.command,
working_directory: task.working_directory,
tty: task.tty,
})),
})),
};
let output = try_stream! {
let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
message: x.to_string(),
})?;
loop {
select! {
x = input.next() => if let Some(update) = x {
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
message: error.to_string()
}.into());
if let Ok(update) = update {
if !update.stdin.is_empty() {
let _ = handle.update(IdmRequest {
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
update: Some(Update::Stdin(ExecStreamRequestStdin {
data: update.stdin,
closed: update.stdin_closed,
})),
}))}).await;
}
}
},
x = handle.receiver.recv() => match x {
Some(response) => {
let Some(IdmResponseType::ExecStream(update)) = response.response else {
break;
};
let reply = ExecInsideZoneReply {
exited: update.exited,
error: update.error,
exit_code: update.exit_code,
stdout: update.stdout,
stderr: update.stderr,
};
yield reply;
},
None => {
break;
}
}
}
}
};
Ok(Box::pin(output))
}
}

View File

@ -0,0 +1,33 @@
use anyhow::Result;
use krata::v1::control::{GetHostCpuTopologyReply, GetHostCpuTopologyRequest, HostCpuTopologyInfo};
use kratart::Runtime;
pub struct GetHostCpuTopologyRpc {
runtime: Runtime,
}
impl GetHostCpuTopologyRpc {
pub fn new(runtime: Runtime) -> Self {
Self { runtime }
}
pub async fn process(
self,
_request: GetHostCpuTopologyRequest,
) -> Result<GetHostCpuTopologyReply> {
let power = self.runtime.power_management_context().await?;
let cpu_topology = power.cpu_topology().await?;
let mut cpus = vec![];
for cpu in cpu_topology {
cpus.push(HostCpuTopologyInfo {
core: cpu.core,
socket: cpu.socket,
node: cpu.node,
thread: cpu.thread,
class: cpu.class as i32,
})
}
Ok(GetHostCpuTopologyReply { cpus })
}
}

View File

@ -0,0 +1,37 @@
use crate::command::DaemonCommand;
use crate::ip::assignment::IpAssignment;
use crate::zlt::ZoneLookupTable;
use anyhow::Result;
use krata::v1::control::{GetHostStatusReply, GetHostStatusRequest};
pub struct GetHostStatusRpc {
ip: IpAssignment,
zlt: ZoneLookupTable,
}
impl GetHostStatusRpc {
pub fn new(ip: IpAssignment, zlt: ZoneLookupTable) -> Self {
Self { ip, zlt }
}
pub async fn process(self, _request: GetHostStatusRequest) -> Result<GetHostStatusReply> {
let host_reservation = self.ip.retrieve(self.zlt.host_uuid()).await?;
Ok(GetHostStatusReply {
host_domid: self.zlt.host_domid(),
host_uuid: self.zlt.host_uuid().to_string(),
krata_version: DaemonCommand::version(),
host_ipv4: host_reservation
.as_ref()
.map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix))
.unwrap_or_default(),
host_ipv6: host_reservation
.as_ref()
.map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix))
.unwrap_or_default(),
host_mac: host_reservation
.as_ref()
.map(|x| x.mac.to_string().to_lowercase().replace('-', ":"))
.unwrap_or_default(),
})
}
}

View File

@ -0,0 +1,24 @@
use std::str::FromStr;
use anyhow::Result;
use uuid::Uuid;
use krata::v1::control::{GetZoneReply, GetZoneRequest};
use crate::db::zone::ZoneStore;
pub struct GetZoneRpc {
zones: ZoneStore,
}
impl GetZoneRpc {
pub fn new(zones: ZoneStore) -> Self {
Self { zones }
}
pub async fn process(self, request: GetZoneRequest) -> Result<GetZoneReply> {
let mut zones = self.zones.list().await?;
let zone = zones.remove(&Uuid::from_str(&request.zone_id)?);
Ok(GetZoneReply { zone })
}
}

View File

@ -0,0 +1,28 @@
use anyhow::Result;
use krata::v1::control::{DeviceInfo, ListDevicesReply, ListDevicesRequest};
use crate::devices::DaemonDeviceManager;
pub struct ListDevicesRpc {
devices: DaemonDeviceManager,
}
impl ListDevicesRpc {
pub fn new(devices: DaemonDeviceManager) -> Self {
Self { devices }
}
pub async fn process(self, _request: ListDevicesRequest) -> Result<ListDevicesReply> {
let mut devices = Vec::new();
let state = self.devices.copy().await?;
for (name, state) in state {
devices.push(DeviceInfo {
name,
claimed: state.owner.is_some(),
owner: state.owner.map(|x| x.to_string()).unwrap_or_default(),
});
}
Ok(ListDevicesReply { devices })
}
}

View File

@ -0,0 +1,21 @@
use anyhow::Result;
use krata::v1::common::Zone;
use krata::v1::control::{ListZonesReply, ListZonesRequest};
use crate::db::zone::ZoneStore;
pub struct ListZonesRpc {
zones: ZoneStore,
}
impl ListZonesRpc {
pub fn new(zones: ZoneStore) -> Self {
Self { zones }
}
pub async fn process(self, _request: ListZonesRequest) -> Result<ListZonesReply> {
let zones = self.zones.list().await?;
let zones = zones.into_values().collect::<Vec<Zone>>();
Ok(ListZonesReply { zones })
}
}

View File

@ -0,0 +1,351 @@
use std::pin::Pin;
use anyhow::Error;
use futures::Stream;
use tokio::sync::mpsc::Sender;
use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid;
use krata::v1::control::{
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, DestroyZoneReply,
DestroyZoneRequest, ExecInsideZoneReply, ExecInsideZoneRequest, GetHostCpuTopologyReply,
GetHostCpuTopologyRequest, GetHostStatusReply, GetHostStatusRequest, ListDevicesReply,
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, WatchEventsReply,
WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
};
use krata::v1::control::{
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
SetHostPowerManagementPolicyRequest,
};
use krataoci::packer::service::OciPackerService;
use kratart::Runtime;
use crate::control::attach_zone_console::AttachZoneConsoleRpc;
use crate::control::create_zone::CreateZoneRpc;
use crate::control::destroy_zone::DestroyZoneRpc;
use crate::control::exec_inside_zone::ExecInsideZoneRpc;
use crate::control::get_host_cpu_topology::GetHostCpuTopologyRpc;
use crate::control::get_host_status::GetHostStatusRpc;
use crate::control::get_zone::GetZoneRpc;
use crate::control::list_devices::ListDevicesRpc;
use crate::control::list_zones::ListZonesRpc;
use crate::control::pull_image::PullImageRpc;
use crate::control::read_hypervisor_console::ReadHypervisorConsoleRpc;
use crate::control::read_zone_metrics::ReadZoneMetricsRpc;
use crate::control::resolve_zone_id::ResolveZoneIdRpc;
use crate::control::set_host_power_management_policy::SetHostPowerManagementPolicyRpc;
use crate::control::snoop_idm::SnoopIdmRpc;
use crate::control::update_zone_resources::UpdateZoneResourcesRpc;
use crate::control::watch_events::WatchEventsRpc;
use crate::db::zone::ZoneStore;
use crate::ip::assignment::IpAssignment;
use crate::{
console::DaemonConsoleHandle, devices::DaemonDeviceManager, event::DaemonEventContext,
idm::DaemonIdmHandle, zlt::ZoneLookupTable,
};
pub mod attach_zone_console;
pub mod create_zone;
pub mod destroy_zone;
pub mod exec_inside_zone;
pub mod get_host_cpu_topology;
pub mod get_host_status;
pub mod get_zone;
pub mod list_devices;
pub mod list_zones;
pub mod pull_image;
pub mod read_hypervisor_console;
pub mod read_zone_metrics;
pub mod resolve_zone_id;
pub mod set_host_power_management_policy;
pub mod snoop_idm;
pub mod update_zone_resources;
pub mod watch_events;
pub struct ApiError {
message: String,
}
impl From<Error> for ApiError {
fn from(value: Error) -> Self {
ApiError {
message: value.to_string(),
}
}
}
impl From<ApiError> for Status {
fn from(value: ApiError) -> Self {
Status::unknown(value.message)
}
}
#[derive(Clone)]
pub struct DaemonControlService {
zlt: ZoneLookupTable,
devices: DaemonDeviceManager,
events: DaemonEventContext,
console: DaemonConsoleHandle,
idm: DaemonIdmHandle,
zones: ZoneStore,
ip: IpAssignment,
zone_reconciler_notify: Sender<Uuid>,
packer: OciPackerService,
runtime: Runtime,
}
impl DaemonControlService {
#[allow(clippy::too_many_arguments)]
pub fn new(
zlt: ZoneLookupTable,
devices: DaemonDeviceManager,
events: DaemonEventContext,
console: DaemonConsoleHandle,
idm: DaemonIdmHandle,
zones: ZoneStore,
ip: IpAssignment,
zone_reconciler_notify: Sender<Uuid>,
packer: OciPackerService,
runtime: Runtime,
) -> Self {
Self {
zlt,
devices,
events,
console,
idm,
zones,
ip,
zone_reconciler_notify,
packer,
runtime,
}
}
}
#[tonic::async_trait]
impl ControlService for DaemonControlService {
async fn get_host_status(
&self,
request: Request<GetHostStatusRequest>,
) -> Result<Response<GetHostStatusReply>, Status> {
let request = request.into_inner();
adapt(
GetHostStatusRpc::new(self.ip.clone(), self.zlt.clone())
.process(request)
.await,
)
}
type SnoopIdmStream =
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
async fn snoop_idm(
&self,
request: Request<SnoopIdmRequest>,
) -> Result<Response<Self::SnoopIdmStream>, Status> {
let request = request.into_inner();
adapt(
SnoopIdmRpc::new(self.idm.clone(), self.zlt.clone())
.process(request)
.await,
)
}
async fn get_host_cpu_topology(
&self,
request: Request<GetHostCpuTopologyRequest>,
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
let request = request.into_inner();
adapt(
GetHostCpuTopologyRpc::new(self.runtime.clone())
.process(request)
.await,
)
}
async fn set_host_power_management_policy(
&self,
request: Request<SetHostPowerManagementPolicyRequest>,
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
let request = request.into_inner();
adapt(
SetHostPowerManagementPolicyRpc::new(self.runtime.clone())
.process(request)
.await,
)
}
async fn list_devices(
&self,
request: Request<ListDevicesRequest>,
) -> Result<Response<ListDevicesReply>, Status> {
let request = request.into_inner();
adapt(
ListDevicesRpc::new(self.devices.clone())
.process(request)
.await,
)
}
type PullImageStream =
Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
async fn pull_image(
&self,
request: Request<PullImageRequest>,
) -> Result<Response<Self::PullImageStream>, Status> {
let request = request.into_inner();
adapt(
PullImageRpc::new(self.packer.clone())
.process(request)
.await,
)
}
async fn create_zone(
&self,
request: Request<CreateZoneRequest>,
) -> Result<Response<CreateZoneReply>, Status> {
let request = request.into_inner();
adapt(
CreateZoneRpc::new(
self.zones.clone(),
self.zlt.clone(),
self.zone_reconciler_notify.clone(),
)
.process(request)
.await,
)
}
async fn destroy_zone(
&self,
request: Request<DestroyZoneRequest>,
) -> Result<Response<DestroyZoneReply>, Status> {
let request = request.into_inner();
adapt(
DestroyZoneRpc::new(self.zones.clone(), self.zone_reconciler_notify.clone())
.process(request)
.await,
)
}
async fn resolve_zone_id(
&self,
request: Request<ResolveZoneIdRequest>,
) -> Result<Response<ResolveZoneIdReply>, Status> {
let request = request.into_inner();
adapt(
ResolveZoneIdRpc::new(self.zones.clone())
.process(request)
.await,
)
}
async fn get_zone(
&self,
request: Request<GetZoneRequest>,
) -> Result<Response<GetZoneReply>, Status> {
let request = request.into_inner();
adapt(GetZoneRpc::new(self.zones.clone()).process(request).await)
}
async fn update_zone_resources(
&self,
request: Request<UpdateZoneResourcesRequest>,
) -> Result<Response<UpdateZoneResourcesReply>, Status> {
let request = request.into_inner();
adapt(
UpdateZoneResourcesRpc::new(self.runtime.clone(), self.zones.clone())
.process(request)
.await,
)
}
async fn list_zones(
&self,
request: Request<ListZonesRequest>,
) -> Result<Response<ListZonesReply>, Status> {
let request = request.into_inner();
adapt(ListZonesRpc::new(self.zones.clone()).process(request).await)
}
type AttachZoneConsoleStream =
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
async fn attach_zone_console(
&self,
request: Request<Streaming<ZoneConsoleRequest>>,
) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
let input = request.into_inner();
adapt(
AttachZoneConsoleRpc::new(self.console.clone())
.process(input)
.await,
)
}
type ExecInsideZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
async fn exec_inside_zone(
&self,
request: Request<Streaming<ExecInsideZoneRequest>>,
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
let input = request.into_inner();
adapt(
ExecInsideZoneRpc::new(self.idm.clone())
.process(input)
.await,
)
}
async fn read_zone_metrics(
&self,
request: Request<ReadZoneMetricsRequest>,
) -> Result<Response<ReadZoneMetricsReply>, Status> {
let request = request.into_inner();
adapt(
ReadZoneMetricsRpc::new(self.idm.clone())
.process(request)
.await,
)
}
type WatchEventsStream =
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
async fn watch_events(
&self,
request: Request<WatchEventsRequest>,
) -> Result<Response<Self::WatchEventsStream>, Status> {
let request = request.into_inner();
adapt(
WatchEventsRpc::new(self.events.clone())
.process(request)
.await,
)
}
async fn read_hypervisor_console(
&self,
request: Request<ReadHypervisorConsoleRequest>,
) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
let request = request.into_inner();
adapt(
ReadHypervisorConsoleRpc::new(self.runtime.clone())
.process(request)
.await,
)
}
}
fn adapt<T>(result: anyhow::Result<T>) -> Result<Response<T>, Status> {
result
.map(Response::new)
.map_err(|error| Status::unknown(error.to_string()))
}

View File

@ -0,0 +1,100 @@
use crate::control::ApiError;
use crate::oci::convert_oci_progress;
use anyhow::Result;
use async_stream::try_stream;
use krata::v1::common::OciImageFormat;
use krata::v1::control::{PullImageReply, PullImageRequest};
use krataoci::name::ImageName;
use krataoci::packer::service::OciPackerService;
use krataoci::packer::{OciPackedFormat, OciPackedImage};
use krataoci::progress::{OciProgress, OciProgressContext};
use std::pin::Pin;
use tokio::select;
use tokio::task::JoinError;
use tokio_stream::Stream;
use tonic::Status;
enum PullImageSelect {
Progress(Option<OciProgress>),
Completed(Result<Result<OciPackedImage, anyhow::Error>, JoinError>),
}
pub struct PullImageRpc {
packer: OciPackerService,
}
impl PullImageRpc {
pub fn new(packer: OciPackerService) -> Self {
Self { packer }
}
pub async fn process(
self,
request: PullImageRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>> {
let name = ImageName::parse(&request.image)?;
let format = match request.format() {
OciImageFormat::Unknown => OciPackedFormat::Squashfs,
OciImageFormat::Squashfs => OciPackedFormat::Squashfs,
OciImageFormat::Erofs => OciPackedFormat::Erofs,
OciImageFormat::Tar => OciPackedFormat::Tar,
};
let (context, mut receiver) = OciProgressContext::create();
let our_packer = self.packer;
let output = try_stream! {
let mut task = tokio::task::spawn(async move {
our_packer.request(name, format, request.overwrite_cache, request.update, context).await
});
let abort_handle = task.abort_handle();
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
handle.abort();
});
loop {
let what = select! {
x = receiver.changed() => match x {
Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())),
Err(_) => PullImageSelect::Progress(None),
},
x = &mut task => PullImageSelect::Completed(x),
};
match what {
PullImageSelect::Progress(Some(progress)) => {
let reply = PullImageReply {
progress: Some(convert_oci_progress(progress)),
digest: String::new(),
format: OciImageFormat::Unknown.into(),
};
yield reply;
},
PullImageSelect::Completed(result) => {
let result = result.map_err(|err| ApiError {
message: err.to_string(),
})?;
let packed = result.map_err(|err| ApiError {
message: err.to_string(),
})?;
let reply = PullImageReply {
progress: None,
digest: packed.digest,
format: match packed.format {
OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(),
OciPackedFormat::Erofs => OciImageFormat::Erofs.into(),
OciPackedFormat::Tar => OciImageFormat::Tar.into(),
},
};
yield reply;
break;
},
_ => {
continue;
}
}
}
};
Ok(Box::pin(output))
}
}

View File

@ -0,0 +1,23 @@
use anyhow::Result;
use krata::v1::control::{ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest};
use kratart::Runtime;
pub struct ReadHypervisorConsoleRpc {
runtime: Runtime,
}
impl ReadHypervisorConsoleRpc {
pub fn new(runtime: Runtime) -> Self {
Self { runtime }
}
pub async fn process(
self,
_: ReadHypervisorConsoleRequest,
) -> Result<ReadHypervisorConsoleReply> {
let data = self.runtime.read_hypervisor_console(false).await?;
Ok(ReadHypervisorConsoleReply {
data: data.to_string(),
})
}
}

View File

@ -0,0 +1,40 @@
use std::str::FromStr;
use anyhow::Result;
use uuid::Uuid;
use krata::idm::internal::MetricsRequest;
use krata::idm::internal::{
request::Request as IdmRequestType, response::Response as IdmResponseType,
Request as IdmRequest,
};
use krata::v1::control::{ReadZoneMetricsReply, ReadZoneMetricsRequest};
use crate::idm::DaemonIdmHandle;
use crate::metrics::idm_metric_to_api;
pub struct ReadZoneMetricsRpc {
idm: DaemonIdmHandle,
}
impl ReadZoneMetricsRpc {
pub fn new(idm: DaemonIdmHandle) -> Self {
Self { idm }
}
pub async fn process(self, request: ReadZoneMetricsRequest) -> Result<ReadZoneMetricsReply> {
let uuid = Uuid::from_str(&request.zone_id)?;
let client = self.idm.client(uuid).await?;
let response = client
.send(IdmRequest {
request: Some(IdmRequestType::Metrics(MetricsRequest {})),
})
.await?;
let mut reply = ReadZoneMetricsReply::default();
if let Some(IdmResponseType::Metrics(metrics)) = response.response {
reply.root = metrics.root.map(idm_metric_to_api);
}
Ok(reply)
}
}

View File

@ -0,0 +1,30 @@
use anyhow::Result;
use krata::v1::common::Zone;
use krata::v1::control::{ResolveZoneIdReply, ResolveZoneIdRequest};
use crate::db::zone::ZoneStore;
pub struct ResolveZoneIdRpc {
zones: ZoneStore,
}
impl ResolveZoneIdRpc {
pub fn new(zones: ZoneStore) -> Self {
Self { zones }
}
pub async fn process(self, request: ResolveZoneIdRequest) -> Result<ResolveZoneIdReply> {
let zones = self.zones.list().await?;
let zones = zones
.into_values()
.filter(|x| {
let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default();
(!request.name.is_empty() && comparison_spec.name == request.name)
|| x.id == request.name
})
.collect::<Vec<Zone>>();
Ok(ResolveZoneIdReply {
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
})
}
}

View File

@ -0,0 +1,25 @@
use anyhow::Result;
use krata::v1::control::{SetHostPowerManagementPolicyReply, SetHostPowerManagementPolicyRequest};
use kratart::Runtime;
pub struct SetHostPowerManagementPolicyRpc {
runtime: Runtime,
}
impl SetHostPowerManagementPolicyRpc {
pub fn new(runtime: Runtime) -> Self {
Self { runtime }
}
pub async fn process(
self,
request: SetHostPowerManagementPolicyRequest,
) -> Result<SetHostPowerManagementPolicyReply> {
let power = self.runtime.power_management_context().await?;
let scheduler = &request.scheduler;
power.set_smt_policy(request.smt_awareness).await?;
power.set_scheduler_policy(scheduler).await?;
Ok(SetHostPowerManagementPolicyReply {})
}
}

View File

@ -0,0 +1,39 @@
use crate::idm::DaemonIdmHandle;
use crate::zlt::ZoneLookupTable;
use anyhow::Result;
use async_stream::try_stream;
use krata::v1::control::{SnoopIdmReply, SnoopIdmRequest};
use std::pin::Pin;
use tokio_stream::Stream;
use tonic::Status;
pub struct SnoopIdmRpc {
idm: DaemonIdmHandle,
zlt: ZoneLookupTable,
}
impl SnoopIdmRpc {
pub fn new(idm: DaemonIdmHandle, zlt: ZoneLookupTable) -> Self {
Self { idm, zlt }
}
pub async fn process(
self,
_request: SnoopIdmRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>> {
let mut messages = self.idm.snoop();
let zlt = self.zlt.clone();
let output = try_stream! {
while let Ok(event) = messages.recv().await {
let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else {
continue;
};
let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else {
continue;
};
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
}
};
Ok(Box::pin(output))
}
}

View File

@ -0,0 +1,82 @@
use std::str::FromStr;
use anyhow::{anyhow, Result};
use uuid::Uuid;
use krata::v1::common::{ZoneResourceStatus, ZoneState};
use krata::v1::control::{UpdateZoneResourcesReply, UpdateZoneResourcesRequest};
use kratart::Runtime;
use crate::db::zone::ZoneStore;
pub struct UpdateZoneResourcesRpc {
runtime: Runtime,
zones: ZoneStore,
}
impl UpdateZoneResourcesRpc {
pub fn new(runtime: Runtime, zones: ZoneStore) -> Self {
Self { runtime, zones }
}
pub async fn process(
self,
request: UpdateZoneResourcesRequest,
) -> Result<UpdateZoneResourcesReply> {
let uuid = Uuid::from_str(&request.zone_id)?;
let Some(mut zone) = self.zones.read(uuid).await? else {
return Err(anyhow!("zone not found"));
};
let Some(ref mut status) = zone.status else {
return Err(anyhow!("zone state not available"));
};
if status.state() != ZoneState::Created {
return Err(anyhow!("zone is in an invalid state"));
}
if status.domid == 0 || status.domid == u32::MAX {
return Err(anyhow!("zone domid is invalid"));
}
let mut resources = request.resources.unwrap_or_default();
if resources.target_memory > resources.max_memory {
resources.max_memory = resources.target_memory;
}
if resources.target_cpus < 1 {
resources.target_cpus = 1;
}
let initial_resources = zone
.spec
.clone()
.unwrap_or_default()
.initial_resources
.unwrap_or_default();
if resources.target_cpus > initial_resources.max_cpus {
resources.target_cpus = initial_resources.max_cpus;
}
resources.max_cpus = initial_resources.max_cpus;
self.runtime
.set_memory_resources(
status.domid,
resources.target_memory * 1024 * 1024,
resources.max_memory * 1024 * 1024,
)
.await
.map_err(|error| anyhow!("failed to set memory resources: {}", error))?;
self.runtime
.set_cpu_resources(status.domid, resources.target_cpus)
.await
.map_err(|error| anyhow!("failed to set cpu resources: {}", error))?;
status.resource_status = Some(ZoneResourceStatus {
active_resources: Some(resources),
});
self.zones.update(uuid, zone).await?;
Ok(UpdateZoneResourcesReply {})
}
}

View File

@ -0,0 +1,31 @@
use crate::event::DaemonEventContext;
use anyhow::Result;
use async_stream::try_stream;
use krata::v1::control::{WatchEventsReply, WatchEventsRequest};
use std::pin::Pin;
use tokio_stream::Stream;
use tonic::Status;
pub struct WatchEventsRpc {
events: DaemonEventContext,
}
impl WatchEventsRpc {
pub fn new(events: DaemonEventContext) -> Self {
Self { events }
}
pub async fn process(
self,
_request: WatchEventsRequest,
) -> Result<Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>>
{
let mut events = self.events.subscribe();
let output = try_stream! {
while let Ok(event) = events.recv().await {
yield WatchEventsReply { event: Some(event), };
}
};
Ok(Box::pin(output))
}
}

View File

@ -16,6 +16,7 @@ use kratart::Runtime;
use log::{debug, info};
use reconcile::zone::ZoneReconciler;
use std::path::Path;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use tokio::{
fs,
@ -41,7 +42,6 @@ pub mod metrics;
pub mod oci;
pub mod reconcile;
pub mod zlt;
pub struct Daemon {
store: String,
_config: Arc<DaemonConfig>,
@ -209,6 +209,8 @@ impl Daemon {
server = server.tls_config(tls_config)?;
}
server = server.http2_keepalive_interval(Some(Duration::from_secs(10)));
let server = server.add_service(ControlServiceServer::new(control_service));
info!("listening on address {}", addr);
match addr {

View File

@ -27,7 +27,7 @@ use tokio::{
select,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex, RwLock,
RwLock,
},
task::JoinHandle,
time::sleep,
@ -45,16 +45,9 @@ enum ZoneReconcilerResult {
}
struct ZoneReconcilerEntry {
task: JoinHandle<()>,
sender: Sender<()>,
}
impl Drop for ZoneReconcilerEntry {
fn drop(&mut self) {
self.task.abort();
}
}
#[derive(Clone)]
pub struct ZoneReconciler {
devices: DaemonDeviceManager,
@ -66,7 +59,7 @@ pub struct ZoneReconciler {
kernel_path: PathBuf,
initrd_path: PathBuf,
addons_path: PathBuf,
tasks: Arc<Mutex<HashMap<Uuid, ZoneReconcilerEntry>>>,
tasks: Arc<RwLock<HashMap<Uuid, ZoneReconcilerEntry>>>,
zone_reconciler_notify: Sender<Uuid>,
zone_reconcile_lock: Arc<RwLock<()>>,
ip_assignment: IpAssignment,
@ -99,7 +92,7 @@ impl ZoneReconciler {
kernel_path,
initrd_path,
addons_path: modules_path,
tasks: Arc::new(Mutex::new(HashMap::new())),
tasks: Arc::new(RwLock::new(HashMap::new())),
zone_reconciler_notify,
zone_reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)),
ip_assignment,
@ -125,7 +118,7 @@ impl ZoneReconciler {
error!("failed to start zone reconciler task {}: {}", uuid, error);
}
let map = self.tasks.lock().await;
let map = self.tasks.read().await;
if let Some(entry) = map.get(&uuid) {
if let Err(error) = entry.sender.send(()).await {
error!("failed to notify zone reconciler task {}: {}", uuid, error);
@ -271,7 +264,7 @@ impl ZoneReconciler {
if destroyed {
self.zones.remove(uuid).await?;
let mut map = self.tasks.lock().await;
let mut map = self.tasks.write().await;
map.remove(&uuid);
} else {
self.zones.update(uuid, zone.clone()).await?;
@ -337,7 +330,7 @@ impl ZoneReconciler {
}
async fn launch_task_if_needed(&self, uuid: Uuid) -> Result<()> {
let mut map = self.tasks.lock().await;
let mut map = self.tasks.write().await;
match map.entry(uuid) {
Entry::Occupied(_) => {}
Entry::Vacant(entry) => {
@ -350,7 +343,7 @@ impl ZoneReconciler {
async fn launch_task(&self, uuid: Uuid) -> Result<ZoneReconcilerEntry> {
let this = self.clone();
let (sender, mut receiver) = channel(10);
let task = tokio::task::spawn(async move {
tokio::task::spawn(async move {
'notify_loop: loop {
if receiver.recv().await.is_none() {
break 'notify_loop;
@ -372,7 +365,7 @@ impl ZoneReconciler {
}
}
});
Ok(ZoneReconcilerEntry { task, sender })
Ok(ZoneReconcilerEntry { sender })
}
}

View File

@ -10,7 +10,7 @@ import "krata/idm/transport.proto";
import "krata/v1/common.proto";
service ControlService {
rpc HostStatus(HostStatusRequest) returns (HostStatusReply);
rpc GetHostStatus(GetHostStatusRequest) returns (GetHostStatusReply);
rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply);
rpc GetHostCpuTopology(GetHostCpuTopologyRequest) returns (GetHostCpuTopologyReply);
rpc SetHostPowerManagementPolicy(SetHostPowerManagementPolicyRequest) returns (SetHostPowerManagementPolicyReply);
@ -39,9 +39,9 @@ service ControlService {
rpc ReadHypervisorConsole(ReadHypervisorConsoleRequest) returns (ReadHypervisorConsoleReply);
}
message HostStatusRequest {}
message GetHostStatusRequest {}
message HostStatusReply {
message GetHostStatusReply {
string host_uuid = 1;
uint32 host_domid = 2;
string krata_version = 3;
@ -104,6 +104,7 @@ message ExecInsideZoneReply {
message ZoneConsoleRequest {
string zone_id = 1;
bytes data = 2;
bool replay_history = 3;
}
message ZoneConsoleReply {

View File

@ -495,6 +495,7 @@ impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
IdmTransportPacketForm::StreamRequestClosed => {
let mut update_streams = request_update_streams.lock().await;
update_streams.remove(&packet.id);
println!("stream request closed: {}", packet.id);
}
IdmTransportPacketForm::StreamResponseUpdate => {

View File

@ -16,7 +16,7 @@ clap = { workspace = true }
env_logger = { workspace = true }
etherparse = { workspace = true }
futures = { workspace = true }
krata = { path = "../krata", version = "^0.0.18" }
krata = { path = "../krata", version = "^0.0.19" }
krata-advmac = { workspace = true }
libc = { workspace = true }
log = { workspace = true }

View File

@ -9,7 +9,7 @@ use krata::{
dial::ControlDialAddress,
v1::{
common::Zone,
control::{control_service_client::ControlServiceClient, HostStatusRequest},
control::{control_service_client::ControlServiceClient, GetHostStatusRequest},
},
};
use log::warn;
@ -47,7 +47,7 @@ impl NetworkService {
pub async fn new(control_address: ControlDialAddress) -> Result<NetworkService> {
let mut control = ControlClientProvider::dial(control_address).await?;
let host_status = control
.host_status(Request::new(HostStatusRequest {}))
.get_host_status(Request::new(GetHostStatusRequest {}))
.await?
.into_inner();
let host_ipv4 = Ipv4Cidr::from_str(&host_status.host_ipv4)

View File

@ -12,20 +12,20 @@ resolver = "2"
anyhow = { workspace = true }
backhand = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.18" }
krata = { path = "../krata", version = "^0.0.19" }
krata-advmac = { workspace = true }
krata-oci = { path = "../oci", version = "^0.0.18" }
krata-oci = { path = "../oci", version = "^0.0.19" }
log = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
krata-loopdev = { path = "../loopdev", version = "^0.0.18" }
krata-xencall = { path = "../xen/xencall", version = "^0.0.18" }
krata-xenclient = { path = "../xen/xenclient", version = "^0.0.18" }
krata-xenevtchn = { path = "../xen/xenevtchn", version = "^0.0.18" }
krata-xengnt = { path = "../xen/xengnt", version = "^0.0.18" }
krata-xenplatform = { path = "../xen/xenplatform", version = "^0.0.18" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.18" }
krata-loopdev = { path = "../loopdev", version = "^0.0.19" }
krata-xencall = { path = "../xen/xencall", version = "^0.0.19" }
krata-xenclient = { path = "../xen/xenclient", version = "^0.0.19" }
krata-xenevtchn = { path = "../xen/xenevtchn", version = "^0.0.19" }
krata-xengnt = { path = "../xen/xengnt", version = "^0.0.19" }
krata-xenplatform = { path = "../xen/xenplatform", version = "^0.0.19" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.19" }
walkdir = { workspace = true }
indexmap = { workspace = true }

View File

@ -13,9 +13,9 @@ async-trait = { workspace = true }
indexmap = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
krata-xencall = { path = "../xencall", version = "^0.0.18" }
krata-xenplatform = { path = "../xenplatform", version = "^0.0.18" }
krata-xenstore = { path = "../xenstore", version = "^0.0.18" }
krata-xencall = { path = "../xencall", version = "^0.0.19" }
krata-xenplatform = { path = "../xenplatform", version = "^0.0.19" }
krata-xenstore = { path = "../xenstore", version = "^0.0.19" }
regex = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

View File

@ -16,7 +16,7 @@ flate2 = { workspace = true }
indexmap = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
krata-xencall = { path = "../xencall", version = "^0.0.18" }
krata-xencall = { path = "../xencall", version = "^0.0.19" }
memchr = { workspace = true }
nix = { workspace = true }
regex = { workspace = true }

View File

@ -14,8 +14,8 @@ cgroups-rs = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.18" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.18" }
krata = { path = "../krata", version = "^0.0.19" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.19" }
libc = { workspace = true }
log = { workspace = true }
nix = { workspace = true, features = ["ioctl", "process", "fs"] }
@ -29,6 +29,7 @@ serde_json = { workspace = true }
sys-mount = { workspace = true }
sysinfo = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
[lib]
name = "kratazone"

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, process::Stdio};
use crate::childwait::ChildWait;
use anyhow::{anyhow, Result};
use krata::idm::{
client::IdmClientStreamResponseHandle,
@ -16,9 +17,9 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
join,
process::Command,
select,
};
use crate::childwait::ChildWait;
use tokio_util::sync::CancellationToken;
pub struct ZoneExecTask {
pub wait: ChildWait,
@ -70,16 +71,21 @@ impl ZoneExecTask {
if start.tty {
let pty = Pty::new().map_err(|error| anyhow!("unable to allocate pty: {}", error))?;
pty.resize(Size::new(24, 80))?;
let mut child = ChildDropGuard {
inner: pty_process::Command::new(exe)
let pts = pty
.pts()
.map_err(|error| anyhow!("unable to allocate pts: {}", error))?;
let child = std::panic::catch_unwind(move || {
let pts = pts;
pty_process::Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
.spawn(
&pty.pts()
.map_err(|error| anyhow!("unable to allocate pts: {}", error))?,
)
.map_err(|error| anyhow!("failed to spawn: {}", error))?,
.spawn(&pts)
})
.map_err(|_| anyhow!("internal error"))
.map_err(|error| anyhow!("failed to spawn: {}", error))??;
let mut child = ChildDropGuard {
inner: child,
kill: true,
};
let pid = child
@ -111,9 +117,12 @@ impl ZoneExecTask {
}
});
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break;
};
@ -136,19 +145,33 @@ impl ZoneExecTask {
});
code = loop {
if let Ok(event) = wait_subscription.recv().await {
select! {
result = wait_subscription.recv() => match result {
Ok(event) => {
if event.pid.as_raw() as u32 == pid {
child.kill = false;
break event.status;
}
}
_ => {
child.inner.start_kill()?;
child.kill = false;
break -1;
}
},
_ = cancel.cancelled() => {
child.inner.start_kill()?;
child.kill = false;
break -1;
}
}
};
child.kill = false;
let _ = join!(pty_read_task);
stdin_task.abort();
} else {
let mut child = Command::new(exe)
let mut child = std::panic::catch_unwind(|| {
Command::new(exe)
.args(cmd)
.envs(env)
.current_dir(dir)
@ -157,7 +180,9 @@ impl ZoneExecTask {
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|error| anyhow!("failed to spawn: {}", error))?;
})
.map_err(|_| anyhow!("internal error"))
.map_err(|error| anyhow!("failed to spawn: {}", error))??;
let pid = child.id().ok_or_else(|| anyhow!("pid is not provided"))?;
let mut stdin = child
@ -221,9 +246,12 @@ impl ZoneExecTask {
}
});
let cancel = CancellationToken::new();
let stdin_cancel = cancel.clone();
let stdin_task = tokio::task::spawn(async move {
loop {
let Some(request) = receiver.recv().await else {
stdin_cancel.cancel();
break;
};
@ -247,11 +275,23 @@ impl ZoneExecTask {
});
code = loop {
if let Ok(event) = wait_subscription.recv().await {
select! {
result = wait_subscription.recv() => match result {
Ok(event) => {
if event.pid.as_raw() as u32 == pid {
break event.status;
}
}
_ => {
child.start_kill()?;
break -1;
}
},
_ = cancel.cancelled() => {
child.start_kill()?;
break -1;
}
}
};
data_task.await?;
}