Skip to content
This repository has been archived by the owner on Jan 22, 2021. It is now read-only.

Dmitry-Erokhin/zq

Repository files navigation

ZQ

ZQ – is a multi-producer-single-consumer durable queue based on the PostgreSQL. All operations are performed by calling stored procedures, so it is a reasonable choice if you already have a PostgreSQL in your zoo and need a simple queue solution.

Inspired by pgq.

Way of work

In nutshell work with queue (after it was created) is as simple as:

  • Enqueue data
  • Open batch of desired size
  • Consume data
  • Close or cancel batch
    • If batch was closed (for instance in case of successful data processing):
      • data will be removed from queue
      • next batch will be started from item followed by last item of closed batch
    • If batch was canceled (like in case of temporary impediments of data processing):
      • data stays in the queue
      • next batch will be started from same item

PostgresSQL API

Queues management

  • Create queue: zq.create_queue(queue_name TEXT) : BOOLEAN

    • Tries to create a queue "queue_name"
    • Returns true if queue was successfully created
    • Returns false if queue is already exists
  • Delete queue: zq.drop_queue(queue_name TEXT) : BOOLEAN

    • Tries to delete queue "queue_name"
    • Returns true if queue was successfully deleted
    • Returns false if queue is not exists

Data management

  • Enqueue data: zq.enqueue(queue_name TEXT, data TEXT[]) : VOID

    • Inserts items from data array as events in the queue 'queue_name' preserving order
    • Throws an exception if given queue is not exists
  • Consume data: zq.dequeue(queue_name TEXT) : TABLE

    • Returns content of the current batch for the queue "queue_name" a table with following columns:
      • "timestamp" TIMESTAMP
      • "data" TEXT
    • Throws an exception if given queue is not exists
    • Throws an exception if there is batch was not created for given queue

Batch management

  • Create new batch: zq.open_batch(queue_name TEXT, max_batch_size INT) : INT

    • Creates new batch of maximum size "max_batch_size" for the queue "queue_name", if there are new events
    • Returns actual size of the newly created batch (<="max_batch_size") or 0 if no new events are available and batch was not created
    • Throws an exception if given queue is not exists
    • Throws an exception if "max_batch_size" is a non-positive integer
    • Throws an exception if non-committed batch for given queue is already exists
  • Commit current batch: zq.close_batch(queue_name TEXT) : VOID

    • Commits previously created batch for the queue "queue_name"
    • Throws an exception if given queue is not exists
    • Throws an exception if there is no batch was created for given queue
  • Cancel current batch: zq.cancel_batch(queue_name TEXT) : VOID

    • Cancel previously created batch for the queue "queue_name"
    • Throws an exception if given queue is not exists
    • Throws an exception if there is no batch was created for given queue

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published