Skip to content

Commit

Permalink
upload file via celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
MideO committed Nov 10, 2024
1 parent aacab1b commit 440f625
Show file tree
Hide file tree
Showing 18 changed files with 163 additions and 53 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ jobs:
- name: Test with pytest
run: make test
- name: Set up docker composer
run: make devstack-up
run: make devstack-up STORAGE_PATH="/tmp/"
- name: Run feature tests
run: make feature-test
- name: Spool devstack logs
if: ${{ always() }}
run: make devstack-logs
- name: Tear Down docker composer
run: make devstack-down
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
## What is it
### (WIP) Python/FastAPI application to upload files to S3 via a celery task and redis cache to prevent concurrent uploads of same file
## What is it?
##### Python/FastAPI application to save and delete AWS S3 files via celery tasks.
##### A redis lock is used to handle concurrency across multiple instances

![workflow](https://github.com/MideO/s3-file-uploader/actions/workflows/ci.yml/badge.svg)

## TODO
- Upload, List, Download and Delete from UI
- S3 Upload Celery Task
- Upload
- [x] UI
- [x] Celery Task
- List
- [ ] UI
- [ ] Celery Task
- Download
- [ ] UI
- [ ] Celery Task
- Delete
- [ ] UI
- [ ] Celery Task


### Project structure:

Expand Down
44 changes: 41 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
volumes:
shared_data:

services:
redis:
container_name: redis
Expand All @@ -9,16 +12,49 @@ services:
context: .
target: builder
stop_signal: SIGINT
container_name: celery-worker
container_name: celery-worker-1
working_dir: /s3fileuploader/src
command: celery -A celery_app.celery worker --loglevel DEBUG
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- LOG_LEVEL=DEBUG
- REDIS_HOST=redis
- APP_HOST=0.0.0.0
- APP_PORT=9000
- AWS_ENDPOINT=http://moto:3000
- AWS_ACCESS_KEY_ID=testing
- AWS_SECRET_ACCESS_KEY=testing
- AWS_SESSION_TOKEN=testing
- AWS_REGION=us-east-1
- STORAGE_PATH=${STORAGE_PATH:-/shared_data/}
volumes:
- .:/shared_data
depends_on:
- redis
celery-worker-2:
build:
context: .
target: builder
stop_signal: SIGINT
container_name: celery-worker-2
working_dir: /s3fileuploader/src
command: celery -A celery_app.celery worker --loglevel=info
command: celery -A celery_app.celery worker --loglevel DEBUG
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- LOG_LEVEL=DEBUG
- REDIS_HOST=redis
- APP_HOST=0.0.0.0
- APP_PORT=9000
- AWS_ENDPOINT=http://moto:3000
- AWS_ACCESS_KEY_ID=testing
- AWS_SECRET_ACCESS_KEY=testing
- AWS_SESSION_TOKEN=testing
- AWS_REGION=us-east-1
- STORAGE_PATH=${STORAGE_PATH:-/shared_data/}
volumes:
- .:/shared_data
depends_on:
- redis
moto:
Expand All @@ -42,7 +78,9 @@ services:
- AWS_SECRET_ACCESS_KEY=testing
- AWS_SESSION_TOKEN=testing
- AWS_REGION=us-east-1

- STORAGE_PATH=${STORAGE_PATH:-/shared_data/}
volumes:
- .:/shared_data
build:
context: .
target: builder
Expand Down
10 changes: 10 additions & 0 deletions features/404.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Feature: Not found pages redirected to index
As a user
I want to be redirected to the index page when I navigate to a non existent page url
So that I use the app

Scenario: Upload File
When I visit the path "/404"
Then I expect the page "div" with "banner-content" to contain the text "Welcome to File Upload Hub"


12 changes: 10 additions & 2 deletions features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ def visit_uri(context, path):

@behave.then('I expect the page "{element}" with "{cls}" to contain the text "{text}"')
def check_element_text(context, element, cls, text):
node = context.browser.page.find(element, class_=cls)
try:
node = context.browser.page.find(element, class_=cls)
except AttributeError as exc:
raise AssertionError(f"{element} with class {cls} not found on page") from exc
assert text in node.text


@behave.when('I upload a "{filename}"')
def upload_file(context, filename):
context.browser.select_form('form[action="http://localhost:9000/uploads"]')
try:
context.browser.select_form('form[action="http://localhost:9000/uploads"]')
except AttributeError as exc:
raise AssertionError(
'form[action="http://localhost:9000/uploads"] not found on page'
) from exc
resource_file = f"{context.resources_directory}/{filename}"
with open(resource_file, "rb") as resource:
context.browser["file"] = resource
Expand Down
27 changes: 23 additions & 4 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ help: ## Show this help.


# End Help Text

STORAGE_PATH=${GITHUB_WORKSPACE:-/shared_data/}
# Linting
lint: ## Run black formatter and pylint
@python3 -m flake8 s3fileuploader features
Expand All @@ -22,8 +22,26 @@ install-requirements: ## Run Install All Requirements

# Local Stack Start
devstack-up: ## Run docker compose stack
@docker compose up --build -d

@STORAGE_PATH=$(STORAGE_PATH) docker compose up --build -d

devstack-logs: ## Spool Logs
@echo "\nDev Stack Logs\n"
@echo "redis logs"
@echo "============="
@docker logs redis
@echo "\nmoto logs"
@echo "============="
@docker logs moto
@echo "\ncelery-worker-1 logs"
@echo "===================="
@docker logs celery-worker-1
@echo "\ncelery-worker-2 logs"
@echo "===================="
@docker logs celery-worker-2
@echo "\ncelery-worker-2 logs"
@echo "===================="
@docker logs s3-file-uploader
@echo "\n"
devstack-down: ## Tear down docker compose stack
@docker compose down --volumes --remove-orphans
# Local Stack End
Expand All @@ -37,6 +55,7 @@ test: ## Run pytest with coverage
feature-test: ## Run behave feature tests
@python3 -m behave


test-all: ## Run end to end tests
@make install-requirements lint test devstack-up feature-test devstack-down
@make install-requirements lint test devstack-up feature-test devstack-logs devstack-down
# Testing End
2 changes: 1 addition & 1 deletion s3fileuploader/src/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from dependency_factory import dependencies

celery = dependencies.celery(__name__)
celery = dependencies.celery
1 change: 1 addition & 0 deletions s3fileuploader/src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class Config(BaseSettings):
model_config = SettingsConfigDict(env_nested_delimiter="_")
secret: str = Field(default="change me")
storage_path: str = Field(default="/shared_data/")
logging: LoggingConfig = Field(default=LoggingConfig())
redis: RedisConfig = Field(default=RedisConfig())
app: AppConfig = Field(default=AppConfig())
Expand Down
20 changes: 9 additions & 11 deletions s3fileuploader/src/dependency_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@

from clients import s3_client, redis_client
from config import Config
from services.aws import AwsS3Service
from tasks.upload import UploadTask
from utils.lock import Lock


class DependencyFactory:
def __init__(self, config: Config):
self.config: Config = config
self._s3_service: AwsS3Service = AwsS3Service(
self.celery: Celery = Celery(
"s3fileuploaderCelery",
broker=self.config.redis.url,
backend=self.config.redis.url,
)
self.s3_service: UploadTask = UploadTask(
s3_client(config.aws.region, config.aws.endpoint),
lambda x: Lock(x, redis_client(config.redis.host, config.redis.port)),
)

def s3_service(self):
return self._s3_service

def celery(self, name: str):
return Celery(
name,
broker=self.config.redis.url,
backend=self.config.redis.url,
)
# Register tasks
self.celery.register_task(self.s3_service)


dependencies = DependencyFactory(Config())
12 changes: 9 additions & 3 deletions s3fileuploader/src/routers/index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import shutil

from fastapi import APIRouter, Request, UploadFile, File
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates

from dependency_factory import dependencies


router = APIRouter()
templates = Jinja2Templates(directory="s3fileuploader/src/templates")

Expand All @@ -15,9 +18,12 @@ async def index(request: Request):

@router.post("/uploads", response_class=HTMLResponse)
async def upload_file(request: Request, file: UploadFile = File(...)) -> HTMLResponse:
dependencies.s3_service().upload_file(
content=file.file, filename=file.filename, bucket="test"
)
file_path = f"{dependencies.config.storage_path}{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)

dependencies.s3_service.apply_async((file_path, file.filename, "test"))

return templates.TemplateResponse(
request=request,
name="upload.html",
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
from io import BytesIO
import logging
import os
from typing import Callable

from boto.s3.connection import S3Connection
from botocore.exceptions import ClientError
from celery import Task

from utils.lock import Lock

LOG = logging.getLogger("app")

class AwsS3Service:

class UploadTask(Task):
def __init__(self, s3: S3Connection, lock_provider: Callable[[str], Lock]):
self.name = "AwsS3Service"
self.s3: S3Connection = s3
self.lock_provider = lock_provider

def upload_file(self, content: BytesIO, filename: str, bucket: str):
def run(self, *args, **kwargs):
path, filename, bucket = args
with self.lock_provider(filename):
try:
self.s3.head_bucket(Bucket=bucket)
except ClientError:
self.s3.create_bucket(Bucket=bucket)
self.s3.upload_fileobj(content, bucket, filename)
LOG.info("Uploading file %s", filename)
self.s3.upload_file(path, bucket, filename)
os.remove(path)
LOG.info("Uploaded file %s successfully", filename)
6 changes: 6 additions & 0 deletions s3fileuploader/src/utils/lock.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging

from redis import Redis

LOG = logging.getLogger("app")


class LockAlreadyAcquiredError(Exception):
pass
Expand All @@ -14,6 +18,8 @@ def __enter__(self):
if self.redis.get(self.name):
raise LockAlreadyAcquiredError(f"lock: {self.name} already exist")
self.redis.set(name=self.name, value=1, nx=True)
LOG.info("Lock created %s", self.name)

def __exit__(self, exc_type, exc_val, exc_tb):
self.redis.delete(self.name)
LOG.info("Released Lock %s", self.name)
2 changes: 2 additions & 0 deletions s3fileuploader/tests/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_config_with_defaults():
"SECRET": "super secret",
"AWS_REGION": "eu-west-2",
"AWS_ENDPOINT": "http://moto:3000",
"STORAGE_PATH": "",
},
clear=True,
)
Expand All @@ -38,3 +39,4 @@ def test_config_with_environment_variables():
assert config.app.url == "http://0.0.0.0:9000"
assert config.aws.region == "eu-west-2"
assert config.aws.endpoint == "http://moto:3000"
assert config.storage_path == ""
11 changes: 6 additions & 5 deletions s3fileuploader/tests/routers/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@


@pytest.mark.asyncio
@patch("s3fileuploader.src.routers.index.dependencies.s3_service")
@patch("s3fileuploader.src.routers.index.dependencies")
@patch("s3fileuploader.src.routers.index.templates.TemplateResponse")
async def test_upload_file(response, s3_service):
s3_service.upload_file = MagicMock()
async def test_upload_file(response, dependencies):
dependencies.s3_service = MagicMock()
dependencies.config.storage_path = ""
request = MagicMock()
file_content = BytesIO(b"Test file content")
file_name = "test_upload.txt"
Expand All @@ -24,6 +25,6 @@ async def test_upload_file(response, s3_service):
name="upload.html",
context={"message": "File successfully uploaded"},
)
s3_service().upload_file.assert_called_with(
content=file_content, filename=file_name, bucket="test"
dependencies.s3_service.apply_async.assert_called_with(
(file_name, file_name, "test")
)
File renamed without changes.
Loading

0 comments on commit 440f625

Please sign in to comment.