From cc26a4307f70da575811a3702aab19b479322cc7 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Tue, 4 Mar 2025 12:01:26 +0100 Subject: [PATCH] fix(data-warehouse): Handle compaction errors (#29462) --- .../data_imports/deltalake_compaction_job.py | 13 +++++++++---- .../data_imports/pipelines/pipeline/pipeline.py | 4 +--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/posthog/temporal/data_imports/deltalake_compaction_job.py b/posthog/temporal/data_imports/deltalake_compaction_job.py index a07e4fd61fbb1..76b8279473b09 100644 --- a/posthog/temporal/data_imports/deltalake_compaction_job.py +++ b/posthog/temporal/data_imports/deltalake_compaction_job.py @@ -6,18 +6,19 @@ from django.db import close_old_connections from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.exceptions_capture import capture_exception from posthog.settings import TEST, DEBUG from posthog.temporal.common.base import PostHogWorkflow from posthog.temporal.common.heartbeat_sync import HeartbeaterSync from posthog.temporal.common.client import sync_connect -from posthog.temporal.common.logger import bind_temporal_worker_logger_sync +from posthog.temporal.common.logger import FilteringBoundLogger, bind_temporal_worker_logger_sync from posthog.temporal.data_imports.pipelines.pipeline.delta_table_helper import DeltaTableHelper from posthog.warehouse.models import ExternalDataJob, ExternalDataSchema from posthog.constants import DATA_WAREHOUSE_COMPACTION_TASK_QUEUE from temporalio.exceptions import WorkflowAlreadyStartedError -def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) -> str: +def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema, logger: FilteringBoundLogger) -> str: temporal = sync_connect() workflow_id = f"{schema.id}-compaction" @@ -39,7 +40,11 @@ def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) -> if not DEBUG and not TEST: # Wait for the compaction to complete before continuing - asyncio.run(handle.result()) + try: + asyncio.run(handle.result()) + except Exception as e: + capture_exception(e) + logger.exception(f"Compaction job failed with: {e}", exc_info=e) except WorkflowAlreadyStartedError: pass @@ -80,7 +85,7 @@ async def run(self, inputs: DeltalakeCompactionJobWorkflowInputs): await workflow.execute_activity( run_compaction, inputs, - start_to_close_timeout=dt.timedelta(minutes=5), + start_to_close_timeout=dt.timedelta(minutes=60), retry_policy=RetryPolicy( maximum_attempts=1, ), diff --git a/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py index aba7fd6967cf2..5c7643516a2ee 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py @@ -204,10 +204,8 @@ def _post_run_operations(self, row_count: int): self._logger.debug("No deltalake table, not continuing with post-run ops") return - self._logger.debug("SKIPPING deltatable compact and vacuuming") - self._logger.debug("Triggering workflow to compact and vacuum") - compaction_job_id = trigger_compaction_job(self._job, self._schema) + compaction_job_id = trigger_compaction_job(self._job, self._schema, self._logger) self._logger.debug(f"Compaction workflow id: {compaction_job_id}") file_uris = delta_table.file_uris()