Skip to content

Commit

Permalink
[Refactor] turn Kyu into and actor
Browse files Browse the repository at this point in the history
  • Loading branch information
reddavis committed Jul 23, 2021
1 parent 0ed3d50 commit cddf589
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 165 deletions.
38 changes: 13 additions & 25 deletions Kyu.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
A40D705825583F3B00FF03AC /* Kyu.swift in Sources */ = {isa = PBXBuildFile; fileRef = A40D705725583F3B00FF03AC /* Kyu.swift */; };
A40D705C2558485800FF03AC /* Job.swift in Sources */ = {isa = PBXBuildFile; fileRef = A40D705B2558485800FF03AC /* Job.swift */; };
A4307F4F25664614005844B5 /* AppendNewLineJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4307F4E25664614005844B5 /* AppendNewLineJob.swift */; };
A4B6D92125666A6C00AFB32C /* XCTest+Helpers.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4B6D92025666A6C00AFB32C /* XCTest+Helpers.swift */; };
A47AD49D26AACD090075439D /* Assertions.swift in Sources */ = {isa = PBXBuildFile; fileRef = A47AD49C26AACD090075439D /* Assertions.swift */; };
A4B6D96B256AE3D500AFB32C /* Logger.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4B6D96A256AE3D500AFB32C /* Logger.swift */; };
A4B6D974256BC49000AFB32C /* Directory.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4B6D973256BC49000AFB32C /* Directory.swift */; };
A4E50846256CDEF800C21B6A /* DirectoryTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4E50845256CDEF800C21B6A /* DirectoryTests.swift */; };
Expand All @@ -39,7 +39,7 @@
A40D705725583F3B00FF03AC /* Kyu.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Kyu.swift; sourceTree = "<group>"; };
A40D705B2558485800FF03AC /* Job.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Job.swift; sourceTree = "<group>"; };
A4307F4E25664614005844B5 /* AppendNewLineJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppendNewLineJob.swift; sourceTree = "<group>"; };
A4B6D92025666A6C00AFB32C /* XCTest+Helpers.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "XCTest+Helpers.swift"; sourceTree = "<group>"; };
A47AD49C26AACD090075439D /* Assertions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Assertions.swift; sourceTree = "<group>"; };
A4B6D96A256AE3D500AFB32C /* Logger.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Logger.swift; sourceTree = "<group>"; };
A4B6D973256BC49000AFB32C /* Directory.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Directory.swift; sourceTree = "<group>"; };
A4E50845256CDEF800C21B6A /* DirectoryTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DirectoryTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -135,9 +135,9 @@
A40D705425583E4E00FF03AC /* Source */ = {
isa = PBXGroup;
children = (
A4B6D91F25666A4200AFB32C /* Extensions */,
A4307F4D25664580005844B5 /* Models */,
A4307F4C25664579005844B5 /* Tests */,
A4307F4E25664614005844B5 /* AppendNewLineJob.swift */,
A47AD49C26AACD090075439D /* Assertions.swift */,
);
path = Source;
sourceTree = "<group>";
Expand All @@ -151,22 +151,6 @@
path = Tests;
sourceTree = "<group>";
};
A4307F4D25664580005844B5 /* Models */ = {
isa = PBXGroup;
children = (
A4307F4E25664614005844B5 /* AppendNewLineJob.swift */,
);
path = Models;
sourceTree = "<group>";
};
A4B6D91F25666A4200AFB32C /* Extensions */ = {
isa = PBXGroup;
children = (
A4B6D92025666A6C00AFB32C /* XCTest+Helpers.swift */,
);
path = Extensions;
sourceTree = "<group>";
};
A4E50849256CE1AF00C21B6A /* Deployment */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -305,7 +289,7 @@
files = (
A4307F4F25664614005844B5 /* AppendNewLineJob.swift in Sources */,
A40D704425583D1300FF03AC /* KyuTests.swift in Sources */,
A4B6D92125666A6C00AFB32C /* XCTest+Helpers.swift in Sources */,
A47AD49D26AACD090075439D /* Assertions.swift in Sources */,
A4E50846256CDEF800C21B6A /* DirectoryTests.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
Expand Down Expand Up @@ -373,7 +357,7 @@
GCC_WARN_UNINITIALIZED_AUTOS = YES_AGGRESSIVE;
GCC_WARN_UNUSED_FUNCTION = YES;
GCC_WARN_UNUSED_VARIABLE = YES;
MACOSX_DEPLOYMENT_TARGET = 11.0;
MACOSX_DEPLOYMENT_TARGET = 12.0;
MTL_ENABLE_DEBUG_INFO = INCLUDE_SOURCE;
MTL_FAST_MATH = YES;
ONLY_ACTIVE_ARCH = YES;
Expand Down Expand Up @@ -431,7 +415,7 @@
GCC_WARN_UNINITIALIZED_AUTOS = YES_AGGRESSIVE;
GCC_WARN_UNUSED_FUNCTION = YES;
GCC_WARN_UNUSED_VARIABLE = YES;
MACOSX_DEPLOYMENT_TARGET = 11.0;
MACOSX_DEPLOYMENT_TARGET = 12.0;
MTL_ENABLE_DEBUG_INFO = NO;
MTL_FAST_MATH = YES;
SDKROOT = macosx;
Expand Down Expand Up @@ -460,9 +444,10 @@
"@executable_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.15;
MACOSX_DEPLOYMENT_TARGET = 12.0;
PRODUCT_BUNDLE_IDENTIFIER = com.reddavis.Kyu;
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SDKROOT = iphoneos;
SKIP_INSTALL = YES;
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
SWIFT_VERSION = 5.0;
Expand All @@ -487,9 +472,10 @@
"@executable_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.15;
MACOSX_DEPLOYMENT_TARGET = 12.0;
PRODUCT_BUNDLE_IDENTIFIER = com.reddavis.Kyu;
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SDKROOT = iphoneos;
SKIP_INSTALL = YES;
SWIFT_VERSION = 5.0;
};
Expand All @@ -510,6 +496,7 @@
);
PRODUCT_BUNDLE_IDENTIFIER = com.reddavis.KyuTests;
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = iphoneos;
SWIFT_VERSION = 5.0;
};
name = Debug;
Expand All @@ -529,6 +516,7 @@
);
PRODUCT_BUNDLE_IDENTIFIER = com.reddavis.KyuTests;
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = iphoneos;
SWIFT_VERSION = 5.0;
};
name = Release;
Expand Down
10 changes: 1 addition & 9 deletions Kyu/Source/Job.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// Job.swift
// Kyu
//
// Created by Red Davis on 08/11/2020.
//

import Foundation


Expand All @@ -29,8 +22,7 @@ public protocol Job: Codable
var retryCooldown: TimeInterval { get }

/// Execute the job.
/// - Parameter onComplete: A closure to be called when execution is complete.
func execute(onComplete: @escaping (_ result: Result<Void, Error>) -> Void)
func execute() async throws

/// Increment the `numberOfRetries` and `executionDate`.
mutating func incrementRetryCount()
Expand Down
112 changes: 64 additions & 48 deletions Kyu/Source/Kyu.swift
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
//
// Kyu.swift
// Kyu
//
// Created by Red Davis on 08/11/2020.
//

import Foundation
import os.log


public final class Kyu<T> where T: Job
public actor Kyu<T> where T: Job
{
// Public
public typealias WillExecuteJob = (_ job: inout T) -> Void
Expand All @@ -29,7 +21,7 @@ public final class Kyu<T> where T: Job
/// A closure called before a job is executed.
/// It can be used to change a Job configuration before it is executed
/// or pass it variables that are not `Codable` friendly.
public var onWillExecuteJob: WillExecuteJob?
private let onWillExecuteJob: WillExecuteJob?

// Private
private let logger: Logger
Expand All @@ -40,11 +32,15 @@ public final class Kyu<T> where T: Job
private var isProcessing = false {
didSet
{
if !self.isProcessing { self.processNextJob() }
if !self.isProcessing
{
Task {
await self.processNextJob()
}
}
}
}

private let jobProcessingDispatchQueue: DispatchQueue
private let jobDirectoryObserverDispatchQueue: DispatchQueue
private var jobDirectoryObserver: DispatchSourceFileSystemObject!

Expand All @@ -64,25 +60,38 @@ public final class Kyu<T> where T: Job
/// - Parameters:
/// - url: The root directory that Kyu uses to store jobs.
/// - logLevel: The level of logging required. Defaults to `.fault`.
/// - onWillExecuteJob: A closure called before a job is executed.
/// It can be used to change a Job configuration before it is executed
/// or pass it variables.
/// - Throws:
/// - `Directory.SetupError.fileExistsInDirectoryURL(URL)`
public required init(url: URL, logLevel: Logger.LogLevel = .fault) throws
public required init(
url: URL,
logLevel: Logger.LogLevel = .fault,
onWillExecuteJob: WillExecuteJob? = nil
) throws
{
self.url = url
self.onWillExecuteJob = onWillExecuteJob

self.pendingDirectory = Directory(url: url.appendingPathComponent(self.pendingDirectoryName, isDirectory: true))
self.completedDirectory = Directory(url: url.appendingPathComponent(self.completedDirectoryName, isDirectory: true))
self.failedDirectory = Directory(url: url.appendingPathComponent(self.failedDirectoryName, isDirectory: true))
self.tempDirectory = Directory(url: url.appendingPathComponent(self.tempDirectoryName, isDirectory: true))

self.jobProcessingDispatchQueue = DispatchQueue(label: "com.reddavis.Kyu.jobProcessingDispatchQueue.\(UUID())", qos: .utility)
self.jobDirectoryObserverDispatchQueue = DispatchQueue(label: "com.reddavis.Kyu.jobDirectoryObserverDispatchQueue.\(UUID())", qos: .background)
self.jobDirectoryObserverDispatchQueue = DispatchQueue(label: "com.reddavis.Kyu.jobDirectoryObserverDispatchQueue.\(UUID())", qos: .utility)

self.logger = Logger(subsystem: "com.reddavis.kyu", category: "Kyu[\(url.absoluteString)]")
self.logger.logLevel = logLevel

try self.setup()
}

deinit
{
self.jobDirectoryObserver.cancel()
}

// MARK: Setup

private func setup() throws
Expand All @@ -98,46 +107,53 @@ public final class Kyu<T> where T: Job
{
let fileDesciptor = open(self.pendingDirectory.url.path, O_EVTONLY)
self.jobDirectoryObserver = DispatchSource.makeFileSystemObjectSource(fileDescriptor: fileDesciptor, eventMask: .write, queue: self.jobDirectoryObserverDispatchQueue)
self.jobDirectoryObserver.setEventHandler { [weak self] in self?.processNextJob() }
self.jobDirectoryObserver.setEventHandler { [weak self] in
guard let self = self else { return }
Task {
await self.processNextJob()
}
}
self.jobDirectoryObserver.resume()
}

// MARK: Jobs

private func processNextJob()
private func processNextJob() async
{
self.jobProcessingDispatchQueue.async {
guard !self.isProcessing,
var job = self.nextExecutablePendingJob() else { return }
self.isProcessing = true

// On will execute
self.onWillExecuteJob?(&job)

// Execute job
job.execute { result in
defer { self.isProcessing = false }

switch result
guard !self.isProcessing,
var job = self.nextExecutablePendingJob() else { return }
defer { self.isProcessing = false }
self.isProcessing = true

// On will execute
self.onWillExecuteJob?(&job)

// Execute job
do
{
try await job.execute()
self.move(job: job, from: self.pendingDirectory, to: self.completedDirectory)
}
catch
{
if job.numberOfRetries >= job.maximumNumberOfRetries
{
self.move(job: job, from: self.pendingDirectory, to: self.failedDirectory)
}
else
{
do
{
var job = job
job.incrementRetryCount()

let dataURL = self.pendingDirectory.payloadURL(for: job)
let data = try self.encoder.encode(job)
try data.write(to: dataURL)
}
catch
{
case .success:
self.move(job: job, from: self.pendingDirectory, to: self.completedDirectory)
case .failure where job.numberOfRetries >= job.maximumNumberOfRetries:
self.move(job: job, from: self.pendingDirectory, to: self.failedDirectory)
case .failure:
do
{
var job = job
job.incrementRetryCount()

let dataURL = self.pendingDirectory.payloadURL(for: job)
let data = try self.encoder.encode(job)
try data.write(to: dataURL)
}
catch
{
self.logger.fault("Failed to increment job (\(job.id)) retry count. Error: \(error)")
}
self.logger.fault("Failed to increment job (\(job.id)) retry count. Error: \(error)")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,22 @@ struct AppendNewLineJob: Job

// MARK: Job

func execute(onComplete: @escaping (Result<Void, Error>) -> Void)
func execute() async throws
{
do
let fileHandle = try FileHandle(forWritingTo: self.fileURL)
try fileHandle.seekToEnd()

if self.numberOfRetries >= self.numberOfTimesToFail
{
let fileHandle = try FileHandle(forWritingTo: self.fileURL)
try fileHandle.seekToEnd()

let result: Result<Void, Error>
if self.numberOfRetries >= self.numberOfTimesToFail
{
fileHandle.write("\(self.string)\n".data(using: .utf8)!)
result = .success(Void())
}
else
{
fileHandle.write("\(self.failureString)\n".data(using: .utf8)!)
result = .failure(ExecutionError.failureForced)
}

fileHandle.closeFile()
onComplete(result)
fileHandle.write("\(self.string)\n".data(using: .utf8)!)
}
catch
else
{
onComplete(.failure(error))
fileHandle.write("\(self.failureString)\n".data(using: .utf8)!)
throw ExecutionError.failureForced
}

fileHandle.closeFile()
}
}

Expand Down
Loading

0 comments on commit cddf589

Please sign in to comment.