Skip to content

Commit

Permalink
tart pull: 284% faster pulls with default concurrency setting (#970)
Browse files Browse the repository at this point in the history
* DiskV2: avoid allocating zero chunk on each zeroSkippingWrite() call

* Increase hole granularity size from 64 KiB to 4 MiB

* Fetcher: never write to disk, thanks to URLSessionDataDelegate
  • Loading branch information
edigaryev authored Dec 11, 2024
1 parent 32ebc5b commit 31ab421
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 98 deletions.
20 changes: 1 addition & 19 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"originHash" : "6a15657d8cb1d3e2b447f31aff5b47d6a9655d2262e48ca76476ba525435269b",
"originHash" : "22b3726bc4e4c6e9c04ac97cb08a82967feb39960a93d2909768a16e11576748",
"pins" : [
{
"identity" : "antlr4",
Expand Down Expand Up @@ -55,15 +55,6 @@
"version" : "1.5.0"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-async-algorithms",
"state" : {
"branch" : "main",
"revision" : "5c8bd186f48c16af0775972700626f0b74588278"
}
},
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
Expand All @@ -73,15 +64,6 @@
"version" : "1.2.0"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "9bf03ff58ce34478e66aaee630e491823326fd06",
"version" : "1.1.3"
}
},
{
"identity" : "swift-log",
"kind" : "remoteSourceControl",
Expand Down
2 changes: 0 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ let package = Package(
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.3.1"),
.package(url: "https://github.com/mhdhejazi/Dynamic", branch: "master"),
.package(url: "https://github.com/apple/swift-algorithms", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", branch: "main"),
.package(url: "https://github.com/malcommac/SwiftDate", from: "7.0.0"),
.package(url: "https://github.com/antlr/antlr4", exact: "4.13.2"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
Expand All @@ -29,7 +28,6 @@ let package = Package(
targets: [
.executableTarget(name: "tart", dependencies: [
.product(name: "Algorithms", package: "swift-algorithms"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "Dynamic", package: "Dynamic"),
.product(name: "SwiftDate", package: "SwiftDate"),
Expand Down
122 changes: 66 additions & 56 deletions Sources/tart/Fetcher.swift
Original file line number Diff line number Diff line change
@@ -1,80 +1,90 @@
import Foundation
import AsyncAlgorithms

fileprivate let urlSession = createURLSession()
fileprivate var urlSessionConfiguration: URLSessionConfiguration {
let config = URLSessionConfiguration.default

class DownloadDelegate: NSObject, URLSessionTaskDelegate {
let progress: Progress
init(_ progress: Progress) throws {
self.progress = progress
}
// Harbor expects a CSRF token to be present if the HTTP client
// carries a session cookie between its requests[1] and fails if
// it was not present[2].
//
// To fix that, we disable the automatic cookies carry in URLSession.
//
// [1]: https://github.com/goharbor/harbor/blob/a4c577f9ec4f18396207a5e686433a6ba203d4ef/src/server/middleware/csrf/csrf.go#L78
// [2]: https://github.com/cirruslabs/tart/issues/295
config.httpShouldSetCookies = false

func urlSession(_ session: URLSession, didCreateTask task: URLSessionTask) {
self.progress.addChild(task.progress, withPendingUnitCount: self.progress.totalUnitCount)
}
return config
}

class Fetcher {
static func fetch(_ request: URLRequest, viaFile: Bool = false, progress: Progress? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let delegate = progress != nil ? try DownloadDelegate(progress!) : nil
static func fetch(_ request: URLRequest, viaFile: Bool = false, progress: Progress? = nil) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
let delegate = Delegate()
let session = URLSession(configuration: urlSessionConfiguration, delegate: delegate, delegateQueue: nil)
let task = session.dataTask(with: request)

let stream = AsyncThrowingStream<Data, Error> { continuation in
delegate.streamContinuation = continuation
}

if viaFile {
return try await fetchViaFile(request, delegate: delegate)
let response = try await withCheckedThrowingContinuation { continuation in
delegate.responseContinuation = continuation
task.resume()
}

return try await fetchViaMemory(request, delegate: delegate)
return (stream, response as! HTTPURLResponse)
}
}

private static func fetchViaMemory(_ request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()
fileprivate class Delegate: NSObject, URLSessionDataDelegate {
var responseContinuation: CheckedContinuation<URLResponse, Error>?
var streamContinuation: AsyncThrowingStream<Data, Error>.Continuation?

let (data, response) = try await urlSession.data(for: request, delegate: delegate)
private var buffer: Data = Data()
private let bufferFlushSize = 16 * 1024 * 1024

Task {
await dataCh.send(data)
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
) {
// Soft-limit for the maximum buffer capacity
let capacity = min(response.expectedContentLength, Int64(bufferFlushSize))

dataCh.finish()
}
// Pre-initialize buffer as we now know the capacity
buffer = Data(capacity: Int(capacity))

return (dataCh, response as! HTTPURLResponse)
responseContinuation?.resume(returning: response)
completionHandler(.allow)
}

private static func fetchViaFile(_ request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()

let (fileURL, response) = try await urlSession.download(for: request, delegate: delegate)

// Acquire a handle to the downloaded file and then remove it.
//
// This keeps a working reference to that file, yet we don't
// have to deal with the cleanup any more.
let mappedFile = try Data(contentsOf: fileURL, options: [.alwaysMapped])
try FileManager.default.removeItem(at: fileURL)
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive data: Data
) {
buffer.append(data)

Task {
for chunk in (0 ..< mappedFile.count).chunks(ofCount: 64 * 1024 * 1024) {
await dataCh.send(mappedFile.subdata(in: chunk))
}

dataCh.finish()
if buffer.count >= bufferFlushSize {
streamContinuation?.yield(buffer)
buffer.removeAll(keepingCapacity: true)
}

return (dataCh, response as! HTTPURLResponse)
}
}

fileprivate func createURLSession() -> URLSession {
let config = URLSessionConfiguration.default

// Harbor expects a CSRF token to be present if the HTTP client
// carries a session cookie between its requests[1] and fails if
// it was not present[2].
//
// To fix that, we disable the automatic cookies carry in URLSession.
//
// [1]: https://github.com/goharbor/harbor/blob/a4c577f9ec4f18396207a5e686433a6ba203d4ef/src/server/middleware/csrf/csrf.go#L78
// [2]: https://github.com/cirruslabs/tart/issues/295
config.httpShouldSetCookies = false
func urlSession(
_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?
) {
if !buffer.isEmpty {
streamContinuation?.yield(buffer)
buffer.removeAll(keepingCapacity: true)
}

return URLSession(configuration: config)
if let error = error {
streamContinuation?.finish(throwing: error)
} else {
streamContinuation?.finish()
}
}
}
31 changes: 15 additions & 16 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 512 * 1024 * 1024

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 4 MiB of excess data per 512 MiB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
private static let holeGranularityBytes = 4 * 1024 * 1024
private static let zeroChunk = Data(count: holeGranularityBytes)

static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [(index: Int, pushedLayer: OCIManifestLayer)] = []

Expand Down Expand Up @@ -215,22 +230,6 @@ class DiskV2: Disk {
}

private static func zeroSkippingWrite(_ disk: FileHandle, _ rdisk: FileHandle?, _ fsBlockSize: UInt64, _ offset: UInt64, _ data: Data) throws -> UInt64 {
let holeGranularityBytes = 64 * 1024

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 64 KiB of excess data per 500 MB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
let zeroChunk = Data(count: holeGranularityBytes)

var offset = offset

for chunk in data.chunks(ofCount: holeGranularityBytes) {
Expand Down
7 changes: 3 additions & 4 deletions Sources/tart/OCI/Registry.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Foundation
import Algorithms
import AsyncAlgorithms

enum RegistryError: Error {
case UnexpectedHTTPStatusCode(when: String, code: Int, details: String = "")
Expand Down Expand Up @@ -31,7 +30,7 @@ extension Data {
}
}

extension AsyncThrowingChannel<Data, Error> {
extension AsyncThrowingStream<Data, Error> {
func asData() async throws -> Data {
var result = Data()

Expand Down Expand Up @@ -307,7 +306,7 @@ class Registry {
body: Data? = nil,
doAuth: Bool = true,
viaFile: Bool = false
) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
var urlComponents = urlComponents

if urlComponents.queryItems == nil && !parameters.isEmpty {
Expand Down Expand Up @@ -413,7 +412,7 @@ class Registry {
return nil
}

private func authAwareRequest(request: URLRequest, viaFile: Bool = false, doAuth: Bool) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
private func authAwareRequest(request: URLRequest, viaFile: Bool = false, doAuth: Bool) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
var request = request

if doAuth {
Expand Down
1 change: 0 additions & 1 deletion Sources/tart/VM.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Foundation
import Virtualization
import AsyncAlgorithms
import Semaphore

struct UnsupportedRestoreImageError: Error {
Expand Down

0 comments on commit 31ab421

Please sign in to comment.