Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Latest commit

 

History

History
94 lines (66 loc) · 2.7 KB

README.md

File metadata and controls

94 lines (66 loc) · 2.7 KB

pydantic-sqs

Convert your pydantic models to and from AWS SQS messages.

Latest Commit GitHub release (latest by date)
GitHub Workflow Status Test and Lint (branch)
Package version

Main Dependencies

Getting Started

from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os


class ThisModel(SQSModel):
    foo: str = Field(..., description="Foo")


class ThatModel(SQSModel):
    bar: str = Field(..., description="bar")


async def main():
    queue_kwargs = {
        "queue_url": os.environ.get("SQS_QUEUE_URL"),
        "endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
        "use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
    }
    if queue_kwargs["endpoint_url"] is None:
        del queue_kwargs["endpoint_url"]

    queue = SQSQueue(**queue_kwargs)

    queue.register_model(ThisModel)
    queue.register_model(ThatModel)

    this_thing = ThisModel(foo="1234")
    that_thing = ThatModel(bar="5678")
    await this_thing.to_sqs()
    await that_thing.to_sqs()

    new_things = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
    pprint(new_things)
    for thing in new_things:
        await thing.delete_from_queue()

    print("deleted all the messages we got from the queue")
    pprint(new_things)


if __name__ == "__main__":
    asyncio.run(main())

Examples

Examples are in the examples/ directory of this repo.

Installation

Install the package

pip install pydantic-sqs

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide

License

Licensed under the MIT License