-
Notifications
You must be signed in to change notification settings - Fork 49
Work a Q
WIP!!
The QL Web app can offload the execution of some tasks to a work queue. In this configuration, a set of QL Web servers add tasks to a queue and a pool of QL queue worker processes fetch tasks from the queue and run them asynchronously, optionally retrying failed tasks. The queue data sits in Redis and QL relies on RQ to manage it, provide worker processes to run tasks and do all the necessary bookkeeping—e.g. purging data past a configured time to live (TTL).
The QL Web app comes with a REST API to manage work queue tasks. Clients can query, retrieve and inspect task inputs as well as task runtime information, count tasks satisfying a given predicate to gauge system load, and delete tasks. The implementation features efficient algorithms to boost performance. In particular the space complexity of queries is constant thanks to the extensive use of stream processing techniques.
The design is modular. Components hide their implementation behind interfaces and use other components only through their provided interfaces. A layered approach makes it possible to write and manage tasks at a high level, independently of the queue backend—RQ, as noted earlier. By the same token, it's possible to implement an alternative queue backend without modifying existing tasks and high-level task management modules.
At the moment, QL only uses the work queue for NGSI notifications, if
configured to do so. When an entity payload comes through the notify
endpoint, the Web app turns the payload into a task to save it to the
DB, adds the task to the queue and returns a 200 to the client immediately.
A separate instance of QL, configured as a queue worker, fetches the
task from the queue and runs it to actually insert the NGSI entities
into the DB, possibly retrying the insert at a later time if it fails.
Clients connect to the Web app to manage notify tasks in the queue.
TODO:
- Layering: reporter, wq-ql, cli /—>/ tasks, mgmt /—>/ rts /—>/ RQ, Redis
- Components, connectors and interfaces
- Behaviours and data flows—task stuff covered below, mgmt API needs some love
The way the QL Web app, the queue worker, RQ and Redis collaborate to process tasks is a key aspect of the QL work queue architecture. In a nutshell, the Web app creates a task and stores it, through RQ, in a Redis hash containing the queue data. Then the worker, through RQ, fetches the task from Redis and runs it. If the task fails and retries are configured, the worker asks RQ to schedule the task to run again later and RQ puts it back into Redis. The worker retries failed tasks for a configured maximum number of times before giving up. Regardless of retries, as soon as a task runs to completion successfully, the worker notifies RQ which, in turn, saves the task back to Redis in the set of successful tasks, taking care of specifying a time to live (TTL) so Redis can automatically remove it from the set and reclaim storage past that TTL. Similarly, in the case a task fails and no retries are configured or there aren't any retries left, the worker notifies RQ which puts it into a Redis set of failed tasks, again setting a TTL so Redis can delete it automatically past that time.
The interactions sketched out so far are based on an abstract specification of the work queue task life-cycle. In fact, the above components carry out the various steps in the task life-cycle according to a finite state machine that models, with a fair degree of accuracy, how computation actually happens in the work queue and explains some important aspects we glossed over earlier, such as the relationships among retries, time and events. The UML state chart below is a visual representation of the task state machine.
A task begins its life when the QL Web app adds it to the work queue—enqueue
event in the diagram. At this point the task is in the Queued
state
and is waiting for a queue worker process to fetch it and run it for
the first time.
Workers can retry failed tasks for up to a configured number of times
M. (M is an non-negative integer.) RQ keeps track of how many times
workers have retried a task. When the task enters the Queued
state,
the current number of retries r is set to 0. A worker can retry a
task only if r < M. So if M is set to 0, workers never retry failed
tasks.
A worker initially fetches a task in the Queued
state (fetch
event)
at a certain time t0 and tries to run it once. While a
task runs, it's in the Running
state. After running a task, the worker
checks if the task completed successfully. If so, the task transitions
from the Running
state to the Succeeded
state—succeed
event. On
the other hand, if the task failed, two transitions out of the Running
state are possible, depending on the current number of retries—fail
event. If r = M, the task enters the Failed
state, whereas if r < M,
the worker asks RQ to schedule another execution attempt at a later
time and the task enters the Scheduled
state.
The worker uses an exponential retry schedule σ. Retries get spaced out by an exponentially growing number of seconds defined by the sequence s = { c⋅2n | k ∈ ℕ } = (c, 2c, 4c, 8c, 16c,...) where c is a constant number of seconds. (In the current implementation c = 20.) The retry schedule σ is the series of seconds defined recursively by
- σ0 = t0
- σn+1 = σn + sn
So σ = (t0, t0 + c, t0 + c + 2c, …) and the zeroth schedule is the initial task execution at time t0 when the worker fetched the task from the queue for the first time, the first schedule is the time point t0 + c at which the worker retries the task for the first time if the initial run at t0 failed, the second schedule is the time point t0 + c + 2c at which the worker retries the task for the second time if the first retry at t0 + c failed, and so on.
So the task may run at time point σk with 0 ≤ k ≤ M.
In particular, if a task sits in the Scheduled
state, at time point
σr+1 the worker fetches it and tries to run it again—fetch
event. Again, while the task runs, it's in the Running
state. In
transitioning from Scheduled
to Running
, the current number of
retries r is increased by one.
A task in the Queued
, Running
or Scheduled
state is also in the
Pending
state. This is just a convenience composite state to capture
the idea that the system still doesn't know what the task outcome is
either because the task is waiting to be run for the first time, or
is scheduled for a retry or is actually busy running. The two possible
task outcomes in the model are captured by the Succeeded
and Failed
state, respectively.
The Succeeded
and Failed
states aren't final states. In fact, as
noted earlier, RQ keeps successful and failed tasks in Redis for a
configured success and failure TTL, respectively. When that TTL expires,
Redis automatically deletes the task at which point the state machine
reaches its final state.
To simplify things, the state machine model assumes fetching a task from the queue and running it take an amount of time x less than c. That should be true in most cases for QL, but I haven't tested what RQ would do if x > c—think long-running task that fails.
this section is still a work in progress!
- Distributed computation: producers (QL Web app instances) and consumers (QL queue workers) communicate asynchronously through a queue data store (RQ / Redis).
- Producers don't need to know the outcome of running a task, so there's no need for a coordination algorithm with consumers.
- Consumers have to coordinate to fetch tasks from the queue; RQ manages the distribution of work to consumers.
- Each consumer is a sequential (single-threaded) RQ process running a Python interpreter loaded with all the QL packages and dependent libraries. This is because tasks usually aren't self-contained and may reference any other QL package—the notify task is a case in point.
- The consumer forks a child process (the RQ "work horse") to run each task it fetches from the queue. But it still runs one task at a time, no two work horses ever get forked simultaneously. The point of the work horse is reliability, not performance: runaway tasks can't bring down the consumer—e.g. task memory leaks won't affect the consumer.
- To increase throughput, start more than one consumer to process tasks in parallel.
TODO:
- Mapping functional & process view to code
- Packages, modules and dependencies
- Stream processing techniques---this comment to #707 explains most of what you need to know
TODO:
- Benchmark stuff
- Test driver CLI, running w/ or w/o Docker, e.g.
python -m wq.tests.benchmark --no-docker
- Docker stuff
this section is still a work in progress!
When using a work queue, there are two sets of QL processes: a set
of Web servers that add tasks to a queue and a pool of queue worker
processes that fetch tasks from the queue and run them asynchronously.
The Web servers are QL Web app instances configured to offload tasks
to the work queue through the various WQ_*
environment variables
available for the Web app. A worker processes is a Python interpreter
loaded with the same code as the Web app but started with a different
entry point. There are several options to start worker processes:
- Use built-in CLI. Typically:
python wq up
, but have a look atwq.core.cli
for other options. -
Supervisor configuration available to manage a pool
of worker processes—have a look at the config file in
src/wq
for the details. - Built-in worker process pool also available. Alternative to external
tools like Supervisor. Warning: experimental, only use for testing.
E.g.
python wq up -w 2
runs a pool of two workers to process tasks in parallel. - The QL Docker image can start queue workers too, but you'll have to
override the Docker entry point—the default command starts the QL
Web app.
- To start a Supervisor-managed pool of workers, set the
WQ_WORKERS
env var to specify the pool size and override the Docker entry point with:supervisord -n -c ./wq/supervisord.conf
. - To start a single worker w/o Supervisor, just override the Docker
entry point w/:
python wq up
.
- To start a Supervisor-managed pool of workers, set the
- Start workers with QL built-in telemetry to gauge performance.
It works the same as for QL Web apps. There is a tutorial
about collecting and analysing data from a QL Web app. The same principles
apply to queue worker telemetry. For actual examples of queue worker
telemetry, look at the benchmark in
src/wq/tests/benchmark
. In particular, there's metrics to gauge overall system throughout (number of DB rows inserted per second) and sample the queue size. - Log files when using Supervisor go in
/tmp
. One log file for Supervisor, plus two log files for each worker to capturestderr
andstdout
, respectively.
TODO
Here's a few examples to cover the most common scenarios for the task management API—refer to the QL Swagger spec for the details. Before try the examples, load some data, e.g.
curl -v localhost:8668/v2/notify \
-H 'Content-Type: application/json' \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /' \
-d @notify-payload.json
and while you do that bring down the DB back-end so some of the tasks will fail.
(1) Get all tasks for a given tenant and service path
curl -v localhost:8668/wq/notifications \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /'
(2) Same as (1) but, but additionally filter by task status—pending
,
succeeded
or failed
curl -v localhost:8668/wq/notifications?taskStatus=succeeded \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /'
(3) Get a summary of all tasks for a given tenant and service path (note: can always use a correlation ID too, if you'd like to narrow the result set further, possibly to just one task)
curl -v localhost:8668/wq/notifications/summary \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /'
Notice this is conceptually the same as (1), except the returned task objects don't have the inputs in it. As in (2), you can additionally filter by status.
(4) Count how many tasks are there in the queue:
curl localhost:8668/wq/notifications/count
Notice this includes everything, even stuff that got processed or failed and won't be touched again.
(5) Count how many tasks have been run successfully, regardless of tenant or service path
curl localhost:8668/wq/notifications/count?taskStatus=succeeded
(6) Optionally you can just retrieve a count for a tenant and/or service path (note: can always use a correlation ID too, if you'd like to narrow the result set further, possibly to just one task)
curl localhost:8668/wq/notifications/count?taskStatus=succeeded \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /'
(7) Delete all tasks for a tenant and service path
curl -X DELETE localhost:8668/wq/notifications \
-H 'fiware-service: x' \
-H 'fiware-servicepath: /'
keep in mind that all things being equal, you won't need to use this endpoint since tasks come w/ a configurable time to live, so past the number of seconds specified in the failure or success TTL, the task gets deleted automatically for you.
Example interactive session:
>>> from redis import Redis
>>> from rq import Queue
>>> q = Queue(connection=Redis())
>>> ids = q.finished_job_registry.get_job_ids()
>>> js = [q.fetch_job(i) for i in ids]
Example interactive session:
>>> from redis import Redis
>>> r = Redis()
>>> ks = r.keys()
>>> ts = [r.type(k) for k in ks]
>>> ks
[b'rq:queue:default', b'rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a', b'rq:job:4d5690dc-fb5a-48f8-9121-1f10cca98ed6', b'rq:queues']
>>> ts
[b'list', b'hash', b'hash', b'set']
>>> j = r.hgetall('rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a')
>>> job_ks = r.keys('rq:job:*')
>>> job_ks
[b'rq:job:a9a9cef1-cfad-4939-be50-79093ab4f63a', b'rq:job:4d5690dc-fb5a-48f8-9121-1f10cca98ed6']
>>> r.lrange('rq:queue:default', 0, -1)
[b'4d5690dc-fb5a-48f8-9121-1f10cca98ed6', b'a9a9cef1-cfad-4939-be50-79093ab4f63a']
>>> r.smembers('rq:queues')
{b'rq:queue:default'}
>>> r.zrange('rq:finished:default', 0, -1)
[b'eA==:Lw==:"":NWU0MDhjZDNkZDllNDQ1ZDhhYTZkNTVkODg2MTcyYWM=', b'eA==:Lw==:"":ZDNhODBhOWE5NGM5NDZmZWJhMmE0MmM4YjBkNDFkYjc=', b'eA==:Lw==:"":YzI3MGEzZjVhOGRiNDI0OWI5ZjAwZGUwMTNmYWUwMWQ=', b'eA==:Lw==:"":MWNjMWYwM2I1ZmIwNGVhYzhhOThkYmY0ODdiN2VlMDg=']
>>> r.zscan('rq:finished:default', cursor=0, match='eA==:Lw==*')
(0, [(b'eA==:Lw==:"":NWU0MDhjZDNkZDllNDQ1ZDhhYTZkNTVkODg2MTcyYWM=', 1619795739.0), (b'eA==:Lw==:"":ZDNhODBhOWE5NGM5NDZmZWJhMmE0MmM4YjBkNDFkYjc=', 1619795739.0), (b'eA==:Lw==:"":YzI3MGEzZjVhOGRiNDI0OWI5ZjAwZGUwMTNmYWUwMWQ=', 1619795740.0), (b'eA==:Lw==:"":MWNjMWYwM2I1ZmIwNGVhYzhhOThkYmY0ODdiN2VlMDg=', 1619795741.0)])
>>> r.zscan('rq:finished:default', cursor=0, match='eA==:Lw==1*')
(0, [])
Connect to Postgres as QL user
psql postgresql://quantumleap:*@localhost
See the worker process tree
pstree -s python
See the computational resources the workers are using
htop -F python -t
Developer Track
- Cookbook
- Gauging performance
- Mother of all queries
- Enteater
- Work a Q
- No async free lunch
- Release procedure
User Track