mirror of
https://github.com/edera-dev/krata.git
synced 2025-08-03 05:10:55 +00:00
chore(control): split out all of the rpc calls into their own files (#357)
This commit is contained in:
@ -1,753 +0,0 @@
|
|||||||
use crate::db::zone::ZoneStore;
|
|
||||||
use crate::ip::assignment::IpAssignment;
|
|
||||||
use crate::{
|
|
||||||
command::DaemonCommand, console::DaemonConsoleHandle, devices::DaemonDeviceManager,
|
|
||||||
event::DaemonEventContext, idm::DaemonIdmHandle, metrics::idm_metric_to_api,
|
|
||||||
oci::convert_oci_progress, zlt::ZoneLookupTable,
|
|
||||||
};
|
|
||||||
use async_stream::try_stream;
|
|
||||||
use futures::Stream;
|
|
||||||
use krata::v1::common::ZoneResourceStatus;
|
|
||||||
use krata::v1::control::{
|
|
||||||
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
|
|
||||||
SetHostPowerManagementPolicyRequest,
|
|
||||||
};
|
|
||||||
use krata::{
|
|
||||||
idm::internal::{
|
|
||||||
exec_stream_request_update::Update, request::Request as IdmRequestType,
|
|
||||||
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
|
|
||||||
ExecStreamRequestStdin, ExecStreamRequestUpdate, MetricsRequest, Request as IdmRequest,
|
|
||||||
},
|
|
||||||
v1::{
|
|
||||||
common::{OciImageFormat, Zone, ZoneState, ZoneStatus},
|
|
||||||
control::{
|
|
||||||
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest,
|
|
||||||
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecInsideZoneReply,
|
|
||||||
ExecInsideZoneRequest, GetHostCpuTopologyReply, GetHostCpuTopologyRequest,
|
|
||||||
HostCpuTopologyInfo, HostStatusReply, HostStatusRequest, ListDevicesReply,
|
|
||||||
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
|
|
||||||
ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
|
|
||||||
ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
|
|
||||||
SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest,
|
|
||||||
WatchEventsReply, WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use krataoci::{
|
|
||||||
name::ImageName,
|
|
||||||
packer::{service::OciPackerService, OciPackedFormat, OciPackedImage},
|
|
||||||
progress::{OciProgress, OciProgressContext},
|
|
||||||
};
|
|
||||||
use kratart::Runtime;
|
|
||||||
use std::{pin::Pin, str::FromStr};
|
|
||||||
use tokio::{
|
|
||||||
select,
|
|
||||||
sync::mpsc::{channel, Sender},
|
|
||||||
task::JoinError,
|
|
||||||
};
|
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
pub struct ApiError {
|
|
||||||
message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<anyhow::Error> for ApiError {
|
|
||||||
fn from(value: anyhow::Error) -> Self {
|
|
||||||
ApiError {
|
|
||||||
message: value.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ApiError> for Status {
|
|
||||||
fn from(value: ApiError) -> Self {
|
|
||||||
Status::unknown(value.message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DaemonControlService {
|
|
||||||
zlt: ZoneLookupTable,
|
|
||||||
devices: DaemonDeviceManager,
|
|
||||||
events: DaemonEventContext,
|
|
||||||
console: DaemonConsoleHandle,
|
|
||||||
idm: DaemonIdmHandle,
|
|
||||||
zones: ZoneStore,
|
|
||||||
ip: IpAssignment,
|
|
||||||
zone_reconciler_notify: Sender<Uuid>,
|
|
||||||
packer: OciPackerService,
|
|
||||||
runtime: Runtime,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DaemonControlService {
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
pub fn new(
|
|
||||||
zlt: ZoneLookupTable,
|
|
||||||
devices: DaemonDeviceManager,
|
|
||||||
events: DaemonEventContext,
|
|
||||||
console: DaemonConsoleHandle,
|
|
||||||
idm: DaemonIdmHandle,
|
|
||||||
zones: ZoneStore,
|
|
||||||
ip: IpAssignment,
|
|
||||||
zone_reconciler_notify: Sender<Uuid>,
|
|
||||||
packer: OciPackerService,
|
|
||||||
runtime: Runtime,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
zlt,
|
|
||||||
devices,
|
|
||||||
events,
|
|
||||||
console,
|
|
||||||
idm,
|
|
||||||
zones,
|
|
||||||
ip,
|
|
||||||
zone_reconciler_notify,
|
|
||||||
packer,
|
|
||||||
runtime,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ConsoleDataSelect {
|
|
||||||
Read(Option<Vec<u8>>),
|
|
||||||
Write(Option<Result<ZoneConsoleRequest, Status>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
enum PullImageSelect {
|
|
||||||
Progress(Option<OciProgress>),
|
|
||||||
Completed(Result<Result<OciPackedImage, anyhow::Error>, JoinError>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl ControlService for DaemonControlService {
|
|
||||||
type ExecInsideZoneStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
type AttachZoneConsoleStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
type PullImageStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
type WatchEventsStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
type SnoopIdmStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
async fn host_status(
|
|
||||||
&self,
|
|
||||||
request: Request<HostStatusRequest>,
|
|
||||||
) -> Result<Response<HostStatusReply>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let host_reservation =
|
|
||||||
self.ip
|
|
||||||
.retrieve(self.zlt.host_uuid())
|
|
||||||
.await
|
|
||||||
.map_err(|x| ApiError {
|
|
||||||
message: x.to_string(),
|
|
||||||
})?;
|
|
||||||
Ok(Response::new(HostStatusReply {
|
|
||||||
host_domid: self.zlt.host_domid(),
|
|
||||||
host_uuid: self.zlt.host_uuid().to_string(),
|
|
||||||
krata_version: DaemonCommand::version(),
|
|
||||||
host_ipv4: host_reservation
|
|
||||||
.as_ref()
|
|
||||||
.map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix))
|
|
||||||
.unwrap_or_default(),
|
|
||||||
host_ipv6: host_reservation
|
|
||||||
.as_ref()
|
|
||||||
.map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix))
|
|
||||||
.unwrap_or_default(),
|
|
||||||
host_mac: host_reservation
|
|
||||||
.as_ref()
|
|
||||||
.map(|x| x.mac.to_string().to_lowercase().replace('-', ":"))
|
|
||||||
.unwrap_or_default(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn create_zone(
|
|
||||||
&self,
|
|
||||||
request: Request<CreateZoneRequest>,
|
|
||||||
) -> Result<Response<CreateZoneReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let Some(spec) = request.spec else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone spec not provided".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
self.zones
|
|
||||||
.update(
|
|
||||||
uuid,
|
|
||||||
Zone {
|
|
||||||
id: uuid.to_string(),
|
|
||||||
status: Some(ZoneStatus {
|
|
||||||
state: ZoneState::Creating.into(),
|
|
||||||
network_status: None,
|
|
||||||
exit_status: None,
|
|
||||||
error_status: None,
|
|
||||||
resource_status: None,
|
|
||||||
host: self.zlt.host_uuid().to_string(),
|
|
||||||
domid: u32::MAX,
|
|
||||||
}),
|
|
||||||
spec: Some(spec),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
self.zone_reconciler_notify
|
|
||||||
.send(uuid)
|
|
||||||
.await
|
|
||||||
.map_err(|x| ApiError {
|
|
||||||
message: x.to_string(),
|
|
||||||
})?;
|
|
||||||
Ok(Response::new(CreateZoneReply {
|
|
||||||
zone_id: uuid.to_string(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn exec_inside_zone(
|
|
||||||
&self,
|
|
||||||
request: Request<Streaming<ExecInsideZoneRequest>>,
|
|
||||||
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
|
|
||||||
let mut input = request.into_inner();
|
|
||||||
let Some(request) = input.next().await else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "expected to have at least one request".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
let request = request?;
|
|
||||||
|
|
||||||
let Some(task) = request.task else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "task is missing".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
|
|
||||||
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
let idm = self.idm.client(uuid).await.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let idm_request = IdmRequest {
|
|
||||||
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
|
|
||||||
update: Some(Update::Start(ExecStreamRequestStart {
|
|
||||||
environment: task
|
|
||||||
.environment
|
|
||||||
.into_iter()
|
|
||||||
.map(|x| ExecEnvVar {
|
|
||||||
key: x.key,
|
|
||||||
value: x.value,
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
command: task.command,
|
|
||||||
working_directory: task.working_directory,
|
|
||||||
tty: task.tty,
|
|
||||||
})),
|
|
||||||
})),
|
|
||||||
};
|
|
||||||
|
|
||||||
let output = try_stream! {
|
|
||||||
let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
|
|
||||||
message: x.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
|
||||||
x = input.next() => if let Some(update) = x {
|
|
||||||
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
|
|
||||||
message: error.to_string()
|
|
||||||
}.into());
|
|
||||||
|
|
||||||
if let Ok(update) = update {
|
|
||||||
if !update.stdin.is_empty() {
|
|
||||||
let _ = handle.update(IdmRequest {
|
|
||||||
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
|
|
||||||
update: Some(Update::Stdin(ExecStreamRequestStdin {
|
|
||||||
data: update.stdin,
|
|
||||||
closed: update.stdin_closed,
|
|
||||||
})),
|
|
||||||
}))}).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
x = handle.receiver.recv() => match x {
|
|
||||||
Some(response) => {
|
|
||||||
let Some(IdmResponseType::ExecStream(update)) = response.response else {
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
let reply = ExecInsideZoneReply {
|
|
||||||
exited: update.exited,
|
|
||||||
error: update.error,
|
|
||||||
exit_code: update.exit_code,
|
|
||||||
stdout: update.stdout,
|
|
||||||
stderr: update.stderr,
|
|
||||||
};
|
|
||||||
yield reply;
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Response::new(Box::pin(output) as Self::ExecInsideZoneStream))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn destroy_zone(
|
|
||||||
&self,
|
|
||||||
request: Request<DestroyZoneRequest>,
|
|
||||||
) -> Result<Response<DestroyZoneReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone not found".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
|
|
||||||
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
|
|
||||||
|
|
||||||
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone already destroyed".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
|
|
||||||
self.zones
|
|
||||||
.update(uuid, zone)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
self.zone_reconciler_notify
|
|
||||||
.send(uuid)
|
|
||||||
.await
|
|
||||||
.map_err(|x| ApiError {
|
|
||||||
message: x.to_string(),
|
|
||||||
})?;
|
|
||||||
Ok(Response::new(DestroyZoneReply {}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_zones(
|
|
||||||
&self,
|
|
||||||
request: Request<ListZonesRequest>,
|
|
||||||
) -> Result<Response<ListZonesReply>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let zones = self.zones.list().await.map_err(ApiError::from)?;
|
|
||||||
let zones = zones.into_values().collect::<Vec<Zone>>();
|
|
||||||
Ok(Response::new(ListZonesReply { zones }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn resolve_zone_id(
|
|
||||||
&self,
|
|
||||||
request: Request<ResolveZoneIdRequest>,
|
|
||||||
) -> Result<Response<ResolveZoneIdReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let zones = self.zones.list().await.map_err(ApiError::from)?;
|
|
||||||
let zones = zones
|
|
||||||
.into_values()
|
|
||||||
.filter(|x| {
|
|
||||||
let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default();
|
|
||||||
(!request.name.is_empty() && comparison_spec.name == request.name)
|
|
||||||
|| x.id == request.name
|
|
||||||
})
|
|
||||||
.collect::<Vec<Zone>>();
|
|
||||||
Ok(Response::new(ResolveZoneIdReply {
|
|
||||||
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn attach_zone_console(
|
|
||||||
&self,
|
|
||||||
request: Request<Streaming<ZoneConsoleRequest>>,
|
|
||||||
) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
|
|
||||||
let mut input = request.into_inner();
|
|
||||||
let Some(request) = input.next().await else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "expected to have at least one request".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
let request = request?;
|
|
||||||
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
let (sender, mut receiver) = channel(100);
|
|
||||||
let console = self
|
|
||||||
.console
|
|
||||||
.attach(uuid, sender)
|
|
||||||
.await
|
|
||||||
.map_err(|error| ApiError {
|
|
||||||
message: format!("failed to attach to console: {}", error),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let output = try_stream! {
|
|
||||||
yield ZoneConsoleReply { data: console.initial.clone(), };
|
|
||||||
loop {
|
|
||||||
let what = select! {
|
|
||||||
x = receiver.recv() => ConsoleDataSelect::Read(x),
|
|
||||||
x = input.next() => ConsoleDataSelect::Write(x),
|
|
||||||
};
|
|
||||||
|
|
||||||
match what {
|
|
||||||
ConsoleDataSelect::Read(Some(data)) => {
|
|
||||||
yield ZoneConsoleReply { data, };
|
|
||||||
},
|
|
||||||
|
|
||||||
ConsoleDataSelect::Read(None) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
ConsoleDataSelect::Write(Some(request)) => {
|
|
||||||
let request = request?;
|
|
||||||
if !request.data.is_empty() {
|
|
||||||
console.send(request.data).await.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
ConsoleDataSelect::Write(None) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Response::new(
|
|
||||||
Box::pin(output) as Self::AttachZoneConsoleStream
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_zone_metrics(
|
|
||||||
&self,
|
|
||||||
request: Request<ReadZoneMetricsRequest>,
|
|
||||||
) -> Result<Response<ReadZoneMetricsReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
let client = self.idm.client(uuid).await.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let response = client
|
|
||||||
.send(IdmRequest {
|
|
||||||
request: Some(IdmRequestType::Metrics(MetricsRequest {})),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut reply = ReadZoneMetricsReply::default();
|
|
||||||
if let Some(IdmResponseType::Metrics(metrics)) = response.response {
|
|
||||||
reply.root = metrics.root.map(idm_metric_to_api);
|
|
||||||
}
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pull_image(
|
|
||||||
&self,
|
|
||||||
request: Request<PullImageRequest>,
|
|
||||||
) -> Result<Response<Self::PullImageStream>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let name = ImageName::parse(&request.image).map_err(|err| ApiError {
|
|
||||||
message: err.to_string(),
|
|
||||||
})?;
|
|
||||||
let format = match request.format() {
|
|
||||||
OciImageFormat::Unknown => OciPackedFormat::Squashfs,
|
|
||||||
OciImageFormat::Squashfs => OciPackedFormat::Squashfs,
|
|
||||||
OciImageFormat::Erofs => OciPackedFormat::Erofs,
|
|
||||||
OciImageFormat::Tar => OciPackedFormat::Tar,
|
|
||||||
};
|
|
||||||
let (context, mut receiver) = OciProgressContext::create();
|
|
||||||
let our_packer = self.packer.clone();
|
|
||||||
|
|
||||||
let output = try_stream! {
|
|
||||||
let mut task = tokio::task::spawn(async move {
|
|
||||||
our_packer.request(name, format, request.overwrite_cache, request.update, context).await
|
|
||||||
});
|
|
||||||
let abort_handle = task.abort_handle();
|
|
||||||
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
|
|
||||||
handle.abort();
|
|
||||||
});
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let what = select! {
|
|
||||||
x = receiver.changed() => match x {
|
|
||||||
Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())),
|
|
||||||
Err(_) => PullImageSelect::Progress(None),
|
|
||||||
},
|
|
||||||
x = &mut task => PullImageSelect::Completed(x),
|
|
||||||
};
|
|
||||||
match what {
|
|
||||||
PullImageSelect::Progress(Some(progress)) => {
|
|
||||||
let reply = PullImageReply {
|
|
||||||
progress: Some(convert_oci_progress(progress)),
|
|
||||||
digest: String::new(),
|
|
||||||
format: OciImageFormat::Unknown.into(),
|
|
||||||
};
|
|
||||||
yield reply;
|
|
||||||
},
|
|
||||||
|
|
||||||
PullImageSelect::Completed(result) => {
|
|
||||||
let result = result.map_err(|err| ApiError {
|
|
||||||
message: err.to_string(),
|
|
||||||
})?;
|
|
||||||
let packed = result.map_err(|err| ApiError {
|
|
||||||
message: err.to_string(),
|
|
||||||
})?;
|
|
||||||
let reply = PullImageReply {
|
|
||||||
progress: None,
|
|
||||||
digest: packed.digest,
|
|
||||||
format: match packed.format {
|
|
||||||
OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(),
|
|
||||||
OciPackedFormat::Erofs => OciImageFormat::Erofs.into(),
|
|
||||||
OciPackedFormat::Tar => OciImageFormat::Tar.into(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
yield reply;
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
|
|
||||||
_ => {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(Response::new(Box::pin(output) as Self::PullImageStream))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn watch_events(
|
|
||||||
&self,
|
|
||||||
request: Request<WatchEventsRequest>,
|
|
||||||
) -> Result<Response<Self::WatchEventsStream>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let mut events = self.events.subscribe();
|
|
||||||
let output = try_stream! {
|
|
||||||
while let Ok(event) = events.recv().await {
|
|
||||||
yield WatchEventsReply { event: Some(event), };
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(Response::new(Box::pin(output) as Self::WatchEventsStream))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn snoop_idm(
|
|
||||||
&self,
|
|
||||||
request: Request<SnoopIdmRequest>,
|
|
||||||
) -> Result<Response<Self::SnoopIdmStream>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let mut messages = self.idm.snoop();
|
|
||||||
let zlt = self.zlt.clone();
|
|
||||||
let output = try_stream! {
|
|
||||||
while let Ok(event) = messages.recv().await {
|
|
||||||
let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(Response::new(Box::pin(output) as Self::SnoopIdmStream))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_devices(
|
|
||||||
&self,
|
|
||||||
request: Request<ListDevicesRequest>,
|
|
||||||
) -> Result<Response<ListDevicesReply>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let mut devices = Vec::new();
|
|
||||||
let state = self.devices.copy().await.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
for (name, state) in state {
|
|
||||||
devices.push(DeviceInfo {
|
|
||||||
name,
|
|
||||||
claimed: state.owner.is_some(),
|
|
||||||
owner: state.owner.map(|x| x.to_string()).unwrap_or_default(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Ok(Response::new(ListDevicesReply { devices }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_host_cpu_topology(
|
|
||||||
&self,
|
|
||||||
request: Request<GetHostCpuTopologyRequest>,
|
|
||||||
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
|
|
||||||
let _ = request.into_inner();
|
|
||||||
let power = self
|
|
||||||
.runtime
|
|
||||||
.power_management_context()
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
let cputopo = power.cpu_topology().await.map_err(ApiError::from)?;
|
|
||||||
let mut cpus = vec![];
|
|
||||||
|
|
||||||
for cpu in cputopo {
|
|
||||||
cpus.push(HostCpuTopologyInfo {
|
|
||||||
core: cpu.core,
|
|
||||||
socket: cpu.socket,
|
|
||||||
node: cpu.node,
|
|
||||||
thread: cpu.thread,
|
|
||||||
class: cpu.class as i32,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Response::new(GetHostCpuTopologyReply { cpus }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn set_host_power_management_policy(
|
|
||||||
&self,
|
|
||||||
request: Request<SetHostPowerManagementPolicyRequest>,
|
|
||||||
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
|
|
||||||
let policy = request.into_inner();
|
|
||||||
let power = self
|
|
||||||
.runtime
|
|
||||||
.power_management_context()
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
let scheduler = &policy.scheduler;
|
|
||||||
|
|
||||||
power
|
|
||||||
.set_smt_policy(policy.smt_awareness)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
power
|
|
||||||
.set_scheduler_policy(scheduler)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
|
|
||||||
Ok(Response::new(SetHostPowerManagementPolicyReply {}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_zone(
|
|
||||||
&self,
|
|
||||||
request: Request<GetZoneRequest>,
|
|
||||||
) -> Result<Response<GetZoneReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let zones = self.zones.list().await.map_err(ApiError::from)?;
|
|
||||||
let zone = zones.get(&Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?);
|
|
||||||
Ok(Response::new(GetZoneReply {
|
|
||||||
zone: zone.cloned(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_zone_resources(
|
|
||||||
&self,
|
|
||||||
request: Request<UpdateZoneResourcesRequest>,
|
|
||||||
) -> Result<Response<UpdateZoneResourcesReply>, Status> {
|
|
||||||
let request = request.into_inner();
|
|
||||||
let uuid = Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
let Some(mut zone) = self.zones.read(uuid).await.map_err(ApiError::from)? else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone not found".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(ref mut status) = zone.status else {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone state not available".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
};
|
|
||||||
|
|
||||||
if status.state() != ZoneState::Created {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone is in an invalid state".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
if status.domid == 0 || status.domid == u32::MAX {
|
|
||||||
return Err(ApiError {
|
|
||||||
message: "zone domid is invalid".to_string(),
|
|
||||||
}
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut resources = request.resources.unwrap_or_default();
|
|
||||||
if resources.target_memory > resources.max_memory {
|
|
||||||
resources.max_memory = resources.target_memory;
|
|
||||||
}
|
|
||||||
|
|
||||||
if resources.target_cpus < 1 {
|
|
||||||
resources.target_cpus = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
let initial_resources = zone
|
|
||||||
.spec
|
|
||||||
.clone()
|
|
||||||
.unwrap_or_default()
|
|
||||||
.initial_resources
|
|
||||||
.unwrap_or_default();
|
|
||||||
if resources.target_cpus > initial_resources.max_cpus {
|
|
||||||
resources.target_cpus = initial_resources.max_cpus;
|
|
||||||
}
|
|
||||||
resources.max_cpus = initial_resources.max_cpus;
|
|
||||||
|
|
||||||
self.runtime
|
|
||||||
.set_memory_resources(
|
|
||||||
status.domid,
|
|
||||||
resources.target_memory * 1024 * 1024,
|
|
||||||
resources.max_memory * 1024 * 1024,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|error| ApiError {
|
|
||||||
message: format!("failed to set memory resources: {}", error),
|
|
||||||
})?;
|
|
||||||
self.runtime
|
|
||||||
.set_cpu_resources(status.domid, resources.target_cpus)
|
|
||||||
.await
|
|
||||||
.map_err(|error| ApiError {
|
|
||||||
message: format!("failed to set cpu resources: {}", error),
|
|
||||||
})?;
|
|
||||||
status.resource_status = Some(ZoneResourceStatus {
|
|
||||||
active_resources: Some(resources),
|
|
||||||
});
|
|
||||||
|
|
||||||
self.zones
|
|
||||||
.update(uuid, zone)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
Ok(Response::new(UpdateZoneResourcesReply {}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_hypervisor_console(
|
|
||||||
&self,
|
|
||||||
_request: Request<ReadHypervisorConsoleRequest>,
|
|
||||||
) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
|
|
||||||
let data = self
|
|
||||||
.runtime
|
|
||||||
.read_hypervisor_console(false)
|
|
||||||
.await
|
|
||||||
.map_err(|error| ApiError {
|
|
||||||
message: error.to_string(),
|
|
||||||
})?;
|
|
||||||
Ok(Response::new(ReadHypervisorConsoleReply {
|
|
||||||
data: data.to_string(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
82
crates/daemon/src/control/attach_zone_console.rs
Normal file
82
crates/daemon/src/control/attach_zone_console.rs
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::sync::mpsc::channel;
|
||||||
|
use tokio_stream::{Stream, StreamExt};
|
||||||
|
use tonic::{Status, Streaming};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest};
|
||||||
|
|
||||||
|
use crate::console::DaemonConsoleHandle;
|
||||||
|
use crate::control::ApiError;
|
||||||
|
|
||||||
|
enum ConsoleDataSelect {
|
||||||
|
Read(Option<Vec<u8>>),
|
||||||
|
Write(Option<Result<ZoneConsoleRequest, Status>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AttachZoneConsoleRpc {
|
||||||
|
console: DaemonConsoleHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AttachZoneConsoleRpc {
|
||||||
|
pub fn new(console: DaemonConsoleHandle) -> Self {
|
||||||
|
Self { console }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
mut input: Streaming<ZoneConsoleRequest>,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>>
|
||||||
|
{
|
||||||
|
let Some(request) = input.next().await else {
|
||||||
|
return Err(anyhow!("expected to have at least one request"));
|
||||||
|
};
|
||||||
|
let request = request?;
|
||||||
|
let uuid = Uuid::from_str(&request.zone_id)?;
|
||||||
|
let (sender, mut receiver) = channel(100);
|
||||||
|
let console = self
|
||||||
|
.console
|
||||||
|
.attach(uuid, sender)
|
||||||
|
.await
|
||||||
|
.map_err(|error| anyhow!("failed to attach to console: {}", error))?;
|
||||||
|
|
||||||
|
let output = try_stream! {
|
||||||
|
yield ZoneConsoleReply { data: console.initial.clone(), };
|
||||||
|
loop {
|
||||||
|
let what = select! {
|
||||||
|
x = receiver.recv() => ConsoleDataSelect::Read(x),
|
||||||
|
x = input.next() => ConsoleDataSelect::Write(x),
|
||||||
|
};
|
||||||
|
|
||||||
|
match what {
|
||||||
|
ConsoleDataSelect::Read(Some(data)) => {
|
||||||
|
yield ZoneConsoleReply { data, };
|
||||||
|
},
|
||||||
|
|
||||||
|
ConsoleDataSelect::Read(None) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsoleDataSelect::Write(Some(request)) => {
|
||||||
|
let request = request?;
|
||||||
|
if !request.data.is_empty() {
|
||||||
|
console.send(request.data).await.map_err(|error| ApiError {
|
||||||
|
message: error.to_string(),
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
ConsoleDataSelect::Write(None) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(Box::pin(output))
|
||||||
|
}
|
||||||
|
}
|
56
crates/daemon/src/control/create_zone.rs
Normal file
56
crates/daemon/src/control/create_zone.rs
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
use crate::zlt::ZoneLookupTable;
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use krata::v1::common::{Zone, ZoneState, ZoneStatus};
|
||||||
|
use krata::v1::control::{CreateZoneReply, CreateZoneRequest};
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
pub struct CreateZoneRpc {
|
||||||
|
zones: ZoneStore,
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
zone_reconciler_notify: Sender<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CreateZoneRpc {
|
||||||
|
pub fn new(
|
||||||
|
zones: ZoneStore,
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
zone_reconciler_notify: Sender<Uuid>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
zones,
|
||||||
|
zlt,
|
||||||
|
zone_reconciler_notify,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, request: CreateZoneRequest) -> Result<CreateZoneReply> {
|
||||||
|
let Some(spec) = request.spec else {
|
||||||
|
return Err(anyhow!("zone spec not provided"));
|
||||||
|
};
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
self.zones
|
||||||
|
.update(
|
||||||
|
uuid,
|
||||||
|
Zone {
|
||||||
|
id: uuid.to_string(),
|
||||||
|
status: Some(ZoneStatus {
|
||||||
|
state: ZoneState::Creating.into(),
|
||||||
|
network_status: None,
|
||||||
|
exit_status: None,
|
||||||
|
error_status: None,
|
||||||
|
resource_status: None,
|
||||||
|
host: self.zlt.host_uuid().to_string(),
|
||||||
|
domid: u32::MAX,
|
||||||
|
}),
|
||||||
|
spec: Some(spec),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.zone_reconciler_notify.send(uuid).await?;
|
||||||
|
Ok(CreateZoneReply {
|
||||||
|
zone_id: uuid.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
42
crates/daemon/src/control/destroy_zone.rs
Normal file
42
crates/daemon/src/control/destroy_zone.rs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::v1::common::ZoneState;
|
||||||
|
use krata::v1::control::{DestroyZoneReply, DestroyZoneRequest};
|
||||||
|
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
|
||||||
|
pub struct DestroyZoneRpc {
|
||||||
|
zones: ZoneStore,
|
||||||
|
zone_reconciler_notify: Sender<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DestroyZoneRpc {
|
||||||
|
pub fn new(zones: ZoneStore, zone_reconciler_notify: Sender<Uuid>) -> Self {
|
||||||
|
Self {
|
||||||
|
zones,
|
||||||
|
zone_reconciler_notify,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, request: DestroyZoneRequest) -> Result<DestroyZoneReply> {
|
||||||
|
let uuid = Uuid::from_str(&request.zone_id)?;
|
||||||
|
let Some(mut zone) = self.zones.read(uuid).await? else {
|
||||||
|
return Err(anyhow!("zone not found"));
|
||||||
|
};
|
||||||
|
|
||||||
|
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
|
||||||
|
|
||||||
|
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
|
||||||
|
return Err(anyhow!("zone already destroyed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
|
||||||
|
self.zones.update(uuid, zone).await?;
|
||||||
|
self.zone_reconciler_notify.send(uuid).await?;
|
||||||
|
Ok(DestroyZoneReply {})
|
||||||
|
}
|
||||||
|
}
|
116
crates/daemon/src/control/exec_inside_zone.rs
Normal file
116
crates/daemon/src/control/exec_inside_zone.rs
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio_stream::{Stream, StreamExt};
|
||||||
|
use tonic::{Status, Streaming};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::idm::internal::Request;
|
||||||
|
use krata::{
|
||||||
|
idm::internal::{
|
||||||
|
exec_stream_request_update::Update, request::Request as IdmRequestType,
|
||||||
|
response::Response as IdmResponseType, ExecEnvVar, ExecStreamRequestStart,
|
||||||
|
ExecStreamRequestStdin, ExecStreamRequestUpdate, Request as IdmRequest,
|
||||||
|
},
|
||||||
|
v1::control::{ExecInsideZoneReply, ExecInsideZoneRequest},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::control::ApiError;
|
||||||
|
use crate::idm::DaemonIdmHandle;
|
||||||
|
|
||||||
|
pub struct ExecInsideZoneRpc {
|
||||||
|
idm: DaemonIdmHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExecInsideZoneRpc {
|
||||||
|
pub fn new(idm: DaemonIdmHandle) -> Self {
|
||||||
|
Self { idm }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
mut input: Streaming<ExecInsideZoneRequest>,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>>
|
||||||
|
{
|
||||||
|
let Some(request) = input.next().await else {
|
||||||
|
return Err(anyhow!("expected to have at least one request"));
|
||||||
|
};
|
||||||
|
let request = request?;
|
||||||
|
|
||||||
|
let Some(task) = request.task else {
|
||||||
|
return Err(anyhow!("task is missing"));
|
||||||
|
};
|
||||||
|
|
||||||
|
let uuid = Uuid::from_str(&request.zone_id)?;
|
||||||
|
let idm = self.idm.client(uuid).await?;
|
||||||
|
|
||||||
|
let idm_request = Request {
|
||||||
|
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
|
||||||
|
update: Some(Update::Start(ExecStreamRequestStart {
|
||||||
|
environment: task
|
||||||
|
.environment
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| ExecEnvVar {
|
||||||
|
key: x.key,
|
||||||
|
value: x.value,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
command: task.command,
|
||||||
|
working_directory: task.working_directory,
|
||||||
|
tty: task.tty,
|
||||||
|
})),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
|
||||||
|
let output = try_stream! {
|
||||||
|
let mut handle = idm.send_stream(idm_request).await.map_err(|x| ApiError {
|
||||||
|
message: x.to_string(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
x = input.next() => if let Some(update) = x {
|
||||||
|
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
|
||||||
|
message: error.to_string()
|
||||||
|
}.into());
|
||||||
|
|
||||||
|
if let Ok(update) = update {
|
||||||
|
if !update.stdin.is_empty() {
|
||||||
|
let _ = handle.update(IdmRequest {
|
||||||
|
request: Some(IdmRequestType::ExecStream(ExecStreamRequestUpdate {
|
||||||
|
update: Some(Update::Stdin(ExecStreamRequestStdin {
|
||||||
|
data: update.stdin,
|
||||||
|
closed: update.stdin_closed,
|
||||||
|
})),
|
||||||
|
}))}).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
x = handle.receiver.recv() => match x {
|
||||||
|
Some(response) => {
|
||||||
|
let Some(IdmResponseType::ExecStream(update)) = response.response else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let reply = ExecInsideZoneReply {
|
||||||
|
exited: update.exited,
|
||||||
|
error: update.error,
|
||||||
|
exit_code: update.exit_code,
|
||||||
|
stdout: update.stdout,
|
||||||
|
stderr: update.stderr,
|
||||||
|
};
|
||||||
|
yield reply;
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::pin(output))
|
||||||
|
}
|
||||||
|
}
|
33
crates/daemon/src/control/get_host_cpu_topology.rs
Normal file
33
crates/daemon/src/control/get_host_cpu_topology.rs
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::control::{GetHostCpuTopologyReply, GetHostCpuTopologyRequest, HostCpuTopologyInfo};
|
||||||
|
use kratart::Runtime;
|
||||||
|
|
||||||
|
pub struct GetHostCpuTopologyRpc {
|
||||||
|
runtime: Runtime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GetHostCpuTopologyRpc {
|
||||||
|
pub fn new(runtime: Runtime) -> Self {
|
||||||
|
Self { runtime }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
_request: GetHostCpuTopologyRequest,
|
||||||
|
) -> Result<GetHostCpuTopologyReply> {
|
||||||
|
let power = self.runtime.power_management_context().await?;
|
||||||
|
let cpu_topology = power.cpu_topology().await?;
|
||||||
|
let mut cpus = vec![];
|
||||||
|
|
||||||
|
for cpu in cpu_topology {
|
||||||
|
cpus.push(HostCpuTopologyInfo {
|
||||||
|
core: cpu.core,
|
||||||
|
socket: cpu.socket,
|
||||||
|
node: cpu.node,
|
||||||
|
thread: cpu.thread,
|
||||||
|
class: cpu.class as i32,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Ok(GetHostCpuTopologyReply { cpus })
|
||||||
|
}
|
||||||
|
}
|
24
crates/daemon/src/control/get_zone.rs
Normal file
24
crates/daemon/src/control/get_zone.rs
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::v1::control::{GetZoneReply, GetZoneRequest};
|
||||||
|
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
|
||||||
|
pub struct GetZoneRpc {
|
||||||
|
zones: ZoneStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GetZoneRpc {
|
||||||
|
pub fn new(zones: ZoneStore) -> Self {
|
||||||
|
Self { zones }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, request: GetZoneRequest) -> Result<GetZoneReply> {
|
||||||
|
let mut zones = self.zones.list().await?;
|
||||||
|
let zone = zones.remove(&Uuid::from_str(&request.zone_id)?);
|
||||||
|
Ok(GetZoneReply { zone })
|
||||||
|
}
|
||||||
|
}
|
37
crates/daemon/src/control/host_status.rs
Normal file
37
crates/daemon/src/control/host_status.rs
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
use crate::command::DaemonCommand;
|
||||||
|
use crate::ip::assignment::IpAssignment;
|
||||||
|
use crate::zlt::ZoneLookupTable;
|
||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::control::{HostStatusReply, HostStatusRequest};
|
||||||
|
|
||||||
|
pub struct HostStatusRpc {
|
||||||
|
ip: IpAssignment,
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HostStatusRpc {
|
||||||
|
pub fn new(ip: IpAssignment, zlt: ZoneLookupTable) -> Self {
|
||||||
|
Self { ip, zlt }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, _request: HostStatusRequest) -> Result<HostStatusReply> {
|
||||||
|
let host_reservation = self.ip.retrieve(self.zlt.host_uuid()).await?;
|
||||||
|
Ok(HostStatusReply {
|
||||||
|
host_domid: self.zlt.host_domid(),
|
||||||
|
host_uuid: self.zlt.host_uuid().to_string(),
|
||||||
|
krata_version: DaemonCommand::version(),
|
||||||
|
host_ipv4: host_reservation
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| format!("{}/{}", x.ipv4, x.ipv4_prefix))
|
||||||
|
.unwrap_or_default(),
|
||||||
|
host_ipv6: host_reservation
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| format!("{}/{}", x.ipv6, x.ipv6_prefix))
|
||||||
|
.unwrap_or_default(),
|
||||||
|
host_mac: host_reservation
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| x.mac.to_string().to_lowercase().replace('-', ":"))
|
||||||
|
.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
28
crates/daemon/src/control/list_devices.rs
Normal file
28
crates/daemon/src/control/list_devices.rs
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
use krata::v1::control::{DeviceInfo, ListDevicesReply, ListDevicesRequest};
|
||||||
|
|
||||||
|
use crate::devices::DaemonDeviceManager;
|
||||||
|
|
||||||
|
pub struct ListDevicesRpc {
|
||||||
|
devices: DaemonDeviceManager,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ListDevicesRpc {
|
||||||
|
pub fn new(devices: DaemonDeviceManager) -> Self {
|
||||||
|
Self { devices }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, _request: ListDevicesRequest) -> Result<ListDevicesReply> {
|
||||||
|
let mut devices = Vec::new();
|
||||||
|
let state = self.devices.copy().await?;
|
||||||
|
for (name, state) in state {
|
||||||
|
devices.push(DeviceInfo {
|
||||||
|
name,
|
||||||
|
claimed: state.owner.is_some(),
|
||||||
|
owner: state.owner.map(|x| x.to_string()).unwrap_or_default(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(ListDevicesReply { devices })
|
||||||
|
}
|
||||||
|
}
|
21
crates/daemon/src/control/list_zones.rs
Normal file
21
crates/daemon/src/control/list_zones.rs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::common::Zone;
|
||||||
|
use krata::v1::control::{ListZonesReply, ListZonesRequest};
|
||||||
|
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
|
||||||
|
pub struct ListZonesRpc {
|
||||||
|
zones: ZoneStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ListZonesRpc {
|
||||||
|
pub fn new(zones: ZoneStore) -> Self {
|
||||||
|
Self { zones }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, _request: ListZonesRequest) -> Result<ListZonesReply> {
|
||||||
|
let zones = self.zones.list().await?;
|
||||||
|
let zones = zones.into_values().collect::<Vec<Zone>>();
|
||||||
|
Ok(ListZonesReply { zones })
|
||||||
|
}
|
||||||
|
}
|
351
crates/daemon/src/control/mod.rs
Normal file
351
crates/daemon/src/control/mod.rs
Normal file
@ -0,0 +1,351 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use futures::Stream;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::v1::control::{
|
||||||
|
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest, DestroyZoneReply,
|
||||||
|
DestroyZoneRequest, ExecInsideZoneReply, ExecInsideZoneRequest, GetHostCpuTopologyReply,
|
||||||
|
GetHostCpuTopologyRequest, HostStatusReply, HostStatusRequest, ListDevicesReply,
|
||||||
|
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
|
||||||
|
ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest, ReadZoneMetricsReply,
|
||||||
|
ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest, SnoopIdmReply,
|
||||||
|
SnoopIdmRequest, UpdateZoneResourcesReply, UpdateZoneResourcesRequest, WatchEventsReply,
|
||||||
|
WatchEventsRequest, ZoneConsoleReply, ZoneConsoleRequest,
|
||||||
|
};
|
||||||
|
use krata::v1::control::{
|
||||||
|
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
|
||||||
|
SetHostPowerManagementPolicyRequest,
|
||||||
|
};
|
||||||
|
use krataoci::packer::service::OciPackerService;
|
||||||
|
use kratart::Runtime;
|
||||||
|
|
||||||
|
use crate::control::attach_zone_console::AttachZoneConsoleRpc;
|
||||||
|
use crate::control::create_zone::CreateZoneRpc;
|
||||||
|
use crate::control::destroy_zone::DestroyZoneRpc;
|
||||||
|
use crate::control::exec_inside_zone::ExecInsideZoneRpc;
|
||||||
|
use crate::control::get_host_cpu_topology::GetHostCpuTopologyRpc;
|
||||||
|
use crate::control::get_zone::GetZoneRpc;
|
||||||
|
use crate::control::host_status::HostStatusRpc;
|
||||||
|
use crate::control::list_devices::ListDevicesRpc;
|
||||||
|
use crate::control::list_zones::ListZonesRpc;
|
||||||
|
use crate::control::pull_image::PullImageRpc;
|
||||||
|
use crate::control::read_hypervisor_console::ReadHypervisorConsoleRpc;
|
||||||
|
use crate::control::read_zone_metrics::ReadZoneMetricsRpc;
|
||||||
|
use crate::control::resolve_zone_id::ResolveZoneIdRpc;
|
||||||
|
use crate::control::set_host_power_management_policy::SetHostPowerManagementPolicyRpc;
|
||||||
|
use crate::control::snoop_idm::SnoopIdmRpc;
|
||||||
|
use crate::control::update_zone_resources::UpdateZoneResourcesRpc;
|
||||||
|
use crate::control::watch_events::WatchEventsRpc;
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
use crate::ip::assignment::IpAssignment;
|
||||||
|
use crate::{
|
||||||
|
console::DaemonConsoleHandle, devices::DaemonDeviceManager, event::DaemonEventContext,
|
||||||
|
idm::DaemonIdmHandle, zlt::ZoneLookupTable,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub mod attach_zone_console;
|
||||||
|
pub mod create_zone;
|
||||||
|
pub mod destroy_zone;
|
||||||
|
pub mod exec_inside_zone;
|
||||||
|
pub mod get_host_cpu_topology;
|
||||||
|
pub mod get_zone;
|
||||||
|
pub mod host_status;
|
||||||
|
pub mod list_devices;
|
||||||
|
pub mod list_zones;
|
||||||
|
pub mod pull_image;
|
||||||
|
pub mod read_hypervisor_console;
|
||||||
|
pub mod read_zone_metrics;
|
||||||
|
pub mod resolve_zone_id;
|
||||||
|
pub mod set_host_power_management_policy;
|
||||||
|
pub mod snoop_idm;
|
||||||
|
pub mod update_zone_resources;
|
||||||
|
pub mod watch_events;
|
||||||
|
|
||||||
|
pub struct ApiError {
|
||||||
|
message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Error> for ApiError {
|
||||||
|
fn from(value: Error) -> Self {
|
||||||
|
ApiError {
|
||||||
|
message: value.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ApiError> for Status {
|
||||||
|
fn from(value: ApiError) -> Self {
|
||||||
|
Status::unknown(value.message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DaemonControlService {
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
devices: DaemonDeviceManager,
|
||||||
|
events: DaemonEventContext,
|
||||||
|
console: DaemonConsoleHandle,
|
||||||
|
idm: DaemonIdmHandle,
|
||||||
|
zones: ZoneStore,
|
||||||
|
ip: IpAssignment,
|
||||||
|
zone_reconciler_notify: Sender<Uuid>,
|
||||||
|
packer: OciPackerService,
|
||||||
|
runtime: Runtime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DaemonControlService {
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub fn new(
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
devices: DaemonDeviceManager,
|
||||||
|
events: DaemonEventContext,
|
||||||
|
console: DaemonConsoleHandle,
|
||||||
|
idm: DaemonIdmHandle,
|
||||||
|
zones: ZoneStore,
|
||||||
|
ip: IpAssignment,
|
||||||
|
zone_reconciler_notify: Sender<Uuid>,
|
||||||
|
packer: OciPackerService,
|
||||||
|
runtime: Runtime,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
zlt,
|
||||||
|
devices,
|
||||||
|
events,
|
||||||
|
console,
|
||||||
|
idm,
|
||||||
|
zones,
|
||||||
|
ip,
|
||||||
|
zone_reconciler_notify,
|
||||||
|
packer,
|
||||||
|
runtime,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl ControlService for DaemonControlService {
|
||||||
|
async fn host_status(
|
||||||
|
&self,
|
||||||
|
request: Request<HostStatusRequest>,
|
||||||
|
) -> Result<Response<HostStatusReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
HostStatusRpc::new(self.ip.clone(), self.zlt.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SnoopIdmStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn snoop_idm(
|
||||||
|
&self,
|
||||||
|
request: Request<SnoopIdmRequest>,
|
||||||
|
) -> Result<Response<Self::SnoopIdmStream>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
SnoopIdmRpc::new(self.idm.clone(), self.zlt.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_host_cpu_topology(
|
||||||
|
&self,
|
||||||
|
request: Request<GetHostCpuTopologyRequest>,
|
||||||
|
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
GetHostCpuTopologyRpc::new(self.runtime.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set_host_power_management_policy(
|
||||||
|
&self,
|
||||||
|
request: Request<SetHostPowerManagementPolicyRequest>,
|
||||||
|
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
SetHostPowerManagementPolicyRpc::new(self.runtime.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_devices(
|
||||||
|
&self,
|
||||||
|
request: Request<ListDevicesRequest>,
|
||||||
|
) -> Result<Response<ListDevicesReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
ListDevicesRpc::new(self.devices.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PullImageStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn pull_image(
|
||||||
|
&self,
|
||||||
|
request: Request<PullImageRequest>,
|
||||||
|
) -> Result<Response<Self::PullImageStream>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
PullImageRpc::new(self.packer.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_zone(
|
||||||
|
&self,
|
||||||
|
request: Request<CreateZoneRequest>,
|
||||||
|
) -> Result<Response<CreateZoneReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
CreateZoneRpc::new(
|
||||||
|
self.zones.clone(),
|
||||||
|
self.zlt.clone(),
|
||||||
|
self.zone_reconciler_notify.clone(),
|
||||||
|
)
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn destroy_zone(
|
||||||
|
&self,
|
||||||
|
request: Request<DestroyZoneRequest>,
|
||||||
|
) -> Result<Response<DestroyZoneReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
DestroyZoneRpc::new(self.zones.clone(), self.zone_reconciler_notify.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_zone_id(
|
||||||
|
&self,
|
||||||
|
request: Request<ResolveZoneIdRequest>,
|
||||||
|
) -> Result<Response<ResolveZoneIdReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
ResolveZoneIdRpc::new(self.zones.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_zone(
|
||||||
|
&self,
|
||||||
|
request: Request<GetZoneRequest>,
|
||||||
|
) -> Result<Response<GetZoneReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(GetZoneRpc::new(self.zones.clone()).process(request).await)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_zone_resources(
|
||||||
|
&self,
|
||||||
|
request: Request<UpdateZoneResourcesRequest>,
|
||||||
|
) -> Result<Response<UpdateZoneResourcesReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
UpdateZoneResourcesRpc::new(self.runtime.clone(), self.zones.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_zones(
|
||||||
|
&self,
|
||||||
|
request: Request<ListZonesRequest>,
|
||||||
|
) -> Result<Response<ListZonesReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(ListZonesRpc::new(self.zones.clone()).process(request).await)
|
||||||
|
}
|
||||||
|
|
||||||
|
type AttachZoneConsoleStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn attach_zone_console(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<ZoneConsoleRequest>>,
|
||||||
|
) -> Result<Response<Self::AttachZoneConsoleStream>, Status> {
|
||||||
|
let input = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
AttachZoneConsoleRpc::new(self.console.clone())
|
||||||
|
.process(input)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExecInsideZoneStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn exec_inside_zone(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<ExecInsideZoneRequest>>,
|
||||||
|
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
|
||||||
|
let input = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
ExecInsideZoneRpc::new(self.idm.clone())
|
||||||
|
.process(input)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_zone_metrics(
|
||||||
|
&self,
|
||||||
|
request: Request<ReadZoneMetricsRequest>,
|
||||||
|
) -> Result<Response<ReadZoneMetricsReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
ReadZoneMetricsRpc::new(self.idm.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchEventsStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>;
|
||||||
|
|
||||||
|
async fn watch_events(
|
||||||
|
&self,
|
||||||
|
request: Request<WatchEventsRequest>,
|
||||||
|
) -> Result<Response<Self::WatchEventsStream>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
WatchEventsRpc::new(self.events.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_hypervisor_console(
|
||||||
|
&self,
|
||||||
|
request: Request<ReadHypervisorConsoleRequest>,
|
||||||
|
) -> Result<Response<ReadHypervisorConsoleReply>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
|
adapt(
|
||||||
|
ReadHypervisorConsoleRpc::new(self.runtime.clone())
|
||||||
|
.process(request)
|
||||||
|
.await,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn adapt<T>(result: anyhow::Result<T>) -> Result<Response<T>, Status> {
|
||||||
|
result
|
||||||
|
.map(Response::new)
|
||||||
|
.map_err(|error| Status::unknown(error.to_string()))
|
||||||
|
}
|
100
crates/daemon/src/control/pull_image.rs
Normal file
100
crates/daemon/src/control/pull_image.rs
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
use crate::control::ApiError;
|
||||||
|
use crate::oci::convert_oci_progress;
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use krata::v1::common::OciImageFormat;
|
||||||
|
use krata::v1::control::{PullImageReply, PullImageRequest};
|
||||||
|
use krataoci::name::ImageName;
|
||||||
|
use krataoci::packer::service::OciPackerService;
|
||||||
|
use krataoci::packer::{OciPackedFormat, OciPackedImage};
|
||||||
|
use krataoci::progress::{OciProgress, OciProgressContext};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::task::JoinError;
|
||||||
|
use tokio_stream::Stream;
|
||||||
|
use tonic::Status;
|
||||||
|
|
||||||
|
enum PullImageSelect {
|
||||||
|
Progress(Option<OciProgress>),
|
||||||
|
Completed(Result<Result<OciPackedImage, anyhow::Error>, JoinError>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PullImageRpc {
|
||||||
|
packer: OciPackerService,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PullImageRpc {
|
||||||
|
pub fn new(packer: OciPackerService) -> Self {
|
||||||
|
Self { packer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
request: PullImageRequest,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<PullImageReply, Status>> + Send + 'static>>> {
|
||||||
|
let name = ImageName::parse(&request.image)?;
|
||||||
|
let format = match request.format() {
|
||||||
|
OciImageFormat::Unknown => OciPackedFormat::Squashfs,
|
||||||
|
OciImageFormat::Squashfs => OciPackedFormat::Squashfs,
|
||||||
|
OciImageFormat::Erofs => OciPackedFormat::Erofs,
|
||||||
|
OciImageFormat::Tar => OciPackedFormat::Tar,
|
||||||
|
};
|
||||||
|
let (context, mut receiver) = OciProgressContext::create();
|
||||||
|
let our_packer = self.packer;
|
||||||
|
|
||||||
|
let output = try_stream! {
|
||||||
|
let mut task = tokio::task::spawn(async move {
|
||||||
|
our_packer.request(name, format, request.overwrite_cache, request.update, context).await
|
||||||
|
});
|
||||||
|
let abort_handle = task.abort_handle();
|
||||||
|
let _task_cancel_guard = scopeguard::guard(abort_handle, |handle| {
|
||||||
|
handle.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let what = select! {
|
||||||
|
x = receiver.changed() => match x {
|
||||||
|
Ok(_) => PullImageSelect::Progress(Some(receiver.borrow_and_update().clone())),
|
||||||
|
Err(_) => PullImageSelect::Progress(None),
|
||||||
|
},
|
||||||
|
x = &mut task => PullImageSelect::Completed(x),
|
||||||
|
};
|
||||||
|
match what {
|
||||||
|
PullImageSelect::Progress(Some(progress)) => {
|
||||||
|
let reply = PullImageReply {
|
||||||
|
progress: Some(convert_oci_progress(progress)),
|
||||||
|
digest: String::new(),
|
||||||
|
format: OciImageFormat::Unknown.into(),
|
||||||
|
};
|
||||||
|
yield reply;
|
||||||
|
},
|
||||||
|
|
||||||
|
PullImageSelect::Completed(result) => {
|
||||||
|
let result = result.map_err(|err| ApiError {
|
||||||
|
message: err.to_string(),
|
||||||
|
})?;
|
||||||
|
let packed = result.map_err(|err| ApiError {
|
||||||
|
message: err.to_string(),
|
||||||
|
})?;
|
||||||
|
let reply = PullImageReply {
|
||||||
|
progress: None,
|
||||||
|
digest: packed.digest,
|
||||||
|
format: match packed.format {
|
||||||
|
OciPackedFormat::Squashfs => OciImageFormat::Squashfs.into(),
|
||||||
|
OciPackedFormat::Erofs => OciImageFormat::Erofs.into(),
|
||||||
|
OciPackedFormat::Tar => OciImageFormat::Tar.into(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
yield reply;
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(Box::pin(output))
|
||||||
|
}
|
||||||
|
}
|
23
crates/daemon/src/control/read_hypervisor_console.rs
Normal file
23
crates/daemon/src/control/read_hypervisor_console.rs
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::control::{ReadHypervisorConsoleReply, ReadHypervisorConsoleRequest};
|
||||||
|
use kratart::Runtime;
|
||||||
|
|
||||||
|
pub struct ReadHypervisorConsoleRpc {
|
||||||
|
runtime: Runtime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadHypervisorConsoleRpc {
|
||||||
|
pub fn new(runtime: Runtime) -> Self {
|
||||||
|
Self { runtime }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
_: ReadHypervisorConsoleRequest,
|
||||||
|
) -> Result<ReadHypervisorConsoleReply> {
|
||||||
|
let data = self.runtime.read_hypervisor_console(false).await?;
|
||||||
|
Ok(ReadHypervisorConsoleReply {
|
||||||
|
data: data.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
40
crates/daemon/src/control/read_zone_metrics.rs
Normal file
40
crates/daemon/src/control/read_zone_metrics.rs
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::idm::internal::MetricsRequest;
|
||||||
|
use krata::idm::internal::{
|
||||||
|
request::Request as IdmRequestType, response::Response as IdmResponseType,
|
||||||
|
Request as IdmRequest,
|
||||||
|
};
|
||||||
|
use krata::v1::control::{ReadZoneMetricsReply, ReadZoneMetricsRequest};
|
||||||
|
|
||||||
|
use crate::idm::DaemonIdmHandle;
|
||||||
|
use crate::metrics::idm_metric_to_api;
|
||||||
|
|
||||||
|
pub struct ReadZoneMetricsRpc {
|
||||||
|
idm: DaemonIdmHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadZoneMetricsRpc {
|
||||||
|
pub fn new(idm: DaemonIdmHandle) -> Self {
|
||||||
|
Self { idm }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, request: ReadZoneMetricsRequest) -> Result<ReadZoneMetricsReply> {
|
||||||
|
let uuid = Uuid::from_str(&request.zone_id)?;
|
||||||
|
let client = self.idm.client(uuid).await?;
|
||||||
|
let response = client
|
||||||
|
.send(IdmRequest {
|
||||||
|
request: Some(IdmRequestType::Metrics(MetricsRequest {})),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut reply = ReadZoneMetricsReply::default();
|
||||||
|
if let Some(IdmResponseType::Metrics(metrics)) = response.response {
|
||||||
|
reply.root = metrics.root.map(idm_metric_to_api);
|
||||||
|
}
|
||||||
|
Ok(reply)
|
||||||
|
}
|
||||||
|
}
|
30
crates/daemon/src/control/resolve_zone_id.rs
Normal file
30
crates/daemon/src/control/resolve_zone_id.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::common::Zone;
|
||||||
|
use krata::v1::control::{ResolveZoneIdReply, ResolveZoneIdRequest};
|
||||||
|
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
|
||||||
|
pub struct ResolveZoneIdRpc {
|
||||||
|
zones: ZoneStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ResolveZoneIdRpc {
|
||||||
|
pub fn new(zones: ZoneStore) -> Self {
|
||||||
|
Self { zones }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(self, request: ResolveZoneIdRequest) -> Result<ResolveZoneIdReply> {
|
||||||
|
let zones = self.zones.list().await?;
|
||||||
|
let zones = zones
|
||||||
|
.into_values()
|
||||||
|
.filter(|x| {
|
||||||
|
let comparison_spec = x.spec.as_ref().cloned().unwrap_or_default();
|
||||||
|
(!request.name.is_empty() && comparison_spec.name == request.name)
|
||||||
|
|| x.id == request.name
|
||||||
|
})
|
||||||
|
.collect::<Vec<Zone>>();
|
||||||
|
Ok(ResolveZoneIdReply {
|
||||||
|
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use krata::v1::control::{SetHostPowerManagementPolicyReply, SetHostPowerManagementPolicyRequest};
|
||||||
|
use kratart::Runtime;
|
||||||
|
|
||||||
|
pub struct SetHostPowerManagementPolicyRpc {
|
||||||
|
runtime: Runtime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SetHostPowerManagementPolicyRpc {
|
||||||
|
pub fn new(runtime: Runtime) -> Self {
|
||||||
|
Self { runtime }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
request: SetHostPowerManagementPolicyRequest,
|
||||||
|
) -> Result<SetHostPowerManagementPolicyReply> {
|
||||||
|
let power = self.runtime.power_management_context().await?;
|
||||||
|
let scheduler = &request.scheduler;
|
||||||
|
|
||||||
|
power.set_smt_policy(request.smt_awareness).await?;
|
||||||
|
power.set_scheduler_policy(scheduler).await?;
|
||||||
|
Ok(SetHostPowerManagementPolicyReply {})
|
||||||
|
}
|
||||||
|
}
|
39
crates/daemon/src/control/snoop_idm.rs
Normal file
39
crates/daemon/src/control/snoop_idm.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
use crate::idm::DaemonIdmHandle;
|
||||||
|
use crate::zlt::ZoneLookupTable;
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use krata::v1::control::{SnoopIdmReply, SnoopIdmRequest};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use tokio_stream::Stream;
|
||||||
|
use tonic::Status;
|
||||||
|
|
||||||
|
pub struct SnoopIdmRpc {
|
||||||
|
idm: DaemonIdmHandle,
|
||||||
|
zlt: ZoneLookupTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SnoopIdmRpc {
|
||||||
|
pub fn new(idm: DaemonIdmHandle, zlt: ZoneLookupTable) -> Self {
|
||||||
|
Self { idm, zlt }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
_request: SnoopIdmRequest,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>> {
|
||||||
|
let mut messages = self.idm.snoop();
|
||||||
|
let zlt = self.zlt.clone();
|
||||||
|
let output = try_stream! {
|
||||||
|
while let Ok(event) = messages.recv().await {
|
||||||
|
let Some(from_uuid) = zlt.lookup_uuid_by_domid(event.from).await else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(to_uuid) = zlt.lookup_uuid_by_domid(event.to).await else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
yield SnoopIdmReply { from: from_uuid.to_string(), to: to_uuid.to_string(), packet: Some(event.packet) };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(Box::pin(output))
|
||||||
|
}
|
||||||
|
}
|
82
crates/daemon/src/control/update_zone_resources.rs
Normal file
82
crates/daemon/src/control/update_zone_resources.rs
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use krata::v1::common::{ZoneResourceStatus, ZoneState};
|
||||||
|
use krata::v1::control::{UpdateZoneResourcesReply, UpdateZoneResourcesRequest};
|
||||||
|
use kratart::Runtime;
|
||||||
|
|
||||||
|
use crate::db::zone::ZoneStore;
|
||||||
|
|
||||||
|
pub struct UpdateZoneResourcesRpc {
|
||||||
|
runtime: Runtime,
|
||||||
|
zones: ZoneStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateZoneResourcesRpc {
|
||||||
|
pub fn new(runtime: Runtime, zones: ZoneStore) -> Self {
|
||||||
|
Self { runtime, zones }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
request: UpdateZoneResourcesRequest,
|
||||||
|
) -> Result<UpdateZoneResourcesReply> {
|
||||||
|
let uuid = Uuid::from_str(&request.zone_id)?;
|
||||||
|
let Some(mut zone) = self.zones.read(uuid).await? else {
|
||||||
|
return Err(anyhow!("zone not found"));
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(ref mut status) = zone.status else {
|
||||||
|
return Err(anyhow!("zone state not available"));
|
||||||
|
};
|
||||||
|
|
||||||
|
if status.state() != ZoneState::Created {
|
||||||
|
return Err(anyhow!("zone is in an invalid state"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.domid == 0 || status.domid == u32::MAX {
|
||||||
|
return Err(anyhow!("zone domid is invalid"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut resources = request.resources.unwrap_or_default();
|
||||||
|
if resources.target_memory > resources.max_memory {
|
||||||
|
resources.max_memory = resources.target_memory;
|
||||||
|
}
|
||||||
|
|
||||||
|
if resources.target_cpus < 1 {
|
||||||
|
resources.target_cpus = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let initial_resources = zone
|
||||||
|
.spec
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.initial_resources
|
||||||
|
.unwrap_or_default();
|
||||||
|
if resources.target_cpus > initial_resources.max_cpus {
|
||||||
|
resources.target_cpus = initial_resources.max_cpus;
|
||||||
|
}
|
||||||
|
resources.max_cpus = initial_resources.max_cpus;
|
||||||
|
|
||||||
|
self.runtime
|
||||||
|
.set_memory_resources(
|
||||||
|
status.domid,
|
||||||
|
resources.target_memory * 1024 * 1024,
|
||||||
|
resources.max_memory * 1024 * 1024,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|error| anyhow!("failed to set memory resources: {}", error))?;
|
||||||
|
self.runtime
|
||||||
|
.set_cpu_resources(status.domid, resources.target_cpus)
|
||||||
|
.await
|
||||||
|
.map_err(|error| anyhow!("failed to set cpu resources: {}", error))?;
|
||||||
|
status.resource_status = Some(ZoneResourceStatus {
|
||||||
|
active_resources: Some(resources),
|
||||||
|
});
|
||||||
|
|
||||||
|
self.zones.update(uuid, zone).await?;
|
||||||
|
Ok(UpdateZoneResourcesReply {})
|
||||||
|
}
|
||||||
|
}
|
31
crates/daemon/src/control/watch_events.rs
Normal file
31
crates/daemon/src/control/watch_events.rs
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
use crate::event::DaemonEventContext;
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_stream::try_stream;
|
||||||
|
use krata::v1::control::{WatchEventsReply, WatchEventsRequest};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use tokio_stream::Stream;
|
||||||
|
use tonic::Status;
|
||||||
|
|
||||||
|
pub struct WatchEventsRpc {
|
||||||
|
events: DaemonEventContext,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WatchEventsRpc {
|
||||||
|
pub fn new(events: DaemonEventContext) -> Self {
|
||||||
|
Self { events }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(
|
||||||
|
self,
|
||||||
|
_request: WatchEventsRequest,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = Result<WatchEventsReply, Status>> + Send + 'static>>>
|
||||||
|
{
|
||||||
|
let mut events = self.events.subscribe();
|
||||||
|
let output = try_stream! {
|
||||||
|
while let Ok(event) = events.recv().await {
|
||||||
|
yield WatchEventsReply { event: Some(event), };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(Box::pin(output))
|
||||||
|
}
|
||||||
|
}
|
@ -41,7 +41,6 @@ pub mod metrics;
|
|||||||
pub mod oci;
|
pub mod oci;
|
||||||
pub mod reconcile;
|
pub mod reconcile;
|
||||||
pub mod zlt;
|
pub mod zlt;
|
||||||
|
|
||||||
pub struct Daemon {
|
pub struct Daemon {
|
||||||
store: String,
|
store: String,
|
||||||
_config: Arc<DaemonConfig>,
|
_config: Arc<DaemonConfig>,
|
||||||
|
Reference in New Issue
Block a user