diff --git a/backend/src/BwdServer/Server.fs b/backend/src/BwdServer/Server.fs index 17794354bc..ca19c6c81c 100644 --- a/backend/src/BwdServer/Server.fs +++ b/backend/src/BwdServer/Server.fs @@ -476,8 +476,8 @@ let initSerializers () = "Canvas.loadJsonFromDisk" Json.Vanilla.allow "Canvas.loadJsonFromDisk" // Json.Vanilla.allow "eventqueue storage" - // Json.Vanilla.allow - // "TraceCloudStorageFormat" + Json.Vanilla.allow + "TraceCloudStorageFormat" Json.Vanilla.allow "Rollbar" // for Pusher.com payloads diff --git a/backend/src/LibExecution/Execution.fs b/backend/src/LibExecution/Execution.fs index f8c4acd59e..62acd0adaf 100644 --- a/backend/src/LibExecution/Execution.fs +++ b/backend/src/LibExecution/Execution.fs @@ -220,23 +220,25 @@ let traceDvals () : Dictionary.T * RT.Tracing.TraceDval = (results, trace) -// let rec rteToString -// (state : RT.ExecutionState) -// (rte : RT.RuntimeError) -// : Ply = -// uply { -// let errorMessageFn = -// RT.FQFnName.fqPackage -// PackageIDs.Fn.LanguageTools.RuntimeErrors.Error.toErrorMessage - -// let rte = RT.RuntimeError.toDT rte - -// let! rteMessage = executeFunction state errorMessageFn [] (NEList.ofList rte []) - -// match rteMessage with -// | Ok(RT.DString msg) -> return msg -// | Ok(other) -> return string other -// | Error(_, rte) -> -// debuG "Error converting RTE to string" rte -// return! rteToString state rte -// } +let rec rteToString + (rteToDval : RT.RuntimeError.Error -> RT.Dval) + (state : RT.ExecutionState) + (rte : RT.RuntimeError.Error) + : Ply = + uply { + let errorMessageFn = + RT.FQFnName.fqPackage + PackageIDs.Fn.PrettyPrinter.RuntimeTypes.RuntimeError.toErrorMessage + + let rteDval = rteToDval rte + + let! rteMessage = + executeFunction state errorMessageFn [] (NEList.ofList rteDval []) + + match rteMessage with + | Ok(RT.DString msg) -> return msg + | Ok(other) -> return string other + | Error(rte) -> + debuG "Error converting RTE to string" rte + return! rteToString rteToDval state rte + } diff --git a/backend/src/ProdExec/ProdExec.fs b/backend/src/ProdExec/ProdExec.fs index 67b70e5017..9e05d4c86d 100644 --- a/backend/src/ProdExec/ProdExec.fs +++ b/backend/src/ProdExec/ProdExec.fs @@ -161,8 +161,8 @@ let initSerializers () = // Json.Vanilla.allow // "RoundtrippableSerializationFormatV0.Dval" //Json.Vanilla.allow "eventqueue storage" - // Json.Vanilla.allow - // "TraceCloudStorageFormat" + Json.Vanilla.allow + "TraceCloudStorageFormat" Json.Vanilla.allow "Rollbar" // for Pusher.com payloads diff --git a/backend/src/QueueWorker/QueueWorker.fs b/backend/src/QueueWorker/QueueWorker.fs index b46057126c..76332938bd 100644 --- a/backend/src/QueueWorker/QueueWorker.fs +++ b/backend/src/QueueWorker/QueueWorker.fs @@ -32,219 +32,219 @@ type ShouldRetry = | NoRetry -// /// The algorithm here is described in the chart in `docs/eventsV2.md`. -// /// The code below is annotated with names from chart. -// /// -// /// Notes: -// /// - `dequeueAndProcess` will block until it receives a notification. -// /// - Returns a Result containing the notification and the -// /// event on success, and just the notification and failure reason on failure. -// /// - Should not throw on error. -// let processNotification -// (notification : EQ.Notification) -// : Task> = -// task { -// use _span = Telemetry.createRoot "process" -// Telemetry.addTags -// [ "event.time_in_queue_ms", notification.timeInQueue.TotalMilliseconds -// "event.id", notification.data.id -// "event.canvas_id", notification.data.canvasID -// "event.delivery_attempt", notification.deliveryAttempt -// "event.pubsub.ack_id", notification.pubSubAckID -// "event.pubsub.message_id", notification.pubSubMessageID ] - -// // Function used to quit this event -// let stop -// (reason : string) -// (retry : ShouldRetry) -// : Task> = -// task { -// Telemetry.addTags -// [ "queue.completion_reason", reason -// "queue.success", false -// "queue.retrying", retry <> NoRetry ] -// match retry with -// | Retry delay -> return! EQ.requeueEvent notification delay -// | NoRetry -> return! EQ.acknowledgeEvent notification -// return Error(reason, notification) // no events executed -// } - -// // ------- -// // EventLoad -// // ------- -// match! EQ.loadEvent notification.data.canvasID notification.data.id with -// | None -> return! stop "EventMissing" NoRetry -// | Some event -> // EventPresent -// Telemetry.addTags -// [ "event.handler.name", event.name -// "event.handler.modifier", event.modifier -// "event.handler.module", event.module' -// "event.value.type", (event.value |> DvalReprDeveloper.toTypeName :> obj) -// "event.locked_at", event.lockedAt -// "event.enqueued_at", event.enqueuedAt ] - - -// // ------- -// // LockCheck -// // ------- -// let timeLeft = -// match event.lockedAt with -// | Some lockedAt -> // LockExpired -// let expiryTime = lockedAt.Plus(NodaTime.Duration.FromMinutes 5.0) -// // Date math is hard so let's spell it out. `timeLeft` measures how long is -// // left until the lock expires. If there is time left until the lock -// // expires, `timeLeft` is positive. So -// // -// // `timeLeft = expiryTime - now` -// // -// // as that way there is positive `timeLeft` if `expiryTime` is later than -// // `now`. -// expiryTime - Instant.now () -// | None -> NodaTime.Duration.FromSeconds 0.0 // LockNone - -// if timeLeft.TotalSeconds > 0 then -// // RETRY but it means something else is running it so doesn't matter -// return! stop "IsLocked" (Retry timeLeft) -// else // LockNone/LockExpired - -// // ------- -// // RuleCheck -// // ------- -// match! EQ.getRule notification.data.canvasID event with -// | Some rule -> -// // Drop the notification - we'll requeue it if someone unpauses -// Telemetry.addTags -// [ "queue.rule.type", rule.ruleType; "queue.rule.id", rule.id ] -// return! stop "RuleCheckPaused/Blocked" NoRetry -// | None -> // RuleNone -// // ------- -// // DeliveryCheck -// // Note that this happens after all the other checks, as we might have -// // multiple notifications for the same event and we don't want to delete -// // one that is being executed or isn't ready. We stop after 4 retries here -// // because the retries might happen for a reason that isn't strictly -// // retries, such as lockedAt. -// // ------- -// if notification.deliveryAttempt >= 5 then -// // DeliveryTooManyRetries -// do! EQ.deleteEvent event -// return! stop "DeliveryTooMany" NoRetry -// else // DeliveryPermitted - -// // ------- -// // LockClaim -// // ------- -// match! EQ.claimLock event notification with -// | Error msg -> -// // Someone else just claimed the lock! -// let retryTime = NodaTime.Duration.FromSeconds 300.0 -// return! stop $"LockClaimFailed: {msg}" (Retry retryTime) -// | Ok() -> // LockClaimed - -// // ------- -// // Process -// // ------- -// let! canvas = -// Exception.taskCatch (fun () -> -// task { -// return! -// Canvas.loadForEvent -// notification.data.canvasID -// event.module' -// event.name -// event.modifier -// }) -// match canvas with -// | None -> -// do! EQ.deleteEvent event -// return! stop "MissingCanvas" NoRetry -// | Some c -> -// let traceID = AT.TraceID.create () -// let desc = (event.module', event.name, event.modifier) -// Telemetry.addTags [ "canvasID", c.id; "trace_id", traceID ] - - -// // CLEANUP switch events and scheduling rules to use TLIDs instead of eventDescs -// let h = -// c.handlers -// |> Map.values -// |> List.filter (fun h -> -// Some desc = PTParser.Handler.Spec.toEventDesc h.spec) -// |> List.head - -// match h with -// | None -> -// // If an event gets put in the queue and there's no handler for it, -// // they're probably emiting to a handler they haven't created yet. -// // In this case, all they need to build is the trace. So just drop -// // this event immediately. - -// // TODO: reenable using CloudStorage -// // let! timestamp = TI.storeEvent c.id traceID desc event.value -// // Pusher.push -// // LibClientTypesToCloudTypes.Pusher.eventSerializer -// // c.id -// // (Pusher.New404( -// // event.module', -// // event.name, -// // event.modifier, -// // timestamp, -// // traceID -// // )) -// // None - -// do! EQ.deleteEvent event -// return! stop "MissingHandler" NoRetry -// | Some h -> - -// // If we acknowledge the event here, and the machine goes down, -// // PubSub will retry this once the ack deadline runs out -// do! EQ.extendDeadline notification - -// // CLEANUP Set a time limit of 3m -// try -// let! program = Canvas.toProgram c -// let! (result, traceResults) = -// CloudExecution.executeHandler -// LibClientTypesToCloudTypes.Pusher.eventSerializer -// (PT2RT.Handler.toRT h) -// program -// traceID -// (Map [ "event", event.value ]) -// (CloudExecution.InitialExecution( -// EQ.toEventDesc event, -// "event", -// event.value -// )) - -// Telemetry.addTags -// [ "result_type", DvalReprDeveloper.toTypeName result -// "queue.success", true -// "executed_tlids", HashSet.toList traceResults.tlids -// "queue.completion_reason", "completed" ] -// // ExecutesToCompletion - -// // ------- -// // Delete -// // ------- -// do! EQ.deleteEvent event -// do! EQ.acknowledgeEvent notification - -// // ------- -// // End -// // ------- -// return Ok(event, notification) -// with _ -> -// // This automatically increments the deliveryAttempt, so it might -// // be deleted at the next iteration. -// let timeLeft = NodaTime.Duration.FromSeconds 301.0 -// return! stop "RetryAllowed" (Retry timeLeft) -// } +/// The algorithm here is described in the chart in `docs/eventsV2.md`. +/// The code below is annotated with names from chart. +/// +/// Notes: +/// - `dequeueAndProcess` will block until it receives a notification. +/// - Returns a Result containing the notification and the +/// event on success, and just the notification and failure reason on failure. +/// - Should not throw on error. +let processNotification + (notification : EQ.Notification) + : Task> = + task { + use _span = Telemetry.createRoot "process" + Telemetry.addTags + [ "event.time_in_queue_ms", notification.timeInQueue.TotalMilliseconds + "event.id", notification.data.id + "event.canvas_id", notification.data.canvasID + "event.delivery_attempt", notification.deliveryAttempt + "event.pubsub.ack_id", notification.pubSubAckID + "event.pubsub.message_id", notification.pubSubMessageID ] + + // Function used to quit this event + let stop + (reason : string) + (retry : ShouldRetry) + : Task> = + task { + Telemetry.addTags + [ "queue.completion_reason", reason + "queue.success", false + "queue.retrying", retry <> NoRetry ] + match retry with + | Retry delay -> return! EQ.requeueEvent notification delay + | NoRetry -> return! EQ.acknowledgeEvent notification + return Error(reason, notification) // no events executed + } + + // ------- + // EventLoad + // ------- + match! EQ.loadEvent notification.data.canvasID notification.data.id with + | None -> return! stop "EventMissing" NoRetry + | Some event -> // EventPresent + Telemetry.addTags + [ "event.handler.name", event.name + "event.handler.modifier", event.modifier + "event.handler.module", event.module' + //"event.value.type", (event.value |> DvalReprDeveloper.toTypeName :> obj) + "event.locked_at", event.lockedAt + "event.enqueued_at", event.enqueuedAt ] + + + // ------- + // LockCheck + // ------- + let timeLeft = + match event.lockedAt with + | Some lockedAt -> // LockExpired + let expiryTime = lockedAt.Plus(NodaTime.Duration.FromMinutes 5.0) + // Date math is hard so let's spell it out. `timeLeft` measures how long is + // left until the lock expires. If there is time left until the lock + // expires, `timeLeft` is positive. So + // + // `timeLeft = expiryTime - now` + // + // as that way there is positive `timeLeft` if `expiryTime` is later than + // `now`. + expiryTime - Instant.now () + | None -> NodaTime.Duration.FromSeconds 0.0 // LockNone + + if timeLeft.TotalSeconds > 0 then + // RETRY but it means something else is running it so doesn't matter + return! stop "IsLocked" (Retry timeLeft) + else // LockNone/LockExpired + + // ------- + // RuleCheck + // ------- + match! EQ.getRule notification.data.canvasID event with + | Some rule -> + // Drop the notification - we'll requeue it if someone unpauses + Telemetry.addTags + [ "queue.rule.type", rule.ruleType; "queue.rule.id", rule.id ] + return! stop "RuleCheckPaused/Blocked" NoRetry + | None -> // RuleNone + // ------- + // DeliveryCheck + // Note that this happens after all the other checks, as we might have + // multiple notifications for the same event and we don't want to delete + // one that is being executed or isn't ready. We stop after 4 retries here + // because the retries might happen for a reason that isn't strictly + // retries, such as lockedAt. + // ------- + if notification.deliveryAttempt >= 5 then + // DeliveryTooManyRetries + do! EQ.deleteEvent event + return! stop "DeliveryTooMany" NoRetry + else // DeliveryPermitted + + // ------- + // LockClaim + // ------- + match! EQ.claimLock event notification with + | Error msg -> + // Someone else just claimed the lock! + let retryTime = NodaTime.Duration.FromSeconds 300.0 + return! stop $"LockClaimFailed: {msg}" (Retry retryTime) + | Ok() -> // LockClaimed + + // ------- + // Process + // ------- + let! canvas = + Exception.taskCatch (fun () -> + task { + return! + Canvas.loadForEvent + notification.data.canvasID + event.module' + event.name + event.modifier + }) + match canvas with + | None -> + do! EQ.deleteEvent event + return! stop "MissingCanvas" NoRetry + | Some c -> + let traceID = AT.TraceID.create () + let desc = (event.module', event.name, event.modifier) + Telemetry.addTags [ "canvasID", c.id; "trace_id", traceID ] + + + // CLEANUP switch events and scheduling rules to use TLIDs instead of eventDescs + let h = + c.handlers + |> Map.values + |> List.filter (fun h -> + Some desc = PTParser.Handler.Spec.toEventDesc h.spec) + |> List.head + + match h with + | None -> + // If an event gets put in the queue and there's no handler for it, + // they're probably emiting to a handler they haven't created yet. + // In this case, all they need to build is the trace. So just drop + // this event immediately. + + // TODO: reenable using CloudStorage + // let! timestamp = TI.storeEvent c.id traceID desc event.value + // Pusher.push + // LibClientTypesToCloudTypes.Pusher.eventSerializer + // c.id + // (Pusher.New404( + // event.module', + // event.name, + // event.modifier, + // timestamp, + // traceID + // )) + // None + + do! EQ.deleteEvent event + return! stop "MissingHandler" NoRetry + | Some _h -> + + // If we acknowledge the event here, and the machine goes down, + // PubSub will retry this once the ack deadline runs out + do! EQ.extendDeadline notification + + // CLEANUP Set a time limit of 3m + try + let! _program = Canvas.toProgram c + // let! (result, traceResults) = + // CloudExecution.executeHandler + // LibClientTypesToCloudTypes.Pusher.eventSerializer + // (PT2RT.Handler.toRT h) + // program + // traceID + // (Map [ "event", event.value ]) + // (CloudExecution.InitialExecution( + // EQ.toEventDesc event, + // "event", + // event.value + // )) + + Telemetry.addTags + [ //"result_type", DvalReprDeveloper.toTypeName result + "queue.success", true + //"executed_tlids", HashSet.toList traceResults.tlids + "queue.completion_reason", "completed" ] + // ExecutesToCompletion + + // ------- + // Delete + // ------- + do! EQ.deleteEvent event + do! EQ.acknowledgeEvent notification + + // ------- + // End + // ------- + return Ok(event, notification) + with _ -> + // This automatically increments the deliveryAttempt, so it might + // be deleted at the next iteration. + let timeLeft = NodaTime.Duration.FromSeconds 301.0 + return! stop "RetryAllowed" (Retry timeLeft) + } /// Run in the background, using the semaphore to track completion let runInBackground (semaphore : System.Threading.SemaphoreSlim) - //(notification : EQ.Notification) + (notification : EQ.Notification) : unit = // Ensure we get a lock before the background task starts. We should always get a // lock here, but if something goes awry it's better that we wait rather than fetch @@ -252,7 +252,7 @@ let runInBackground semaphore.Wait() backgroundTask { try - //let! (_ : Result<_, _>) = processNotification notification + let! (_ : Result<_, _>) = processNotification notification return () finally semaphore.Release() |> ignore @@ -309,8 +309,8 @@ let initSerializers () = // "RoundtrippableSerializationFormatV0.Dval" Json.Vanilla.allow "Canvas.loadJsonFromDisk" //Json.Vanilla.allow "eventqueue storage" - // Json.Vanilla.allow - // "TraceCloudStorageFormat" + Json.Vanilla.allow + "TraceCloudStorageFormat" Json.Vanilla.allow "Rollbar" // for Pusher.com payloads diff --git a/backend/tests/TestUtils/TestUtils.fs b/backend/tests/TestUtils/TestUtils.fs index 269aee6479..e30434e2d4 100644 --- a/backend/tests/TestUtils/TestUtils.fs +++ b/backend/tests/TestUtils/TestUtils.fs @@ -84,8 +84,8 @@ let initializeTestCanvas (name : string) : Task = // : PT.Handler.T = // { tlid = gid (); ast = ast; spec = PT.Handler.Cron(name, interval) } -// let testWorker (name : string) (ast : PT.Expr) : PT.Handler.T = -// { tlid = gid (); ast = ast; spec = PT.Handler.Worker name } +let testWorker (name : string) (ast : PT.Expr) : PT.Handler.T = + { tlid = gid (); ast = ast; spec = PT.Handler.Worker name } let testPackageFn (owner : string) @@ -1470,87 +1470,86 @@ let interestingInts : List = module Http = type T = { status : string; headers : (string * string) list; body : byte array } -// let setHeadersToCRLF (text : byte array) : byte array = -// // We keep our test files with an LF line ending, but the HTTP spec -// // requires headers (but not the body, nor the first line) to have CRLF -// // line endings -// let mutable justSawNewline = false -// let mutable inBody = false - -// text -// |> Array.toList -// |> List.collect (fun b -> -// if not inBody && b = byte '\n' then -// if justSawNewline then inBody <- true -// justSawNewline <- true -// [ byte '\r'; b ] -// else -// justSawNewline <- false -// [ b ]) -// |> List.toArray - -// let split (response : byte array) : T = -// // read a single line of bytes (a line ends with \r\n) -// let rec consume (existing : byte list) (l : byte list) : byte list * byte list = -// match l with -// | [] -> [], [] -// | 13uy :: 10uy :: tail -> existing, tail -// | head :: tail -> consume (existing @ [ head ]) tail - -// // read all headers (ends when we get two \r\n in a row), return headers -// // and remaining byte string (the body). Assumes the status line is not -// // present. Headers are returned reversed -// let rec consumeHeaders -// (headers : string list) -// (l : byte list) -// : string list * byte list = -// let (line, remaining) = consume [] l - -// if line = [] then -// (headers, remaining) -// else -// let str = line |> Array.ofList |> UTF8.ofBytesUnsafe -// consumeHeaders (str :: headers) remaining - -// let bytes = Array.toList response - -// // read the status like (eg HTTP 200 OK) -// let status, bytes = consume [] bytes - -// let headers, body = consumeHeaders [] bytes - -// let headers = -// headers -// |> List.reverse -// |> List.map (fun s -> -// match String.split ":" s with -// | k :: vs -> (k, vs |> String.concat ":" |> String.trimLeft) -// | _ -> Exception.raiseInternal $"not a valid header" [ "header", s ]) - - -// { status = status |> List.toArray |> UTF8.ofBytesUnsafe -// headers = headers -// body = List.toArray body } - -// // For an ASP.NET http server, remove the default loggers and add a file logger that -// // saves the output in rundir/logs -// open Microsoft.Extensions.Logging -// open Microsoft.Extensions.DependencyInjection -// open NReco.Logging.File - -// let configureLogging -// (name : string) -// (builder : Microsoft.Extensions.Logging.ILoggingBuilder) -// : unit = -// // This removes the default ConsoleLogger. Having two console loggers (this one and -// // also the one in Main), caused a deadlock (possibly from having two different -// // console logging threads). -// builder -// .ClearProviders() -// .Services.AddLogging(fun loggingBuilder -> -// loggingBuilder.AddFile($"{LibCloud.Config.logDir}{name}.log", append = false) -// |> ignore) -// |> ignore + let setHeadersToCRLF (text : byte array) : byte array = + // We keep our test files with an LF line ending, but the HTTP spec + // requires headers (but not the body, nor the first line) to have CRLF + // line endings + let mutable justSawNewline = false + let mutable inBody = false + + text + |> Array.toList + |> List.collect (fun b -> + if not inBody && b = byte '\n' then + if justSawNewline then inBody <- true + justSawNewline <- true + [ byte '\r'; b ] + else + justSawNewline <- false + [ b ]) + |> List.toArray + + let split (response : byte array) : T = + // read a single line of bytes (a line ends with \r\n) + let rec consume (existing : byte list) (l : byte list) : byte list * byte list = + match l with + | [] -> [], [] + | 13uy :: 10uy :: tail -> existing, tail + | head :: tail -> consume (existing @ [ head ]) tail + + // read all headers (ends when we get two \r\n in a row), return headers + // and remaining byte string (the body). Assumes the status line is not + // present. Headers are returned reversed + let rec consumeHeaders + (headers : string list) + (l : byte list) + : string list * byte list = + let (line, remaining) = consume [] l + + if line = [] then + (headers, remaining) + else + let str = line |> Array.ofList |> UTF8.ofBytesUnsafe + consumeHeaders (str :: headers) remaining + + let bytes = Array.toList response + + // read the status like (eg HTTP 200 OK) + let status, bytes = consume [] bytes + + let headers, body = consumeHeaders [] bytes + + let headers = + headers + |> List.reverse + |> List.map (fun s -> + match String.split ":" s with + | k :: vs -> (k, vs |> String.concat ":" |> String.trimLeft) + | _ -> Exception.raiseInternal $"not a valid header" [ "header", s ]) + + { status = status |> List.toArray |> UTF8.ofBytesUnsafe + headers = headers + body = List.toArray body } + +// For an ASP.NET http server, remove the default loggers and add a file logger that +// saves the output in rundir/logs +open Microsoft.Extensions.Logging +open Microsoft.Extensions.DependencyInjection +open NReco.Logging.File + +let configureLogging + (name : string) + (builder : Microsoft.Extensions.Logging.ILoggingBuilder) + : unit = + // This removes the default ConsoleLogger. Having two console loggers (this one and + // also the one in Main), caused a deadlock (possibly from having two different + // console logging threads). + builder + .ClearProviders() + .Services.AddLogging(fun loggingBuilder -> + loggingBuilder.AddFile($"{LibCloud.Config.logDir}{name}.log", append = false) + |> ignore) + |> ignore // let unwrapExecutionResult @@ -1579,27 +1578,27 @@ module Http = // | _ -> return RT.DString(string rte) // } -// let parsePTExpr (code : string) : Task = -// uply { -// let! (state : RT.ExecutionState) = -// let canvasID = System.Guid.NewGuid() -// executionStateFor pmPT canvasID false false Map.empty - -// let name = -// RT.FQFnName.FQFnName.Package PackageIDs.Fn.LanguageTools.Parser.parsePTExpr - -// let args = NEList.singleton (RT.DString code) -// let! execResult = LibExecution.Execution.executeFunction state name [] args - -// match execResult with -// | Ok dval -> -// match C2DT.Result.fromDT PT2DT.Expr.fromDT dval identity with -// | Ok expr -> return expr -// | Error _ -> -// return Exception.raiseInternal "Error converting Dval to PT.Expr" [] -// | _ -> return Exception.raiseInternal "Error executing parsePTExpr function" [] -// } -// |> Ply.toTask +let parsePTExpr (code : string) : Task = + uply { + let! (state : RT.ExecutionState) = + let canvasID = System.Guid.NewGuid() + executionStateFor pmPT canvasID false false //Map.empty + + let name = + RT.FQFnName.FQFnName.Package PackageIDs.Fn.LanguageTools.Parser.parsePTExpr + + let args = NEList.singleton (RT.DString code) + let! execResult = LibExecution.Execution.executeFunction state name [] args + + match execResult with + | Ok dval -> + match C2DT.Result.fromDT PT2DT.Expr.fromDT dval identity with + | Ok expr -> return expr + | Error _ -> + return Exception.raiseInternal "Error converting Dval to PT.Expr" [] + | _ -> return Exception.raiseInternal "Error executing parsePTExpr function" [] + } + |> Ply.toTask module Internal = module Test = diff --git a/backend/tests/Tests/HttpClient.Tests.fs b/backend/tests/Tests/HttpClient.Tests.fs index cac4f2c367..0ceaa50acd 100644 --- a/backend/tests/Tests/HttpClient.Tests.fs +++ b/backend/tests/Tests/HttpClient.Tests.fs @@ -26,96 +26,97 @@ type ConcurrentDictionary<'a, 'b> = open Prelude -// module RT = LibExecution.RuntimeTypes -// module PT = LibExecution.ProgramTypes -// module PT2RT = LibExecution.ProgramTypesToRuntimeTypes -// module Exe = LibExecution.Execution -// module PackageIDs = LibExecution.PackageIDs -// module C2DT = LibExecution.CommonToDarkTypes - -// open TestUtils.TestUtils - -// type TestCase = -// { -// /// What we expect the webserver to receive when the HttpClient is used -// expectedRequest : Http.T - -// /// The request that our webserver actually receives -// actualRequest : Option - -// /// This is the result that our mock HTTP is configured to return for the test, -// /// as set by the test file. -// responseToReturn : Http.T -// } - -// let testCases : ConcurrentDictionary = ConcurrentDictionary() - - -// let host = $"test.dlio.localhost:{TestConfig.httpClientPort}" - -// let normalizeHeaders -// (body : byte array) -// (headers : (string * string) list) -// : (string * string) list = -// headers -// |> List.map (fun (key, value) -> -// match value with -// // make writing tests easier -// | "HOST" when String.equalsCaseInsensitive "Host" key -> (key, host) -// // optionally change content length for writing responses more easily -// | "LENGTH" when String.equalsCaseInsensitive "Content-length" key -> -// key, string body.Length -// | other -> key, other) - -// let randomBytes = -// [ 0x2euy; 0x0Auy; 0xE8uy; 0xE6uy; 0xF1uy; 0xE0uy; 0x9Buy; 0xA6uy; 0xEuy ] - -// let updateBody (body : byte array) : byte array = -// // Test cases are parsed as strings, so we can't "add bytes" to the tests directly. -// // To get around this, we include the string "RANDOM_BYTES" in our test file, -// // and replace that during the test at run-time. -// let randomBytes = List.range 0 255 |> List.map (fun i -> byte i) - -// let rec find (bytes : byte list) : byte list = -// match bytes with -// // Note: "51 41 ..." is equivalent to "RANDOM_BYTES" -// | 0x52uy :: 0x41uy :: 0x4Euy :: 0x44uy :: 0x4Fuy :: 0x4Duy :: 0x5Fuy :: 0x42uy :: 0x59uy :: 0x54uy :: 0x45uy :: 0x53uy :: tail -> -// randomBytes @ find tail -// | [] -> [] -// | head :: tail -> head :: find tail - -// body |> List.fromArray |> find |> List.toArray - - - -// let pmPT = LibCloud.PackageManager.pt - -// let parseSingleTestFromFile -// (filename : string) -// (test : string) -// : Ply = -// uply { -// let! (state : RT.ExecutionState) = -// let canvasID = System.Guid.NewGuid() -// executionStateFor pmPT canvasID false false //Map.empty - -// let name = -// RT.FQFnName.FQFnName.Package -// PackageIDs.Fn.Internal.Test.parseSingleTestFromFile - -// let args = NEList.ofList (RT.DString filename) [ RT.DString test ] -// let! execResult = LibExecution.Execution.executeFunction state name [] args - -// match execResult with -// | Ok dval -> return Internal.Test.fromDT dval -// | Error(rte) -> -// let! rteString = Exe.rteToString state rte -// return -// Exception.raiseInternal -// "Error executing parseSingleTestFromFile function" -// [ "error", rteString ] - -// } +module RT = LibExecution.RuntimeTypes +module PT = LibExecution.ProgramTypes +module PT2RT = LibExecution.ProgramTypesToRuntimeTypes +module RT2DT = LibExecution.RuntimeTypesToDarkTypes +module Exe = LibExecution.Execution +module PackageIDs = LibExecution.PackageIDs +module C2DT = LibExecution.CommonToDarkTypes + +open TestUtils.TestUtils + +type TestCase = + { + /// What we expect the webserver to receive when the HttpClient is used + expectedRequest : Http.T + + /// The request that our webserver actually receives + actualRequest : Option + + /// This is the result that our mock HTTP is configured to return for the test, + /// as set by the test file. + responseToReturn : Http.T + } + +let testCases : ConcurrentDictionary = ConcurrentDictionary() + + +let host = $"test.dlio.localhost:{TestConfig.httpClientPort}" + +let normalizeHeaders + (body : byte array) + (headers : (string * string) list) + : (string * string) list = + headers + |> List.map (fun (key, value) -> + match value with + // make writing tests easier + | "HOST" when String.equalsCaseInsensitive "Host" key -> (key, host) + // optionally change content length for writing responses more easily + | "LENGTH" when String.equalsCaseInsensitive "Content-length" key -> + key, string body.Length + | other -> key, other) + +let randomBytes = + [ 0x2euy; 0x0Auy; 0xE8uy; 0xE6uy; 0xF1uy; 0xE0uy; 0x9Buy; 0xA6uy; 0xEuy ] + +let updateBody (body : byte array) : byte array = + // Test cases are parsed as strings, so we can't "add bytes" to the tests directly. + // To get around this, we include the string "RANDOM_BYTES" in our test file, + // and replace that during the test at run-time. + let randomBytes = List.range 0 255 |> List.map (fun i -> byte i) + + let rec find (bytes : byte list) : byte list = + match bytes with + // Note: "51 41 ..." is equivalent to "RANDOM_BYTES" + | 0x52uy :: 0x41uy :: 0x4Euy :: 0x44uy :: 0x4Fuy :: 0x4Duy :: 0x5Fuy :: 0x42uy :: 0x59uy :: 0x54uy :: 0x45uy :: 0x53uy :: tail -> + randomBytes @ find tail + | [] -> [] + | head :: tail -> head :: find tail + + body |> List.fromArray |> find |> List.toArray + + + +let pmPT = LibCloud.PackageManager.pt + +let parseSingleTestFromFile + (filename : string) + (test : string) + : Ply = + uply { + let! (state : RT.ExecutionState) = + let canvasID = System.Guid.NewGuid() + executionStateFor pmPT canvasID false false //Map.empty + + let name = + RT.FQFnName.FQFnName.Package + PackageIDs.Fn.Internal.Test.parseSingleTestFromFile + + let args = NEList.ofList (RT.DString filename) [ RT.DString test ] + let! execResult = LibExecution.Execution.executeFunction state name [] args + + match execResult with + | Ok dval -> return Internal.Test.fromDT dval + | Error(rte) -> + let! rteString = Exe.rteToString RT2DT.RuntimeError.toDT state rte + return + Exception.raiseInternal + "Error executing parseSingleTestFromFile function" + [ "error", rteString ] + + } // let parseTest (filename : string) (test : string) : Ply = // uply { @@ -130,283 +131,284 @@ open Prelude // name = ptTest.name } // } -// let makeTest versionName filename = -// // Parse the file contents now, rather than later, so that tests that refer to -// // other tests (that is, tests for redirects) will work. -// let shouldSkipTest = String.startsWith "_" filename -// let testName = -// let withoutPrefix = -// if shouldSkipTest then String.dropLeft 1 filename else filename -// withoutPrefix |> String.dropRight 5 // ".test" - -// let filename = $"{baseDirectory}/{versionName}/{filename}" -// let content = System.IO.File.ReadAllBytes filename |> UTF8.ofBytesUnsafe - -// // Parse Handler code, expected HTTP request, and (static) HTTP response to return -// let expectedRequest, response, darkCode = -// let m = -// Regex.Match( -// content, -// "^(\[expected-request\]\n(.*)\n)\[response\]\n(.*)\n\n\[test\]\n(.*)$", -// RegexOptions.Singleline -// ) - -// if not m.Success then -// Exception.raiseInternal $"incorrect format" [ "name", testName ] -// let g = m.Groups - -// (g[2].Value, g[3].Value, g[4].Value) - -// let expected = -// expectedRequest |> UTF8.toBytes |> Http.setHeadersToCRLF |> Http.split -// let newExpectedBody = updateBody expected.body -// let expected = -// { expected with -// headers = normalizeHeaders newExpectedBody expected.headers -// body = newExpectedBody } - -// let response = response |> UTF8.toBytes |> Http.setHeadersToCRLF |> Http.split -// let newResponseBody = updateBody response.body -// let response = -// { response with -// headers = normalizeHeaders newResponseBody response.headers -// body = newResponseBody } - -// let dictKey = $"{versionName}/{testName}" - -// testCases[dictKey] <- -// { expectedRequest = expected; actualRequest = None; responseToReturn = response } - - -// // Load the testcases first so that redirection works -// testTask testName { -// // debuG "expectedRequest" (toStr expectedRequest) -// // debuG "response" (toStr response) -// // debuG "darkCode" darkCode - -// if shouldSkipTest then -// skiptest $"underscore test - {testName}" -// else -// // Set up the canvas -// let canvasID = System.Guid.NewGuid() -// let! state = executionStateFor pmPT canvasID false true Map.empty - -// // Parse the Dark code -// let! (test : Internal.Test.RTTest) = -// darkCode -// |> String.replace "URL" $"{host}/{versionName}/{testName}" -// // CLEANUP: this doesn't use the correct length, as it might be latin1 or -// // compressed -// |> String.replace "LENGTH" (string response.body.Length) -// |> parseTest "httpclient.tests.fs" -// |> Ply.toTask - -// // Run the handler (call the HTTP client) -// // Note: this will update the corresponding value in `testCases` with the -// // actual request received -// let! actual = Exe.executeExpr state Map.empty test.actual - -// // First check: expected HTTP request matches actual HTTP request -// let tc = testCases[dictKey] -// match tc.actualRequest with -// | None -> -// // We failed to make a request - almost undoubtedly, the result will be some -// // sort of error -// () //Expect.equal 1 2 "Unexpected - no actual request has been saved" -// | Some actualRequest -> -// Expect.equal actualRequest tc.expectedRequest "requests don't match" - -// // Second check: expected result (Dval) matches actual result (Dval) -// let actual = Result.map normalizeDvalResult actual - -// let! expected = Exe.executeExpr state Map.empty test.expected -// match actual, expected with -// | Ok actual, Ok expected -> -// return Expect.equalDval actual expected $"Responses don't match" -// | _ -> Expect.equal actual expected $"Responses don't match" -// } - - -// // --------------- -// // This is the webserver that we will be testing against. -// // It records the received request, and returns the test case output. -// // --------------- -// open Microsoft.AspNetCore -// open Microsoft.AspNetCore.Builder -// open Microsoft.AspNetCore.Hosting -// open Microsoft.AspNetCore.Http -// open Microsoft.AspNetCore.Http.Extensions -// open Microsoft.Extensions.Hosting - -// type Compression = -// | Deflate -// | Brotli -// | Gzip - -// let runTestHandler (ctx : HttpContext) : Task = -// task { -// try -// let versionName, testName = -// let segments = System.Uri(ctx.Request.Path.Value).Segments - -// let versionName = segments[1] -// let versionName = -// if String.endsWith "/" versionName then -// String.dropRight 1 versionName -// else -// versionName - -// let testName = segments[2] -// let testName = -// if String.endsWith "/" testName then -// String.dropRight 1 testName -// else -// testName - -// versionName, testName - -// let dictKey = $"{versionName}/{testName}" - -// let testCase = -// try -// Some testCases[dictKey] -// with _ -> -// None - -// match testCase with -// | None -> -// ctx.Response.StatusCode <- 404 -// let body = "intentionally not found" |> UTF8.toBytes -// ctx.Response.ContentLength <- int64 body.Length -// do! ctx.Response.Body.WriteAsync(body, 0, body.Length) -// return ctx - -// | Some testCase -> -// // Gather status, headers, and body from the actual request -// let actualStatus = -// $"{ctx.Request.Method} {ctx.Request.GetEncodedPathAndQuery()} {ctx.Request.Protocol}" -// let actualHeaders = BwdServer.Server.getHeadersWithoutMergingKeys ctx -// let! actualBody = BwdServer.Server.getBody ctx -// let actualRequest : Http.T = -// { status = actualStatus; headers = actualHeaders; body = actualBody } - - -// // Update the TestCase with the actual request; -// // also, update the PATH in the status line of the expected request -// let updatedTestCase = -// { testCase with -// actualRequest = Some actualRequest -// expectedRequest = -// { testCase.expectedRequest with -// status = -// testCase.expectedRequest.status -// |> String.replace "PATH" ctx.Request.Path.Value } } -// testCases[dictKey] <- updatedTestCase - - -// // Return the response -// let mutable compression = None -// let mutable transcodeToLatin1 = false - -// ctx.Response.StatusCode <- -// testCase.responseToReturn.status -// |> String.split " " -// |> List.getAt 1 -// |> Exception.unwrapOptionInternal -// "invalid status code" -// [ "status", testCase.responseToReturn.status ] -// |> int - -// testCase.responseToReturn.headers -// |> List.iter (fun (k, v) -> -// if String.equalsCaseInsensitive k "Content-Encoding" then -// if v = "deflate" then compression <- Some Deflate -// else if v = "br" then compression <- Some Brotli -// else if v = "gzip" then compression <- Some Gzip -// else () -// elif String.equalsCaseInsensitive k "Content-Type" then -// if -// v.Contains "charset=iso-8859-1" -// || v.Contains "charset=latin1" -// || v.Contains "us-ascii" -// then -// transcodeToLatin1 <- true - -// BwdServer.Server.setResponseHeader ctx k v) - -// let data = -// if transcodeToLatin1 then -// System.Text.Encoding.Convert( -// System.Text.Encoding.UTF8, -// System.Text.Encoding.Latin1, -// testCase.responseToReturn.body -// ) -// else -// testCase.responseToReturn.body - -// match compression with -// | Some algo -> -// let stream : Stream = -// let body = ctx.Response.Body -// match algo with -// | Gzip -> new GZipStream(body, CompressionMode.Compress) -// | Brotli -> new BrotliStream(body, CompressionMode.Compress) -// | Deflate -> new DeflateStream(body, CompressionMode.Compress) -// do! stream.WriteAsync(data, 0, data.Length) -// do! stream.FlushAsync() -// do! stream.DisposeAsync() -// | None -> -// if ctx.Response.StatusCode <> 304 then -// do! ctx.Response.Body.WriteAsync(data, 0, data.Length) - -// return ctx -// with e -> -// // It might already have started, in which case let's just get the exception in -// // the body and hope that helps -// if not ctx.Response.HasStarted then ctx.Response.StatusCode <- 500 - -// let body = $"{e.Message}\n\n{e.StackTrace}" -// print $"{body}-{ctx.Request.Path}" -// let body = UTF8.toBytes body - -// do! ctx.Response.Body.WriteAsync(body, 0, body.Length) -// return ctx -// } - - - - -// let configureApp (app : IApplicationBuilder) = -// let handler (ctx : HttpContext) : Task = runTestHandler ctx -// app.Run(RequestDelegate handler) - -// let webserver () = -// Host.CreateDefaultBuilder() -// |> fun h -> h.ConfigureLogging(configureLogging "test-httpclient-server") -// |> fun h -> -// h.ConfigureWebHost(fun wh -> -// wh -// |> fun wh -> wh.UseKestrel() -// |> fun wh -> wh.UseUrls($"http://*:{TestConfig.httpClientPort}") -// |> fun wh -> wh.Configure(configureApp) -// |> ignore) -// |> fun h -> h.Build() +let makeTest versionName filename = + // Parse the file contents now, rather than later, so that tests that refer to + // other tests (that is, tests for redirects) will work. + let shouldSkipTest = String.startsWith "_" filename + let testName = + let withoutPrefix = + if shouldSkipTest then String.dropLeft 1 filename else filename + withoutPrefix |> String.dropRight 5 // ".test" + + let filename = $"{baseDirectory}/{versionName}/{filename}" + let content = System.IO.File.ReadAllBytes filename |> UTF8.ofBytesUnsafe + + // Parse Handler code, expected HTTP request, and (static) HTTP response to return + let expectedRequest, response, _darkCode = + let m = + Regex.Match( + content, + "^(\[expected-request\]\n(.*)\n)\[response\]\n(.*)\n\n\[test\]\n(.*)$", + RegexOptions.Singleline + ) + + if not m.Success then + Exception.raiseInternal $"incorrect format" [ "name", testName ] + let g = m.Groups + + (g[2].Value, g[3].Value, g[4].Value) + + let expected = + expectedRequest |> UTF8.toBytes |> Http.setHeadersToCRLF |> Http.split + let newExpectedBody = updateBody expected.body + let expected = + { expected with + headers = normalizeHeaders newExpectedBody expected.headers + body = newExpectedBody } + + let response = response |> UTF8.toBytes |> Http.setHeadersToCRLF |> Http.split + let newResponseBody = updateBody response.body + let response = + { response with + headers = normalizeHeaders newResponseBody response.headers + body = newResponseBody } + + let dictKey = $"{versionName}/{testName}" + + testCases[dictKey] <- + { expectedRequest = expected; actualRequest = None; responseToReturn = response } + + + // Load the testcases first so that redirection works + testTask testName { + // debuG "expectedRequest" (toStr expectedRequest) + // debuG "response" (toStr response) + // debuG "darkCode" darkCode + + if shouldSkipTest then + skiptest $"underscore test - {testName}" + else + // Set up the canvas + let canvasID = System.Guid.NewGuid() + let! _state = executionStateFor pmPT canvasID false true // Map.empty + + // // Parse the Dark code + // let! (test : Internal.Test.RTTest) = + // darkCode + // |> String.replace "URL" $"{host}/{versionName}/{testName}" + // // CLEANUP: this doesn't use the correct length, as it might be latin1 or + // // compressed + // |> String.replace "LENGTH" (string response.body.Length) + // |> parseTest "httpclient.tests.fs" + // |> Ply.toTask + + // // Run the handler (call the HTTP client) + // // Note: this will update the corresponding value in `testCases` with the + // // actual request received + // let! actual = Exe.executeExpr state Map.empty test.actual + + // // First check: expected HTTP request matches actual HTTP request + // let tc = testCases[dictKey] + // match tc.actualRequest with + // | None -> + // // We failed to make a request - almost undoubtedly, the result will be some + // // sort of error + // () //Expect.equal 1 2 "Unexpected - no actual request has been saved" + // | Some actualRequest -> + // Expect.equal actualRequest tc.expectedRequest "requests don't match" + + // // Second check: expected result (Dval) matches actual result (Dval) + // let actual = Result.map normalizeDvalResult actual + + // let! expected = Exe.executeExpr state Map.empty test.expected + // match actual, expected with + // | Ok actual, Ok expected -> + // return Expect.equalDval actual expected $"Responses don't match" + // | _ -> Expect.equal actual expected $"Responses don't match" + + Expect.equal 1 2 "test" + } + + +// --------------- +// This is the webserver that we will be testing against. +// It records the received request, and returns the test case output. +// --------------- +open Microsoft.AspNetCore +open Microsoft.AspNetCore.Builder +open Microsoft.AspNetCore.Hosting +open Microsoft.AspNetCore.Http +open Microsoft.AspNetCore.Http.Extensions +open Microsoft.Extensions.Hosting + +type Compression = + | Deflate + | Brotli + | Gzip + +let runTestHandler (ctx : HttpContext) : Task = + task { + try + let versionName, testName = + let segments = System.Uri(ctx.Request.Path.Value).Segments + + let versionName = segments[1] + let versionName = + if String.endsWith "/" versionName then + String.dropRight 1 versionName + else + versionName + + let testName = segments[2] + let testName = + if String.endsWith "/" testName then + String.dropRight 1 testName + else + testName + + versionName, testName + + let dictKey = $"{versionName}/{testName}" + + let testCase = + try + Some testCases[dictKey] + with _ -> + None + + match testCase with + | None -> + ctx.Response.StatusCode <- 404 + let body = "intentionally not found" |> UTF8.toBytes + ctx.Response.ContentLength <- int64 body.Length + do! ctx.Response.Body.WriteAsync(body, 0, body.Length) + return ctx + + | Some testCase -> + // Gather status, headers, and body from the actual request + let actualStatus = + $"{ctx.Request.Method} {ctx.Request.GetEncodedPathAndQuery()} {ctx.Request.Protocol}" + let actualHeaders = BwdServer.Server.getHeadersWithoutMergingKeys ctx + let! actualBody = BwdServer.Server.getBody ctx + let actualRequest : Http.T = + { status = actualStatus; headers = actualHeaders; body = actualBody } + + + // Update the TestCase with the actual request; + // also, update the PATH in the status line of the expected request + let updatedTestCase = + { testCase with + actualRequest = Some actualRequest + expectedRequest = + { testCase.expectedRequest with + status = + testCase.expectedRequest.status + |> String.replace "PATH" ctx.Request.Path.Value } } + testCases[dictKey] <- updatedTestCase + + + // Return the response + let mutable compression = None + let mutable transcodeToLatin1 = false + + ctx.Response.StatusCode <- + testCase.responseToReturn.status + |> String.split " " + |> List.getAt 1 + |> Exception.unwrapOptionInternal + "invalid status code" + [ "status", testCase.responseToReturn.status ] + |> int + + testCase.responseToReturn.headers + |> List.iter (fun (k, v) -> + if String.equalsCaseInsensitive k "Content-Encoding" then + if v = "deflate" then compression <- Some Deflate + else if v = "br" then compression <- Some Brotli + else if v = "gzip" then compression <- Some Gzip + else () + elif String.equalsCaseInsensitive k "Content-Type" then + if + v.Contains "charset=iso-8859-1" + || v.Contains "charset=latin1" + || v.Contains "us-ascii" + then + transcodeToLatin1 <- true + + BwdServer.Server.setResponseHeader ctx k v) + + let data = + if transcodeToLatin1 then + System.Text.Encoding.Convert( + System.Text.Encoding.UTF8, + System.Text.Encoding.Latin1, + testCase.responseToReturn.body + ) + else + testCase.responseToReturn.body + + match compression with + | Some algo -> + let stream : Stream = + let body = ctx.Response.Body + match algo with + | Gzip -> new GZipStream(body, CompressionMode.Compress) + | Brotli -> new BrotliStream(body, CompressionMode.Compress) + | Deflate -> new DeflateStream(body, CompressionMode.Compress) + do! stream.WriteAsync(data, 0, data.Length) + do! stream.FlushAsync() + do! stream.DisposeAsync() + | None -> + if ctx.Response.StatusCode <> 304 then + do! ctx.Response.Body.WriteAsync(data, 0, data.Length) + + return ctx + with e -> + // It might already have started, in which case let's just get the exception in + // the body and hope that helps + if not ctx.Response.HasStarted then ctx.Response.StatusCode <- 500 + + let body = $"{e.Message}\n\n{e.StackTrace}" + print $"{body}-{ctx.Request.Path}" + let body = UTF8.toBytes body + + do! ctx.Response.Body.WriteAsync(body, 0, body.Length) + return ctx + } + + + + +let configureApp (app : IApplicationBuilder) = + let handler (ctx : HttpContext) : Task = runTestHandler ctx + app.Run(RequestDelegate handler) + +let webserver () = + Host.CreateDefaultBuilder() + |> fun h -> h.ConfigureLogging(configureLogging "test-httpclient-server") + |> fun h -> + h.ConfigureWebHost(fun wh -> + wh + |> fun wh -> wh.UseKestrel() + |> fun wh -> wh.UseUrls($"http://*:{TestConfig.httpClientPort}") + |> fun wh -> wh.Configure(configureApp) + |> ignore) + |> fun h -> h.Build() // run a webserver to read test input -let init (_token : System.Threading.CancellationToken) : Task = - //(webserver ()).RunAsync(token) - Task.FromResult() +let init (token : System.Threading.CancellationToken) : Task = + (webserver ()).RunAsync(token) -// let testsFromFiles version = -// System.IO.Directory.GetFiles($"{baseDirectory}/{version}", "*.test") -// |> Array.map (System.IO.Path.GetFileName) -// |> Array.toList -// |> List.map (makeTest version) +let testsFromFiles version = + System.IO.Directory.GetFiles($"{baseDirectory}/{version}", "*.test") + |> Array.map (System.IO.Path.GetFileName) + |> Array.toList + |> List.map (makeTest version) let tests = + //versions [] - // versions - // |> List.map (fun versionName -> - // let tests = testsFromFiles versionName - // testList versionName tests) + |> List.map (fun versionName -> + let tests = testsFromFiles versionName + testList versionName tests) |> testList "HttpClient" diff --git a/backend/tests/Tests/Queue.Tests.fs b/backend/tests/Tests/Queue.Tests.fs index bfd1d01826..725c1973e8 100644 --- a/backend/tests/Tests/Queue.Tests.fs +++ b/backend/tests/Tests/Queue.Tests.fs @@ -27,431 +27,429 @@ module TCS = LibCloud.TraceCloudStorage let pmPT = LibCloud.PackageManager.pt -// // This doesn't actually test input, since it's a cron handler and not an actual event handler -// let initializeCanvas (name : string) : Task = -// task { -// // set up handler -// let! canvasID = initializeTestCanvas name -// let! e = parsePTExpr "let data = PACKAGE.Darklang.Stdlib.DateTime.now ()\n 123" -// let h = testWorker "test" e - -// do! Canvas.saveTLIDs canvasID [ (PT.Toplevel.TLHandler h, Serialize.NotDeleted) ] - -// return canvasID, h.tlid -// } - - -// let enqueueAtTime (canvasID : CanvasID) (time : Instant) : Task = -// // crons take inputs, so this could be anything -// EQ.enqueueAtTime canvasID "WORKER" "test" "_" time RT.DUnit - -// let enqueueNow (canvasID : CanvasID) : Task = -// enqueueAtTime canvasID (Instant.now ()) - - -// let checkExecutedTraces (canvasID : CanvasID) (count : int) : Task = -// task { -// // Saving happens in the background so wait for it -// let mutable traceIDs = [] -// for _ in 1..10 do -// if List.length traceIDs <> count then -// let! result = TCS.Test.listAllTraceIDs canvasID -// traceIDs <- result -// if List.length result <> count then do! Task.Delay 500 -// Expect.hasLength traceIDs count "wrong execution count" -// } - -// let rec waitForSuccess -// (canvasID : CanvasID) -// (tlid : tlid) -// (count : int) -// : Task = -// let rec getTrace -// (traceID) -// (remainingAttempts : int) -// : Task = -// task { -// if remainingAttempts <= 0 then -// return Exception.raiseInternal "no trace found" [] -// else -// try -// // This can fail if the background task uploading the trace data hasn't -// // finished yet -// return! TCS.getTraceData canvasID tlid traceID -// with -// | (:? Exception.InternalException) as e -> return Exception.reraise e -// | _ -> -// do! Task.Delay 500 -// return! getTrace traceID (remainingAttempts - 1) -// } - -// task { -// let! eventIDs = EQ.loadEventIDs canvasID ("WORKER", "test", "_") -// let! traceIDs = TCS.Test.listAllTraceIDs canvasID -// if List.length eventIDs <> 0 || List.length traceIDs <> count then -// do! Task.Delay 50 -// return! waitForSuccess canvasID tlid count -// else -// do! -// traceIDs -// |> Task.iterSequentially (fun traceID -> -// task { -// let! trace = getTrace traceID 2 -// let shapeIsAsExpected = -// match (Tuple2.second trace).functionResults with -// | [ (_, _, _, _, RT.DDateTime _) ] -> true -// | _ -> false -// return Expect.isTrue shapeIsAsExpected "should have a date here" -// }) -// } - - -// let checkSavedEvents (canvasID : CanvasID) (count : int) = -// task { -// let! queueIDs = EQ.loadEventIDs canvasID ("WORKER", "test", "_") -// Expect.hasLength queueIDs count "wrong stored event count" -// } - -// let mutable queueLastEmptyAt = Instant.MinValue - - -// let init () : unit = -// let timeout = System.TimeSpan.FromMilliseconds 10 -// let processContinuouslyInBackground () : unit = -// task { -// while true do -// match! EQ.dequeue timeout 1 with -// | [ notification ] -> -// let! _ = QueueWorker.processNotification notification -// return () -// | [] -> -// queueLastEmptyAt <- Instant.now () -// do! Task.Delay 100 -// | results -> -// return! -// Exception.raiseInternal -// "got more than 1" -// [ "count", List.length results ] - -// return () -// } -// |> fun x -> x.Result -// let thread = System.Threading.Thread processContinuouslyInBackground -// thread.IsBackground <- true -// thread.Name <- "Queue.Tests worker" -// thread.Start() - -// /// Tests that need to check that something isn't going to be run can wait until the -// /// queue is empty (locked/blocked items will be checked and then dropped) to prove -// /// that something isn't going to be run. -// let waitUntilQueueEmpty () : Task = -// task { -// let initialTime = Instant.now () -// while initialTime > queueLastEmptyAt do -// do! Task.Delay 10 -// } - - -// let testSuccess = -// testTask "event queue success" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "event-queue-success" -// do! enqueueNow canvasID -// do! waitForSuccess canvasID tlid 1 -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - -// let testSuccessThree = -// testTask "event queue success three" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "event-queue-success-three" -// do! enqueueNow canvasID -// do! enqueueNow canvasID -// do! enqueueNow canvasID -// do! waitForSuccess canvasID tlid 3 -// do! checkExecutedTraces canvasID 3 -// do! checkSavedEvents canvasID 0 -// } - -// let testSuccessLockExpired = -// testTask "success lock expired" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "success-lock-expired" - -// // Create the event, but don't have it run yet -// do! enqueueAtTime canvasID (Instant.now () + Duration.FromSeconds 3L) - -// // Lock it -// let earlier = Instant.now () + Duration.FromMinutes -6L -// do! -// Sql.query -// "UPDATE queue_events_v0 SET locked_at = @newValue WHERE canvas_id = @canvasID" -// |> Sql.parameters -// [ "canvasID", Sql.uuid canvasID -// "newValue", Sql.instantWithTimeZone earlier ] -// |> Sql.executeStatementAsync - -// // Wait for it to run -// do! waitForSuccess canvasID tlid 1 -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - -// let testFailLocked = -// testTask "fail locked" { -// let! (canvasID : CanvasID, _tlid) = initializeCanvas "fail-locked" - -// // Create the event, but don't have it run yet -// do! enqueueAtTime canvasID (Instant.now () + Duration.FromSeconds 3L) - -// // Lock it -// do! -// Sql.query -// "UPDATE queue_events_v0 SET locked_at = @newValue WHERE canvas_id = @canvasID" -// |> Sql.parameters -// [ "canvasID", Sql.uuid canvasID -// "newValue", Sql.instantWithTimeZone (Instant.now ()) ] -// |> Sql.executeStatementAsync - -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 -// } - -// let testSuccessBlockAndUnblock = -// testTask "block and unblock" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "block-and-unblock" - -// // Block it -// do! EQ.blockWorker canvasID "test" - -// do! enqueueNow canvasID - -// // Check blocked -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Unblock it -// do! EQ.unblockWorker canvasID "test" - -// // Check unblocked -// do! waitForSuccess canvasID tlid 1 -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - -// let testSuccessPauseAndUnpause = -// testTask "pause and unpause" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "pause-and-unpause" -// // Pause it -// do! EQ.pauseWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Check paused -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Unpause it -// do! EQ.unpauseWorker canvasID "test" - -// // Check unpaused -// do! waitForSuccess canvasID tlid 1 -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - -// let testFailPauseBlockAndUnpause = -// testTask "pause block and unpause" { -// let! (canvasID : CanvasID, _tlid) = initializeCanvas "pause-block-and-unpause" - -// // Pause it -// do! EQ.pauseWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Check paused -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Block and unpause it -// do! EQ.blockWorker canvasID "test" -// do! EQ.unpauseWorker canvasID "test" - -// // Check still paused -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 -// } - -// let testFailPauseBlockAndUnblock = -// testTask "pause block and unblock" { -// let! (canvasID : CanvasID, _tlid) = initializeCanvas "pause-block-and-unblock" - -// // Pause it -// do! EQ.pauseWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Check paused -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Block and unblock it -// do! EQ.blockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" - -// // Check still paused -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 -// } - -// let testFailBlockPauseAndUnpause = -// testTask "block pause and unpause" { -// let! (canvasID : CanvasID, _tlid) = initializeCanvas "block-pause-and-unpause" - -// // Block it -// do! EQ.blockWorker canvasID "test" - -// do! enqueueNow canvasID - -// // Check blocked -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Pause and unpause it -// do! EQ.pauseWorker canvasID "test" -// do! EQ.unpauseWorker canvasID "test" - -// // Check still blocked -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 -// } - -// let testFailBlockPauseAndUnblock = -// testTask "block pause and unblock" { -// let! (canvasID : CanvasID, _tlid) = initializeCanvas "block-pause-and-unblock" - -// // Block it -// do! EQ.blockWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Check blocked -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 - -// // Pause and unblock it -// do! EQ.pauseWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" - -// // Check still paused -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 0 -// do! checkSavedEvents canvasID 1 -// } - -// let testUnpauseMulitpleTimesInSequence = -// testTask "unpause multiple times in sequence" { -// let! (canvasID : CanvasID, tlid) = -// initializeCanvas "unpause-multiple-times-in-secquence" - -// // Block it -// do! EQ.blockWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Pause and unblock it -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" - -// do! waitForSuccess canvasID tlid 1 -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - -// let testUnpauseMultipleTimesInParallel = -// testTask "unpause multiple times in parallel" { -// let! (canvasID : CanvasID, tlid) = -// initializeCanvas "unpause-multiple-times-in-parallel" - -// // Block it -// do! EQ.blockWorker canvasID "test" - -// // Enqueue -// do! enqueueNow canvasID - -// // Pause and unblock it -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" -// do! EQ.unblockWorker canvasID "test" - -// do! waitForSuccess canvasID tlid 1 -// do! waitUntilQueueEmpty () -// do! checkExecutedTraces canvasID 1 -// do! checkSavedEvents canvasID 0 -// } - - -// let testCount = -// testTask "count is right" { -// let! (canvasID : CanvasID, tlid) = initializeCanvas "count-is-correct" -// do! EQ.blockWorker canvasID "test" -// do! enqueueNow canvasID -// do! enqueueNow canvasID -// do! enqueueNow canvasID -// do! enqueueNow canvasID -// do! enqueueNow canvasID - -// let! count = LibCloud.Stats.workerStats canvasID tlid -// Expect.equal count 5 "count should be 5" - -// do! EQ.unblockWorker canvasID "test" -// do! checkSavedEvents canvasID 5 -// } +// This doesn't actually test input, since it's a cron handler and not an actual event handler +let initializeCanvas (name : string) : Task = + task { + // set up handler + let! canvasID = initializeTestCanvas name + let! e = parsePTExpr "let data = PACKAGE.Darklang.Stdlib.DateTime.now ()\n 123" + let h = testWorker "test" e + + do! Canvas.saveTLIDs canvasID [ (PT.Toplevel.TLHandler h, Serialize.NotDeleted) ] + + return canvasID, h.tlid + } + + +let enqueueAtTime (canvasID : CanvasID) (time : Instant) : Task = + // crons take inputs, so this could be anything + EQ.enqueueAtTime canvasID "WORKER" "test" "_" time RT.DUnit + +let enqueueNow (canvasID : CanvasID) : Task = + enqueueAtTime canvasID (Instant.now ()) + + +let checkExecutedTraces (canvasID : CanvasID) (count : int) : Task = + task { + // Saving happens in the background so wait for it + let mutable traceIDs = [] + for _ in 1..10 do + if List.length traceIDs <> count then + let! result = TCS.Test.listAllTraceIDs canvasID + traceIDs <- result + if List.length result <> count then do! Task.Delay 500 + Expect.hasLength traceIDs count "wrong execution count" + } + +let rec waitForSuccess + (canvasID : CanvasID) + (tlid : tlid) + (count : int) + : Task = + let rec getTrace + (traceID) + (remainingAttempts : int) + : Task = + task { + if remainingAttempts <= 0 then + return Exception.raiseInternal "no trace found" [] + else + try + // This can fail if the background task uploading the trace data hasn't + // finished yet + return! TCS.getTraceData canvasID tlid traceID + with + | (:? Exception.InternalException) as e -> return Exception.reraise e + | _ -> + do! Task.Delay 500 + return! getTrace traceID (remainingAttempts - 1) + } + + task { + let! eventIDs = EQ.loadEventIDs canvasID ("WORKER", "test", "_") + let! traceIDs = TCS.Test.listAllTraceIDs canvasID + if List.length eventIDs <> 0 || List.length traceIDs <> count then + do! Task.Delay 50 + return! waitForSuccess canvasID tlid count + else + do! + traceIDs + |> Task.iterSequentially (fun traceID -> + task { + let! trace = getTrace traceID 2 + let shapeIsAsExpected = + match (Tuple2.second trace).functionResults with + | [ (_, _, _, _, RT.DDateTime _) ] -> true + | _ -> false + return Expect.isTrue shapeIsAsExpected "should have a date here" + }) + } + + +let checkSavedEvents (canvasID : CanvasID) (count : int) = + task { + let! queueIDs = EQ.loadEventIDs canvasID ("WORKER", "test", "_") + Expect.hasLength queueIDs count "wrong stored event count" + } + +let mutable queueLastEmptyAt = Instant.MinValue + + +let init () : unit = + let timeout = System.TimeSpan.FromMilliseconds 10 + let processContinuouslyInBackground () : unit = + task { + while true do + match! EQ.dequeue timeout 1 with + | [ notification ] -> + let! _ = QueueWorker.processNotification notification + return () + | [] -> + queueLastEmptyAt <- Instant.now () + do! Task.Delay 100 + | results -> + return! + Exception.raiseInternal + "got more than 1" + [ "count", List.length results ] + + return () + } + |> fun x -> x.Result + let thread = System.Threading.Thread processContinuouslyInBackground + thread.IsBackground <- true + thread.Name <- "Queue.Tests worker" + thread.Start() + +/// Tests that need to check that something isn't going to be run can wait until the +/// queue is empty (locked/blocked items will be checked and then dropped) to prove +/// that something isn't going to be run. +let waitUntilQueueEmpty () : Task = + task { + let initialTime = Instant.now () + while initialTime > queueLastEmptyAt do + do! Task.Delay 10 + } + + +let testSuccess = + testTask "event queue success" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "event-queue-success" + do! enqueueNow canvasID + do! waitForSuccess canvasID tlid 1 + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + +let testSuccessThree = + testTask "event queue success three" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "event-queue-success-three" + do! enqueueNow canvasID + do! enqueueNow canvasID + do! enqueueNow canvasID + do! waitForSuccess canvasID tlid 3 + do! checkExecutedTraces canvasID 3 + do! checkSavedEvents canvasID 0 + } + +let testSuccessLockExpired = + testTask "success lock expired" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "success-lock-expired" + + // Create the event, but don't have it run yet + do! enqueueAtTime canvasID (Instant.now () + Duration.FromSeconds 3L) + + // Lock it + let earlier = Instant.now () + Duration.FromMinutes -6L + do! + Sql.query + "UPDATE queue_events_v0 SET locked_at = @newValue WHERE canvas_id = @canvasID" + |> Sql.parameters + [ "canvasID", Sql.uuid canvasID + "newValue", Sql.instantWithTimeZone earlier ] + |> Sql.executeStatementAsync + + // Wait for it to run + do! waitForSuccess canvasID tlid 1 + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + +let testFailLocked = + testTask "fail locked" { + let! (canvasID : CanvasID, _tlid) = initializeCanvas "fail-locked" + + // Create the event, but don't have it run yet + do! enqueueAtTime canvasID (Instant.now () + Duration.FromSeconds 3L) + + // Lock it + do! + Sql.query + "UPDATE queue_events_v0 SET locked_at = @newValue WHERE canvas_id = @canvasID" + |> Sql.parameters + [ "canvasID", Sql.uuid canvasID + "newValue", Sql.instantWithTimeZone (Instant.now ()) ] + |> Sql.executeStatementAsync + + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + } + +let testSuccessBlockAndUnblock = + testTask "block and unblock" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "block-and-unblock" + + // Block it + do! EQ.blockWorker canvasID "test" + + do! enqueueNow canvasID + + // Check blocked + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Unblock it + do! EQ.unblockWorker canvasID "test" + + // Check unblocked + do! waitForSuccess canvasID tlid 1 + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + +let testSuccessPauseAndUnpause = + testTask "pause and unpause" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "pause-and-unpause" + // Pause it + do! EQ.pauseWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Check paused + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Unpause it + do! EQ.unpauseWorker canvasID "test" + + // Check unpaused + do! waitForSuccess canvasID tlid 1 + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + +let testFailPauseBlockAndUnpause = + testTask "pause block and unpause" { + let! (canvasID : CanvasID, _tlid) = initializeCanvas "pause-block-and-unpause" + + // Pause it + do! EQ.pauseWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Check paused + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Block and unpause it + do! EQ.blockWorker canvasID "test" + do! EQ.unpauseWorker canvasID "test" + + // Check still paused + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + } + +let testFailPauseBlockAndUnblock = + testTask "pause block and unblock" { + let! (canvasID : CanvasID, _tlid) = initializeCanvas "pause-block-and-unblock" + + // Pause it + do! EQ.pauseWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Check paused + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Block and unblock it + do! EQ.blockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + + // Check still paused + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + } + +let testFailBlockPauseAndUnpause = + testTask "block pause and unpause" { + let! (canvasID : CanvasID, _tlid) = initializeCanvas "block-pause-and-unpause" + + // Block it + do! EQ.blockWorker canvasID "test" + + do! enqueueNow canvasID + + // Check blocked + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Pause and unpause it + do! EQ.pauseWorker canvasID "test" + do! EQ.unpauseWorker canvasID "test" + + // Check still blocked + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + } + +let testFailBlockPauseAndUnblock = + testTask "block pause and unblock" { + let! (canvasID : CanvasID, _tlid) = initializeCanvas "block-pause-and-unblock" + + // Block it + do! EQ.blockWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Check blocked + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + + // Pause and unblock it + do! EQ.pauseWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + + // Check still paused + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 0 + do! checkSavedEvents canvasID 1 + } + +let testUnpauseMulitpleTimesInSequence = + testTask "unpause multiple times in sequence" { + let! (canvasID : CanvasID, tlid) = + initializeCanvas "unpause-multiple-times-in-secquence" + + // Block it + do! EQ.blockWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Pause and unblock it + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + + do! waitForSuccess canvasID tlid 1 + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + +let testUnpauseMultipleTimesInParallel = + testTask "unpause multiple times in parallel" { + let! (canvasID : CanvasID, tlid) = + initializeCanvas "unpause-multiple-times-in-parallel" + + // Block it + do! EQ.blockWorker canvasID "test" + + // Enqueue + do! enqueueNow canvasID + + // Pause and unblock it + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + do! EQ.unblockWorker canvasID "test" + + do! waitForSuccess canvasID tlid 1 + do! waitUntilQueueEmpty () + do! checkExecutedTraces canvasID 1 + do! checkSavedEvents canvasID 0 + } + + +let testCount = + testTask "count is right" { + let! (canvasID : CanvasID, tlid) = initializeCanvas "count-is-correct" + do! EQ.blockWorker canvasID "test" + do! enqueueNow canvasID + do! enqueueNow canvasID + do! enqueueNow canvasID + do! enqueueNow canvasID + do! enqueueNow canvasID + + let! count = LibCloud.Stats.workerStats canvasID tlid + Expect.equal count 5 "count should be 5" + + do! EQ.unblockWorker canvasID "test" + do! checkSavedEvents canvasID 5 + } let tests = - //init () + init () testSequencedGroup "Queue" (testList "Queue" - [ - // testSuccess - // testSuccessThree - // testSuccessLockExpired - // testFailLocked - // testSuccessBlockAndUnblock - // testSuccessPauseAndUnpause - // testFailPauseBlockAndUnpause - // testFailPauseBlockAndUnblock - // testFailBlockPauseAndUnpause - // testFailBlockPauseAndUnblock - // testUnpauseMulitpleTimesInSequence - // testUnpauseMultipleTimesInParallel - // testCount - ]) + [ testSuccess + testSuccessThree + testSuccessLockExpired + testFailLocked + testSuccessBlockAndUnblock + testSuccessPauseAndUnpause + testFailPauseBlockAndUnpause + testFailPauseBlockAndUnblock + testFailBlockPauseAndUnpause + testFailBlockPauseAndUnblock + testUnpauseMulitpleTimesInSequence + testUnpauseMultipleTimesInParallel + testCount ]) diff --git a/backend/tests/Tests/QueueSchedulingRules.Tests.fs b/backend/tests/Tests/QueueSchedulingRules.Tests.fs index ecf7734807..252a5d7458 100644 --- a/backend/tests/Tests/QueueSchedulingRules.Tests.fs +++ b/backend/tests/Tests/QueueSchedulingRules.Tests.fs @@ -7,48 +7,51 @@ open Expecto open Prelude -// open TestUtils.TestUtils - -// module PT = LibExecution.ProgramTypes -// module RT = LibExecution.RuntimeTypes -// module EQ2 = LibCloud.Queue -// module Canvas = LibCloud.Canvas -// module Serialize = LibCloud.Serialize -// module SR = LibCloud.QueueSchedulingRules - - -// let testGetWorkerSchedulesForCanvas = -// testTask "worker schedules for canvas" { -// let! canvasID = initializeTestCanvas "worker-schedules" - -// let! e1 = parsePTExpr "1L" -// let! e2 = parsePTExpr "1L" -// let! e3 = parsePTExpr "1L" -// let apple = testWorker "apple" e1 -// let banana = testWorker "banana" e2 -// let cherry = testWorker "cherry" e3 - -// do! -// ([ apple; banana; cherry ] -// |> List.map (fun h -> (PT.Toplevel.TLHandler h, Serialize.NotDeleted)) -// |> Canvas.saveTLIDs canvasID) - -// do! EQ2.pauseWorker canvasID "apple" -// do! EQ2.pauseWorker canvasID "banana" -// do! EQ2.blockWorker canvasID "banana" -// let! result = SR.getWorkerSchedules canvasID - -// let check (name : string) (value : SR.WorkerStates.State) = -// let actual = -// Map.get name result -// |> Exception.unwrapOptionInternal "missing workerstate" [ "name", name ] -// |> string -// let expected = string value -// Expect.equal actual expected ($"{name} is {expected}") - -// check "apple" SR.WorkerStates.Paused -// check "banana" SR.WorkerStates.Blocked -// check "cherry" SR.WorkerStates.Running -// } - -let tests = testSequencedGroup "eventQueue" (testList "eventQueue" []) // testGetWorkerSchedulesForCanvas ]) +open TestUtils.TestUtils + +module PT = LibExecution.ProgramTypes +module RT = LibExecution.RuntimeTypes +module EQ2 = LibCloud.Queue +module Canvas = LibCloud.Canvas +module Serialize = LibCloud.Serialize +module SR = LibCloud.QueueSchedulingRules + + +let testGetWorkerSchedulesForCanvas = + testTask "worker schedules for canvas" { + let! canvasID = initializeTestCanvas "worker-schedules" + + let! e1 = parsePTExpr "1L" + let! e2 = parsePTExpr "1L" + let! e3 = parsePTExpr "1L" + let apple = testWorker "apple" e1 + let banana = testWorker "banana" e2 + let cherry = testWorker "cherry" e3 + + do! + ([ apple; banana; cherry ] + |> List.map (fun h -> (PT.Toplevel.TLHandler h, Serialize.NotDeleted)) + |> Canvas.saveTLIDs canvasID) + + do! EQ2.pauseWorker canvasID "apple" + do! EQ2.pauseWorker canvasID "banana" + do! EQ2.blockWorker canvasID "banana" + let! result = SR.getWorkerSchedules canvasID + + let check (name : string) (value : SR.WorkerStates.State) = + let actual = + Map.get name result + |> Exception.unwrapOptionInternal "missing workerstate" [ "name", name ] + |> string + let expected = string value + Expect.equal actual expected ($"{name} is {expected}") + + check "apple" SR.WorkerStates.Paused + check "banana" SR.WorkerStates.Blocked + check "cherry" SR.WorkerStates.Running + } + +let tests = + testSequencedGroup + "eventQueue" + (testList "eventQueue" [ testGetWorkerSchedulesForCanvas ]) diff --git a/backend/tests/Tests/Tests.fs b/backend/tests/Tests/Tests.fs index 213d051b1d..b70fc1556d 100644 --- a/backend/tests/Tests/Tests.fs +++ b/backend/tests/Tests/Tests.fs @@ -40,34 +40,33 @@ let main (args : string array) : int = Tests.ProgramTypesToRuntimeTypes.tests Tests.Interpreter.tests Tests.AnalysisTypes.tests - // Tests.Execution.tests + Tests.Execution.tests Tests.Builtin.tests // Tests.DvalRepr.tests -- maybe this gets deleted TODO Tests.PackageManager.tests Tests.LibParser.tests - // Tests.NewParser.tests - // Tests.HttpClient.tests - + Tests.NewParser.tests + Tests.HttpClient.tests // cloud - // Tests.BwdServer.tests - // Tests.Canvas.tests - // Tests.Cron.tests - // Tests.QueueSchedulingRules.tests + Tests.BwdServer.tests + Tests.Canvas.tests + Tests.Cron.tests + Tests.QueueSchedulingRules.tests // TODO: bring back Tests.Queue.tests - // Tests.Routing.tests - // Tests.BinarySerialization.tests - // Tests.VanillaSerialization.tests - // Tests.DarkTypesSerialization.tests - // Tests.SqlCompiler.tests - // Tests.StorageTraces.tests + Tests.Routing.tests + Tests.BinarySerialization.tests + Tests.VanillaSerialization.tests + Tests.DarkTypesSerialization.tests + Tests.SqlCompiler.tests + Tests.StorageTraces.tests // cross-cutting Tests.LibExecution.tests.Force() ] let cancelationTokenSource = new System.Threading.CancellationTokenSource() // let bwdServerTestsTask = Tests.BwdServer.init cancelationTokenSource.Token - // let httpClientTestsTask = Tests.HttpClient.init cancelationTokenSource.Token + let httpClientTestsTask = Tests.HttpClient.init cancelationTokenSource.Token //Telemetry.Console.loadTelemetry "tests" Telemetry.TraceDBQueries // // Generate this so that we can see if the format has changed in a git diff @@ -82,7 +81,7 @@ let main (args : string array) : int = NonBlockingConsole.wait () // flush stdout cancelationTokenSource.Cancel() // bwdServerTestsTask.Wait() - // httpClientTestsTask.Wait() + httpClientTestsTask.Wait() // QueueWorker.shouldShutdown <- true exitCode with e ->