Skip to content

Commit

Permalink
Move job processing to new JobManager class.
Browse files Browse the repository at this point in the history
  • Loading branch information
emmiegit committed Jan 21, 2024
1 parent 8422238 commit 79647d0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 80 deletions.
78 changes: 2 additions & 76 deletions yellowstone/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,26 @@
and processes new tasks to be run in response.
"""

import json
import logging
import time
from typing import NoReturn, TypedDict, cast
from typing import NoReturn

import pugsql

from .config import Config, getenv
from .exception import UnknownJobError
from .job import (
JobManager,
JobType,
get_site,
get_user,
get_user_avatar,
index_site_members,
)
from .job.index_site_members import START_OFFSET as START_MEMBER_OFFSET
from .s3 import S3
from .types import Json
from .wikidot import Wikidot

MAX_RETRIES = 4
FULL_WORKLOAD_PAUSE = 10

logger = logging.getLogger(__name__)


class JobDict(TypedDict):
job_id: int
job_type: str
job_object: str
attempts: int
data: Json


class BackupDispatcher:
__slots__ = (
"config",
Expand Down Expand Up @@ -98,63 +82,5 @@ def process_all_jobs(self) -> None:
job = self.database.get_job()
if job is None:
break
self.process_job(job)
self.job.process(self, job)
logger.info("No more jobs received, done")

def has_jobs(self) -> bool:
row = self.database.has_jobs()
exists = row["exists"]
assert isinstance(exists, bool)
return exists

def process_job(self, job: JobDict) -> None:
job_type = JobType(job["job_type"])
data = job["data"]
logger.info("Processing job %r", job)
try:
match job_type:
case JobType.INDEX_SITE_PAGES:
raise NotImplementedError
case JobType.INDEX_SITE_FORUMS:
raise NotImplementedError
case JobType.INDEX_SITE_MEMBERS:
index_site_members.run(
self,
cast(index_site_members.SiteMemberJob, data),
)
case JobType.FETCH_USER:
get_user.run(
self,
cast(get_user.GetUserJob, data),
)
case JobType.FETCH_USER_AVATAR:
get_user_avatar.run(
self,
cast(get_user_avatar.GetUserAvatarJob, data),
)
case _:
raise UnknownJobError(f"Unknown job type: {job_type}")
except UnknownJobError:
logger.error("Fatal: No job implementation", exc_info=True)
raise
except Exception as _:
logger.error("Error occurred while processing job", exc_info=True)
if job["attempts"] < MAX_RETRIES:
logger.debug(
"Adding to attempt count, currently at %d",
job["attempts"],
)
self.database.fail_job(job_id=job["job_id"])
else:
logger.error("Job failed too many times, sending to dead letter queue")
with self.database.transaction():
self.database.delete_job(job_id=job["job_id"])
self.database.add_dead_job(
job_id=job["job_id"],
job_type=job["job_type"],
job_object=job["job_object"],
data=json.dumps(job["data"]),
)
else:
logger.debug("Job completed successfully, removing from queue")
self.database.delete_job(job_id=job["job_id"])
80 changes: 76 additions & 4 deletions yellowstone/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,24 @@
"""

import json
import logging
from enum import Enum, unique
from typing import cast
from typing import TYPE_CHECKING, TypedDict, cast

from ..exception import UnknownJobError
from ..types import Json
from . import get_user, get_user_avatar, index_site_members
from .get_user import GetUserJob
from .get_user_avatar import GetUserAvatarJob
from .index_site_members import SiteMemberJob

if TYPE_CHECKING:
from ..core import BackupDispatcher

logger = logging.getLogger(__name__)

MAX_RETRIES = 4


@unique
class JobType(Enum):
Expand All @@ -26,6 +36,14 @@ class JobType(Enum):
FETCH_USER_AVATAR = "fetch-user-avatar"


class JobDict(TypedDict):
job_id: int
job_type: str
job_object: str
attempts: int
data: Json


class JobManager:
__slots__ = ("database",)

Expand Down Expand Up @@ -53,6 +71,60 @@ def fetch_user(self, data: GetUserJob) -> None:
def fetch_user_avatar(self, data: GetUserAvatarJob) -> None:
self.add_raw(JobType.FETCH_USER_AVATAR, cast(Json, data))

def process_job(self):
# TODO
...
def has(self) -> bool:
row = self.database.has_jobs()
exists = row["exists"]
assert isinstance(exists, bool)
return exists

def process(self, core: "BackupDispatcher", job: JobDict) -> None:
job_type = JobType(job["job_type"])
data = job["data"]
logger.info("Processing job %r", job)
try:
match job_type:
case JobType.INDEX_SITE_PAGES:
raise NotImplementedError
case JobType.INDEX_SITE_FORUMS:
raise NotImplementedError
case JobType.INDEX_SITE_MEMBERS:
index_site_members.run(
core,
cast(index_site_members.SiteMemberJob, data),
)
case JobType.FETCH_USER:
get_user.run(
core,
cast(get_user.GetUserJob, data),
)
case JobType.FETCH_USER_AVATAR:
get_user_avatar.run(
core,
cast(get_user_avatar.GetUserAvatarJob, data),
)
case _:
raise UnknownJobError(f"Unknown job type: {job_type}")
except UnknownJobError:
logger.error("Fatal: No job implementation", exc_info=True)
raise
except Exception as _:
logger.error("Error occurred while processing job", exc_info=True)
if job["attempts"] < MAX_RETRIES:
logger.debug(
"Adding to attempt count, currently at %d",
job["attempts"],
)
self.database.fail_job(job_id=job["job_id"])
else:
logger.error("Job failed too many times, sending to dead letter queue")
with self.database.transaction():
self.database.delete_job(job_id=job["job_id"])
self.database.add_dead_job(
job_id=job["job_id"],
job_type=job["job_type"],
job_object=job["job_object"],
data=json.dumps(job["data"]),
)
else:
logger.debug("Job completed successfully, removing from queue")
self.database.delete_job(job_id=job["job_id"])

0 comments on commit 79647d0

Please sign in to comment.