Compare commits

..

17 Commits

Author SHA1 Message Date
1dca770091 chore: release (#306)
Co-authored-by: edera-cultivation[bot] <165992271+edera-cultivation[bot]@users.noreply.github.com>
2024-08-14 06:37:52 +00:00
01a94ad23e feature(krata): prepare for workload rework (#276)
* chore(code): simple code cleanup

* chore(code): additional code cleanup

* feature(krata): rework api and make ip assignment persistent to database

* rework and cleanup

* fix daemon config references
2024-08-14 06:17:47 +00:00
2a107a370f build(deps): bump the dep-updates group with 2 updates (#332)
Bumps the dep-updates group with 2 updates: [ctrlc](https://github.com/Detegr/rust-ctrlc) and [indexmap](https://github.com/indexmap-rs/indexmap).


Updates `ctrlc` from 3.4.4 to 3.4.5
- [Release notes](https://github.com/Detegr/rust-ctrlc/releases)
- [Commits](https://github.com/Detegr/rust-ctrlc/compare/3.4.4...3.4.5)

Updates `indexmap` from 2.3.0 to 2.4.0
- [Changelog](https://github.com/indexmap-rs/indexmap/blob/master/RELEASES.md)
- [Commits](https://github.com/indexmap-rs/indexmap/compare/2.3.0...2.4.0)

---
updated-dependencies:
- dependency-name: ctrlc
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
- dependency-name: indexmap
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-14 05:39:00 +00:00
313d3f72a5 build(deps): bump docker/build-push-action in the dep-updates group (#331)
Bumps the dep-updates group with 1 update: [docker/build-push-action](https://github.com/docker/build-push-action).


Updates `docker/build-push-action` from 6.6.1 to 6.7.0
- [Release notes](https://github.com/docker/build-push-action/releases)
- [Commits](16ebe778df...5cd11c3a4c)

---
updated-dependencies:
- dependency-name: docker/build-push-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-14 05:23:20 +00:00
5ec3d9d5c1 xencall: use correct op for setting cpufreq governor (#327) 2024-08-14 03:11:08 +00:00
1cf03a460e fix(idm): reimplement packet processing algorithm (#330)
* chore(xen): rewrite event channel code

* fix(idm): repair idm bugs on the file backend
2024-08-13 23:18:27 +00:00
ffc9dcc0ea build(deps): bump serde from 1.0.206 to 1.0.207 in the dep-updates group (#324)
Bumps the dep-updates group with 1 update: [serde](https://github.com/serde-rs/serde).


Updates `serde` from 1.0.206 to 1.0.207
- [Release notes](https://github.com/serde-rs/serde/releases)
- [Commits](https://github.com/serde-rs/serde/compare/v1.0.206...v1.0.207)

---
updated-dependencies:
- dependency-name: serde
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-13 16:58:37 +00:00
0358c9c775 fix(power-trap-eacces): gracefully handle hypercall errors in power management (#325)
* daemon: reenable built-in power management policy

* xenruntime: gracefully handle power management errors
2024-08-13 08:22:05 +00:00
dcffaf110e build(deps): bump the dep-updates group with 2 updates (#316)
Bumps the dep-updates group with 2 updates: [sigstore/cosign-installer](https://github.com/sigstore/cosign-installer) and [docker/build-push-action](https://github.com/docker/build-push-action).


Updates `sigstore/cosign-installer` from 3.5.0 to 3.6.0
- [Release notes](https://github.com/sigstore/cosign-installer/releases)
- [Commits](59acb6260d...4959ce089c)

Updates `docker/build-push-action` from 6.5.0 to 6.6.1
- [Release notes](https://github.com/docker/build-push-action/releases)
- [Commits](5176d81f87...16ebe778df)

---
updated-dependencies:
- dependency-name: sigstore/cosign-installer
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: dep-updates
- dependency-name: docker/build-push-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-12 22:10:01 +00:00
b81ae5d01a build(deps): bump rust in /images in the dep-updates group (#321)
Bumps the dep-updates group in /images with 1 update: rust.


Updates `rust` from `596c7fa` to `1f5aff5`

---
updated-dependencies:
- dependency-name: rust
  dependency-type: direct:production
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-12 22:08:50 +00:00
1756bc6647 build(deps): bump the dep-updates group across 1 directory with 3 updates (#323)
Bumps the dep-updates group with 3 updates in the / directory: [serde_json](https://github.com/serde-rs/json), [clap](https://github.com/clap-rs/clap) and [serde](https://github.com/serde-rs/serde).


Updates `serde_json` from 1.0.122 to 1.0.124
- [Release notes](https://github.com/serde-rs/json/releases)
- [Commits](https://github.com/serde-rs/json/compare/v1.0.122...v1.0.124)

Updates `clap` from 4.5.13 to 4.5.15
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.13...v4.5.15)

Updates `serde` from 1.0.205 to 1.0.206
- [Release notes](https://github.com/serde-rs/serde/releases)
- [Commits](https://github.com/serde-rs/serde/compare/v1.0.205...v1.0.206)

---
updated-dependencies:
- dependency-name: serde_json
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
- dependency-name: clap
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
- dependency-name: serde
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-12 22:08:32 +00:00
6bf3741ec9 chore(o11y): add more debug logs to daemon & runtime (#318)
This change adds debug log lines to make it easier to tell where issues
are occuring during startup.
2024-08-08 19:11:51 +00:00
b7d41ee9f4 build(deps): bump serde from 1.0.204 to 1.0.205 in the dep-updates group (#315)
Bumps the dep-updates group with 1 update: [serde](https://github.com/serde-rs/serde).


Updates `serde` from 1.0.204 to 1.0.205
- [Release notes](https://github.com/serde-rs/serde/releases)
- [Commits](https://github.com/serde-rs/serde/compare/v1.0.204...v1.0.205)

---
updated-dependencies:
- dependency-name: serde
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-08 05:52:22 +00:00
53059e8cca fix(xenbus): avoid fd close race by forgetting copy of handle (#314)
This change addresses a race condition where the read-copy of a raw FD
is closed while the write-copy is still in use. Now the read-copy is not
closed, by utilizing mem::forget.

Co-authored-by: Alex Zenla <alex@edera.dev>
2024-08-07 23:55:04 +00:00
11bb99b1e4 build(deps): bump actions/upload-artifact in the dep-updates group (#312)
Bumps the dep-updates group with 1 update: [actions/upload-artifact](https://github.com/actions/upload-artifact).


Updates `actions/upload-artifact` from 4.3.5 to 4.3.6
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](89ef406dd8...834a144ee9)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-07 19:25:49 +00:00
eaa84089ce build(deps): bump hyper-util in the dep-updates group (#311)
Bumps the dep-updates group with 1 update: [hyper-util](https://github.com/hyperium/hyper-util).


Updates `hyper-util` from 0.1.6 to 0.1.7
- [Release notes](https://github.com/hyperium/hyper-util/releases)
- [Changelog](https://github.com/hyperium/hyper-util/blob/master/CHANGELOG.md)
- [Commits](https://github.com/hyperium/hyper-util/compare/v0.1.6...v0.1.7)

---
updated-dependencies:
- dependency-name: hyper-util
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-07 05:17:50 +00:00
680244fc5e build(deps): bump step-security/harden-runner in the dep-updates group (#308)
Bumps the dep-updates group with 1 update: [step-security/harden-runner](https://github.com/step-security/harden-runner).


Updates `step-security/harden-runner` from 2.9.0 to 2.9.1
- [Release notes](https://github.com/step-security/harden-runner/releases)
- [Commits](0d381219dd...5c7944e73c)

---
updated-dependencies:
- dependency-name: step-security/harden-runner
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: dep-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-08-06 16:56:49 +00:00
68 changed files with 1697 additions and 1177 deletions

View File

@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -56,7 +56,7 @@ jobs:
name: full build linux-${{ matrix.arch }}
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -84,7 +84,7 @@ jobs:
name: full test linux-${{ matrix.arch }}
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -111,7 +111,7 @@ jobs:
name: full clippy linux-${{ matrix.arch }}
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -139,7 +139,7 @@ jobs:
name: zone initrd linux-${{ matrix.arch }}
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -176,7 +176,7 @@ jobs:
shell: bash
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: configure git line endings

View File

@ -20,7 +20,7 @@ jobs:
name: nightly full build linux-${{ matrix.arch }}
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -37,7 +37,7 @@ jobs:
- name: build systemd bundle
run: ./hack/dist/bundle.sh
- name: upload systemd bundle
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
with:
name: krata-bundle-systemd-${{ matrix.arch }}
path: "target/dist/bundle-systemd-${{ matrix.arch }}.tgz"
@ -45,7 +45,7 @@ jobs:
- name: build deb package
run: ./hack/dist/deb.sh
- name: upload deb package
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
with:
name: krata-debian-${{ matrix.arch }}
path: "target/dist/*.deb"
@ -53,7 +53,7 @@ jobs:
- name: build apk package
run: ./hack/dist/apk.sh
- name: upload apk package
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
with:
name: krata-alpine-${{ matrix.arch }}
path: "target/dist/*_${{ matrix.arch }}.apk"
@ -79,7 +79,7 @@ jobs:
shell: bash
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: configure git line endings
@ -104,13 +104,13 @@ jobs:
- name: cargo build kratactl
run: ./hack/build/cargo.sh build --release --bin kratactl
- name: upload kratactl
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
with:
name: kratactl-${{ matrix.platform.os }}-${{ matrix.platform.arch }}
path: "target/*/release/kratactl"
if: ${{ matrix.platform.os != 'windows' }}
- name: upload kratactl
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
with:
name: kratactl-${{ matrix.platform.os }}-${{ matrix.platform.arch }}
path: "target/*/release/kratactl.exe"
@ -132,7 +132,7 @@ jobs:
packages: write
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -140,7 +140,7 @@ jobs:
with:
submodules: recursive
- name: install cosign
uses: sigstore/cosign-installer@59acb6260d9c0ba8f4a2f9d9b48431a222b68e20 # v3.5.0
uses: sigstore/cosign-installer@4959ce089c160fddf62f7b42464195ba1a56d382 # v3.6.0
- name: setup docker buildx
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
- name: login to container registry
@ -150,7 +150,7 @@ jobs:
username: "${{ github.actor }}"
password: "${{ secrets.GITHUB_TOKEN }}"
- name: docker build and push ${{ matrix.component }}
uses: docker/build-push-action@5176d81f87c23d6fc96624dfdbcd9f3830bbe445 # v6.5.0
uses: docker/build-push-action@5cd11c3a4ced054e52742c5fd54dca954e0edd85 # v6.7.0
id: push
with:
file: ./images/Dockerfile.${{ matrix.component }}

View File

@ -27,7 +27,7 @@ jobs:
contents: write
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -81,7 +81,7 @@ jobs:
contents: write
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -129,7 +129,7 @@ jobs:
packages: write
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: checkout repository
@ -137,7 +137,7 @@ jobs:
with:
submodules: recursive
- name: install cosign
uses: sigstore/cosign-installer@59acb6260d9c0ba8f4a2f9d9b48431a222b68e20 # v3.5.0
uses: sigstore/cosign-installer@4959ce089c160fddf62f7b42464195ba1a56d382 # v3.6.0
- name: setup docker buildx
uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
- name: login to container registry
@ -151,7 +151,7 @@ jobs:
run: |
echo "KRATA_VERSION=$(./hack/dist/version.sh)" >> "${GITHUB_OUTPUT}"
- name: docker build and push ${{ matrix.component }}
uses: docker/build-push-action@5176d81f87c23d6fc96624dfdbcd9f3830bbe445 # v6.5.0
uses: docker/build-push-action@5cd11c3a4ced054e52742c5fd54dca954e0edd85 # v6.7.0
id: push
with:
file: ./images/Dockerfile.${{ matrix.component }}

View File

@ -15,7 +15,7 @@ jobs:
contents: write
steps:
- name: harden runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
uses: step-security/harden-runner@5c7944e73c4c2a096b17a9cb74d65b6c2bbafbde # v2.9.1
with:
egress-policy: audit
- name: generate cultivator token

View File

@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.0.16](https://github.com/edera-dev/krata/compare/v0.0.15...v0.0.16) - 2024-08-14
### Added
- *(krata)* prepare for workload rework ([#276](https://github.com/edera-dev/krata/pull/276))
### Fixed
- *(idm)* reimplement packet processing algorithm ([#330](https://github.com/edera-dev/krata/pull/330))
- *(power-trap-eacces)* gracefully handle hypercall errors in power management ([#325](https://github.com/edera-dev/krata/pull/325))
### Other
- *(o11y)* add more debug logs to daemon & runtime ([#318](https://github.com/edera-dev/krata/pull/318))
## [0.0.15](https://github.com/edera-dev/krata/compare/v0.0.14...v0.0.15) - 2024-08-06
### Fixed

742
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.0.15"
version = "0.0.16"
homepage = "https://krata.dev"
license = "Apache-2.0"
repository = "https://github.com/edera-dev/krata"
@ -38,7 +38,7 @@ cgroups-rs = "0.3.4"
circular-buffer = "0.1.7"
comfy-table = "7.1.1"
crossterm = "0.27.0"
ctrlc = "3.4.4"
ctrlc = "3.4.5"
elf = "0.7.4"
env_logger = "0.11.5"
etherparse = "0.15.0"
@ -46,9 +46,9 @@ fancy-duration = "0.9.2"
flate2 = "1.0"
futures = "0.3.30"
hyper = "1.4.1"
hyper-util = "0.1.6"
hyper-util = "0.1.7"
human_bytes = "0.4"
indexmap = "2.3.0"
indexmap = "2.4.0"
indicatif = "0.17.8"
ipnetwork = "0.20.0"
libc = "0.2"
@ -74,7 +74,7 @@ redb = "2.1.1"
regex = "1.10.6"
rtnetlink = "0.14.1"
scopeguard = "1.2.0"
serde_json = "1.0.122"
serde_json = "1.0.124"
serde_yaml = "0.9"
sha256 = "1.5.0"
signal-hook = "0.3.17"
@ -93,7 +93,7 @@ walkdir = "2"
xz2 = "0.1"
[workspace.dependencies.clap]
version = "4.5.13"
version = "4.5.15"
features = ["derive"]
[workspace.dependencies.prost-reflect]
@ -106,7 +106,7 @@ default-features = false
features = ["rustls-tls"]
[workspace.dependencies.serde]
version = "1.0.204"
version = "1.0.207"
features = ["derive"]
[workspace.dependencies.sys-mount]

View File

@ -16,7 +16,7 @@ oci-spec = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
krata-oci = { path = "../oci", version = "^0.0.15" }
krata-oci = { path = "../oci", version = "^0.0.16" }
krata-tokio-tar = { workspace = true }
uuid = { workspace = true }

View File

@ -20,7 +20,7 @@ env_logger = { workspace = true }
fancy-duration = { workspace = true }
human_bytes = { workspace = true }
indicatif = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata = { path = "../krata", version = "^0.0.16" }
log = { workspace = true }
prost-reflect = { workspace = true, features = ["serde"] }
prost-types = { workspace = true }

View File

@ -3,7 +3,7 @@ use clap::{Parser, ValueEnum};
use comfy_table::presets::UTF8_FULL_CONDENSED;
use comfy_table::{Cell, Table};
use krata::v1::control::{
control_service_client::ControlServiceClient, HostCpuTopologyClass, HostCpuTopologyRequest,
control_service_client::ControlServiceClient, GetHostCpuTopologyRequest, HostCpuTopologyClass,
};
use tonic::{transport::Channel, Request};
@ -31,7 +31,7 @@ pub struct HostCpuTopologyCommand {
impl HostCpuTopologyCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.get_host_cpu_topology(Request::new(HostCpuTopologyRequest {}))
.get_host_cpu_topology(Request::new(GetHostCpuTopologyRequest {}))
.await?
.into_inner();

View File

@ -1,17 +1,17 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, IdentifyHostRequest};
use krata::v1::control::{control_service_client::ControlServiceClient, HostStatusRequest};
use tonic::{transport::Channel, Request};
#[derive(Parser)]
#[command(about = "Identify information about the host")]
pub struct HostIdentifyCommand {}
#[command(about = "Get information about the host")]
pub struct HostStatusCommand {}
impl HostIdentifyCommand {
impl HostStatusCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let response = client
.identify_host(Request::new(IdentifyHostRequest {}))
.host_status(Request::new(HostStatusRequest {}))
.await?
.into_inner();
println!("Host UUID: {}", response.host_uuid);

View File

@ -6,7 +6,7 @@ use krata::events::EventStream;
use krata::v1::control::control_service_client::ControlServiceClient;
use crate::cli::host::cpu_topology::HostCpuTopologyCommand;
use crate::cli::host::identify::HostIdentifyCommand;
use crate::cli::host::identify::HostStatusCommand;
use crate::cli::host::idm_snoop::HostIdmSnoopCommand;
pub mod cpu_topology;
@ -33,7 +33,7 @@ impl HostCommand {
#[derive(Subcommand)]
pub enum HostCommands {
CpuTopology(HostCpuTopologyCommand),
Identify(HostIdentifyCommand),
Status(HostStatusCommand),
IdmSnoop(HostIdmSnoopCommand),
}
@ -46,7 +46,7 @@ impl HostCommands {
match self {
HostCommands::CpuTopology(cpu_topology) => cpu_topology.run(client).await,
HostCommands::Identify(identify) => identify.run(client).await,
HostCommands::Status(status) => status.run(client).await,
HostCommands::IdmSnoop(snoop) => snoop.run(client, events).await,
}

View File

@ -12,7 +12,7 @@ use clap::Parser;
use krata::{
client::ControlClientProvider,
events::EventStream,
v1::control::{control_service_client::ControlServiceClient, ResolveZoneRequest},
v1::control::{control_service_client::ControlServiceClient, ResolveZoneIdRequest},
};
use tonic::{transport::Channel, Request};
@ -51,7 +51,7 @@ impl ControlCommand {
ControlCommands::Device(device) => device.run(client, events).await,
ControlCommands::Host(snoop) => snoop.run(client, events).await,
ControlCommands::Host(host) => host.run(client, events).await,
}
}
}
@ -61,14 +61,14 @@ pub async fn resolve_zone(
name: &str,
) -> Result<String> {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest {
.resolve_zone_id(Request::new(ResolveZoneIdRequest {
name: name.to_string(),
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
Ok(zone.id)
if !reply.zone_id.is_empty() {
Ok(reply.zone_id)
} else {
Err(anyhow!("unable to resolve zone '{}'", name))
}

View File

@ -26,7 +26,7 @@ impl ZoneAttachCommand {
let input = StdioConsoleStream::stdin_stream(zone_id.clone()).await;
let output = client.attach_zone_console(input).await?.into_inner();
let stdout_handle =
tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await });
tokio::task::spawn(async move { StdioConsoleStream::stdout(output, true).await });
let exit_hook_task = StdioConsoleStream::zone_exit_hook(zone_id.clone(), events).await?;
let code = select! {
x = stdout_handle => {

View File

@ -2,20 +2,16 @@ use anyhow::Result;
use clap::Parser;
use krata::{
events::EventStream,
v1::{
common::ZoneStatus,
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
DestroyZoneRequest,
},
v1::control::{
control_service_client::ControlServiceClient, watch_events_reply::Event, DestroyZoneRequest,
},
};
use crate::cli::resolve_zone;
use krata::v1::common::ZoneState;
use log::error;
use tonic::{transport::Channel, Request};
use crate::cli::resolve_zone;
#[derive(Parser)]
#[command(about = "Destroy a zone")]
pub struct ZoneDestroyCommand {
@ -61,12 +57,12 @@ async fn wait_zone_destroyed(id: &str, events: EventStream) -> Result<()> {
continue;
}
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == ZoneStatus::Failed {
if let Some(ref error) = status.error_status {
if status.state() == ZoneState::Failed {
error!("destroy failed: {}", error.message);
std::process::exit(1);
} else {
@ -74,7 +70,7 @@ async fn wait_zone_destroyed(id: &str, events: EventStream) -> Result<()> {
}
}
if state.status() == ZoneStatus::Destroyed {
if status.state() == ZoneState::Destroyed {
std::process::exit(0);
}
}

View File

@ -5,7 +5,7 @@ use anyhow::Result;
use clap::Parser;
use krata::v1::{
common::{ZoneTaskSpec, ZoneTaskSpecEnvVar},
control::{control_service_client::ControlServiceClient, ExecZoneRequest},
control::{control_service_client::ControlServiceClient, ExecInsideZoneRequest},
};
use tonic::{transport::Channel, Request};
@ -34,7 +34,7 @@ pub struct ZoneExecCommand {
impl ZoneExecCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let zone_id: String = resolve_zone(&mut client, &self.zone).await?;
let initial = ExecZoneRequest {
let initial = ExecInsideZoneRequest {
zone_id,
task: Some(ZoneTaskSpec {
environment: env_map(&self.env.unwrap_or_default())
@ -52,7 +52,10 @@ impl ZoneExecCommand {
let stream = StdioConsoleStream::stdin_stream_exec(initial).await;
let response = client.exec_zone(Request::new(stream)).await?.into_inner();
let response = client
.exec_inside_zone(Request::new(stream))
.await?
.into_inner();
let code = StdioConsoleStream::exec_output(response).await?;
std::process::exit(code);

View File

@ -7,7 +7,7 @@ use krata::{
v1::{
common::{
zone_image_spec::Image, OciImageFormat, ZoneImageSpec, ZoneOciImageSpec, ZoneSpec,
ZoneSpecDevice, ZoneStatus, ZoneTaskSpec, ZoneTaskSpecEnvVar,
ZoneSpecDevice, ZoneState, ZoneTaskSpec, ZoneTaskSpecEnvVar,
},
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
@ -120,7 +120,7 @@ impl ZoneLaunchCommand {
image: Some(image),
kernel,
initrd,
vcpus: self.cpus,
cpus: self.cpus,
mem: self.mem,
task: Some(ZoneTaskSpec {
environment: env_map(&self.env.unwrap_or_default())
@ -155,7 +155,7 @@ impl ZoneLaunchCommand {
let input = StdioConsoleStream::stdin_stream(id.clone()).await;
let output = client.attach_zone_console(input).await?.into_inner();
let stdout_handle =
tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await });
tokio::task::spawn(async move { StdioConsoleStream::stdout(output, true).await });
let exit_hook_task = StdioConsoleStream::zone_exit_hook(id.clone(), events).await?;
select! {
x = stdout_handle => {
@ -209,12 +209,12 @@ async fn wait_zone_started(id: &str, events: EventStream) -> Result<()> {
continue;
}
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
if let Some(ref error) = state.error_info {
if state.status() == ZoneStatus::Failed {
if let Some(ref error) = status.error_status {
if status.state() == ZoneState::Failed {
error!("launch failed: {}", error.message);
std::process::exit(1);
} else {
@ -222,12 +222,12 @@ async fn wait_zone_started(id: &str, events: EventStream) -> Result<()> {
}
}
if state.status() == ZoneStatus::Destroyed {
if status.state() == ZoneState::Destroyed {
error!("zone destroyed");
std::process::exit(1);
}
if state.status() == ZoneStatus::Started {
if status.state() == ZoneState::Created {
break;
}
}

View File

@ -4,18 +4,19 @@ use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, Color, Table};
use krata::{
events::EventStream,
v1::{
common::{Zone, ZoneStatus},
common::Zone,
control::{
control_service_client::ControlServiceClient, ListZonesRequest, ResolveZoneRequest,
control_service_client::ControlServiceClient, ListZonesRequest, ResolveZoneIdRequest,
},
},
};
use crate::format::{kv2line, proto2dynamic, proto2kv, zone_simple_line, zone_state_text};
use krata::v1::common::ZoneState;
use krata::v1::control::GetZoneRequest;
use serde_json::Value;
use tonic::{transport::Channel, Request};
use crate::format::{kv2line, proto2dynamic, proto2kv, zone_simple_line, zone_status_text};
#[derive(ValueEnum, Clone, Debug, PartialEq, Eq)]
enum ZoneListFormat {
Table,
@ -44,11 +45,21 @@ impl ZoneListCommand {
) -> Result<()> {
let mut zones = if let Some(ref zone) = self.zone {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest { name: zone.clone() }))
.resolve_zone_id(Request::new(ResolveZoneIdRequest { name: zone.clone() }))
.await?
.into_inner();
if let Some(zone) = reply.zone {
vec![zone]
if !reply.zone_id.is_empty() {
let reply = client
.get_zone(Request::new(GetZoneRequest {
zone_id: reply.zone_id,
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
vec![zone]
} else {
return Err(anyhow!("unable to resolve zone '{}'", zone));
}
} else {
return Err(anyhow!("unable to resolve zone '{}'", zone));
}
@ -115,30 +126,30 @@ impl ZoneListCommand {
let mut table = Table::new();
table.load_preset(UTF8_FULL_CONDENSED);
table.set_content_arrangement(comfy_table::ContentArrangement::Dynamic);
table.set_header(vec!["name", "uuid", "status", "ipv4", "ipv6"]);
table.set_header(vec!["name", "uuid", "state", "ipv4", "ipv6"]);
for zone in zones {
let ipv4 = zone
.state
.status
.as_ref()
.and_then(|x| x.network.as_ref())
.and_then(|x| x.network_status.as_ref())
.map(|x| x.zone_ipv4.as_str())
.unwrap_or("n/a");
let ipv6 = zone
.state
.status
.as_ref()
.and_then(|x| x.network.as_ref())
.and_then(|x| x.network_status.as_ref())
.map(|x| x.zone_ipv6.as_str())
.unwrap_or("n/a");
let Some(spec) = zone.spec else {
continue;
};
let status = zone.state.as_ref().cloned().unwrap_or_default().status();
let status_text = zone_status_text(status);
let state = zone.status.as_ref().cloned().unwrap_or_default().state();
let status_text = zone_state_text(state);
let status_color = match status {
ZoneStatus::Destroyed | ZoneStatus::Failed => Color::Red,
ZoneStatus::Destroying | ZoneStatus::Exited | ZoneStatus::Starting => Color::Yellow,
ZoneStatus::Started => Color::Green,
let status_color = match state {
ZoneState::Destroyed | ZoneState::Failed => Color::Red,
ZoneState::Destroying | ZoneState::Exited | ZoneState::Creating => Color::Yellow,
ZoneState::Created => Color::Green,
_ => Color::Reset,
};

View File

@ -43,7 +43,7 @@ impl ZoneLogsCommand {
};
let output = client.attach_zone_console(input).await?.into_inner();
let stdout_handle =
tokio::task::spawn(async move { StdioConsoleStream::stdout(output).await });
tokio::task::spawn(async move { StdioConsoleStream::stdout(output, false).await });
let exit_hook_task = StdioConsoleStream::zone_exit_hook(zone_id.clone(), events).await?;
let code = select! {
x = stdout_handle => {

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use krata::v1::control::{control_service_client::ControlServiceClient, ResolveZoneRequest};
use krata::v1::control::{control_service_client::ControlServiceClient, ResolveZoneIdRequest};
use tonic::{transport::Channel, Request};
@ -14,13 +14,13 @@ pub struct ZoneResolveCommand {
impl ZoneResolveCommand {
pub async fn run(self, mut client: ControlServiceClient<Channel>) -> Result<()> {
let reply = client
.resolve_zone(Request::new(ResolveZoneRequest {
.resolve_zone_id(Request::new(ResolveZoneIdRequest {
name: self.zone.clone(),
}))
.await?
.into_inner();
if let Some(zone) = reply.zone {
println!("{}", zone.id);
if !reply.zone_id.is_empty() {
println!("{}", reply.zone_id);
} else {
std::process::exit(1);
}

View File

@ -24,7 +24,7 @@ use ratatui::{
};
use crate::{
format::zone_status_text,
format::zone_state_text,
metrics::{
lookup_metric_value, MultiMetricCollector, MultiMetricCollectorHandle, MultiMetricState,
},
@ -106,7 +106,7 @@ impl ZoneTopApp {
break;
}
}
};
}
}
Ok(())
}
@ -157,7 +157,7 @@ impl Widget for &mut ZoneTopApp {
continue;
};
let Some(ref state) = ms.zone.state else {
let Some(ref status) = ms.zone.status else {
continue;
};
@ -177,7 +177,7 @@ impl Widget for &mut ZoneTopApp {
let row = Row::new(vec![
spec.name.clone(),
ms.zone.id.clone(),
zone_status_text(state.status()),
zone_state_text(status.state()),
memory_total.unwrap_or_default(),
memory_used.unwrap_or_default(),
memory_free.unwrap_or_default(),

View File

@ -4,14 +4,12 @@ use crossterm::{
terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled},
tty::IsTty,
};
use krata::v1::common::ZoneState;
use krata::{
events::EventStream,
v1::{
common::ZoneStatus,
control::{
watch_events_reply::Event, ExecZoneReply, ExecZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
v1::control::{
watch_events_reply::Event, ExecInsideZoneReply, ExecInsideZoneRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
};
use log::debug;
@ -49,8 +47,8 @@ impl StdioConsoleStream {
}
pub async fn stdin_stream_exec(
initial: ExecZoneRequest,
) -> impl Stream<Item = ExecZoneRequest> {
initial: ExecInsideZoneRequest,
) -> impl Stream<Item = ExecInsideZoneRequest> {
let mut stdin = stdin();
stream! {
yield initial;
@ -68,13 +66,13 @@ impl StdioConsoleStream {
if size == 1 && buffer[0] == 0x1d {
break;
}
yield ExecZoneRequest { zone_id: String::default(), task: None, data };
yield ExecInsideZoneRequest { zone_id: String::default(), task: None, data };
}
}
}
pub async fn stdout(mut stream: Streaming<ZoneConsoleReply>) -> Result<()> {
if stdin().is_tty() {
pub async fn stdout(mut stream: Streaming<ZoneConsoleReply>, raw: bool) -> Result<()> {
if raw && stdin().is_tty() {
enable_raw_mode()?;
StdioConsoleStream::register_terminal_restore_hook()?;
}
@ -90,7 +88,7 @@ impl StdioConsoleStream {
Ok(())
}
pub async fn exec_output(mut stream: Streaming<ExecZoneReply>) -> Result<i32> {
pub async fn exec_output(mut stream: Streaming<ExecInsideZoneReply>) -> Result<i32> {
let mut stdout = stdout();
let mut stderr = stderr();
while let Some(reply) = stream.next().await {
@ -128,7 +126,7 @@ impl StdioConsoleStream {
continue;
};
let Some(state) = zone.state else {
let Some(status) = zone.status else {
continue;
};
@ -136,12 +134,12 @@ impl StdioConsoleStream {
continue;
}
if let Some(exit_info) = state.exit_info {
return Some(exit_info.code);
if let Some(exit_status) = status.exit_status {
return Some(exit_status.code);
}
let status = state.status();
if status == ZoneStatus::Destroying || status == ZoneStatus::Destroyed {
let state = status.state();
if state == ZoneState::Destroying || state == ZoneState::Destroyed {
return Some(10);
}
}

View File

@ -3,11 +3,12 @@ use std::{collections::HashMap, time::Duration};
use anyhow::Result;
use fancy_duration::FancyDuration;
use human_bytes::human_bytes;
use krata::v1::common::{Zone, ZoneMetricFormat, ZoneMetricNode, ZoneStatus};
use prost_reflect::{DynamicMessage, ReflectMessage};
use prost_types::Value;
use termtree::Tree;
use krata::v1::common::{Zone, ZoneMetricFormat, ZoneMetricNode, ZoneState};
pub fn proto2dynamic(proto: impl ReflectMessage) -> Result<DynamicMessage> {
Ok(DynamicMessage::decode(
proto.descriptor(),
@ -75,30 +76,30 @@ pub fn kv2line(map: HashMap<String, String>) -> String {
.join(" ")
}
pub fn zone_status_text(status: ZoneStatus) -> String {
pub fn zone_state_text(status: ZoneState) -> String {
match status {
ZoneStatus::Starting => "starting",
ZoneStatus::Started => "started",
ZoneStatus::Destroying => "destroying",
ZoneStatus::Destroyed => "destroyed",
ZoneStatus::Exited => "exited",
ZoneStatus::Failed => "failed",
ZoneState::Creating => "creating",
ZoneState::Created => "created",
ZoneState::Destroying => "destroying",
ZoneState::Destroyed => "destroyed",
ZoneState::Exited => "exited",
ZoneState::Failed => "failed",
_ => "unknown",
}
.to_string()
}
pub fn zone_simple_line(zone: &Zone) -> String {
let state = zone_status_text(
zone.state
let state = zone_state_text(
zone.status
.as_ref()
.map(|x| x.status())
.unwrap_or(ZoneStatus::Unknown),
.map(|x| x.state())
.unwrap_or(ZoneState::Unknown),
);
let name = zone.spec.as_ref().map(|x| x.name.as_str()).unwrap_or("");
let network = zone.state.as_ref().and_then(|x| x.network.as_ref());
let ipv4 = network.map(|x| x.zone_ipv4.as_str()).unwrap_or("");
let ipv6 = network.map(|x| x.zone_ipv6.as_str()).unwrap_or("");
let network_status = zone.status.as_ref().and_then(|x| x.network_status.as_ref());
let ipv4 = network_status.map(|x| x.zone_ipv4.as_str()).unwrap_or("");
let ipv6 = network_status.map(|x| x.zone_ipv6.as_str()).unwrap_or("");
format!("{}\t{}\t{}\t{}\t{}", zone.id, state, name, ipv4, ipv6)
}

View File

@ -1,8 +1,10 @@
use crate::format::metrics_value_pretty;
use anyhow::Result;
use krata::v1::common::ZoneState;
use krata::{
events::EventStream,
v1::{
common::{Zone, ZoneMetricNode, ZoneStatus},
common::{Zone, ZoneMetricNode},
control::{
control_service_client::ControlServiceClient, watch_events_reply::Event,
ListZonesRequest, ReadZoneMetricsRequest,
@ -19,8 +21,6 @@ use tokio::{
};
use tonic::transport::Channel;
use crate::format::metrics_value_pretty;
pub struct MetricState {
pub zone: Zone,
pub root: Option<ZoneMetricNode>,
@ -86,11 +86,11 @@ impl MultiMetricCollector {
let Some(zone) = changed.zone else {
continue;
};
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
zones.retain(|x| x.id != zone.id);
if state.status() != ZoneStatus::Destroying {
if status.state() != ZoneState::Destroying {
zones.push(zone);
}
false
@ -112,11 +112,11 @@ impl MultiMetricCollector {
let mut metrics = Vec::new();
for zone in &zones {
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
if state.status() != ZoneStatus::Started {
if status.state() != ZoneState::Created {
continue;
}

View File

@ -9,6 +9,7 @@ edition = "2021"
resolver = "2"
[dependencies]
krata-advmac = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
@ -17,14 +18,16 @@ circular-buffer = { workspace = true }
clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata-oci = { path = "../oci", version = "^0.0.15" }
krata-runtime = { path = "../runtime", version = "^0.0.15" }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.16" }
krata-oci = { path = "../oci", version = "^0.0.16" }
krata-runtime = { path = "../runtime", version = "^0.0.16" }
log = { workspace = true }
prost = { workspace = true }
redb = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
signal-hook = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }

View File

@ -10,6 +10,8 @@ pub struct DaemonConfig {
pub oci: OciConfig,
#[serde(default)]
pub pci: DaemonPciConfig,
#[serde(default = "default_network")]
pub network: DaemonNetworkConfig,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
@ -49,6 +51,65 @@ pub enum DaemonPciDeviceRdmReservePolicy {
Relaxed,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonNetworkConfig {
#[serde(default = "default_network_nameservers")]
pub nameservers: Vec<String>,
#[serde(default = "default_network_ipv4")]
pub ipv4: DaemonIpv4NetworkConfig,
#[serde(default = "default_network_ipv6")]
pub ipv6: DaemonIpv6NetworkConfig,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonIpv4NetworkConfig {
#[serde(default = "default_network_ipv4_subnet")]
pub subnet: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DaemonIpv6NetworkConfig {
#[serde(default = "default_network_ipv6_subnet")]
pub subnet: String,
}
fn default_network() -> DaemonNetworkConfig {
DaemonNetworkConfig {
nameservers: default_network_nameservers(),
ipv4: default_network_ipv4(),
ipv6: default_network_ipv6(),
}
}
fn default_network_nameservers() -> Vec<String> {
vec![
"1.1.1.1".to_string(),
"1.0.0.1".to_string(),
"2606:4700:4700::1111".to_string(),
"2606:4700:4700::1001".to_string(),
]
}
fn default_network_ipv4() -> DaemonIpv4NetworkConfig {
DaemonIpv4NetworkConfig {
subnet: default_network_ipv4_subnet(),
}
}
fn default_network_ipv4_subnet() -> String {
"10.75.80.0/24".to_string()
}
fn default_network_ipv6() -> DaemonIpv6NetworkConfig {
DaemonIpv6NetworkConfig {
subnet: default_network_ipv6_subnet(),
}
}
fn default_network_ipv6_subnet() -> String {
"fdd4:1476:6c7e::/48".to_string()
}
impl DaemonConfig {
pub async fn load(path: &Path) -> Result<DaemonConfig> {
if path.exists() {

View File

@ -1,5 +1,15 @@
use crate::db::zone::ZoneStore;
use crate::{
command::DaemonCommand, console::DaemonConsoleHandle, devices::DaemonDeviceManager,
event::DaemonEventContext, idm::DaemonIdmHandle, metrics::idm_metric_to_api,
oci::convert_oci_progress, zlt::ZoneLookupTable,
};
use async_stream::try_stream;
use futures::Stream;
use krata::v1::control::{
GetZoneReply, GetZoneRequest, SetHostPowerManagementPolicyReply,
SetHostPowerManagementPolicyRequest,
};
use krata::{
idm::internal::{
exec_stream_request_update::Update, request::Request as IdmRequestType,
@ -10,11 +20,11 @@ use krata::{
common::{OciImageFormat, Zone, ZoneState, ZoneStatus},
control::{
control_service_server::ControlService, CreateZoneReply, CreateZoneRequest,
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecZoneReply, ExecZoneRequest,
HostCpuTopologyInfo, HostCpuTopologyReply, HostCpuTopologyRequest,
HostPowerManagementPolicy, IdentifyHostReply, IdentifyHostRequest, ListDevicesReply,
DestroyZoneReply, DestroyZoneRequest, DeviceInfo, ExecInsideZoneReply,
ExecInsideZoneRequest, GetHostCpuTopologyReply, GetHostCpuTopologyRequest,
HostCpuTopologyInfo, HostStatusReply, HostStatusRequest, ListDevicesReply,
ListDevicesRequest, ListZonesReply, ListZonesRequest, PullImageReply, PullImageRequest,
ReadZoneMetricsReply, ReadZoneMetricsRequest, ResolveZoneReply, ResolveZoneRequest,
ReadZoneMetricsReply, ReadZoneMetricsRequest, ResolveZoneIdReply, ResolveZoneIdRequest,
SnoopIdmReply, SnoopIdmRequest, WatchEventsReply, WatchEventsRequest, ZoneConsoleReply,
ZoneConsoleRequest,
},
@ -36,12 +46,6 @@ use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid;
use crate::{
command::DaemonCommand, console::DaemonConsoleHandle, db::ZoneStore,
devices::DaemonDeviceManager, event::DaemonEventContext, idm::DaemonIdmHandle,
metrics::idm_metric_to_api, oci::convert_oci_progress, zlt::ZoneLookupTable,
};
pub struct ApiError {
message: String,
}
@ -112,8 +116,8 @@ enum PullImageSelect {
#[tonic::async_trait]
impl ControlService for DaemonControlService {
type ExecZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecZoneReply, Status>> + Send + 'static>>;
type ExecInsideZoneStream =
Pin<Box<dyn Stream<Item = Result<ExecInsideZoneReply, Status>> + Send + 'static>>;
type AttachZoneConsoleStream =
Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>;
@ -127,12 +131,12 @@ impl ControlService for DaemonControlService {
type SnoopIdmStream =
Pin<Box<dyn Stream<Item = Result<SnoopIdmReply, Status>> + Send + 'static>>;
async fn identify_host(
async fn host_status(
&self,
request: Request<IdentifyHostRequest>,
) -> Result<Response<IdentifyHostReply>, Status> {
request: Request<HostStatusRequest>,
) -> Result<Response<HostStatusReply>, Status> {
let _ = request.into_inner();
Ok(Response::new(IdentifyHostReply {
Ok(Response::new(HostStatusReply {
host_domid: self.glt.host_domid(),
host_uuid: self.glt.host_uuid().to_string(),
krata_version: DaemonCommand::version(),
@ -156,11 +160,11 @@ impl ControlService for DaemonControlService {
uuid,
Zone {
id: uuid.to_string(),
state: Some(ZoneState {
status: ZoneStatus::Starting.into(),
network: None,
exit_info: None,
error_info: None,
status: Some(ZoneStatus {
state: ZoneState::Creating.into(),
network_status: None,
exit_status: None,
error_status: None,
host: self.glt.host_uuid().to_string(),
domid: u32::MAX,
}),
@ -180,10 +184,10 @@ impl ControlService for DaemonControlService {
}))
}
async fn exec_zone(
async fn exec_inside_zone(
&self,
request: Request<Streaming<ExecZoneRequest>>,
) -> Result<Response<Self::ExecZoneStream>, Status> {
request: Request<Streaming<ExecInsideZoneRequest>>,
) -> Result<Response<Self::ExecInsideZoneStream>, Status> {
let mut input = request.into_inner();
let Some(request) = input.next().await else {
return Err(ApiError {
@ -232,7 +236,7 @@ impl ControlService for DaemonControlService {
loop {
select! {
x = input.next() => if let Some(update) = x {
let update: Result<ExecZoneRequest, Status> = update.map_err(|error| ApiError {
let update: Result<ExecInsideZoneRequest, Status> = update.map_err(|error| ApiError {
message: error.to_string()
}.into());
@ -252,7 +256,7 @@ impl ControlService for DaemonControlService {
let Some(IdmResponseType::ExecStream(update)) = response.response else {
break;
};
let reply = ExecZoneReply {
let reply = ExecInsideZoneReply {
exited: update.exited,
error: update.error,
exit_code: update.exit_code,
@ -265,11 +269,11 @@ impl ControlService for DaemonControlService {
break;
}
}
};
}
}
};
Ok(Response::new(Box::pin(output) as Self::ExecZoneStream))
Ok(Response::new(Box::pin(output) as Self::ExecInsideZoneStream))
}
async fn destroy_zone(
@ -287,16 +291,16 @@ impl ControlService for DaemonControlService {
.into());
};
zone.state = Some(zone.state.as_mut().cloned().unwrap_or_default());
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
if zone.state.as_ref().unwrap().status() == ZoneStatus::Destroyed {
if zone.status.as_ref().unwrap().state() == ZoneState::Destroyed {
return Err(ApiError {
message: "zone already destroyed".to_string(),
}
.into());
}
zone.state.as_mut().unwrap().status = ZoneStatus::Destroying.into();
zone.status.as_mut().unwrap().state = ZoneState::Destroying.into();
self.zones
.update(uuid, zone)
.await
@ -320,10 +324,10 @@ impl ControlService for DaemonControlService {
Ok(Response::new(ListZonesReply { zones }))
}
async fn resolve_zone(
async fn resolve_zone_id(
&self,
request: Request<ResolveZoneRequest>,
) -> Result<Response<ResolveZoneReply>, Status> {
request: Request<ResolveZoneIdRequest>,
) -> Result<Response<ResolveZoneIdReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zones = zones
@ -334,8 +338,8 @@ impl ControlService for DaemonControlService {
|| x.id == request.name
})
.collect::<Vec<Zone>>();
Ok(Response::new(ResolveZoneReply {
zone: zones.first().cloned(),
Ok(Response::new(ResolveZoneIdReply {
zone_id: zones.first().cloned().map(|x| x.id).unwrap_or_default(),
}))
}
@ -558,8 +562,8 @@ impl ControlService for DaemonControlService {
async fn get_host_cpu_topology(
&self,
request: Request<HostCpuTopologyRequest>,
) -> Result<Response<HostCpuTopologyReply>, Status> {
request: Request<GetHostCpuTopologyRequest>,
) -> Result<Response<GetHostCpuTopologyReply>, Status> {
let _ = request.into_inner();
let power = self
.runtime
@ -579,13 +583,13 @@ impl ControlService for DaemonControlService {
})
}
Ok(Response::new(HostCpuTopologyReply { cpus }))
Ok(Response::new(GetHostCpuTopologyReply { cpus }))
}
async fn set_host_power_management_policy(
&self,
request: Request<HostPowerManagementPolicy>,
) -> Result<Response<HostPowerManagementPolicy>, Status> {
request: Request<SetHostPowerManagementPolicyRequest>,
) -> Result<Response<SetHostPowerManagementPolicyReply>, Status> {
let policy = request.into_inner();
let power = self
.runtime
@ -603,9 +607,20 @@ impl ControlService for DaemonControlService {
.await
.map_err(ApiError::from)?;
Ok(Response::new(HostPowerManagementPolicy {
scheduler: scheduler.to_string(),
smt_awareness: policy.smt_awareness,
Ok(Response::new(SetHostPowerManagementPolicyReply {}))
}
async fn get_zone(
&self,
request: Request<GetZoneRequest>,
) -> Result<Response<GetZoneReply>, Status> {
let request = request.into_inner();
let zones = self.zones.list().await.map_err(ApiError::from)?;
let zone = zones.get(&Uuid::from_str(&request.zone_id).map_err(|error| ApiError {
message: error.to_string(),
})?);
Ok(Response::new(GetZoneReply {
zone: zone.cloned(),
}))
}
}

118
crates/daemon/src/db/ip.rs Normal file
View File

@ -0,0 +1,118 @@
use crate::db::KrataDatabase;
use advmac::MacAddr6;
use anyhow::Result;
use log::error;
use redb::{ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use uuid::Uuid;
const IP_RESERVATION_TABLE: TableDefinition<u128, &[u8]> = TableDefinition::new("ip-reservation");
#[derive(Clone)]
pub struct IpReservationStore {
db: KrataDatabase,
}
impl IpReservationStore {
pub fn open(db: KrataDatabase) -> Result<Self> {
let write = db.database.begin_write()?;
let _ = write.open_table(IP_RESERVATION_TABLE);
write.commit()?;
Ok(IpReservationStore { db })
}
pub async fn read(&self, id: Uuid) -> Result<Option<IpReservation>> {
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
let Some(entry) = table.get(id.to_u128_le())? else {
return Ok(None);
};
let bytes = entry.value();
Ok(Some(serde_json::from_slice(bytes)?))
}
pub async fn list(&self) -> Result<HashMap<Uuid, IpReservation>> {
enum ListEntry {
Valid(Uuid, IpReservation),
Invalid(Uuid),
}
let mut reservations: HashMap<Uuid, IpReservation> = HashMap::new();
let corruptions = {
let read = self.db.database.begin_read()?;
let table = read.open_table(IP_RESERVATION_TABLE)?;
table
.iter()?
.flat_map(|result| {
result.map(|(key, value)| {
let uuid = Uuid::from_u128_le(key.value());
match serde_json::from_slice::<IpReservation>(value.value()) {
Ok(reservation) => ListEntry::Valid(uuid, reservation),
Err(error) => {
error!(
"found invalid ip reservation in database for uuid {}: {}",
uuid, error
);
ListEntry::Invalid(uuid)
}
}
})
})
.filter_map(|entry| match entry {
ListEntry::Valid(uuid, reservation) => {
reservations.insert(uuid, reservation);
None
}
ListEntry::Invalid(uuid) => Some(uuid),
})
.collect::<Vec<Uuid>>()
};
if !corruptions.is_empty() {
let write = self.db.database.begin_write()?;
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
for corruption in corruptions {
table.remove(corruption.to_u128_le())?;
}
}
Ok(reservations)
}
pub async fn update(&self, id: Uuid, entry: IpReservation) -> Result<()> {
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
let bytes = serde_json::to_vec(&entry)?;
table.insert(id.to_u128_le(), bytes.as_slice())?;
}
write.commit()?;
Ok(())
}
pub async fn remove(&self, id: Uuid) -> Result<()> {
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(IP_RESERVATION_TABLE)?;
table.remove(id.to_u128_le())?;
}
write.commit()?;
Ok(())
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IpReservation {
pub uuid: String,
pub ipv4: Ipv4Addr,
pub ipv6: Ipv6Addr,
pub mac: MacAddr6,
pub ipv4_prefix: u8,
pub ipv6_prefix: u8,
pub gateway_ipv4: Ipv4Addr,
pub gateway_ipv6: Ipv6Addr,
pub gateway_mac: MacAddr6,
}

View File

@ -0,0 +1,21 @@
use anyhow::Result;
use redb::Database;
use std::path::Path;
use std::sync::Arc;
pub mod ip;
pub mod zone;
#[derive(Clone)]
pub struct KrataDatabase {
pub database: Arc<Database>,
}
impl KrataDatabase {
pub fn open(path: &Path) -> Result<Self> {
let database = Database::create(path)?;
Ok(KrataDatabase {
database: Arc::new(database),
})
}
}

View File

@ -1,33 +1,31 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use std::collections::HashMap;
use crate::db::KrataDatabase;
use anyhow::Result;
use krata::v1::common::Zone;
use log::error;
use prost::Message;
use redb::{Database, ReadableTable, TableDefinition};
use redb::{ReadableTable, TableDefinition};
use uuid::Uuid;
const ZONES: TableDefinition<u128, &[u8]> = TableDefinition::new("zones");
const ZONE_TABLE: TableDefinition<u128, &[u8]> = TableDefinition::new("zone");
#[derive(Clone)]
pub struct ZoneStore {
database: Arc<Database>,
db: KrataDatabase,
}
impl ZoneStore {
pub fn open(path: &Path) -> Result<Self> {
let database = Database::create(path)?;
let write = database.begin_write()?;
let _ = write.open_table(ZONES);
pub fn open(db: KrataDatabase) -> Result<Self> {
let write = db.database.begin_write()?;
let _ = write.open_table(ZONE_TABLE);
write.commit()?;
Ok(ZoneStore {
database: Arc::new(database),
})
Ok(ZoneStore { db })
}
pub async fn read(&self, id: Uuid) -> Result<Option<Zone>> {
let read = self.database.begin_read()?;
let table = read.open_table(ZONES)?;
let read = self.db.database.begin_read()?;
let table = read.open_table(ZONE_TABLE)?;
let Some(entry) = table.get(id.to_u128_le())? else {
return Ok(None);
};
@ -37,8 +35,8 @@ impl ZoneStore {
pub async fn list(&self) -> Result<HashMap<Uuid, Zone>> {
let mut zones: HashMap<Uuid, Zone> = HashMap::new();
let read = self.database.begin_read()?;
let table = read.open_table(ZONES)?;
let read = self.db.database.begin_read()?;
let table = read.open_table(ZONE_TABLE)?;
for result in table.iter()? {
let (key, value) = result?;
let uuid = Uuid::from_u128_le(key.value());
@ -58,9 +56,9 @@ impl ZoneStore {
}
pub async fn update(&self, id: Uuid, entry: Zone) -> Result<()> {
let write = self.database.begin_write()?;
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(ZONES)?;
let mut table = write.open_table(ZONE_TABLE)?;
let bytes = entry.encode_to_vec();
table.insert(id.to_u128_le(), bytes.as_slice())?;
}
@ -69,9 +67,9 @@ impl ZoneStore {
}
pub async fn remove(&self, id: Uuid) -> Result<()> {
let write = self.database.begin_write()?;
let write = self.db.database.begin_write()?;
{
let mut table = write.open_table(ZONES)?;
let mut table = write.open_table(ZONE_TABLE)?;
table.remove(id.to_u128_le())?;
}
write.commit()?;

View File

@ -4,9 +4,10 @@ use std::{
time::Duration,
};
use crate::{db::ZoneStore, idm::DaemonIdmHandle};
use crate::db::zone::ZoneStore;
use crate::idm::DaemonIdmHandle;
use anyhow::Result;
use krata::v1::common::ZoneExitInfo;
use krata::v1::common::ZoneExitStatus;
use krata::{
idm::{internal::event::Event as EventType, internal::Event},
v1::common::{ZoneState, ZoneStatus},
@ -83,15 +84,15 @@ impl DaemonEventGenerator {
return Ok(());
};
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
return Ok(());
};
let status = state.status();
let state = status.state();
let id = Uuid::from_str(&zone.id)?;
let domid = state.domid;
match status {
ZoneStatus::Started => {
let domid = status.domid;
match state {
ZoneState::Created => {
if let Entry::Vacant(e) = self.idms.entry(domid) {
let client = self.idm.client_by_domid(domid).await?;
let mut receiver = client.subscribe().await?;
@ -111,7 +112,7 @@ impl DaemonEventGenerator {
}
}
ZoneStatus::Destroyed => {
ZoneState::Destroyed => {
if let Some((_, handle)) = self.idms.remove(&domid) {
handle.abort();
}
@ -131,13 +132,13 @@ impl DaemonEventGenerator {
async fn handle_exit_code(&mut self, id: Uuid, code: i32) -> Result<()> {
if let Some(mut zone) = self.zones.read(id).await? {
zone.state = Some(ZoneState {
status: ZoneStatus::Exited.into(),
network: zone.state.clone().unwrap_or_default().network,
exit_info: Some(ZoneExitInfo { code }),
error_info: None,
host: zone.state.clone().map(|x| x.host).unwrap_or_default(),
domid: zone.state.clone().map(|x| x.domid).unwrap_or(u32::MAX),
zone.status = Some(ZoneStatus {
state: ZoneState::Exited.into(),
network_status: zone.status.clone().unwrap_or_default().network_status,
exit_status: Some(ZoneExitStatus { code }),
error_status: None,
host: zone.status.clone().map(|x| x.host).unwrap_or_default(),
domid: zone.status.clone().map(|x| x.domid).unwrap_or(u32::MAX),
});
self.zones.update(id, zone).await?;

View File

@ -11,7 +11,7 @@ use krata::idm::{
transport::IdmTransportPacket,
};
use kratart::channel::ChannelService;
use log::{error, warn};
use log::{debug, error, warn};
use prost::Message;
use tokio::{
select,
@ -85,13 +85,18 @@ pub struct DaemonIdm {
impl DaemonIdm {
pub async fn new(glt: ZoneLookupTable) -> Result<DaemonIdm> {
debug!("allocating channel service for idm");
let (service, tx_raw_sender, rx_receiver) =
ChannelService::new("krata-channel".to_string(), None).await?;
let (tx_sender, tx_receiver) = channel(100);
let (snoop_sender, _) = broadcast::channel(100);
debug!("starting idm channel service");
let task = service.launch().await?;
let clients = Arc::new(Mutex::new(HashMap::new()));
let feeds = Arc::new(Mutex::new(HashMap::new()));
Ok(DaemonIdm {
glt,
rx_receiver,
@ -128,52 +133,99 @@ impl DaemonIdm {
})
}
async fn process_rx_packet(
&mut self,
domid: u32,
data: Option<Vec<u8>>,
buffers: &mut HashMap<u32, BytesMut>,
) -> Result<()> {
// check if data is present, if it is not, that signals a closed channel.
if let Some(data) = data {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
buffer.extend_from_slice(&data);
loop {
// check if the buffer is less than the header size, if so, wait for more data
if buffer.len() < 6 {
break;
}
// check for the magic bytes 0xff, 0xff at the start of the message, if that doesn't
// exist, clear the buffer. this ensures that partial messages won't be processed.
if buffer[0] != 0xff || buffer[1] != 0xff {
buffer.clear();
return Ok(());
}
// read the size from the buffer as a little endian u32
let size = (buffer[2] as u32
| (buffer[3] as u32) << 8
| (buffer[4] as u32) << 16
| (buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if buffer.len() < needed {
return Ok(());
}
let mut packet = buffer.split_to(needed);
// advance the buffer by the header, leaving only the raw data.
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
let _ =
client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds)
.await?;
let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet.clone());
}
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
from: domid,
to: 0,
packet,
});
}
Err(packet) => {
warn!("received invalid packet from domain {}: {}", domid, packet);
}
}
}
} else {
let mut clients = self.clients.lock().await;
let mut feeds = self.feeds.lock().await;
clients.remove(&domid);
feeds.remove(&domid);
}
Ok(())
}
async fn tx_packet(&mut self, domid: u32, packet: IdmTransportPacket) -> Result<()> {
let data = packet.encode_to_vec();
let mut buffer = vec![0u8; 6];
let length = data.len() as u32;
// magic bytes
buffer[0] = 0xff;
buffer[1] = 0xff;
// little endian u32 for message size
buffer[2] = length as u8;
buffer[3] = (length << 8) as u8;
buffer[4] = (length << 16) as u8;
buffer[5] = (length << 24) as u8;
buffer.extend_from_slice(&data);
self.tx_raw_sender.send((domid, buffer)).await?;
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket {
from: 0,
to: domid,
packet,
});
Ok(())
}
async fn process(&mut self, buffers: &mut HashMap<u32, BytesMut>) -> Result<()> {
loop {
select! {
x = self.rx_receiver.recv() => match x {
Some((domid, data)) => {
if let Some(data) = data {
let buffer = buffers.entry(domid).or_insert_with_key(|_| BytesMut::new());
buffer.extend_from_slice(&data);
loop {
if buffer.len() < 6 {
break;
}
if buffer[0] != 0xff || buffer[1] != 0xff {
buffer.clear();
break;
}
let size = (buffer[2] as u32 | (buffer[3] as u32) << 8 | (buffer[4] as u32) << 16 | (buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if buffer.len() < needed {
break;
}
let mut packet = buffer.split_to(needed);
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
let _ = client_or_create(domid, &self.tx_sender, &self.clients, &self.feeds).await?;
let guard = self.feeds.lock().await;
if let Some(feed) = guard.get(&domid) {
let _ = feed.try_send(packet.clone());
}
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: domid, to: 0, packet });
}
Err(packet) => {
warn!("received invalid packet from domain {}: {}", domid, packet);
}
}
}
} else {
let mut clients = self.clients.lock().await;
let mut feeds = self.feeds.lock().await;
clients.remove(&domid);
feeds.remove(&domid);
}
self.process_rx_packet(domid, data, buffers).await?;
},
None => {
@ -182,25 +234,14 @@ impl DaemonIdm {
},
x = self.tx_receiver.recv() => match x {
Some((domid, packet)) => {
let data = packet.encode_to_vec();
let mut buffer = vec![0u8; 6];
let length = data.len() as u32;
buffer[0] = 0xff;
buffer[1] = 0xff;
buffer[2] = length as u8;
buffer[3] = (length << 8) as u8;
buffer[4] = (length << 16) as u8;
buffer[5] = (length << 24) as u8;
buffer.extend_from_slice(&data);
self.tx_raw_sender.send((domid, buffer)).await?;
let _ = self.snoop_sender.send(DaemonIdmSnoopPacket { from: 0, to: domid, packet });
self.tx_packet(domid, packet).await?;
},
None => {
break;
}
}
};
}
}
Ok(())
}
@ -249,9 +290,9 @@ pub struct IdmDaemonBackend {
#[async_trait::async_trait]
impl IdmBackend for IdmDaemonBackend {
async fn recv(&mut self) -> Result<IdmTransportPacket> {
async fn recv(&mut self) -> Result<Vec<IdmTransportPacket>> {
if let Some(packet) = self.rx_receiver.recv().await {
Ok(packet)
Ok(vec![packet])
} else {
Err(anyhow!("idm receive channel closed"))
}

View File

@ -0,0 +1,179 @@
use advmac::MacAddr6;
use anyhow::{anyhow, Result};
use ipnetwork::{Ipv4Network, Ipv6Network};
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::db::ip::{IpReservation, IpReservationStore};
#[derive(Default, Clone)]
pub struct IpAssignmentState {
pub ipv4: HashMap<Ipv4Addr, IpReservation>,
pub ipv6: HashMap<Ipv6Addr, IpReservation>,
}
#[derive(Clone)]
pub struct IpAssignment {
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
gateway_ipv4: Ipv4Addr,
gateway_ipv6: Ipv6Addr,
gateway_mac: MacAddr6,
store: IpReservationStore,
state: Arc<RwLock<IpAssignmentState>>,
}
impl IpAssignment {
pub async fn new(
host_uuid: Uuid,
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
store: IpReservationStore,
) -> Result<Self> {
let mut state = IpAssignment::fetch_current_state(&store).await?;
let reservation = if let Some(reservation) = store.read(host_uuid).await? {
reservation
} else {
IpAssignment::allocate(
&mut state,
&store,
host_uuid,
ipv4_network,
ipv6_network,
None,
None,
None,
)
.await?
};
let assignment = IpAssignment {
ipv4_network,
ipv6_network,
gateway_ipv4: reservation.ipv4,
gateway_ipv6: reservation.ipv6,
gateway_mac: reservation.gateway_mac,
store,
state: Arc::new(RwLock::new(state)),
};
Ok(assignment)
}
async fn fetch_current_state(store: &IpReservationStore) -> Result<IpAssignmentState> {
let reservations = store.list().await?;
let mut state = IpAssignmentState::default();
for reservation in reservations.values() {
state.ipv4.insert(reservation.ipv4, reservation.clone());
state.ipv6.insert(reservation.ipv6, reservation.clone());
}
Ok(state)
}
#[allow(clippy::too_many_arguments)]
async fn allocate(
state: &mut IpAssignmentState,
store: &IpReservationStore,
uuid: Uuid,
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
gateway_ipv4: Option<Ipv4Addr>,
gateway_ipv6: Option<Ipv6Addr>,
gateway_mac: Option<MacAddr6>,
) -> Result<IpReservation> {
let found_ipv4: Option<Ipv4Addr> = ipv4_network
.iter()
.filter(|ip| {
ip.is_private() && !(ip.is_loopback() || ip.is_multicast() || ip.is_broadcast())
})
.filter(|ip| {
let last = ip.octets()[3];
// filter for IPs ending in .1 to .250 because .250+ can have special meaning
last > 0 && last < 250
})
.find(|ip| !state.ipv4.contains_key(ip));
let found_ipv6: Option<Ipv6Addr> = ipv6_network
.iter()
.filter(|ip| !ip.is_loopback() && !ip.is_multicast())
.find(|ip| !state.ipv6.contains_key(ip));
let Some(ipv4) = found_ipv4 else {
return Err(anyhow!(
"unable to allocate ipv4 address, assigned network is exhausted"
));
};
let Some(ipv6) = found_ipv6 else {
return Err(anyhow!(
"unable to allocate ipv6 address, assigned network is exhausted"
));
};
let mut mac = MacAddr6::random();
mac.set_local(false);
mac.set_multicast(false);
let reservation = IpReservation {
uuid: uuid.to_string(),
ipv4,
ipv6,
mac,
ipv4_prefix: ipv4_network.prefix(),
ipv6_prefix: ipv6_network.prefix(),
gateway_ipv4: gateway_ipv4.unwrap_or(ipv4),
gateway_ipv6: gateway_ipv6.unwrap_or(ipv6),
gateway_mac: gateway_mac.unwrap_or(mac),
};
state.ipv4.insert(ipv4, reservation.clone());
state.ipv6.insert(ipv6, reservation.clone());
store.update(uuid, reservation.clone()).await?;
Ok(reservation)
}
pub async fn assign(&self, uuid: Uuid) -> Result<IpReservation> {
let mut state = self.state.write().await;
let reservation = IpAssignment::allocate(
&mut state,
&self.store,
uuid,
self.ipv4_network,
self.ipv6_network,
Some(self.gateway_ipv4),
Some(self.gateway_ipv6),
Some(self.gateway_mac),
)
.await?;
Ok(reservation)
}
pub async fn recall(&self, uuid: Uuid) -> Result<()> {
let mut state = self.state.write().await;
self.store.remove(uuid).await?;
state
.ipv4
.retain(|_, reservation| reservation.uuid != uuid.to_string());
state
.ipv6
.retain(|_, reservation| reservation.uuid != uuid.to_string());
Ok(())
}
pub async fn retrieve(&self, uuid: Uuid) -> Result<Option<IpReservation>> {
self.store.read(uuid).await
}
pub async fn reload(&self) -> Result<()> {
let mut state = self.state.write().await;
let intermediate = IpAssignment::fetch_current_state(&self.store).await?;
*state = intermediate;
Ok(())
}
pub async fn read(&self) -> Result<IpAssignmentState> {
Ok(self.state.read().await.clone())
}
}

View File

@ -0,0 +1 @@
pub mod assignment;

View File

@ -1,18 +1,22 @@
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use crate::db::ip::IpReservationStore;
use crate::db::zone::ZoneStore;
use crate::db::KrataDatabase;
use crate::ip::assignment::IpAssignment;
use anyhow::{anyhow, Result};
use config::DaemonConfig;
use console::{DaemonConsole, DaemonConsoleHandle};
use control::DaemonControlService;
use db::ZoneStore;
use devices::DaemonDeviceManager;
use event::{DaemonEventContext, DaemonEventGenerator};
use idm::{DaemonIdm, DaemonIdmHandle};
use ipnetwork::{Ipv4Network, Ipv6Network};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
use kratart::Runtime;
use log::info;
use log::{debug, info};
use reconcile::zone::ZoneReconciler;
use std::path::Path;
use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
use tokio::{
fs,
net::UnixListener,
@ -32,6 +36,7 @@ pub mod db;
pub mod devices;
pub mod event;
pub mod idm;
pub mod ip;
pub mod metrics;
pub mod oci;
pub mod reconcile;
@ -58,18 +63,22 @@ const ZONE_RECONCILER_QUEUE_LEN: usize = 1000;
impl Daemon {
pub async fn new(store: String) -> Result<Self> {
let store_dir = PathBuf::from(store.clone());
debug!("loading configuration");
let mut config_path = store_dir.clone();
config_path.push("config.toml");
let config = DaemonConfig::load(&config_path).await?;
let config = Arc::new(config);
debug!("initializing device manager");
let devices = DaemonDeviceManager::new(config.clone());
debug!("validating image cache directory");
let mut image_cache_dir = store_dir.clone();
image_cache_dir.push("cache");
image_cache_dir.push("image");
fs::create_dir_all(&image_cache_dir).await?;
debug!("loading zone0 uuid");
let mut host_uuid_path = store_dir.clone();
host_uuid_path.push("host.uuid");
let host_uuid = if host_uuid_path.is_file() {
@ -89,29 +98,41 @@ impl Daemon {
generated
};
debug!("validating zone asset directories");
let initrd_path = detect_zone_path(&store, "initrd")?;
let kernel_path = detect_zone_path(&store, "kernel")?;
let addons_path = detect_zone_path(&store, "addons.squashfs")?;
debug!("initializing caches and hydrating zone state");
let seed = config.oci.seed.clone().map(PathBuf::from);
let packer = OciPackerService::new(seed, &image_cache_dir, OciPlatform::current()).await?;
let runtime = Runtime::new(host_uuid).await?;
let glt = ZoneLookupTable::new(0, host_uuid);
let zones_db_path = format!("{}/zones.db", store);
let zones = ZoneStore::open(&PathBuf::from(zones_db_path))?;
debug!("initializing core runtime");
let runtime = Runtime::new().await?;
let zlt = ZoneLookupTable::new(0, host_uuid);
let db_path = format!("{}/krata.db", store);
let database = KrataDatabase::open(Path::new(&db_path))?;
let zones = ZoneStore::open(database.clone())?;
let (zone_reconciler_notify, zone_reconciler_receiver) =
channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
let idm = DaemonIdm::new(glt.clone()).await?;
debug!("starting IDM service");
let idm = DaemonIdm::new(zlt.clone()).await?;
let idm = idm.launch().await?;
let console = DaemonConsole::new(glt.clone()).await?;
debug!("initializing console interfaces");
let console = DaemonConsole::new(zlt.clone()).await?;
let console = console.launch().await?;
let (events, generator) =
DaemonEventGenerator::new(zones.clone(), zone_reconciler_notify.clone(), idm.clone())
.await?;
let runtime_for_reconciler = runtime.dupe().await?;
let ipv4_network = Ipv4Network::from_str(&config.network.ipv4.subnet)?;
let ipv6_network = Ipv6Network::from_str(&config.network.ipv6.subnet)?;
let ip_reservation_store = IpReservationStore::open(database)?;
let ip_assignment =
IpAssignment::new(host_uuid, ipv4_network, ipv6_network, ip_reservation_store).await?;
debug!("initializing zone reconciler");
let zone_reconciler = ZoneReconciler::new(
devices.clone(),
glt.clone(),
zlt.clone(),
zones.clone(),
events.clone(),
runtime_for_reconciler,
@ -120,6 +141,8 @@ impl Daemon {
kernel_path,
initrd_path,
addons_path,
ip_assignment,
config.clone(),
)?;
let zone_reconciler_task = zone_reconciler.launch(zone_reconciler_receiver).await?;
@ -127,17 +150,18 @@ impl Daemon {
// TODO: Create a way of abstracting early init tasks in kratad.
// TODO: Make initial power management policy configurable.
// FIXME: Power management hypercalls fail when running as an L1 hypervisor.
// let power = runtime.power_management_context().await?;
// power.set_smt_policy(true).await?;
// power
// .set_scheduler_policy("performance".to_string())
// .await?;
let power = runtime.power_management_context().await?;
power.set_smt_policy(true).await?;
power
.set_scheduler_policy("performance".to_string())
.await?;
info!("power management initialized");
info!("krata daemon initialized");
Ok(Self {
store,
_config: config,
glt,
glt: zlt,
devices,
zones,
events,
@ -152,6 +176,7 @@ impl Daemon {
}
pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
debug!("starting control service");
let control_service = DaemonControlService::new(
self.glt.clone(),
self.devices.clone(),

View File

@ -1,41 +1,41 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{anyhow, Result};
use futures::StreamExt;
use krata::launchcfg::LaunchPackedFormat;
use krata::v1::common::ZoneOciImageSpec;
use krata::v1::common::{OciImageFormat, Zone, ZoneState, ZoneStatus};
use krataoci::packer::{service::OciPackerService, OciPackedFormat};
use kratart::launch::{PciBdf, PciDevice, PciRdmReservePolicy};
use kratart::launch::{PciBdf, PciDevice, PciRdmReservePolicy, ZoneLaunchNetwork};
use kratart::{launch::ZoneLaunchRequest, Runtime};
use log::info;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::config::DaemonPciDeviceRdmReservePolicy;
use crate::config::{DaemonConfig, DaemonPciDeviceRdmReservePolicy};
use crate::devices::DaemonDeviceManager;
use crate::{
reconcile::zone::{zoneinfo_to_networkstate, ZoneReconcilerResult},
zlt::ZoneLookupTable,
};
use crate::ip::assignment::IpAssignment;
use crate::reconcile::zone::ip_reservation_to_network_status;
use crate::{reconcile::zone::ZoneReconcilerResult, zlt::ZoneLookupTable};
use krata::v1::common::zone_image_spec::Image;
use tokio::fs::{self, File};
use tokio::io::AsyncReadExt;
use tokio_tar::Archive;
use uuid::Uuid;
pub struct ZoneStarter<'a> {
pub struct ZoneCreator<'a> {
pub devices: &'a DaemonDeviceManager,
pub kernel_path: &'a Path,
pub initrd_path: &'a Path,
pub addons_path: &'a Path,
pub packer: &'a OciPackerService,
pub glt: &'a ZoneLookupTable,
pub ip_assignment: &'a IpAssignment,
pub zlt: &'a ZoneLookupTable,
pub runtime: &'a Runtime,
pub config: &'a DaemonConfig,
}
impl ZoneStarter<'_> {
impl ZoneCreator<'_> {
pub async fn oci_spec_tar_read_file(
&self,
file: &Path,
@ -75,7 +75,7 @@ impl ZoneStarter<'_> {
))
}
pub async fn start(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
pub async fn create(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let Some(ref spec) = zone.spec else {
return Err(anyhow!("zone spec not specified"));
};
@ -174,6 +174,8 @@ impl ZoneStarter<'_> {
}
}
let reservation = self.ip_assignment.assign(uuid).await?;
let info = self
.runtime
.launch(ZoneLaunchRequest {
@ -187,7 +189,7 @@ impl ZoneStarter<'_> {
image,
kernel,
initrd,
vcpus: spec.vcpus,
vcpus: spec.cpus,
mem: spec.mem,
pcis,
env: task
@ -198,16 +200,26 @@ impl ZoneStarter<'_> {
run: empty_vec_optional(task.command.clone()),
debug: false,
addons_image: Some(self.addons_path.to_path_buf()),
network: ZoneLaunchNetwork {
ipv4: reservation.ipv4.to_string(),
ipv4_prefix: reservation.ipv4_prefix,
ipv6: reservation.ipv6.to_string(),
ipv6_prefix: reservation.ipv6_prefix,
gateway_ipv4: reservation.gateway_ipv4.to_string(),
gateway_ipv6: reservation.gateway_ipv6.to_string(),
zone_mac: reservation.mac,
nameservers: self.config.network.nameservers.clone(),
},
})
.await?;
self.glt.associate(uuid, info.domid).await;
info!("started zone {}", uuid);
zone.state = Some(ZoneState {
status: ZoneStatus::Started.into(),
network: Some(zoneinfo_to_networkstate(&info)),
exit_info: None,
error_info: None,
host: self.glt.host_uuid().to_string(),
self.zlt.associate(uuid, info.domid).await;
info!("created zone {}", uuid);
zone.status = Some(ZoneStatus {
state: ZoneState::Created.into(),
network_status: Some(ip_reservation_to_network_status(&reservation)),
exit_status: None,
error_status: None,
host: self.zlt.host_uuid().to_string(),
domid: info.domid,
});
success.store(true, Ordering::Release);

View File

@ -5,13 +5,23 @@ use std::{
time::Duration,
};
use self::create::ZoneCreator;
use crate::config::DaemonConfig;
use crate::db::ip::IpReservation;
use crate::ip::assignment::IpAssignment;
use crate::{
db::zone::ZoneStore,
devices::DaemonDeviceManager,
event::{DaemonEvent, DaemonEventContext},
zlt::ZoneLookupTable,
};
use anyhow::Result;
use krata::v1::{
common::{Zone, ZoneErrorInfo, ZoneExitInfo, ZoneNetworkState, ZoneState, ZoneStatus},
common::{Zone, ZoneErrorStatus, ZoneExitStatus, ZoneNetworkStatus, ZoneState, ZoneStatus},
control::ZoneChangedEvent,
};
use krataoci::packer::service::OciPackerService;
use kratart::{Runtime, ZoneInfo};
use kratart::Runtime;
use log::{error, info, trace, warn};
use tokio::{
select,
@ -24,16 +34,7 @@ use tokio::{
};
use uuid::Uuid;
use crate::{
db::ZoneStore,
devices::DaemonDeviceManager,
event::{DaemonEvent, DaemonEventContext},
zlt::ZoneLookupTable,
};
use self::start::ZoneStarter;
mod start;
mod create;
const PARALLEL_LIMIT: u32 = 5;
@ -68,6 +69,8 @@ pub struct ZoneReconciler {
tasks: Arc<Mutex<HashMap<Uuid, ZoneReconcilerEntry>>>,
zone_reconciler_notify: Sender<Uuid>,
zone_reconcile_lock: Arc<RwLock<()>>,
ip_assignment: IpAssignment,
config: Arc<DaemonConfig>,
}
impl ZoneReconciler {
@ -83,6 +86,8 @@ impl ZoneReconciler {
kernel_path: PathBuf,
initrd_path: PathBuf,
modules_path: PathBuf,
ip_assignment: IpAssignment,
config: Arc<DaemonConfig>,
) -> Result<Self> {
Ok(Self {
devices,
@ -97,6 +102,8 @@ impl ZoneReconciler {
tasks: Arc::new(Mutex::new(HashMap::new())),
zone_reconciler_notify,
zone_reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)),
ip_assignment,
config,
})
}
@ -132,7 +139,7 @@ impl ZoneReconciler {
error!("runtime reconciler failed: {}", error);
}
}
};
}
}
}))
}
@ -166,21 +173,21 @@ impl ZoneReconciler {
let runtime_zone = runtime_zones.iter().find(|x| x.uuid == uuid);
match runtime_zone {
None => {
let mut state = stored_zone.state.as_mut().cloned().unwrap_or_default();
if state.status() == ZoneStatus::Started {
state.status = ZoneStatus::Starting.into();
let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
if status.state() == ZoneState::Created {
status.state = ZoneState::Creating.into();
}
stored_zone.state = Some(state);
stored_zone.status = Some(status);
}
Some(runtime) => {
self.zlt.associate(uuid, runtime.domid).await;
let mut state = stored_zone.state.as_mut().cloned().unwrap_or_default();
let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
if let Some(code) = runtime.state.exit_code {
state.status = ZoneStatus::Exited.into();
state.exit_info = Some(ZoneExitInfo { code });
status.state = ZoneState::Exited.into();
status.exit_status = Some(ZoneExitStatus { code });
} else {
state.status = ZoneStatus::Started.into();
status.state = ZoneState::Created.into();
}
for device in &stored_zone
@ -193,8 +200,11 @@ impl ZoneReconciler {
device_claims.insert(device.name.clone(), uuid);
}
state.network = Some(zoneinfo_to_networkstate(runtime));
stored_zone.state = Some(state);
if let Some(reservation) = self.ip_assignment.retrieve(uuid).await? {
status.network_status =
Some(ip_reservation_to_network_status(&reservation));
}
stored_zone.status = Some(status);
}
}
@ -228,20 +238,20 @@ impl ZoneReconciler {
zone: Some(zone.clone()),
}))?;
let start_status = zone.state.as_ref().map(|x| x.status()).unwrap_or_default();
let result = match start_status {
ZoneStatus::Starting => self.start(uuid, &mut zone).await,
ZoneStatus::Exited => self.exited(&mut zone).await,
ZoneStatus::Destroying => self.destroy(uuid, &mut zone).await,
let start_state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
let result = match start_state {
ZoneState::Creating => self.create(uuid, &mut zone).await,
ZoneState::Exited => self.exited(&mut zone).await,
ZoneState::Destroying => self.destroy(uuid, &mut zone).await,
_ => Ok(ZoneReconcilerResult::Unchanged),
};
let result = match result {
Ok(result) => result,
Err(error) => {
zone.state = Some(zone.state.as_mut().cloned().unwrap_or_default());
zone.state.as_mut().unwrap().status = ZoneStatus::Failed.into();
zone.state.as_mut().unwrap().error_info = Some(ZoneErrorInfo {
zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
zone.status.as_mut().unwrap().state = ZoneState::Failed.into();
zone.status.as_mut().unwrap().error_status = Some(ZoneErrorStatus {
message: error.to_string(),
});
warn!("failed to start zone {}: {}", zone.id, error);
@ -251,8 +261,8 @@ impl ZoneReconciler {
info!("reconciled zone {}", uuid);
let status = zone.state.as_ref().map(|x| x.status()).unwrap_or_default();
let destroyed = status == ZoneStatus::Destroyed;
let state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
let destroyed = state == ZoneState::Destroyed;
let rerun = if let ZoneReconcilerResult::Changed { rerun } = result {
let event = DaemonEvent::ZoneChanged(ZoneChangedEvent {
@ -276,22 +286,24 @@ impl ZoneReconciler {
Ok(rerun)
}
async fn start(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let starter = ZoneStarter {
async fn create(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
let starter = ZoneCreator {
devices: &self.devices,
kernel_path: &self.kernel_path,
initrd_path: &self.initrd_path,
addons_path: &self.addons_path,
packer: &self.packer,
glt: &self.zlt,
ip_assignment: &self.ip_assignment,
zlt: &self.zlt,
runtime: &self.runtime,
config: &self.config,
};
starter.start(uuid, zone).await
starter.create(uuid, zone).await
}
async fn exited(&self, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
if let Some(ref mut state) = zone.state {
state.set_status(ZoneStatus::Destroying);
if let Some(ref mut status) = zone.status {
status.set_state(ZoneState::Destroying);
Ok(ZoneReconcilerResult::Changed { rerun: true })
} else {
Ok(ZoneReconcilerResult::Unchanged)
@ -303,18 +315,19 @@ impl ZoneReconciler {
trace!("failed to destroy runtime zone {}: {}", uuid, error);
}
let domid = zone.state.as_ref().map(|x| x.domid);
let domid = zone.status.as_ref().map(|x| x.domid);
if let Some(domid) = domid {
self.zlt.remove(uuid, domid).await;
}
info!("destroyed zone {}", uuid);
zone.state = Some(ZoneState {
status: ZoneStatus::Destroyed.into(),
network: None,
exit_info: None,
error_info: None,
self.ip_assignment.recall(uuid).await?;
zone.status = Some(ZoneStatus {
state: ZoneState::Destroyed.into(),
network_status: None,
exit_status: None,
error_status: None,
host: self.zlt.host_uuid().to_string(),
domid: domid.unwrap_or(u32::MAX),
});
@ -362,13 +375,13 @@ impl ZoneReconciler {
}
}
pub fn zoneinfo_to_networkstate(info: &ZoneInfo) -> ZoneNetworkState {
ZoneNetworkState {
zone_ipv4: info.zone_ipv4.map(|x| x.to_string()).unwrap_or_default(),
zone_ipv6: info.zone_ipv6.map(|x| x.to_string()).unwrap_or_default(),
zone_mac: info.zone_mac.as_ref().cloned().unwrap_or_default(),
gateway_ipv4: info.gateway_ipv4.map(|x| x.to_string()).unwrap_or_default(),
gateway_ipv6: info.gateway_ipv6.map(|x| x.to_string()).unwrap_or_default(),
gateway_mac: info.gateway_mac.as_ref().cloned().unwrap_or_default(),
pub fn ip_reservation_to_network_status(ip: &IpReservation) -> ZoneNetworkStatus {
ZoneNetworkStatus {
zone_ipv4: format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
zone_ipv6: format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
zone_mac: ip.mac.to_string().replace('-', ":"),
gateway_ipv4: format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix),
gateway_ipv6: format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix),
gateway_mac: ip.gateway_mac.to_string().replace('-', ":"),
}
}

View File

@ -11,7 +11,7 @@ import "google/protobuf/struct.proto";
message Zone {
string id = 1;
ZoneSpec spec = 2;
ZoneState state = 3;
ZoneStatus status = 3;
}
message ZoneSpec {
@ -21,7 +21,7 @@ message ZoneSpec {
ZoneImageSpec kernel = 3;
// If not specified, defaults to the daemon default initrd.
ZoneImageSpec initrd = 4;
uint32 vcpus = 5;
uint32 cpus = 5;
uint64 mem = 6;
ZoneTaskSpec task = 7;
repeated ZoneSpecAnnotation annotations = 8;
@ -67,26 +67,26 @@ message ZoneSpecDevice {
string name = 1;
}
message ZoneState {
ZoneStatus status = 1;
ZoneNetworkState network = 2;
ZoneExitInfo exit_info = 3;
ZoneErrorInfo error_info = 4;
message ZoneStatus {
ZoneState state = 1;
ZoneNetworkStatus network_status = 2;
ZoneExitStatus exit_status = 3;
ZoneErrorStatus error_status = 4;
string host = 5;
uint32 domid = 6;
}
enum ZoneStatus {
ZONE_STATUS_UNKNOWN = 0;
ZONE_STATUS_STARTING = 1;
ZONE_STATUS_STARTED = 2;
ZONE_STATUS_EXITED = 3;
ZONE_STATUS_DESTROYING = 4;
ZONE_STATUS_DESTROYED = 5;
ZONE_STATUS_FAILED = 6;
enum ZoneState {
ZONE_STATE_UNKNOWN = 0;
ZONE_STATE_CREATING = 1;
ZONE_STATE_CREATED = 2;
ZONE_STATE_EXITED = 3;
ZONE_STATE_DESTROYING = 4;
ZONE_STATE_DESTROYED = 5;
ZONE_STATE_FAILED = 6;
}
message ZoneNetworkState {
message ZoneNetworkStatus {
string zone_ipv4 = 1;
string zone_ipv6 = 2;
string zone_mac = 3;
@ -95,11 +95,11 @@ message ZoneNetworkState {
string gateway_mac = 6;
}
message ZoneExitInfo {
message ZoneExitStatus {
int32 code = 1;
}
message ZoneErrorInfo {
message ZoneErrorStatus {
string message = 1;
}

View File

@ -10,31 +10,34 @@ import "krata/idm/transport.proto";
import "krata/v1/common.proto";
service ControlService {
rpc IdentifyHost(IdentifyHostRequest) returns (IdentifyHostReply);
rpc CreateZone(CreateZoneRequest) returns (CreateZoneReply);
rpc DestroyZone(DestroyZoneRequest) returns (DestroyZoneReply);
rpc ResolveZone(ResolveZoneRequest) returns (ResolveZoneReply);
rpc ListZones(ListZonesRequest) returns (ListZonesReply);
rpc ListDevices(ListDevicesRequest) returns (ListDevicesReply);
rpc ExecZone(stream ExecZoneRequest) returns (stream ExecZoneReply);
rpc AttachZoneConsole(stream ZoneConsoleRequest) returns (stream ZoneConsoleReply);
rpc ReadZoneMetrics(ReadZoneMetricsRequest) returns (ReadZoneMetricsReply);
rpc HostStatus(HostStatusRequest) returns (HostStatusReply);
rpc SnoopIdm(SnoopIdmRequest) returns (stream SnoopIdmReply);
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
rpc GetHostCpuTopology(GetHostCpuTopologyRequest) returns (GetHostCpuTopologyReply);
rpc SetHostPowerManagementPolicy(SetHostPowerManagementPolicyRequest) returns (SetHostPowerManagementPolicyReply);
rpc ListDevices(ListDevicesRequest) returns (ListDevicesReply);
rpc PullImage(PullImageRequest) returns (stream PullImageReply);
rpc GetHostCpuTopology(HostCpuTopologyRequest) returns (HostCpuTopologyReply);
rpc SetHostPowerManagementPolicy(HostPowerManagementPolicy) returns (HostPowerManagementPolicy);
rpc CreateZone(CreateZoneRequest) returns (CreateZoneReply);
rpc DestroyZone(DestroyZoneRequest) returns (DestroyZoneReply);
rpc ResolveZoneId(ResolveZoneIdRequest) returns (ResolveZoneIdReply);
rpc GetZone(GetZoneRequest) returns (GetZoneReply);
rpc ListZones(ListZonesRequest) returns (ListZonesReply);
rpc AttachZoneConsole(stream ZoneConsoleRequest) returns (stream ZoneConsoleReply);
rpc ExecInsideZone(stream ExecInsideZoneRequest) returns (stream ExecInsideZoneReply);
rpc ReadZoneMetrics(ReadZoneMetricsRequest) returns (ReadZoneMetricsReply);
rpc WatchEvents(WatchEventsRequest) returns (stream WatchEventsReply);
}
message IdentifyHostRequest {}
message HostStatusRequest {}
message IdentifyHostReply {
message HostStatusReply {
string host_uuid = 1;
uint32 host_domid = 2;
string krata_version = 3;
@ -45,36 +48,44 @@ message CreateZoneRequest {
}
message CreateZoneReply {
string Zone_id = 1;
string zone_id = 1;
}
message DestroyZoneRequest {
string Zone_id = 1;
string zone_id = 1;
}
message DestroyZoneReply {}
message ResolveZoneRequest {
message ResolveZoneIdRequest {
string name = 1;
}
message ResolveZoneReply {
krata.v1.common.Zone Zone = 1;
message ResolveZoneIdReply {
string zone_id = 1;
}
message GetZoneRequest {
string zone_id = 1;
}
message GetZoneReply {
krata.v1.common.Zone zone = 1;
}
message ListZonesRequest {}
message ListZonesReply {
repeated krata.v1.common.Zone Zones = 1;
repeated krata.v1.common.Zone zones = 1;
}
message ExecZoneRequest {
string Zone_id = 1;
message ExecInsideZoneRequest {
string zone_id = 1;
krata.v1.common.ZoneTaskSpec task = 2;
bytes data = 3;
}
message ExecZoneReply {
message ExecInsideZoneReply {
bool exited = 1;
string error = 2;
int32 exit_code = 3;
@ -83,7 +94,7 @@ message ExecZoneReply {
}
message ZoneConsoleRequest {
string Zone_id = 1;
string zone_id = 1;
bytes data = 2;
}
@ -95,16 +106,16 @@ message WatchEventsRequest {}
message WatchEventsReply {
oneof event {
ZoneChangedEvent Zone_changed = 1;
ZoneChangedEvent zone_changed = 1;
}
}
message ZoneChangedEvent {
krata.v1.common.Zone Zone = 1;
krata.v1.common.Zone zone = 1;
}
message ReadZoneMetricsRequest {
string Zone_id = 1;
string zone_id = 1;
}
message ReadZoneMetricsReply {
@ -219,15 +230,15 @@ message HostCpuTopologyInfo {
HostCpuTopologyClass class = 5;
}
message HostCpuTopologyRequest {}
message GetHostCpuTopologyRequest {}
message HostCpuTopologyReply {
message GetHostCpuTopologyReply {
repeated HostCpuTopologyInfo cpus = 1;
}
message HostPowerManagementPolicyRequest {}
message HostPowerManagementPolicy {
message SetHostPowerManagementPolicyRequest {
string scheduler = 1;
bool smt_awareness = 2;
}
message SetHostPowerManagementPolicyReply {}

View File

@ -9,13 +9,13 @@ use std::{
};
use anyhow::{anyhow, Result};
use bytes::{BufMut, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error};
use nix::sys::termios::{cfmakeraw, tcgetattr, tcsetattr, SetArg};
use prost::Message;
use tokio::{
fs::File,
io::{unix::AsyncFd, AsyncReadExt, AsyncWriteExt},
io::{AsyncReadExt, AsyncWriteExt},
select,
sync::{
broadcast,
@ -34,7 +34,7 @@ use super::{
type OneshotRequestMap<R> = Arc<Mutex<HashMap<u64, oneshot::Sender<<R as IdmRequest>::Response>>>>;
type StreamRequestMap<R> = Arc<Mutex<HashMap<u64, Sender<<R as IdmRequest>::Response>>>>;
type StreamRequestUpdateMap<R> = Arc<Mutex<HashMap<u64, mpsc::Sender<R>>>>;
type StreamRequestUpdateMap<R> = Arc<Mutex<HashMap<u64, Sender<R>>>>;
pub type IdmInternalClient = IdmClient<internal::Request, internal::Event>;
const IDM_PACKET_QUEUE_LEN: usize = 100;
@ -43,12 +43,13 @@ const IDM_PACKET_MAX_SIZE: usize = 20 * 1024 * 1024;
#[async_trait::async_trait]
pub trait IdmBackend: Send {
async fn recv(&mut self) -> Result<IdmTransportPacket>;
async fn recv(&mut self) -> Result<Vec<IdmTransportPacket>>;
async fn send(&mut self, packet: IdmTransportPacket) -> Result<()>;
}
pub struct IdmFileBackend {
read_fd: Arc<Mutex<AsyncFd<File>>>,
read: Arc<Mutex<File>>,
read_buffer: BytesMut,
write: Arc<Mutex<File>>,
}
@ -57,7 +58,8 @@ impl IdmFileBackend {
IdmFileBackend::set_raw_port(&read_file)?;
IdmFileBackend::set_raw_port(&write_file)?;
Ok(IdmFileBackend {
read_fd: Arc::new(Mutex::new(AsyncFd::new(read_file)?)),
read: Arc::new(Mutex::new(read_file)),
read_buffer: BytesMut::new(),
write: Arc::new(Mutex::new(write_file)),
})
}
@ -72,26 +74,58 @@ impl IdmFileBackend {
#[async_trait::async_trait]
impl IdmBackend for IdmFileBackend {
async fn recv(&mut self) -> Result<IdmTransportPacket> {
let mut fd = self.read_fd.lock().await;
let mut guard = fd.readable_mut().await?;
let b1 = guard.get_inner_mut().read_u8().await?;
if b1 != 0xff {
return Ok(IdmTransportPacket::default());
}
let b2 = guard.get_inner_mut().read_u8().await?;
if b2 != 0xff {
return Ok(IdmTransportPacket::default());
}
let size = guard.get_inner_mut().read_u32_le().await?;
if size == 0 {
return Ok(IdmTransportPacket::default());
}
let mut buffer = vec![0u8; size as usize];
guard.get_inner_mut().read_exact(&mut buffer).await?;
match IdmTransportPacket::decode(buffer.as_slice()) {
Ok(packet) => Ok(packet),
Err(error) => Err(anyhow!("received invalid idm packet: {}", error)),
async fn recv(&mut self) -> Result<Vec<IdmTransportPacket>> {
let mut data = vec![0; 8192];
let mut first = true;
'read_more: loop {
let mut packets = Vec::new();
if !first {
if !packets.is_empty() {
return Ok(packets);
}
let size = self.read.lock().await.read(&mut data).await?;
self.read_buffer.extend_from_slice(&data[0..size]);
}
first = false;
loop {
if self.read_buffer.len() < 6 {
continue 'read_more;
}
let b1 = self.read_buffer[0];
let b2 = self.read_buffer[1];
if b1 != 0xff || b2 != 0xff {
self.read_buffer.clear();
continue 'read_more;
}
let size = (self.read_buffer[2] as u32
| (self.read_buffer[3] as u32) << 8
| (self.read_buffer[4] as u32) << 16
| (self.read_buffer[5] as u32) << 24) as usize;
let needed = size + 6;
if self.read_buffer.len() < needed {
continue 'read_more;
}
let mut packet = self.read_buffer.split_to(needed);
packet.advance(6);
match IdmTransportPacket::decode(packet) {
Ok(packet) => {
packets.push(packet);
}
Err(error) => {
return Err(anyhow!("received invalid idm packet: {}", error));
}
}
if self.read_buffer.is_empty() {
break;
}
}
return Ok(packets);
}
}
@ -403,8 +437,9 @@ impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
loop {
select! {
x = backend.recv() => match x {
Ok(packet) => {
if packet.channel != channel {
Ok(packets) => {
for packet in packets {
if packet.channel != channel {
continue;
}
@ -478,6 +513,7 @@ impl<R: IdmRequest, E: IdmSerializable> IdmClient<R, E> {
_ => {},
}
}
},
Err(error) => {

View File

@ -16,7 +16,7 @@ clap = { workspace = true }
env_logger = { workspace = true }
etherparse = { workspace = true }
futures = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata = { path = "../krata", version = "^0.0.16" }
krata-advmac = { workspace = true }
libc = { workspace = true }
log = { workspace = true }

View File

@ -76,44 +76,44 @@ impl AutoNetworkWatcher {
let mut networks: Vec<NetworkMetadata> = Vec::new();
for (uuid, zone) in &all_zones {
let Some(ref state) = zone.state else {
let Some(ref status) = zone.status else {
continue;
};
if state.domid == u32::MAX {
if status.domid == u32::MAX {
continue;
}
let Some(ref network) = state.network else {
let Some(ref network_status) = status.network_status else {
continue;
};
let Ok(zone_ipv4_cidr) = Ipv4Cidr::from_str(&network.zone_ipv4) else {
let Ok(zone_ipv4_cidr) = Ipv4Cidr::from_str(&network_status.zone_ipv4) else {
continue;
};
let Ok(zone_ipv6_cidr) = Ipv6Cidr::from_str(&network.zone_ipv6) else {
let Ok(zone_ipv6_cidr) = Ipv6Cidr::from_str(&network_status.zone_ipv6) else {
continue;
};
let Ok(zone_mac) = EthernetAddress::from_str(&network.zone_mac) else {
let Ok(zone_mac) = EthernetAddress::from_str(&network_status.zone_mac) else {
continue;
};
let Ok(gateway_ipv4_cidr) = Ipv4Cidr::from_str(&network.gateway_ipv4) else {
let Ok(gateway_ipv4_cidr) = Ipv4Cidr::from_str(&network_status.gateway_ipv4) else {
continue;
};
let Ok(gateway_ipv6_cidr) = Ipv6Cidr::from_str(&network.gateway_ipv6) else {
let Ok(gateway_ipv6_cidr) = Ipv6Cidr::from_str(&network_status.gateway_ipv6) else {
continue;
};
let Ok(gateway_mac) = EthernetAddress::from_str(&network.gateway_mac) else {
let Ok(gateway_mac) = EthernetAddress::from_str(&network_status.gateway_mac) else {
continue;
};
networks.push(NetworkMetadata {
domid: state.domid,
domid: status.domid,
uuid: *uuid,
zone: NetworkSide {
ipv4: zone_ipv4_cidr,
@ -187,7 +187,7 @@ impl AutoNetworkWatcher {
_ = sleep(Duration::from_secs(10)) => {
break;
}
};
}
}
Ok(())
}

View File

@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let (context, mut receiver) = OciProgressContext::create();
tokio::task::spawn(async move {
loop {
if (receiver.changed().await).is_err() {
if receiver.changed().await.is_err() {
break;
}
let progress = receiver.borrow_and_update();

View File

@ -97,13 +97,13 @@ impl OciPackerBackend for OciPackerMkSquashfs {
status = &mut wait => {
break status;
}
};
}
} else {
select! {
status = &mut wait => {
break status;
}
};
}
}
};
if let Some(writer) = writer {
@ -172,13 +172,13 @@ impl OciPackerBackend for OciPackerMkfsErofs {
status = &mut wait => {
break status;
}
};
}
} else {
select! {
status = &mut wait => {
break status;
}
};
}
}
};
if let Some(writer) = writer {

View File

@ -228,7 +228,7 @@ impl OciBoundProgress {
context.update(&progress);
let mut receiver = self.context.subscribe();
tokio::task::spawn(async move {
while (receiver.changed().await).is_ok() {
while receiver.changed().await.is_ok() {
context
.sender
.send_replace(receiver.borrow_and_update().clone());

View File

@ -12,20 +12,20 @@ resolver = "2"
anyhow = { workspace = true }
backhand = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata = { path = "../krata", version = "^0.0.16" }
krata-advmac = { workspace = true }
krata-oci = { path = "../oci", version = "^0.0.15" }
krata-oci = { path = "../oci", version = "^0.0.16" }
log = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
krata-loopdev = { path = "../loopdev", version = "^0.0.15" }
krata-xencall = { path = "../xen/xencall", version = "^0.0.15" }
krata-xenclient = { path = "../xen/xenclient", version = "^0.0.15" }
krata-xenevtchn = { path = "../xen/xenevtchn", version = "^0.0.15" }
krata-xengnt = { path = "../xen/xengnt", version = "^0.0.15" }
krata-xenplatform = { path = "../xen/xenplatform", version = "^0.0.15" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.15" }
krata-loopdev = { path = "../loopdev", version = "^0.0.16" }
krata-xencall = { path = "../xen/xencall", version = "^0.0.16" }
krata-xenclient = { path = "../xen/xenclient", version = "^0.0.16" }
krata-xenevtchn = { path = "../xen/xenevtchn", version = "^0.0.16" }
krata-xengnt = { path = "../xen/xengnt", version = "^0.0.16" }
krata-xenplatform = { path = "../xen/xenplatform", version = "^0.0.16" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.16" }
walkdir = { workspace = true }
indexmap = { workspace = true }

View File

@ -8,14 +8,11 @@ use anyhow::{anyhow, Result};
use log::{debug, error};
use tokio::{
select,
sync::{
broadcast,
mpsc::{channel, Receiver, Sender},
},
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle,
time::sleep,
};
use xenevtchn::EventChannel;
use xenevtchn::EventChannelService;
use xengnt::{sys::GrantRef, GrantTab, MappedMemory};
use xenstore::{XsdClient, XsdInterface};
@ -43,7 +40,7 @@ pub struct ChannelService {
typ: String,
use_reserved_ref: Option<u64>,
backends: HashMap<u32, ChannelBackend>,
evtchn: EventChannel,
evtchn: EventChannelService,
store: XsdClient,
gnttab: GrantTab,
input_receiver: Receiver<(u32, Vec<u8>)>,
@ -62,14 +59,22 @@ impl ChannelService {
)> {
let (input_sender, input_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
let (output_sender, output_receiver) = channel(GROUPED_CHANNEL_QUEUE_LEN);
debug!("opening xenevtchn");
let evtchn = EventChannelService::open().await?;
debug!("opening xenstore");
let store = XsdClient::open().await?;
debug!("opening xengnt");
let gnttab = GrantTab::open()?;
Ok((
ChannelService {
typ,
use_reserved_ref,
backends: HashMap::new(),
evtchn: EventChannel::open().await?,
store: XsdClient::open().await?,
gnttab: GrantTab::open()?,
evtchn,
store,
gnttab,
input_sender: input_sender.clone(),
input_receiver,
output_sender,
@ -226,7 +231,7 @@ impl ChannelBackend {
domid: u32,
id: u32,
store: XsdClient,
evtchn: EventChannel,
evtchn: EventChannelService,
gnttab: GrantTab,
output_sender: Sender<(u32, Option<Vec<u8>>)>,
use_reserved_ref: Option<u64>,
@ -265,7 +270,7 @@ pub struct KrataChannelBackendProcessor {
id: u32,
domid: u32,
store: XsdClient,
evtchn: EventChannel,
evtchn: EventChannelService,
gnttab: GrantTab,
}
@ -484,28 +489,21 @@ impl KrataChannelBackendProcessor {
},
x = channel.receiver.recv() => match x {
Ok(_) => {
Some(_) => {
unsafe {
let buffer = self.read_output_buffer(channel.local_port, &memory).await?;
if !buffer.is_empty() {
sender.send((self.domid, Some(buffer))).await?;
}
};
channel.unmask_sender.send(channel.local_port).await?;
channel.unmask().await?;
},
Err(error) => {
match error {
broadcast::error::RecvError::Closed => {
break;
},
error => {
return Err(anyhow!("failed to receive event notification: {}", error));
}
}
None => {
break;
}
}
};
}
}
Ok(())
}

View File

@ -7,7 +7,7 @@ use std::{
use anyhow::{anyhow, Result};
use ipnetwork::{Ipv4Network, Ipv6Network};
use log::error;
use log::{debug, error};
use tokio::sync::RwLock;
use uuid::Uuid;
use xenstore::{XsdClient, XsdInterface};
@ -72,7 +72,9 @@ impl IpVendor {
ipv4_network: Ipv4Network,
ipv6_network: Ipv6Network,
) -> Result<Self> {
debug!("fetching state from xenstore");
let mut state = IpVendor::fetch_stored_state(&store).await?;
debug!("allocating IP set");
let (gateway_ipv4, gateway_ipv6) =
IpVendor::allocate_ipset(&mut state, host_uuid, ipv4_network, ipv6_network)?;
let vend = IpVendor {
@ -84,11 +86,14 @@ impl IpVendor {
gateway_ipv6,
state: Arc::new(RwLock::new(state)),
};
debug!("IP vendor initialized!");
Ok(vend)
}
async fn fetch_stored_state(store: &XsdClient) -> Result<IpVendorState> {
debug!("initializing default IP vendor state");
let mut state = IpVendorState::default();
debug!("iterating over xen domains");
for domid_candidate in store.list("/local/domain").await? {
let dom_path = format!("/local/domain/{}", domid_candidate);
let Some(uuid) = store
@ -119,6 +124,7 @@ impl IpVendor {
}
}
}
debug!("IP state hydrated");
Ok(state)
}

View File

@ -1,19 +1,21 @@
use std::collections::HashMap;
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use advmac::MacAddr6;
use anyhow::{anyhow, Result};
use ipnetwork::IpNetwork;
use tokio::sync::Semaphore;
use uuid::Uuid;
use krata::launchcfg::{
LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver,
LaunchPackedFormat, LaunchRoot,
};
use krataoci::packer::OciPackedImage;
use tokio::sync::Semaphore;
use uuid::Uuid;
pub use xenclient::{
pci::PciBdf, DomainPciDevice as PciDevice, DomainPciRdmReservePolicy as PciRdmReservePolicy,
};
use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface};
use xenplatform::domain::BaseDomainConfig;
@ -22,10 +24,6 @@ use crate::RuntimeContext;
use super::{ZoneInfo, ZoneState};
pub use xenclient::{
pci::PciBdf, DomainPciDevice as PciDevice, DomainPciRdmReservePolicy as PciRdmReservePolicy,
};
pub struct ZoneLaunchRequest {
pub format: LaunchPackedFormat,
pub kernel: Vec<u8>,
@ -40,6 +38,18 @@ pub struct ZoneLaunchRequest {
pub debug: bool,
pub image: OciPackedImage,
pub addons_image: Option<PathBuf>,
pub network: ZoneLaunchNetwork,
}
pub struct ZoneLaunchNetwork {
pub ipv4: String,
pub ipv4_prefix: u8,
pub ipv6: String,
pub ipv6_prefix: u8,
pub gateway_ipv4: String,
pub gateway_ipv6: String,
pub zone_mac: MacAddr6,
pub nameservers: Vec<String>,
}
pub struct ZoneLauncher {
@ -58,15 +68,7 @@ impl ZoneLauncher {
) -> Result<ZoneInfo> {
let uuid = request.uuid.unwrap_or_else(Uuid::new_v4);
let xen_name = format!("krata-{uuid}");
let mut gateway_mac = MacAddr6::random();
gateway_mac.set_local(true);
gateway_mac.set_multicast(false);
let mut zone_mac = MacAddr6::random();
zone_mac.set_local(true);
zone_mac.set_multicast(false);
let _launch_permit = self.launch_semaphore.acquire().await?;
let mut ip = context.ipvendor.assign(uuid).await?;
let launch_config = LaunchInfo {
root: LaunchRoot {
format: request.format.clone(),
@ -81,20 +83,15 @@ impl ZoneLauncher {
network: Some(LaunchNetwork {
link: "eth0".to_string(),
ipv4: LaunchNetworkIpv4 {
address: format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
gateway: ip.gateway_ipv4.to_string(),
address: format!("{}/{}", request.network.ipv4, request.network.ipv4_prefix),
gateway: request.network.gateway_ipv4,
},
ipv6: LaunchNetworkIpv6 {
address: format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
gateway: ip.gateway_ipv6.to_string(),
address: format!("{}/{}", request.network.ipv6, request.network.ipv6_prefix),
gateway: request.network.gateway_ipv6.to_string(),
},
resolver: LaunchNetworkResolver {
nameservers: vec![
"1.1.1.1".to_string(),
"1.0.0.1".to_string(),
"2606:4700:4700::1111".to_string(),
"2606:4700:4700::1001".to_string(),
],
nameservers: request.network.nameservers,
},
}),
env: request.env,
@ -145,8 +142,7 @@ impl ZoneLauncher {
}
let cmdline = cmdline_options.join(" ");
let zone_mac_string = zone_mac.to_string().replace('-', ":");
let gateway_mac_string = gateway_mac.to_string().replace('-', ":");
let zone_mac_string = request.network.zone_mac.to_string().replace('-', ":");
let mut disks = vec![
DomainDisk {
@ -190,30 +186,6 @@ impl ZoneLauncher {
let mut extra_keys = vec![
("krata/uuid".to_string(), uuid.to_string()),
("krata/loops".to_string(), loops.join(",")),
(
"krata/network/zone/ipv4".to_string(),
format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
),
(
"krata/network/zone/ipv6".to_string(),
format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
),
(
"krata/network/zone/mac".to_string(),
zone_mac_string.clone(),
),
(
"krata/network/gateway/ipv4".to_string(),
format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix),
),
(
"krata/network/gateway/ipv6".to_string(),
format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix),
),
(
"krata/network/gateway/mac".to_string(),
gateway_mac_string.clone(),
),
];
if let Some(name) = request.name.as_ref() {
@ -251,29 +223,14 @@ impl ZoneLauncher {
extra_rw_paths: vec!["krata/zone".to_string()],
};
match context.xen.create(&config).await {
Ok(created) => {
ip.commit().await?;
Ok(ZoneInfo {
name: request.name.as_ref().map(|x| x.to_string()),
uuid,
domid: created.domid,
image: request.image.digest,
loops: vec![],
zone_ipv4: Some(IpNetwork::new(IpAddr::V4(ip.ipv4), ip.ipv4_prefix)?),
zone_ipv6: Some(IpNetwork::new(IpAddr::V6(ip.ipv6), ip.ipv6_prefix)?),
zone_mac: Some(zone_mac_string.clone()),
gateway_ipv4: Some(IpNetwork::new(
IpAddr::V4(ip.gateway_ipv4),
ip.ipv4_prefix,
)?),
gateway_ipv6: Some(IpNetwork::new(
IpAddr::V6(ip.gateway_ipv6),
ip.ipv6_prefix,
)?),
gateway_mac: Some(gateway_mac_string.clone()),
state: ZoneState { exit_code: None },
})
}
Ok(created) => Ok(ZoneInfo {
name: request.name.as_ref().map(|x| x.to_string()),
uuid,
domid: created.domid,
image: request.image.digest,
loops: vec![],
state: ZoneState { exit_code: None },
}),
Err(error) => {
let _ = context.autoloop.unloop(&image_squashfs_loop.path).await;
let _ = context.autoloop.unloop(&cfgblk_squashfs_loop.path).await;

View File

@ -1,12 +1,10 @@
use std::{fs, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc};
use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
use ip::IpVendor;
use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network};
use krataloopdev::LoopControl;
use log::error;
use tokio::sync::Semaphore;
use uuid::Uuid;
use xenclient::XenClient;
use xenstore::{XsdClient, XsdInterface};
@ -19,7 +17,6 @@ use self::{
pub mod autoloop;
pub mod cfgblk;
pub mod channel;
pub mod ip;
pub mod launch;
pub mod power;
@ -48,12 +45,6 @@ pub struct ZoneInfo {
pub domid: u32,
pub image: String,
pub loops: Vec<ZoneLoopInfo>,
pub zone_ipv4: Option<IpNetwork>,
pub zone_ipv6: Option<IpNetwork>,
pub zone_mac: Option<String>,
pub gateway_ipv4: Option<IpNetwork>,
pub gateway_ipv6: Option<IpNetwork>,
pub gateway_mac: Option<String>,
pub state: ZoneState,
}
@ -61,20 +52,14 @@ pub struct ZoneInfo {
pub struct RuntimeContext {
pub autoloop: AutoLoop,
pub xen: XenClient<RuntimePlatform>,
pub ipvendor: IpVendor,
}
impl RuntimeContext {
pub async fn new(host_uuid: Uuid) -> Result<Self> {
pub async fn new() -> Result<Self> {
let xen = XenClient::new(0, RuntimePlatform::new()).await?;
let ipv4_network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?;
let ipv6_network = Ipv6Network::from_str("fdd4:1476:6c7e::/48")?;
let ipvend =
IpVendor::new(xen.store.clone(), host_uuid, ipv4_network, ipv6_network).await?;
Ok(RuntimeContext {
autoloop: AutoLoop::new(LoopControl::open()?),
xen,
ipvendor: ipvend,
})
}
@ -115,61 +100,6 @@ impl RuntimeContext {
.store
.read_string(&format!("{}/krata/loops", &dom_path))
.await?;
let zone_ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/ipv4", &dom_path))
.await?;
let zone_ipv6 = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/ipv6", &dom_path))
.await?;
let zone_mac = self
.xen
.store
.read_string(&format!("{}/krata/network/zone/mac", &dom_path))
.await?;
let gateway_ipv4 = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/ipv4", &dom_path))
.await?;
let gateway_ipv6 = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/ipv6", &dom_path))
.await?;
let gateway_mac = self
.xen
.store
.read_string(&format!("{}/krata/network/gateway/mac", &dom_path))
.await?;
let zone_ipv4 = if let Some(zone_ipv4) = zone_ipv4 {
IpNetwork::from_str(&zone_ipv4).ok()
} else {
None
};
let zone_ipv6 = if let Some(zone_ipv6) = zone_ipv6 {
IpNetwork::from_str(&zone_ipv6).ok()
} else {
None
};
let gateway_ipv4 = if let Some(gateway_ipv4) = gateway_ipv4 {
IpNetwork::from_str(&gateway_ipv4).ok()
} else {
None
};
let gateway_ipv6 = if let Some(gateway_ipv6) = gateway_ipv6 {
IpNetwork::from_str(&gateway_ipv6).ok()
} else {
None
};
let exit_code = self
.xen
.store
@ -190,12 +120,6 @@ impl RuntimeContext {
domid,
image,
loops,
zone_ipv4,
zone_ipv6,
zone_mac,
gateway_ipv4,
gateway_ipv6,
gateway_mac,
state,
});
}
@ -237,16 +161,14 @@ impl RuntimeContext {
#[derive(Clone)]
pub struct Runtime {
host_uuid: Uuid,
context: RuntimeContext,
launch_semaphore: Arc<Semaphore>,
}
impl Runtime {
pub async fn new(host_uuid: Uuid) -> Result<Self> {
let context = RuntimeContext::new(host_uuid).await?;
pub async fn new() -> Result<Self> {
let context = RuntimeContext::new().await?;
Ok(Self {
host_uuid,
context,
launch_semaphore: Arc::new(Semaphore::new(10)),
})
@ -282,11 +204,6 @@ impl Runtime {
return Err(anyhow!("unable to find krata uuid based on the domain",));
}
let uuid = Uuid::parse_str(&uuid)?;
let ip = self
.context
.ipvendor
.read_domain_assignment(uuid, domid)
.await?;
let loops = store
.read_string(format!("{}/krata/loops", dom_path).as_str())
.await?;
@ -306,16 +223,6 @@ impl Runtime {
}
}
}
if let Some(ip) = ip {
if let Err(error) = self.context.ipvendor.recall(&ip).await {
error!(
"failed to recall ip assignment for zone {}: {}",
uuid, error
);
}
}
Ok(uuid)
}
@ -324,11 +231,11 @@ impl Runtime {
}
pub async fn dupe(&self) -> Result<Runtime> {
Runtime::new(self.host_uuid).await
Runtime::new().await
}
pub async fn power_management_context(&self) -> Result<PowerManagementContext> {
let context = RuntimeContext::new(self.host_uuid).await?;
let context = RuntimeContext::new().await?;
Ok(PowerManagementContext { context })
}
}

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use indexmap::IndexMap;
use log::info;
use xencall::sys::{CpuId, SysctlCputopo};
use crate::RuntimeContext;
@ -151,7 +152,10 @@ impl PowerManagementContext {
.xen
.call
.set_turbo_mode(CpuId::All, enable)
.await?;
.await
.unwrap_or_else(|error| {
info!("non-fatal error while setting SMT policy: {:?}", error);
});
Ok(())
}
@ -161,7 +165,13 @@ impl PowerManagementContext {
.xen
.call
.set_cpufreq_gov(CpuId::All, policy)
.await?;
.await
.unwrap_or_else(|error| {
info!(
"non-fatal error while setting scheduler policy: {:?}",
error
);
});
Ok(())
}
}

View File

@ -31,7 +31,7 @@ use sys::{
XEN_DOMCTL_MAX_INTERFACE_VERSION, XEN_DOMCTL_MIN_INTERFACE_VERSION, XEN_MEM_SET_MEMORY_MAP,
XEN_SYSCTL_CPUTOPOINFO, XEN_SYSCTL_MAX_INTERFACE_VERSION, XEN_SYSCTL_MIN_INTERFACE_VERSION,
XEN_SYSCTL_PHYSINFO, XEN_SYSCTL_PM_OP, XEN_SYSCTL_PM_OP_DISABLE_TURBO,
XEN_SYSCTL_PM_OP_ENABLE_TURBO,
XEN_SYSCTL_PM_OP_ENABLE_TURBO, XEN_SYSCTL_PM_OP_SET_CPUFREQ_GOV,
};
use tokio::sync::Semaphore;
use tokio::time::sleep;
@ -1038,7 +1038,7 @@ impl XenCall {
interface_version: self.sysctl_interface_version,
value: SysctlValue {
pm_op: SysctlPmOp {
cmd: XEN_SYSCTL_PM_OP_ENABLE_TURBO,
cmd: XEN_SYSCTL_PM_OP_SET_CPUFREQ_GOV,
cpuid,
value: SysctlPmOpValue {
set_gov: SysctlSetCpuFreqGov { scaling_governor },

View File

@ -771,6 +771,7 @@ pub const XEN_SYSCTL_CPUTOPOINFO: u32 = 16;
pub const XEN_SYSCTL_MIN_INTERFACE_VERSION: u32 = 0x00000015;
pub const XEN_SYSCTL_MAX_INTERFACE_VERSION: u32 = 0x00000020;
pub const XEN_SYSCTL_PM_OP_SET_CPUFREQ_GOV: u32 = 0x12;
pub const XEN_SYSCTL_PM_OP_SET_SCHED_OPT_STMT: u32 = 0x21;
pub const XEN_SYSCTL_PM_OP_ENABLE_TURBO: u32 = 0x26;
pub const XEN_SYSCTL_PM_OP_DISABLE_TURBO: u32 = 0x27;

View File

@ -13,9 +13,9 @@ async-trait = { workspace = true }
indexmap = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
krata-xencall = { path = "../xencall", version = "^0.0.15" }
krata-xenplatform = { path = "../xenplatform", version = "^0.0.15" }
krata-xenstore = { path = "../xenstore", version = "^0.0.15" }
krata-xencall = { path = "../xencall", version = "^0.0.16" }
krata-xenplatform = { path = "../xenplatform", version = "^0.0.16" }
krata-xenstore = { path = "../xenstore", version = "^0.0.16" }
regex = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

View File

@ -130,8 +130,7 @@ impl<P: BootSetupPlatform> XenClient<P> {
match self.init(created.domid, config, &created).await {
Ok(_) => Ok(created),
Err(err) => {
// ignore since destroying a domain is best
// effort when an error occurs
// ignore since destroying a domain is best-effort when an error occurs
let _ = self.domain_manager.destroy(created.domid).await;
Err(err)
}

View File

@ -9,6 +9,7 @@ edition = "2021"
resolver = "2"
[dependencies]
byteorder = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }

View File

@ -1,9 +1,9 @@
use xenevtchn::error::Result;
use xenevtchn::EventChannel;
use xenevtchn::EventChannelService;
#[tokio::main]
async fn main() -> Result<()> {
let channel = EventChannel::open().await?;
let channel = EventChannelService::open().await?;
println!("channel opened");
let port = channel.bind_unbound_port(0).await?;
println!("port: {}", port);

View File

@ -8,6 +8,10 @@ pub enum Error {
Io(#[from] io::Error),
#[error("failed to send event channel wake: {0}")]
WakeSend(tokio::sync::broadcast::error::SendError<u32>),
#[error("failed to acquire lock")]
LockAcquireFailed,
#[error("event port already in use")]
PortInUse,
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@ -1,82 +1,77 @@
pub mod error;
pub mod raw;
pub mod sys;
use crate::error::{Error, Result};
use crate::sys::{BindInterdomain, BindUnboundPort, BindVirq, Notify, UnbindPort};
use crate::raw::EVENT_CHANNEL_DEVICE;
use byteorder::{LittleEndian, ReadBytesExt};
use log::error;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::size_of;
use std::os::fd::AsRawFd;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncReadExt;
use tokio::select;
use tokio::sync::broadcast::{
channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadastSender,
};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::sync::{Mutex, RwLock};
const UNBIND_CHANNEL_QUEUE_LEN: usize = 30;
const UNMASK_CHANNEL_QUEUE_LEN: usize = 30;
const BROADCAST_CHANNEL_QUEUE_LEN: usize = 30;
const CHANNEL_QUEUE_LEN: usize = 30;
type WakeMap = Arc<Mutex<HashMap<u32, BroadastSender<u32>>>>;
type WakeMap = Arc<RwLock<HashMap<u32, Sender<u32>>>>;
#[derive(Clone)]
pub struct EventChannel {
pub struct EventChannelService {
handle: Arc<Mutex<File>>,
wakes: WakeMap,
unbind_sender: Sender<u32>,
unmask_sender: Sender<u32>,
task: Arc<JoinHandle<()>>,
process_flag: Arc<AtomicBool>,
}
pub struct BoundEventChannel {
pub local_port: u32,
pub receiver: BroadcastReceiver<u32>,
unbind_sender: Sender<u32>,
pub unmask_sender: Sender<u32>,
pub receiver: Receiver<u32>,
pub service: EventChannelService,
}
impl BoundEventChannel {
pub async fn unmask(&self) -> Result<()> {
self.service.unmask(self.local_port).await
}
}
impl Drop for BoundEventChannel {
fn drop(&mut self) {
let _ = self.unbind_sender.try_send(self.local_port);
let service = self.service.clone();
let port = self.local_port;
tokio::task::spawn(async move {
let _ = service.unbind(port).await;
});
}
}
impl EventChannel {
pub async fn open() -> Result<EventChannel> {
let file = OpenOptions::new()
impl EventChannelService {
pub async fn open() -> Result<EventChannelService> {
let handle = OpenOptions::new()
.read(true)
.write(true)
.open("/dev/xen/evtchn")
.open(EVENT_CHANNEL_DEVICE)
.await?;
let wakes = Arc::new(Mutex::new(HashMap::new()));
let (unbind_sender, unbind_receiver) = channel(UNBIND_CHANNEL_QUEUE_LEN);
let (unmask_sender, unmask_receiver) = channel(UNMASK_CHANNEL_QUEUE_LEN);
let task = {
let file = file.try_clone().await?;
let wakes = wakes.clone();
tokio::task::spawn(async move {
if let Err(error) =
EventChannel::process(file, wakes, unmask_receiver, unbind_receiver).await
{
error!("event channel processor failed: {}", error);
}
})
let wakes = Arc::new(RwLock::new(HashMap::new()));
let flag = Arc::new(AtomicBool::new(false));
let processor = EventChannelProcessor {
flag: flag.clone(),
handle: handle.try_clone().await?.into_std().await,
wakes: wakes.clone(),
};
Ok(EventChannel {
handle: Arc::new(Mutex::new(file)),
processor.launch()?;
Ok(EventChannelService {
handle: Arc::new(Mutex::new(handle)),
wakes,
unbind_sender,
unmask_sender,
task: Arc::new(task),
process_flag: flag,
})
}
@ -109,11 +104,29 @@ impl EventChannel {
}
}
pub async fn unmask(&self, port: u32) -> Result<()> {
let handle = self.handle.lock().await;
let mut port = port;
let result = unsafe {
libc::write(
handle.as_raw_fd(),
&mut port as *mut u32 as *mut c_void,
size_of::<u32>(),
)
};
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
Ok(())
}
pub async fn unbind(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().await;
unsafe {
let mut request = UnbindPort { port };
Ok(sys::unbind(handle.as_raw_fd(), &mut request)? as u32)
let result = sys::unbind(handle.as_raw_fd(), &mut request)? as u32;
self.wakes.write().await.remove(&port);
Ok(result)
}
}
@ -132,95 +145,66 @@ impl EventChannel {
pub async fn bind(&self, domid: u32, port: u32) -> Result<BoundEventChannel> {
let local_port = self.bind_interdomain(domid, port).await?;
let (receiver, unmask_sender) = self.subscribe(local_port).await?;
let receiver = self.subscribe(local_port).await?;
let bound = BoundEventChannel {
local_port,
receiver,
unbind_sender: self.unbind_sender.clone(),
unmask_sender,
service: self.clone(),
};
Ok(bound)
}
pub async fn subscribe(&self, port: u32) -> Result<(BroadcastReceiver<u32>, Sender<u32>)> {
let mut wakes = self.wakes.lock().await;
pub async fn subscribe(&self, port: u32) -> Result<Receiver<u32>> {
let mut wakes = self.wakes.write().await;
let receiver = match wakes.entry(port) {
Entry::Occupied(entry) => entry.get().subscribe(),
Entry::Occupied(_) => {
return Err(Error::PortInUse);
}
Entry::Vacant(entry) => {
let (sender, receiver) = broadcast_channel::<u32>(BROADCAST_CHANNEL_QUEUE_LEN);
let (sender, receiver) = channel::<u32>(CHANNEL_QUEUE_LEN);
entry.insert(sender);
receiver
}
};
Ok((receiver, self.unmask_sender.clone()))
Ok(receiver)
}
}
async fn process(
mut file: File,
wakers: WakeMap,
mut unmask_receiver: Receiver<u32>,
mut unbind_receiver: Receiver<u32>,
) -> Result<()> {
loop {
select! {
result = file.read_u32_le() => {
match result {
Ok(port) => {
if let Some(sender) = wakers.lock().await.get(&port) {
if let Err(error) = sender.send(port) {
return Err(Error::WakeSend(error));
}
}
}
pub struct EventChannelProcessor {
flag: Arc<AtomicBool>,
handle: std::fs::File,
wakes: WakeMap,
}
Err(error) => return Err(Error::Io(error))
}
impl EventChannelProcessor {
pub fn launch(mut self) -> Result<()> {
std::thread::spawn(move || {
while let Err(error) = self.process() {
if self.flag.load(Ordering::Acquire) {
break;
}
result = unmask_receiver.recv() => {
match result {
Some(port) => {
unsafe {
let mut port = port;
let result = libc::write(file.as_raw_fd(), &mut port as *mut u32 as *mut c_void, size_of::<u32>());
if result != size_of::<u32>() as isize {
return Err(Error::Io(std::io::Error::from_raw_os_error(result as i32)));
}
}
}
None => {
break;
}
}
}
result = unbind_receiver.recv() => {
match result {
Some(port) => {
unsafe {
let mut request = UnbindPort { port };
sys::unbind(file.as_raw_fd(), &mut request)?;
}
}
None => {
break;
}
}
}
};
}
error!("failed to process event channel wakes: {}", error);
}
});
Ok(())
}
}
impl Drop for EventChannel {
fn drop(&mut self) {
if Arc::strong_count(&self.task) <= 1 {
self.task.abort();
pub fn process(&mut self) -> Result<()> {
loop {
let port = self.handle.read_u32::<LittleEndian>()?;
if let Some(wake) = self.wakes.blocking_read().get(&port) {
let _ = wake.try_send(port);
}
}
}
}
impl Drop for EventChannelService {
fn drop(&mut self) {
if Arc::strong_count(&self.handle) <= 1 {
self.process_flag.store(true, Ordering::Release);
}
}
}

View File

@ -0,0 +1,84 @@
use std::fs::{File, OpenOptions};
use std::os::fd::AsRawFd;
use std::sync::{Arc, Mutex};
use byteorder::{LittleEndian, ReadBytesExt};
use crate::error::{Error, Result};
use crate::sys;
pub const EVENT_CHANNEL_DEVICE: &str = "/dev/xen/evtchn";
#[derive(Clone)]
pub struct RawEventChannelService {
handle: Arc<Mutex<File>>,
}
impl RawEventChannelService {
pub fn open() -> Result<RawEventChannelService> {
let handle = OpenOptions::new()
.read(true)
.write(true)
.open(EVENT_CHANNEL_DEVICE)?;
let handle = Arc::new(Mutex::new(handle));
Ok(RawEventChannelService { handle })
}
pub fn from_handle(handle: File) -> Result<RawEventChannelService> {
Ok(RawEventChannelService {
handle: Arc::new(Mutex::new(handle)),
})
}
pub fn bind_virq(&self, virq: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindVirq { virq };
Ok(unsafe { sys::bind_virq(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn bind_interdomain(&self, domid: u32, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindInterdomain {
remote_domain: domid,
remote_port: port,
};
Ok(unsafe { sys::bind_interdomain(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn bind_unbound_port(&self, domid: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::BindUnboundPort {
remote_domain: domid,
};
Ok(unsafe { sys::bind_unbound_port(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn unbind(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::UnbindPort { port };
Ok(unsafe { sys::unbind(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn notify(&self, port: u32) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
let mut request = sys::Notify { port };
Ok(unsafe { sys::notify(handle.as_raw_fd(), &mut request)? as u32 })
}
pub fn reset(&self) -> Result<u32> {
let handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
Ok(unsafe { sys::reset(handle.as_raw_fd())? as u32 })
}
pub fn pending(&self) -> Result<u32> {
let mut handle = self.handle.lock().map_err(|_| Error::LockAcquireFailed)?;
Ok(handle.read_u32::<LittleEndian>()?)
}
pub fn into_handle(self) -> Result<File> {
Arc::into_inner(self.handle)
.ok_or(Error::LockAcquireFailed)?
.into_inner()
.map_err(|_| Error::LockAcquireFailed)
}
}

View File

@ -16,7 +16,7 @@ flate2 = { workspace = true }
indexmap = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
krata-xencall = { path = "../xencall", version = "^0.0.15" }
krata-xencall = { path = "../xencall", version = "^0.0.16" }
memchr = { workspace = true }
nix = { workspace = true }
regex = { workspace = true }

View File

@ -116,9 +116,11 @@ impl XsdSocket {
let rx_task = std::thread::Builder::new()
.name("xenstore-reader".to_string())
.spawn(move || {
if let Err(error) = XsdSocketProcessor::process_rx(read, rx_sender) {
let mut read = read;
if let Err(error) = XsdSocketProcessor::process_rx(&mut read, rx_sender) {
debug!("failed to process xen store bus: {}", error);
}
std::mem::forget(read);
})?;
Ok(XsdSocket {
@ -197,12 +199,11 @@ struct XsdSocketProcessor {
}
impl XsdSocketProcessor {
fn process_rx(mut read: std::fs::File, rx_sender: Sender<XsdMessage>) -> Result<()> {
fn process_rx(read: &mut std::fs::File, rx_sender: Sender<XsdMessage>) -> Result<()> {
let mut header_buffer: Vec<u8> = vec![0u8; XsdMessageHeader::SIZE];
let mut buffer: Vec<u8> = vec![0u8; XEN_BUS_MAX_PACKET_SIZE - XsdMessageHeader::SIZE];
loop {
let message =
XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, &mut read)?;
let message = XsdSocketProcessor::read_message(&mut header_buffer, &mut buffer, read)?;
rx_sender.blocking_send(message)?;
}
}
@ -297,7 +298,7 @@ impl XsdSocketProcessor {
break;
}
}
};
}
}
Ok(())
}

View File

@ -14,8 +14,8 @@ cgroups-rs = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
ipnetwork = { workspace = true }
krata = { path = "../krata", version = "^0.0.15" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.15" }
krata = { path = "../krata", version = "^0.0.16" }
krata-xenstore = { path = "../xen/xenstore", version = "^0.0.16" }
libc = { workspace = true }
log = { workspace = true }
nix = { workspace = true, features = ["ioctl", "process", "fs"] }

View File

@ -95,7 +95,7 @@ impl ZoneBackground {
break;
}
}
};
}
}
Ok(())
}

View File

@ -1,4 +1,4 @@
FROM rust:1.80-alpine@sha256:596c7fa13f7458097b8c88ad83f33420da0341e2f5b544e34d9aa18a22fe11d0 AS build
FROM rust:1.80-alpine@sha256:1f5aff501e02c1384ec61bb47f89e3eebf60e287e6ed5d1c598077afc82e83d5 AS build
RUN apk update && apk add protoc protobuf-dev build-base && rm -rf /var/cache/apk/*
ENV TARGET_LIBC=musl TARGET_VENDOR=unknown

View File

@ -1,4 +1,4 @@
FROM rust:1.80-alpine@sha256:596c7fa13f7458097b8c88ad83f33420da0341e2f5b544e34d9aa18a22fe11d0 AS build
FROM rust:1.80-alpine@sha256:1f5aff501e02c1384ec61bb47f89e3eebf60e287e6ed5d1c598077afc82e83d5 AS build
RUN apk update && apk add protoc protobuf-dev build-base && rm -rf /var/cache/apk/*
ENV TARGET_LIBC=musl TARGET_VENDOR=unknown

View File

@ -1,4 +1,4 @@
FROM rust:1.80-alpine@sha256:596c7fa13f7458097b8c88ad83f33420da0341e2f5b544e34d9aa18a22fe11d0 AS build
FROM rust:1.80-alpine@sha256:1f5aff501e02c1384ec61bb47f89e3eebf60e287e6ed5d1c598077afc82e83d5 AS build
RUN apk update && apk add protoc protobuf-dev build-base && rm -rf /var/cache/apk/*
ENV TARGET_LIBC=musl TARGET_VENDOR=unknown

View File

@ -1,4 +1,4 @@
FROM rust:1.80-alpine@sha256:596c7fa13f7458097b8c88ad83f33420da0341e2f5b544e34d9aa18a22fe11d0 AS build
FROM rust:1.80-alpine@sha256:1f5aff501e02c1384ec61bb47f89e3eebf60e287e6ed5d1c598077afc82e83d5 AS build
RUN apk update && apk add protoc protobuf-dev build-base && rm -rf /var/cache/apk/*
ENV TARGET_LIBC=musl TARGET_VENDOR=unknown