Skip to content

Commit

Permalink
job queue callback
Browse files Browse the repository at this point in the history
  • Loading branch information
timohuber authored and aronerben committed Feb 13, 2024
1 parent 2180ad1 commit d1250a2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
14 changes: 10 additions & 4 deletions sihl-queue/src/sihl_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
let registered_jobs : job' list ref = ref []
let stop_schedule : (unit -> unit) option ref = ref None

let dispatch ?ctx ?delay input (job : 'a job) =
let dispatch ?callback ?ctx ?delay input (job : 'a job) =
let open Sihl.Contract.Queue in
let config = Sihl.Configuration.read schema in
let force_async = config.force_async in
Expand All @@ -76,7 +76,10 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
Logs.debug (fun m -> m "Dispatching job %s" name);
let now = Ptime_clock.now () in
let job_instance = create_instance ?ctx input delay now job in
Repo.enqueue ?ctx job_instance)
let%lwt () = Repo.enqueue ?ctx job_instance in
match callback with
| None -> Lwt.return ()
| Some callback -> callback job_instance)
else (
Logs.info (fun m -> m "Skipping queue in development environment");
match%lwt job.handle ?ctx input with
Expand All @@ -86,7 +89,7 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
Lwt.return ())
;;

let dispatch_all ?ctx ?delay inputs job =
let dispatch_all ?callback ?ctx ?delay inputs job =
let config = Sihl.Configuration.read schema in
let force_async = config.force_async in
if Sihl.Configuration.is_production () || force_async
Expand All @@ -97,7 +100,10 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
let job_instances =
List.map (fun input -> create_instance ?ctx input delay now job) inputs
in
Repo.enqueue_all ?ctx job_instances)
let%lwt () = Repo.enqueue_all ?ctx job_instances in
match callback with
| None -> Lwt.return ()
| Some callback -> Lwt_list.iter_s callback job_instances)
else (
Logs.info (fun m -> m "Skipping queue in development environment");
let rec loop inputs =
Expand Down
12 changes: 10 additions & 2 deletions sihl/src/contract_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,18 @@ module type Sig = sig
(** [dispatch ?ctx ?delay input job] queues [job] for later processing and
returns [unit Lwt.t] once the job has been queued.
An optional [callback] function that will be called after the job has been
enqueued.
An optional [delay] determines the amount of time from now (when dispatch
is called) up until the job can be run. If no delay is specified, the job
is processed as soon as possible.
[input] is the input of the [handle] function which is used for job
processing. *)
val dispatch
: ?ctx:(string * string) list
: ?callback:(instance -> unit Lwt.t)
-> ?ctx:(string * string) list
-> ?delay:Ptime.span
-> 'a
-> 'a job
Expand All @@ -154,14 +158,18 @@ module type Sig = sig
If the queue backend supports transactions, [dispatch_all] guarantees that
either none or all jobs are queued.
An optional [callback] function that will be called after the jobs have been
enqueued.
An optional [delay] determines the amount of time from now (when dispatch
is called) up until the jobs can be run. If no delay is specified, the
jobs are processed as soon as possible.
[inputs] is the input of the [handle] function. It is a list of ['a], one
for each ['a job] instance. *)
val dispatch_all
: ?ctx:(string * string) list
: ?callback:(instance -> unit Lwt.t)
-> ?ctx:(string * string) list
-> ?delay:Ptime.span
-> 'a list
-> 'a job
Expand Down
2 changes: 1 addition & 1 deletion sihl/test/web_csrf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ let post_request_both_invalid_tokens_fails _ () =
[ CCFun.id; Sihl.Test.Session.set_value_req [ csrf_name, "garbage" ] ]
in
(* Cartesian product 4 requests, invalid/empty cookie and request *)
let reqs = CCList.product CCFun.( @@ ) add_cookie requests in
let reqs = CCList.product ( @@ ) add_cookie requests in
let allowed = ref 0 in
let handler _ =
allowed := !allowed + 1;
Expand Down

0 comments on commit d1250a2

Please sign in to comment.