Skip to content

Commit

Permalink
added background tasks for prefetching using celery, redis, flower
Browse files Browse the repository at this point in the history
  • Loading branch information
deshetti committed Nov 28, 2022
1 parent 95c568f commit a2c3a38
Show file tree
Hide file tree
Showing 12 changed files with 537 additions and 24 deletions.
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ MONGODB_PASSWORD=example
# S3 CONFIGURATION
S3_ENDPOINT_URL=CHANGE_ME
S3_ACCESS_KEY_ID=CHANGE_ME
S3_SECRET_ACCESS_KEY=CHANGE_ME
S3_SECRET_ACCESS_KEY=CHANGE_ME

# CELERY CONFIGURATION
CELERY_BROKER_URL=redis://:password@redis:6379/0
CELERY_RESULT_BACKEND=redis://:password@redis:6379/0
25 changes: 24 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,27 @@ dmypy.json
# Cython debug symbols
cython_debug/

# End of https://www.toptal.com/developers/gitignore/api/python,
# End of https://www.toptal.com/developers/gitignore/api/python,# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode

### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets

# Local History for Visual Studio Code
.history/

# Built Visual Studio Code Extensions
*.vsix

### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide

# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode
n
15 changes: 7 additions & 8 deletions app/api/api_v1/routers/prefetch.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from fastapi import APIRouter

from app.models.prefetch import Prefetch
from app.utils.profile_db import save_profile
from app.utils.tasks import prefetch_profiles

prefetch_router = router = APIRouter()


@router.post("/prefetch/")
async def prefetch_profiles(prefetch: Prefetch):
async def prefetch_profiles_background(prefetch: Prefetch):
"""Prefetch and save Profiles for a list of Datasets
Args:
Expand All @@ -25,10 +25,9 @@ async def prefetch_profiles(prefetch: Prefetch):
minimal = prefetch.minimal
samples_to_fetch = prefetch.samples_to_fetch

for url in urls:
await save_profile(url, minimal, samples_to_fetch)
# Prefetch Profiles as a background job
result = prefetch_profiles.delay(
urls=urls, minimal=minimal, samples_to_fetch=samples_to_fetch
)

# Step 2: Implement prefetching as a background task
# Step 3: Create a TaskID and return it to the user
# Step 4: Implement MongoDB insert as async
return None
return result.id
4 changes: 2 additions & 2 deletions app/api/api_v1/routers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from app.models.scatter import Scatter
from app.models.table import Table
from app.models.variables import Variables
from app.utils.dataframes import get_dataframe_async
from app.utils.profile_db import get_profile
from app.utils.util_functions import get_dataframe

profile_router = router = APIRouter()
setting = Settings()
Expand All @@ -40,7 +40,7 @@ async def provide_raw_profiling(
response (json): Pandas-Profile in json
"""

dataframe = await get_dataframe(source)
dataframe = await get_dataframe_async(source)

# WHAT?: Change sample sizes based on number of rows
# WHY?: Fow smaller dataset number of samples to
Expand Down
10 changes: 10 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ class Settings(BaseSettings):
S3_ACCESS_KEY_ID: str = "CHANGE_ME"
S3_SECRET_ACCESS_KEY: str = "CHANGE_ME"

# REDIS CONFIGURATION
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
REDIS_DB: int = 0
REDIS_PASSWORD: str = "password"

# CELERY CONFIGURATION
CELERY_BROKER_URL: str = "redis://:password@redis:6379/0"
CELERY_RESULT_BACKEND: str = "redis://:password@redis:6379/0"

# CORS PARAMS
CORS_ORIGINS: List[str] = ["*"]
CORS_METHODS: List[str] = ["*"]
Expand Down
59 changes: 53 additions & 6 deletions app/utils/util_functions.py → app/utils/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,23 @@ def get_encoding(obj=None, url=None):
return encoding


async def get_dataframe_honouring_encoding(file_url: str) -> pl.DataFrame:
async def get_dataframe_honouring_encoding_async(
file_url: str,
) -> pl.DataFrame:
try:
df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0)
except UnicodeDecodeError:
encoding = get_encoding(url=file_url)
df = pl.read_csv(
file_url,
null_values="NA",
encoding=encoding,
infer_schema_length=0,
)
return df


def get_dataframe_honouring_encoding(file_url: str) -> pl.DataFrame:
try:
df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0)
except UnicodeDecodeError:
Expand Down Expand Up @@ -50,13 +66,44 @@ def json_conversion_objects(obj):
return obj.item()


async def get_dataframe(file_url: str, source="url"):
async def get_dataframe_async(file_url: str):
"""Functionality to provide dataframe from various sources
Args:
file_url (str): URL to the file to be read
Returns:
[type]: [description]
"""
# read any thing and provide proper dataframe instance
# link : str, validate as proper url
# use link from file present in mande Studio

url = urlparse(file_url)

if url.scheme == "http" or url.scheme == "https":
df = await get_dataframe_honouring_encoding_async(file_url)
return df

elif url.scheme == "s3":

fs = s3fs.S3FileSystem(
key=setting.S3_ACCESS_KEY_ID,
secret=setting.S3_SECRET_ACCESS_KEY,
client_kwargs={"endpoint_url": setting.S3_ENDPOINT_URL},
)

with fs.open(f"{url.netloc}{url.path}") as f:
df = await get_dataframe_honouring_encoding_async(f)

return df


def get_dataframe(file_url: str):
"""Functionality to provide dataframe from various sources
Args:
file_url (str): URL to the file to be read
source (str, optional): Various sources from where file can be read.\
Defaults to "url".
Returns:
[type]: [description]
Expand All @@ -68,7 +115,7 @@ async def get_dataframe(file_url: str, source="url"):
url = urlparse(file_url)

if url.scheme == "http" or url.scheme == "https":
df = await get_dataframe_honouring_encoding(file_url)
df = get_dataframe_honouring_encoding(file_url)
return df

elif url.scheme == "s3":
Expand All @@ -80,6 +127,6 @@ async def get_dataframe(file_url: str, source="url"):
)

with fs.open(f"{url.netloc}{url.path}") as f:
df = await get_dataframe_honouring_encoding(f)
df = get_dataframe_honouring_encoding(f)

return df
4 changes: 2 additions & 2 deletions app/utils/profile_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from app.core.config import Settings
from app.db.mongo import profiles_collection
from app.utils.dataframes import get_dataframe_async
from app.utils.profile_segments import ProfileSegments
from app.utils.util_functions import get_dataframe

setting = Settings()

Expand All @@ -25,7 +25,7 @@ async def save_profile(
samples_to_fetch (int, optional): Samples of Dataset rows to fetch. Defaults to 10. # noqa: E501
"""

dataframe = await get_dataframe(url)
dataframe = await get_dataframe_async(url)

if dataframe.shape[0] < 100:
samples_to_fetch = 5
Expand Down
70 changes: 70 additions & 0 deletions app/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from fastapi.encoders import jsonable_encoder
from pandas_profiling import ProfileReport

from app.db.mongo import profiles_collection
from app.utils.dataframes import get_dataframe
from app.utils.profile_segments import ProfileSegments
from app.worker import celery


@celery.task(name="prefetch_profile")
def prefetch_profile(
url: str, minimal: bool = True, samples_to_fetch: int = 10
):

"""Save Profile to MongoDB
Args:
url (str): URL of the Dataset
description (Description): Pandas-Profile in json
minimal (bool, optional): Mode of Profile that needs to be fetched. Defaults to True. # noqa: E501
samples_to_fetch (int, optional): Samples of Dataset rows to fetch. Defaults to 10. # noqa: E501
"""

dataframe = get_dataframe(url)

if dataframe.shape[0] < 100:
samples_to_fetch = 5

profile = ProfileReport(
dataframe.to_pandas(),
minimal=minimal,
samples={"head": samples_to_fetch, "tail": samples_to_fetch},
show_variable_description=False,
progress_bar=False,
)

# use `ProfileSegments` to get duplicates part of pandas profiling
profile_segment = ProfileSegments(profile, columns=list(dataframe.columns))

description = profile_segment.description()

# Add `url` to the description before saving to MongoDB
description["url"] = url

# Upsert a json-encoded description into MongoDB
profiles_collection.update_one(
{"url": url}, {"$set": jsonable_encoder(description)}, upsert=True
)

return


@celery.task(name="prefetch_profiles")
def prefetch_profiles(
urls: list, minimal: bool = True, samples_to_fetch: int = 10
):

"""Save Profiles to MongoDB
Args:
urls (list[str]): List of URLs for which the profile needs to be prefetched. # noqa: E501
example_url (https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv) # noqa: E501
minimal (bool, optional): Mode of Profile that needs to be fetched. Defaults to True. # noqa: E501
samples_to_fetch (int, optional): Samples of Dataset rows to fetch. Defaults to 10. # noqa: E501
"""

for url in urls:
prefetch_profile.delay(url, minimal, samples_to_fetch)

return
15 changes: 15 additions & 0 deletions app/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from celery import Celery

from app.core.config import Settings

settings = Settings()

celery = Celery(
__name__,
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=["app.utils.tasks"],
task_serializer="pickle",
result_serializer="pickle",
accept_content=["application/json", "application/x-python-serialize"],
)
36 changes: 33 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.7"
services:

##########################################################################
###### CACHE #######
###### BACKGROUND TASKS #######
##########################################################################

redis:
Expand All @@ -12,7 +12,37 @@ services:
- 6379:6379
volumes:
- ./volumes/redis:/data
command: redis-server --appendonly yes --requirepass redispass
command: redis-server --appendonly yes --requirepass password
networks:
- hunting

celery:
build:
context: .
dockerfile: Dockerfile.dev
env_file:
- .env
volumes:
- .:/app
command: celery -A app.worker worker -l info
depends_on:
- redis
networks:
- hunting

flower:
build:
context: .
dockerfile: Dockerfile.dev
command: celery -A app.worker flower --port=5555
ports:
- 5555:5555
environment:
- CELERY_BROKER_URL=redis://:password@redis:6379/0
- CELERY_RESULT_BACKEND=redis://:password@redis:6379/0
depends_on:
- redis
- celery
networks:
- hunting

Expand All @@ -24,7 +54,6 @@ services:
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
MONGO_INITDB_DATABASE: hunting
ports:
- 27017:27017
volumes:
Expand Down Expand Up @@ -54,5 +83,6 @@ services:
networks:
- hunting


networks:
hunting:
Loading

0 comments on commit a2c3a38

Please sign in to comment.