Skip to content

Commit

Permalink
feat: Add REDIS WORKER TIMEOUT var
Browse files Browse the repository at this point in the history
  • Loading branch information
drorganvidez committed Oct 11, 2024
1 parent a86d7b6 commit 1269a00
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 28 deletions.
1 change: 1 addition & 0 deletions .env.docker.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ MARIADB_PASSWORD=uvlhubdb_password
MARIADB_ROOT_PASSWORD=uvlhubdb_root_password
WORKING_DIR=/app/
REDIS_URL=redis://redis:6379
REDIS_WORKER_TIMEOUT=180
3 changes: 2 additions & 1 deletion .env.docker.production.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ MARIADB_PASSWORD=<CHANGE_THIS>
MARIADB_ROOT_PASSWORD=<CHANGE_THIS>
WEBHOOK_TOKEN=<CHANGE_THIS>
WORKING_DIR=/app/
REDIS_URL=redis://redis:6379
REDIS_URL=redis://redis:6379
REDIS_WORKER_TIMEOUT=180
3 changes: 2 additions & 1 deletion .env.local.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ MARIADB_USER=uvlhubdb_user
MARIADB_PASSWORD=uvlhubdb_password
MARIADB_ROOT_PASSWORD=uvlhubdb_root_password
WORKING_DIR=""
REDIS_URL=redis://localhost:6379
REDIS_URL=redis://localhost:6379
REDIS_WORKER_TIMEOUT=180
3 changes: 2 additions & 1 deletion .env.vagrant.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ MARIADB_USER=uvlhubdb_user
MARIADB_PASSWORD=uvlhubdb_password
MARIADB_ROOT_PASSWORD=uvlhubdb_root_password
WORKING_DIR=/vagrant/
REDIS_URL=redis://localhost:6379
REDIS_URL=redis://localhost:6379
REDIS_WORKER_TIMEOUT=180
3 changes: 2 additions & 1 deletion app/modules/hubfile/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,6 @@ def hubfile_aupdated_listener(mapper, connection, target):
task_manager = TaskQueueManager()
task_manager.enqueue_task(
"app.modules.hubfile.tasks.transform_uvl",
path=path
path=path,
timeout=300
)
20 changes: 10 additions & 10 deletions app/modules/hubfile/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def transform_uvl(path, retries=5, delay=2):
except Exception as e:
logger.error(f"Error in JSON transformation: {e}")

# SPLOT SPLX Transformation
splot_dir = os.path.join(base_dir, "splot")
create_directory_if_not_exists(splot_dir)
splx_path = os.path.join(splot_dir, os.path.basename(path).replace(".uvl", ".splx"))
try:
SPLOTWriter(splx_path, fm).transform()
logger.info(f"SPLX file created at: {splx_path}")
except Exception as e:
logger.error(f"Error in SPLX transformation: {e}")

# DIMACS CNF Transformation
dimacs_dir = os.path.join(base_dir, "dimacs")
create_directory_if_not_exists(dimacs_dir)
Expand All @@ -62,13 +72,3 @@ def transform_uvl(path, retries=5, delay=2):
logger.info(f"CNF file created at: {cnf_path}")
except Exception as e:
logger.error(f"Error in CNF transformation: {e}")

# SPLOT SPLX Transformation
splot_dir = os.path.join(base_dir, "splot")
create_directory_if_not_exists(splot_dir)
splx_path = os.path.join(splot_dir, os.path.basename(path).replace(".uvl", ".splx"))
try:
SPLOTWriter(splx_path, fm).transform()
logger.info(f"SPLX file created at: {splx_path}")
except Exception as e:
logger.error(f"Error in SPLX transformation: {e}")
3 changes: 2 additions & 1 deletion app/modules/hubfile/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ def test_create_hubfile_calls_enqueue_task(test_client):
# Verificar que enqueue_task fue llamado correctamente
mock_enqueue_task.assert_called_once_with(
"app.modules.hubfile.tasks.transform_uvl", # Nombre de la tarea
path=path # Parámetro que recibe la tarea
path=path, # Parámetro que recibe la tarea
timeout=300
)
1 change: 1 addition & 0 deletions core/managers/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Config:
SESSION_USE_SIGNER = True
REDIS_URL = os.getenv("REDIS_URL", 'redis://redis:6379')
SESSION_REDIS = redis.from_url(REDIS_URL)
REDIS_WORKER_TIMEOUT = os.getenv("REDIS_WORKER_TIMEOUT", 180)


class DevelopmentConfig(Config):
Expand Down
31 changes: 18 additions & 13 deletions core/managers/task_queue_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import datetime
import logging
from flask import current_app
import pytz
import redis
from rq import Queue


logger = logging.getLogger(__name__)


Expand All @@ -18,18 +19,22 @@ def __new__(cls, *args, **kwargs):
return cls._instance

def _initialize(self):
self.redis = redis.from_url('redis://redis:6379')
self.queue = Queue(connection=self.redis)
self.queue = Queue(connection=current_app.config["SESSION_REDIS"])
self.redis_worker_timeout = current_app.config["REDIS_WORKER_TIMEOUT"]
logger.info("TaskQueueManager initialized with Redis connection.")

def enqueue_task(self, task_name: str, *args, **kwargs):
"""
Método para encolar una tarea personalizada.
def enqueue_task(self, task_name: str, *args, timeout=None, **kwargs):
'''
Method to queue a custom task.
:param task_name: Full name of the method or function to queue (e.g. ‘app.modules.hubfile.process_task_worker’).
:param args: Positional arguments required by the task.
:param timeout: Maximum task execution time in seconds (default: 180s).
:param kwargs: Named arguments required by the task.
'''
if timeout is None:
timeout = self.redis_worker_timeout # Asigna el timeout de la instancia si no se provee uno.

:param task_name: Nombre completo del método o función a encolar (ej. "app.modules.hubfile.process_task_worker")
:param args: Argumentos posicionales que requiere la tarea.
:param kwargs: Argumentos nombrados que requiere la tarea.
"""
task_metadata = {
"task_name": task_name,
"args": args,
Expand All @@ -38,6 +43,6 @@ def enqueue_task(self, task_name: str, *args, **kwargs):
}
logger.info(f"Enqueueing task: {task_metadata}")

# Encolar la tarea personalizada en RQ
self.queue.enqueue(task_name, *args, **kwargs)
logger.info(f"Task '{task_name}' enqueued with arguments: {args}, {kwargs}")
# Bind the custom task to RQ with timeout
self.queue.enqueue(task_name, *args, **kwargs, job_timeout=timeout)
logger.info(f"Task '{task_name}' enqueued with arguments: {args}, {kwargs} and timeout: {timeout}")

0 comments on commit 1269a00

Please sign in to comment.