first async refactor of fetch/index

This commit is contained in:
2024-11-17 02:31:44 +11:00
parent badc6dd39e
commit 8c7f0f23d5
13 changed files with 112 additions and 32 deletions

View File

@ -5,12 +5,11 @@
import Foundation import Foundation
import ArgumentParser import ArgumentParser
import darwin_apk
struct RepositoriesConfig { public struct ApkRepositoriesConfig {
let repositories: [ApkIndexRepository] public let repositories: [ApkIndexRepository]
init() async throws(ExitCode) { public init() async throws(ExitCode) {
do { do {
self.repositories = try await Self.readConfig(name: "repositories").flatMap { repo in self.repositories = try await Self.readConfig(name: "repositories").flatMap { repo in
Self.readConfig(name: "arch").map { arch in Self.readConfig(name: "arch").map { arch in
@ -23,12 +22,6 @@ struct RepositoriesConfig {
} }
} }
var localRepositories: [URL] {
self.repositories.map { repo in
URL(filePath: repo.localName, directoryHint: .notDirectory)
}
}
private static func readConfig(name: String) private static func readConfig(name: String)
-> AsyncFilterSequence<AsyncMapSequence<AsyncLineSequence<URL.AsyncBytes>, String>> { -> AsyncFilterSequence<AsyncMapSequence<AsyncLineSequence<URL.AsyncBytes>, String>> {
return URL(filePath: name, directoryHint: .notDirectory).lines return URL(filePath: name, directoryHint: .notDirectory).lines
@ -36,3 +29,9 @@ struct RepositoriesConfig {
.filter { !$0.isEmpty && $0.first != "#" } // Ignore empty & commented lines .filter { !$0.isEmpty && $0.first != "#" } // Ignore empty & commented lines
} }
} }
public extension ApkIndex {
@inlinable static func resolve(_ config: ApkRepositoriesConfig, fetch: ApkIndexFetchMode) async throws -> Self {
try await Self.resolve(config.repositories, fetch: fetch)
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
public struct ApkIndex { public struct ApkIndex: Sendable {
public let packages: [ApkIndexPackage] public let packages: [ApkIndexPackage]
} }

View File

@ -5,7 +5,7 @@
import Foundation import Foundation
public struct ApkIndexDependency: Hashable { public struct ApkIndexDependency: Hashable, Sendable {
let requirement: ApkRequirement let requirement: ApkRequirement
init(requirement: ApkRequirement) { init(requirement: ApkRequirement) {

View File

@ -6,7 +6,7 @@
import Foundation import Foundation
import CryptoKit import CryptoKit
public struct ApkIndexDigest { public struct ApkIndexDigest: Sendable {
public let type: DigestType public let type: DigestType
public let data: Data public let data: Data
@ -89,7 +89,7 @@ extension ApkIndexDigest: Equatable, Hashable {
} }
public extension ApkIndexDigest { public extension ApkIndexDigest {
enum DigestType { enum DigestType: Sendable {
case md5, sha1, sha256 case md5, sha1, sha256
} }
} }

View File

@ -5,8 +5,9 @@
import Foundation import Foundation
struct ApkIndexDownloader { public struct ApkIndexDownloader {
func downloadFile(remote remoteURL: URL, destination destLocalURL: URL) { @available(*, deprecated, message: "This is stinky, use ApkIndexDownloader.fetch instead")
internal func downloadFile(remote remoteURL: URL, destination destLocalURL: URL) {
let sem = DispatchSemaphore.init(value: 0) let sem = DispatchSemaphore.init(value: 0)
let downloadTask = URLSession.shared.downloadTask(with: remoteURL) { url, response, error in let downloadTask = URLSession.shared.downloadTask(with: remoteURL) { url, response, error in
if let localURL = url { if let localURL = url {
@ -26,4 +27,49 @@ struct ApkIndexDownloader {
downloadTask.resume() downloadTask.resume()
sem.wait() sem.wait()
} }
public static func fetch(repository: ApkIndexRepository) async throws(FetchError) -> URL {
let localDestinationURL = URL(filePath: repository.localName)
let tempLocationURL: URL, response: URLResponse
do {
(tempLocationURL, response) = try await URLSession.shared.download(from: repository.url)
} catch {
throw .downloadFailed(error)
}
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw .invalidServerResponse((response as? HTTPURLResponse)?.statusCode ?? -1)
}
// Move index repository to destination location
do {
// Replace existing APKINDEX.tar.gz files
if FileManager.default.fileExists(atPath: localDestinationURL.path()) {
try FileManager.default.removeItem(at: localDestinationURL)
}
// Move downloaded file to the new location
try FileManager.default.moveItem(at: tempLocationURL, to: localDestinationURL)
return localDestinationURL
} catch let error {
throw .moveFailed(error)
}
}
}
public extension ApkIndexDownloader {
enum FetchError: Error, LocalizedError {
case invalidServerResponse(_ code: Int)
case downloadFailed(_ err: any Error)
case moveFailed(_ err: any Error)
public var errorDescription: String? {
switch self {
case .invalidServerResponse(let code): "Server responded with HTTP response code \(code)"
case .downloadFailed(let err): "Failed to create session, \(err.localizedDescription)"
case .moveFailed(let err): "Couldn't move index, \(err.localizedDescription)"
}
}
}
} }

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
public struct ApkIndexInstallIf: Hashable { public struct ApkIndexInstallIf: Hashable, Sendable {
let requirement: ApkRequirement let requirement: ApkRequirement
init(requirement: ApkRequirement) { init(requirement: ApkRequirement) {

View File

@ -5,7 +5,7 @@
import Foundation import Foundation
public struct ApkIndexPackage: Hashable { public struct ApkIndexPackage: Hashable, Sendable {
public let indexChecksum: ApkIndexDigest public let indexChecksum: ApkIndexDigest
public let name: String public let name: String
public let version: String public let version: String

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
public struct ApkIndexProvides: Hashable { public struct ApkIndexProvides: Hashable, Sendable {
let name: String let name: String
init(requirement: ApkRequirement) { init(requirement: ApkRequirement) {

View File

@ -30,6 +30,39 @@ public extension ApkIndex {
} }
} }
public extension ApkIndex {
static func resolve<S: Sequence>(_ repositories: S, fetch: ApkIndexFetchMode) async throws -> Self where S.Element == ApkIndexRepository {
try await withThrowingTaskGroup(of: Self.self) { group in
for repository in repositories {
group.addTask(priority: .userInitiated) {
let local: URL
switch fetch {
case .local:
local = URL(filePath: repository.localName)
case .lazy:
if !FileManager.default.fileExists(atPath: repository.localName) {
fallthrough
}
local = URL(filePath: repository.localName)
case .update:
print("Fetching \"\(repository.name)\"")
local = try await ApkIndexDownloader.fetch(repository: repository)
}
let index = try ApkIndex(readFrom: local)
return index
}
}
return try await ApkIndex.merge(group.reduce(into: []) { $0.append($1) })
}
}
}
public enum ApkIndexFetchMode: Sendable {
case update
case lazy
case local
}
public enum ApkIndexReadingError: Error, LocalizedError { public enum ApkIndexReadingError: Error, LocalizedError {
case missingSignature case missingSignature
case missingIndex case missingIndex

View File

@ -6,7 +6,7 @@
import Foundation import Foundation
import CryptoKit import CryptoKit
public struct ApkIndexRepository { public struct ApkIndexRepository: Sendable {
public let name: String public let name: String
public let arch: String public let arch: String
public let discriminator: String public let discriminator: String

View File

@ -29,7 +29,7 @@ public struct ApkIndexUpdater {
let graph: ApkPackageGraph let graph: ApkPackageGraph
do { do {
let tables = try self.repositories.map { try readIndex(URL(filePath: $0.localName)) } let tables = try self.repositories.map { try Self.readIndex(URL(filePath: $0.localName)) }
graph = ApkPackageGraph(index: ApkIndex.merge(tables)) graph = ApkPackageGraph(index: ApkIndex.merge(tables))
graph.buildGraphNode() graph.buildGraphNode()
@ -46,11 +46,11 @@ public struct ApkIndexUpdater {
} }
} }
private func readIndex(_ indexURL: URL) throws -> ApkIndex { public static func readIndex(_ indexURL: URL) throws -> ApkIndex {
let tarSignature: [TarReader.Entry] let tarSignature: [TarReader.Entry]
let tarRecords: [TarReader.Entry] let tarRecords: [TarReader.Entry]
print("Archive: \(indexURL.lastPathComponent)") let arcName = indexURL.lastPathComponent
let durFormat = Duration.UnitsFormatStyle( let durFormat = Duration.UnitsFormatStyle(
allowedUnits: [ .seconds, .milliseconds ], allowedUnits: [ .seconds, .milliseconds ],
@ -69,7 +69,7 @@ public struct ApkIndexUpdater {
fatalError(error.localizedDescription) fatalError(error.localizedDescription)
} }
print("Gzip time: \((ContinuousClock.now - gzipStart).formatted(durFormat))") print("\(arcName): Gzip time: \((ContinuousClock.now - gzipStart).formatted(durFormat))")
let untarStart = ContinuousClock.now let untarStart = ContinuousClock.now
let signatureStream = MemoryInputStream(buffer: tars[0]) let signatureStream = MemoryInputStream(buffer: tars[0])
@ -84,10 +84,10 @@ public struct ApkIndexUpdater {
guard let description = tarRecords.firstFile(name: "DESCRIPTION") guard let description = tarRecords.firstFile(name: "DESCRIPTION")
else { fatalError("DESCRIPTION missing") } else { fatalError("DESCRIPTION missing") }
print("TAR time: \((ContinuousClock.now - untarStart).formatted(durFormat))") print("\(arcName): TAR time: \((ContinuousClock.now - untarStart).formatted(durFormat))")
let indexStart = ContinuousClock.now let indexStart = ContinuousClock.now
defer { defer {
print("Index time: \((ContinuousClock.now - indexStart).formatted(durFormat))") print("\(arcName): Index time: \((ContinuousClock.now - indexStart).formatted(durFormat))")
} }
return try ApkIndex(raw: return try ApkIndex(raw:

View File

@ -41,10 +41,10 @@ struct DpkSearchCommand: AsyncParsableCommand {
let match: any PatternMatcher let match: any PatternMatcher
match = try matcher.init(patterns: patterns, ignoreCase: !self.caseSensitive) match = try matcher.init(patterns: patterns, ignoreCase: !self.caseSensitive)
let localRepositories = try await RepositoriesConfig().localRepositories let localRepositories = try await ApkRepositoriesConfig()
let index: ApkIndex let index: ApkIndex
do { do {
index = ApkIndex.merge(try localRepositories.map(ApkIndex.init)) index = try await ApkIndex.resolve(localRepositories, fetch: .local)
} catch { } catch {
print("Failed to build package index: \(error.localizedDescription)") print("Failed to build package index: \(error.localizedDescription)")
throw .failure throw .failure

View File

@ -13,11 +13,13 @@ struct DpkUpdateCommand: AsyncParsableCommand {
abstract: "Update the system package repositories.", abstract: "Update the system package repositories.",
aliases: [ "u" ]) aliases: [ "u" ])
@Flag(help: "Index on-disk cache")
var lazyDownload: Bool = false
func run() async throws { func run() async throws {
let repositories = try await ApkRepositoriesConfig().repositories
print("Updating package repositories") print("Updating package repositories")
let repositories = try await RepositoriesConfig().repositories let index = try await ApkIndex.resolve(repositories, fetch: self.lazyDownload ? .lazy : .update)
var updater = ApkIndexUpdater() print("Indexed \(index.packages.count) package(s)")
updater.repositories.append(contentsOf: repositories)
updater.update()
} }
} }