Skip to content

Commit

Permalink
Make the job cancellation mutations more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
janbaykara committed Feb 6, 2025
1 parent e620ede commit d233056
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 11 deletions.
39 changes: 28 additions & 11 deletions hub/graphql/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ async def trigger_update(
await models.ExternalDataSource.objects.aget(id=external_data_source_id)
)

batch_request = await BatchRequest.objects.acreate(user=get_current_user(info))
batch_request = await BatchRequest.objects.acreate(
user=get_current_user(info), type=BatchRequest.BatchRequestType.Update
)
request_id = str(batch_request.id)

await data_source.schedule_refresh_all(request_id=request_id)
Expand Down Expand Up @@ -250,7 +252,9 @@ async def import_all(
data_source: models.ExternalDataSource = (
await models.ExternalDataSource.objects.aget(id=external_data_source_id)
)
batch_request = await BatchRequest.objects.acreate(user=get_current_user(info))
batch_request = await BatchRequest.objects.acreate(
user=get_current_user(info), type=BatchRequest.BatchRequestType.Import
)

request_id = str(batch_request.id)
requested_at = now().isoformat()
Expand All @@ -263,22 +267,33 @@ async def import_all(

@strawberry_django.mutation(extensions=[IsAuthenticated()])
def cancel_import(
info: Info, external_data_source_id: str, request_id: str
info: Info, external_data_source_id: str, request_id: str, all: bool = False
) -> ExternalDataSourceAction:
data_source: models.ExternalDataSource = models.ExternalDataSource.objects.get(
id=external_data_source_id
)
# Confirm user has access to this source
user = get_current_user(info)
assert user_can_manage_source(user, data_source)
# Execute
data_source.cancel_jobs(type=BatchRequest.BatchRequestType.Import, all=all)
# Update all remaining procrastinate jobs, cancel them
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


@strawberry_django.mutation(extensions=[IsAuthenticated()])
def cancel_update(
info: Info, external_data_source_id: str, request_id: str, all: bool = False
) -> ExternalDataSourceAction:
data_source: models.ExternalDataSource = models.ExternalDataSource.objects.get(
id=external_data_source_id
)
# Confirm user has access to this source
user = get_current_user(info)
assert user_can_manage_source(user, data_source)
# Execute
data_source.cancel_jobs(type=BatchRequest.BatchRequestType.Update, all=all)
# Update all remaining procrastinate jobs, cancel them
ProcrastinateJob.objects.filter(
args__external_data_source_id=external_data_source_id,
status__in=["todo", "doing"],
args__request_id=request_id,
).update(status="cancelled")
BatchRequest.objects.filter(id=request_id).update(status="cancelled")
#
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


Expand Down Expand Up @@ -422,7 +437,9 @@ def creator_fn() -> tuple[models.ExternalDataSource, bool]: # noqa: F811
source, created = creator_fn()

if created:
batch_request = BatchRequest.objects.create(user=get_current_user(info))
batch_request = BatchRequest.objects.create(
user=get_current_user(info), type=BatchRequest.BatchRequestType.Import
)
request_id = str(batch_request.id)
requested_at = now().isoformat()
async_to_sync(source.schedule_import_all)(
Expand Down
3 changes: 3 additions & 0 deletions hub/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class Mutation:
cancel_import: mutation_types.ExternalDataSourceAction = (
mutation_types.cancel_import
)
cancel_update: mutation_types.ExternalDataSourceAction = (
mutation_types.cancel_update
)
delete_all_records: model_types.ExternalDataSource = (
mutation_types.delete_all_records
)
Expand Down
67 changes: 67 additions & 0 deletions hub/migrations/0156_batchrequest_created_at_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Generated by Django 4.2.11 on 2025-02-06 13:01

from django.db import migrations, models
import django.utils.timezone
import django_choices_field.fields
import hub.models
import utils.django_json


class Migration(migrations.Migration):

dependencies = [
("hub", "0155_externaldatasource_field_definitions"),
]

operations = [
migrations.AddField(
model_name="batchrequest",
name="created_at",
field=models.DateTimeField(
auto_now_add=True, default=django.utils.timezone.now
),
preserve_default=False,
),
migrations.AddField(
model_name="batchrequest",
name="is_cancelled_by_user",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="batchrequest",
name="type",
field=django_choices_field.fields.TextChoicesField(
choices=[("Import", "Import"), ("Update", "Update")],
choices_enum=hub.models.BatchRequest.BatchRequestType,
default="Import",
max_length=6,
),
preserve_default=False,
),
migrations.AddField(
model_name="batchrequest",
name="updated_at",
field=models.DateTimeField(auto_now=True),
),
migrations.AlterField(
model_name="areadata",
name="json",
field=models.JSONField(
blank=True, encoder=utils.django_json.DBJSONEncoder, null=True
),
),
migrations.AlterField(
model_name="genericdata",
name="json",
field=models.JSONField(
blank=True, encoder=utils.django_json.DBJSONEncoder, null=True
),
),
migrations.AlterField(
model_name="persondata",
name="json",
field=models.JSONField(
blank=True, encoder=utils.django_json.DBJSONEncoder, null=True
),
),
]
42 changes: 42 additions & 0 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,37 @@ def filter(self, filter: dict) -> dict:
"""
raise NotImplementedError("Lookup not implemented for this data source type.")

def cancel_jobs(
self,
type: "BatchRequest.BatchRequestType" = None,
# If ALL, then cancel all jobs, regardless of whether they're batched or not
all=False,
):
"""
Cancel all imports for this data source.
"""
job_filters = dict(
args__external_data_source_id=self.id,
status__in=["todo", "doing"],
)
if not all:
# Cancel all BatchRequests
BatchRequest.objects.filter(
data_source=self,
is_cancelled_by_user=None,
**({"type": type} if type else {}),
).update(is_cancelled_by_user=True)
# Cancel all BatchRequests
cancelled_request_ids = BatchRequest.objects.filter(
data_source=self,
is_cancelled_by_user=True,
)
job_filters.update(
args__request_id__in=cancelled_request_ids.values_list("id", flat=True)
)
# Cancel all user-requested procrastinate import jobs
return ProcrastinateJob.objects.filter(**job_filters).update(status="cancelled")


class DatabaseJSONSource(ExternalDataSource):
crm_type = "DatabaseJSONSource"
Expand Down Expand Up @@ -4971,8 +5002,19 @@ def update_apitoken_cache_on_delete(sender, instance, *args, **kwargs):


class BatchRequest(models.Model):
# enum of request types
class BatchRequestType(models.TextChoices):
# User requested import of data to Mapped
Import = "Import"
# User requested update of data to third party data source
Update = "Update"

id = models.UUIDField(primary_key=True, default=uuid.uuid4)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
user = models.ForeignKey(User, on_delete=models.CASCADE)
send_email = models.BooleanField(default=False)
sent_email = models.BooleanField(default=False)
status = models.CharField(max_length=32, default="todo")
is_cancelled_by_user = models.BooleanField(default=False)
type = TextChoicesField(choices_enum=BatchRequestType)

0 comments on commit d233056

Please sign in to comment.