Skip to content

Commit

Permalink
fix(data-warehouse): Handle compaction errors (#29462)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Mar 4, 2025
1 parent 39ac6d9 commit cc26a43
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
13 changes: 9 additions & 4 deletions posthog/temporal/data_imports/deltalake_compaction_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
from django.db import close_old_connections
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from posthog.exceptions_capture import capture_exception
from posthog.settings import TEST, DEBUG
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.heartbeat_sync import HeartbeaterSync
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.logger import bind_temporal_worker_logger_sync
from posthog.temporal.common.logger import FilteringBoundLogger, bind_temporal_worker_logger_sync
from posthog.temporal.data_imports.pipelines.pipeline.delta_table_helper import DeltaTableHelper
from posthog.warehouse.models import ExternalDataJob, ExternalDataSchema
from posthog.constants import DATA_WAREHOUSE_COMPACTION_TASK_QUEUE
from temporalio.exceptions import WorkflowAlreadyStartedError


def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) -> str:
def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema, logger: FilteringBoundLogger) -> str:
temporal = sync_connect()
workflow_id = f"{schema.id}-compaction"

Expand All @@ -39,7 +40,11 @@ def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) ->

if not DEBUG and not TEST:
# Wait for the compaction to complete before continuing
asyncio.run(handle.result())
try:
asyncio.run(handle.result())
except Exception as e:
capture_exception(e)
logger.exception(f"Compaction job failed with: {e}", exc_info=e)
except WorkflowAlreadyStartedError:
pass

Expand Down Expand Up @@ -80,7 +85,7 @@ async def run(self, inputs: DeltalakeCompactionJobWorkflowInputs):
await workflow.execute_activity(
run_compaction,
inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
start_to_close_timeout=dt.timedelta(minutes=60),
retry_policy=RetryPolicy(
maximum_attempts=1,
),
Expand Down
4 changes: 1 addition & 3 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,8 @@ def _post_run_operations(self, row_count: int):
self._logger.debug("No deltalake table, not continuing with post-run ops")
return

self._logger.debug("SKIPPING deltatable compact and vacuuming")

self._logger.debug("Triggering workflow to compact and vacuum")
compaction_job_id = trigger_compaction_job(self._job, self._schema)
compaction_job_id = trigger_compaction_job(self._job, self._schema, self._logger)
self._logger.debug(f"Compaction workflow id: {compaction_job_id}")

file_uris = delta_table.file_uris()
Expand Down

0 comments on commit cc26a43

Please sign in to comment.