Skip to content

Commit 0e86d6c

Browse files
authored
Add cleanup of paused jobs (#44)
* Add cleanup of paused jobs * Update comment
1 parent a222ec7 commit 0e86d6c

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

Sources/JobsPostgres/PostgresJobsQueue+cleanup.swift

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,24 @@ public struct PostgresJobCleanupParameters: Sendable & Codable {
2424
let completedJobs: PostgresJobQueue.JobCleanup
2525
let failedJobs: PostgresJobQueue.JobCleanup
2626
let cancelledJobs: PostgresJobQueue.JobCleanup
27+
let pausedJobs: PostgresJobQueue.JobCleanup
2728

2829
/// Initialize PostgresJobCleanupParameters
2930
/// - Parameters:
3031
/// - completedJobs: What to do with completed jobs
3132
/// - failedJobs: What to do with failed jobs
3233
/// - cancelledJobs: What to do with cancelled jobs
34+
/// - pausedJobs: What to do with paused jobs
3335
public init(
3436
completedJobs: PostgresJobQueue.JobCleanup = .doNothing,
3537
failedJobs: PostgresJobQueue.JobCleanup = .doNothing,
36-
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing
38+
cancelledJobs: PostgresJobQueue.JobCleanup = .doNothing,
39+
pausedJobs: PostgresJobQueue.JobCleanup = .doNothing
3740
) {
3841
self.failedJobs = failedJobs
3942
self.completedJobs = completedJobs
4043
self.cancelledJobs = cancelledJobs
44+
self.pausedJobs = pausedJobs
4145
}
4246
}
4347

@@ -51,9 +55,13 @@ extension PostgresJobQueue {
5155
}
5256
let rawValue: RawValue
5357

58+
/// Do nothing to jobs
5459
public static var doNothing: Self { .init(rawValue: .doNothing) }
60+
/// Re-queue jobs for running again
5561
public static var rerun: Self { .init(rawValue: .rerun) }
62+
/// Delete jobs
5663
public static var remove: Self { .init(rawValue: .remove(maxAge: nil)) }
64+
/// Delete jobs that are older than maxAge
5765
public static func remove(maxAge: Duration) -> Self { .init(rawValue: .remove(maxAge: maxAge)) }
5866
}
5967

@@ -75,6 +83,7 @@ extension PostgresJobQueue {
7583
completedJobs: parameters.completedJobs,
7684
failedJobs: parameters.failedJobs,
7785
cancelledJobs: parameters.cancelledJobs,
86+
pausedJobs: parameters.pausedJobs,
7887
logger: self.logger
7988
)
8089
}
@@ -83,23 +92,23 @@ extension PostgresJobQueue {
8392

8493
/// Cleanup job queues
8594
///
86-
/// This function is used to re-run or delete jobs in a certain state. Failed jobs can be
87-
/// pushed back into the pending queue to be re-run or removed. When called at startup in
88-
/// theory no job should be set to processing, or set to pending but not in the queue. but if
89-
/// your job server crashes these states are possible, so we also provide options to re-queue
90-
/// these jobs so they are run again.
95+
/// This function is used to re-run or delete jobs in a certain state. Failed, completed,
96+
/// cancelled and paused jobs can be pushed back into the pending queue to be re-run or removed.
97+
/// When called at startup in theory no job should be set to processing, or set to pending but
98+
/// not in the queue. but if your job server crashes these states are possible, so we also provide
99+
/// options to re-queue these jobs so they are run again.
91100
///
92-
/// The job queue needs to be running when you call cleanup. You can call `cleanup` with
93-
/// `failedJobs`` set to whatever you like at any point to re-queue failed jobs. Moving processing
94-
/// or pending jobs should only be done if you are certain there is nothing else processing
95-
/// the job queue.
101+
/// You can call `cleanup` with `failedJobs`, `completedJobs`, `cancelledJobs` or `pausedJobs` set
102+
/// to whatever you like at any point to re-queue failed jobs. Moving processing or pending jobs
103+
/// should only be done if you are certain there is nothing processing the job queue.
96104
///
97105
/// - Parameters:
98106
/// - pendingJobs: What to do with jobs tagged as pending
99107
/// - processingJobs: What to do with jobs tagged as processing
100108
/// - completedJobs: What to do with jobs tagged as completed
101109
/// - failedJobs: What to do with jobs tagged as failed
102110
/// - cancelledJobs: What to do with jobs tagged as cancelled
111+
/// - pausedJobs: What to do with jobs tagged as paused
103112
/// - logger: Optional logger to use when performing cleanup
104113
/// - Throws:
105114
public func cleanup(
@@ -108,6 +117,7 @@ extension PostgresJobQueue {
108117
completedJobs: JobCleanup = .doNothing,
109118
failedJobs: JobCleanup = .doNothing,
110119
cancelledJobs: JobCleanup = .doNothing,
120+
pausedJobs: JobCleanup = .doNothing,
111121
logger: Logger? = nil
112122
) async throws {
113123
let logger = logger ?? self.logger
@@ -121,6 +131,7 @@ extension PostgresJobQueue {
121131
try await self.updateJobsOnInit(withStatus: .failed, onInit: failedJobs, connection: connection)
122132
try await self.updateJobsOnInit(withStatus: .completed, onInit: completedJobs, connection: connection)
123133
try await self.updateJobsOnInit(withStatus: .cancelled, onInit: cancelledJobs, connection: connection)
134+
try await self.updateJobsOnInit(withStatus: .paused, onInit: pausedJobs, connection: connection)
124135
}
125136
} catch let error as PSQLError {
126137
logger.error(

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,36 @@ struct JobsTests {
880880
}
881881
}
882882

883+
@Test func testPausedJobRetention() async throws {
884+
let jobQueue = try await self.createJobQueue(
885+
configuration: .init(queueName: "testPausedJobRetention")
886+
)
887+
let jobName = JobName<Int>("testPausedJobRetention")
888+
jobQueue.registerJob(name: jobName) { _, _ in }
889+
890+
try await withThrowingTaskGroup(of: Void.self) { group in
891+
group.addTask {
892+
// run postgres client
893+
await jobQueue.queue.client.run()
894+
}
895+
try await jobQueue.queue.migrations.apply(client: jobQueue.queue.client, logger: jobQueue.logger, dryRun: false)
896+
897+
let jobId = try await jobQueue.push(jobName, parameters: 1)
898+
let jobId2 = try await jobQueue.push(jobName, parameters: 2)
899+
900+
try await jobQueue.pauseJob(jobID: jobId)
901+
try await jobQueue.pauseJob(jobID: jobId2)
902+
903+
var cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
904+
#expect(cancelledJobs.count == 2)
905+
try await jobQueue.queue.cleanup(pausedJobs: .remove(maxAge: .seconds(0)))
906+
cancelledJobs = try await jobQueue.queue.getJobs(withStatus: .paused)
907+
#expect(cancelledJobs.count == 0)
908+
909+
group.cancelAll()
910+
}
911+
}
912+
883913
@Test func testCleanupProcessingJobs() async throws {
884914
let jobQueue = try await self.createJobQueue()
885915
let jobName = JobName<Int>("testCleanupProcessingJobs")

0 commit comments

Comments
 (0)