Skip to content
14 changes: 12 additions & 2 deletions src/aap_eda/core/tasking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,24 @@ def __init__(
)


def enqueue_delay(queue_name: str, delay: int, *args, **kwargs) -> Job:
def enqueue_delay(
queue_name: str, job_id: str, delay: int, *args, **kwargs
) -> Job:
"""Enqueue a job to run after specific seconds."""
scheduler = get_scheduler(name=queue_name)
return scheduler.enqueue_at(
datetime.utcnow() + timedelta(seconds=delay), *args, **kwargs
datetime.utcnow() + timedelta(seconds=delay),
job_id=job_id,
*args,
**kwargs,
)


def queue_cancel_job(queue_name: str, job_id: str) -> None:
scheduler = get_scheduler(name=queue_name)
scheduler.cancel(job_id)


def unique_enqueue(queue_name: str, job_id: str, *args, **kwargs) -> Job:
"""Enqueue a new job if it is not already enqueued.

Expand Down
12 changes: 11 additions & 1 deletion src/aap_eda/services/activation/activation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from aap_eda.services.activation.engine import exceptions as engine_exceptions
from aap_eda.services.activation.engine.common import ContainerRequest
from aap_eda.services.activation.restart_helper import (
system_cancel_restart_activation,
system_restart_activation,
)

Expand Down Expand Up @@ -772,6 +773,8 @@ def delete(self):
)
LOGGER.error(msg)

# Save the id; once the db instance is deleted the id is set to None.
saved_id = self.db_instance.id
try:
self.db_instance.delete()
except (ObjectDoesNotExist, ValueError):
Expand All @@ -781,8 +784,15 @@ def delete(self):
)
LOGGER.error(msg)
raise exceptions.ActivationManagerError(msg) from None

# Cancel any outstanding restart.
system_cancel_restart_activation(
self.db_instance_type,
saved_id,
)

LOGGER.info(
f"Delete operation for activation id: {self.db_instance.id} "
f"Delete operation for activation id: {saved_id} "
"Activation deleted.",
)

Expand Down
26 changes: 20 additions & 6 deletions src/aap_eda/services/activation/restart_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,27 @@

import aap_eda.tasks.activation_request_queue as requests_queue
from aap_eda.core.enums import ActivationRequest
from aap_eda.core.tasking import enqueue_delay
from aap_eda.core.tasking import enqueue_delay, queue_cancel_job

LOGGER = logging.getLogger(__name__)


def auto_start_job_id(process_parent_type: str, id: int) -> str:
"""Generate the auto-start job id for use in enqueuing and cancelling."""
return f"auto-start-{process_parent_type}-{id}"


def system_cancel_restart_activation(
process_parent_type: str, id: int
) -> None:
"""Cancel the restart for the activation.

The restart may not exist.
"""
LOGGER.debug(f"Cancelling auto-start for {process_parent_type} {id}")
queue_cancel_job("default", auto_start_job_id(process_parent_type, id))


def system_restart_activation(
process_parent_type: str, id: int, delay_seconds: int
) -> None:
Expand All @@ -37,6 +53,7 @@ def system_restart_activation(
)
enqueue_delay(
"default",
auto_start_job_id(process_parent_type, id),
delay_seconds,
_queue_auto_start,
process_parent_type,
Expand All @@ -50,8 +67,5 @@ def _queue_auto_start(process_parent_type: str, id: int) -> None:
requests_queue.push(
process_parent_type, id, ActivationRequest.AUTO_START
)
except IntegrityError:
LOGGER.warning(
f"{process_parent_type} {id} no longer exists, "
"auto-start request will not be processed",
)
except IntegrityError as exc:
LOGGER.warning(exc)
25 changes: 23 additions & 2 deletions src/aap_eda/tasks/activation_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from django.db import transaction
from django.db.models.query import QuerySet
from django.db.utils import IntegrityError

from aap_eda.core.enums import ActivationRequest
from aap_eda.core.models import ActivationRequestQueue
from aap_eda.core.enums import ActivationRequest, ProcessParentType
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream

from .exceptions import UnknownProcessParentType


@transaction.atomic
def push(parent_type: str, parent_id: int, request: ActivationRequest) -> None:
if parent_type == ProcessParentType.ACTIVATION:
model = Activation
elif parent_type == ProcessParentType.EVENT_STREAM:
model = EventStream
else:
raise UnknownProcessParentType(
f"Unknown parent type {parent_type}",
)

ActivationRequestQueue.objects.create(
process_parent_type=parent_type,
process_parent_id=parent_id,
request=request,
)

# Check that the parent referenced still exists.
if not model.objects.filter(id=parent_id).exists():
raise IntegrityError(
f"{parent_type} {parent_id} no longer exists, "
f"{request} request will not be processed",
)


def peek_all(parent_type: str, parent_id: int) -> list[ActivationRequestQueue]:
requests = ActivationRequestQueue.objects.filter(
Expand Down
19 changes: 19 additions & 0 deletions src/aap_eda/tasks/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class UnknownProcessParentType(Exception):
"""Raised when the process parent type is unknown."""

...
8 changes: 2 additions & 6 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
StatusManager,
)

from .exceptions import UnknownProcessParentType

LOGGER = logging.getLogger(__name__)


Expand All @@ -47,12 +49,6 @@ class HealthyQueueNotFoundError(Exception):
...


class UnknownProcessParentType(Exception):
"""Raised when the process parent type is unknown."""

...


def _manage_process_job_id(process_parent_type: str, id: int) -> str:
"""Return the unique job id for the activation manager task."""
return f"{process_parent_type}-{id}"
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/services/test_restart_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from unittest import mock

import pytest
from django_rq import get_scheduler

from aap_eda.core import models
from aap_eda.core.enums import ActivationRequest, ProcessParentType
from aap_eda.services.activation.restart_helper import (
_queue_auto_start,
auto_start_job_id,
system_cancel_restart_activation,
system_restart_activation,
)

Expand All @@ -39,12 +43,40 @@ def activation():
)


@pytest.mark.django_db
def test_system_cancel_restart_activation(activation):
job_id = auto_start_job_id(ProcessParentType.ACTIVATION, activation.id)

scheduler = get_scheduler(name="default")

delay = 5
system_restart_activation(
ProcessParentType.ACTIVATION, activation.id, delay
)

# Sleep for half the delay and verify the job is still in the scheduler.
time.sleep(delay / 2)
assert job_id in scheduler

# Cancel the job and verify that it's not in the scheduler.
system_cancel_restart_activation(
ProcessParentType.ACTIVATION, activation.id
)
assert job_id not in scheduler

# Sleep for half the delay and verify the job didn't somehow run; i.e., we
# really canceled it.
time.sleep(delay / 2)
assert models.ActivationRequestQueue.objects.count() == 0


@pytest.mark.django_db
@mock.patch("aap_eda.services.activation.restart_helper.enqueue_delay")
def test_system_restart_activation(enqueue_mock, activation):
system_restart_activation(ProcessParentType.ACTIVATION, activation.id, 5)
enqueue_args = [
"default",
auto_start_job_id(ProcessParentType.ACTIVATION, activation.id),
5,
_queue_auto_start,
ProcessParentType.ACTIVATION,
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/tasks/test_activation_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
# limitations under the License.

import pytest
from django.db.utils import IntegrityError

import aap_eda.tasks.activation_request_queue as queue
from aap_eda.core import models
from aap_eda.core.enums import ActivationRequest, ProcessParentType
from aap_eda.tasks.exceptions import UnknownProcessParentType


@pytest.fixture()
Expand Down Expand Up @@ -76,6 +78,28 @@ def test_queue(activations):
)


@pytest.mark.django_db
def test_queue_push_exceptions():
parent_type = "unknown"
parent_id = 1

with pytest.raises(UnknownProcessParentType) as info:
queue.push(parent_type, parent_id, ActivationRequest.AUTO_START)
assert str(info.value) == f"Unknown parent type {parent_type}"

with pytest.raises(IntegrityError) as info:
queue.push(
ProcessParentType.ACTIVATION,
parent_id,
ActivationRequest.AUTO_START,
)
assert (
str(info.value)
== f"{ProcessParentType.ACTIVATION} {parent_id} no longer exists, "
f"{ActivationRequest.AUTO_START} request will not be processed"
)


@pytest.mark.parametrize(
"requests",
[
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
from aap_eda.core.models import Activation, EventStream, RulebookProcess
from aap_eda.settings import default
from aap_eda.tasks import orchestrator
from aap_eda.tasks.exceptions import UnknownProcessParentType
from aap_eda.tasks.orchestrator import (
HealthyQueueNotFoundError,
UnknownProcessParentType,
check_rulebook_queue_health,
get_least_busy_queue_name,
get_process_parent,
Expand Down