Skip to content

Commit

Permalink
chore: better queue (#29353)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Feb 28, 2025
1 parent dc460f2 commit a074f96
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import json
from typing import Any
import posthoganalytics
Expand Down Expand Up @@ -39,6 +40,11 @@
"when a count task for a playlist is unknown",
)

REPLAY_TEAM_PLAYLIST_COUNT_SKIPPED = Counter(
"replay_playlist_count_skipped",
"when a count task for a playlist is skipped because the cooldown period has not passed",
)

REPLAY_PLAYLIST_COUNT_TIMER = Histogram(
"replay_playlist_with_filters_count_timer_seconds",
"Time spent loading session recordings that match filters in a playlist in seconds",
Expand Down Expand Up @@ -141,14 +147,34 @@ def count_recordings_that_match_playlist_filters(playlist_id: int) -> None:
try:
with REPLAY_PLAYLIST_COUNT_TIMER.time():
playlist = SessionRecordingPlaylist.objects.get(id=playlist_id)
redis_client = get_client()

existing_value = redis_client.get(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}")
if existing_value:
existing_value = json.loads(existing_value)
else:
existing_value = {}

if existing_value.get("refreshed_at"):
last_refreshed_at = datetime.fromisoformat(existing_value["refreshed_at"])
seconds_since_refresh = int((datetime.now() - last_refreshed_at).total_seconds())

if seconds_since_refresh <= settings.PLAYLIST_COUNTER_PROCESSING_COOLDOWN_SECONDS:
REPLAY_TEAM_PLAYLIST_COUNT_SKIPPED.inc()
return

query = convert_universal_filters_to_recordings_query(playlist.filters)
(recordings, more_recordings_available, _) = list_recordings_from_query(
query, user=None, team=playlist.team
)

redis_client = get_client()
value_to_set = json.dumps(
{"session_ids": [r.session_id for r in recordings], "has_more": more_recordings_available}
{
"session_ids": [r.session_id for r in recordings],
"has_more": more_recordings_available,
"previous_ids": existing_value.get("session_ids", None),
"refreshed_at": datetime.now().isoformat(),
}
)
redis_client.setex(
f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}", THIRTY_SIX_HOURS_IN_SECONDS, value_to_set
Expand All @@ -159,17 +185,27 @@ def count_recordings_that_match_playlist_filters(playlist_id: int) -> None:
logger.info("Playlist does not exist", playlist_id=playlist_id)
REPLAY_TEAM_PLAYLIST_COUNT_UNKNOWN.inc()
except Exception as e:
posthoganalytics.capture_exception(e)
posthoganalytics.capture_exception(
e, properties={"playlist_id": playlist_id, "posthog_feature": "session_replay_playlist_counters"}
)
logger.exception("Failed to count recordings that match playlist filters", playlist_id=playlist_id, error=e)
REPLAY_TEAM_PLAYLIST_COUNT_FAILED.inc()


def enqueue_recordings_that_match_playlist_filters() -> None:
teams_with_counter_processing = settings.PLAYLIST_COUNTER_PROCESSING_ALLOWED_TEAMS
if not settings.PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID or not isinstance(
settings.PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID, int
):
raise Exception("PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID is not set")

for team in teams_with_counter_processing:
all_playlists = SessionRecordingPlaylist.objects.filter(team_id=int(team), deleted=False, filters__isnull=False)
REPLAY_TEAM_PLAYLISTS_IN_TEAM_COUNT.inc(all_playlists.count())
if settings.PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID == 0:
# If we're not processing any teams, we don't need to enqueue anything
return

all_playlists = SessionRecordingPlaylist.objects.filter(
team_id__lte=int(settings.PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID), deleted=False, filters__isnull=False
)
REPLAY_TEAM_PLAYLISTS_IN_TEAM_COUNT.inc(all_playlists.count())

for playlist in all_playlists:
count_recordings_that_match_playlist_filters.delay(playlist.id)
for playlist in all_playlists:
count_recordings_that_match_playlist_filters.delay(playlist.id)
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import datetime, timedelta
import json
from unittest import mock
from unittest.mock import MagicMock, patch
from ee.session_recordings.playlist_counters.recordings_that_match_playlist_filters import (
count_recordings_that_match_playlist_filters,
Expand Down Expand Up @@ -38,7 +40,9 @@ def test_count_recordings_that_match_no_recordings(

assert json.loads(self.redis_client.get(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}")) == {
"session_ids": [],
"previous_ids": None,
"has_more": False,
"refreshed_at": mock.ANY,
}

@patch("posthoganalytics.capture_exception")
Expand Down Expand Up @@ -66,5 +70,64 @@ def test_count_recordings_that_match_recordings(

assert json.loads(self.redis_client.get(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}")) == {
"session_ids": ["123"],
"previous_ids": None,
"has_more": True,
"refreshed_at": mock.ANY,
}

@patch("posthoganalytics.capture_exception")
@patch("ee.session_recordings.playlist_counters.recordings_that_match_playlist_filters.list_recordings_from_query")
def test_count_recordings_that_match_recordings_records_previous_ids(
self, mock_list_recordings_from_query: MagicMock, mock_capture_exception: MagicMock
):
mock_list_recordings_from_query.return_value = (
[
SessionRecording.objects.create(
team=self.team,
session_id="123",
)
],
True,
None,
)
playlist = SessionRecordingPlaylist.objects.create(
team=self.team,
name="test",
filters={},
)
self.redis_client.set(
f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}", json.dumps({"session_ids": ["245"], "has_more": True})
)
count_recordings_that_match_playlist_filters(playlist.id)
mock_capture_exception.assert_not_called()

assert json.loads(self.redis_client.get(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}")) == {
"session_ids": ["123"],
"has_more": True,
"previous_ids": ["245"],
"refreshed_at": mock.ANY,
}

@patch("posthoganalytics.capture_exception")
@patch("ee.session_recordings.playlist_counters.recordings_that_match_playlist_filters.list_recordings_from_query")
def test_count_recordings_that_match_recordings_skips_cooldown(
self, mock_list_recordings_from_query: MagicMock, mock_capture_exception: MagicMock
):
mock_list_recordings_from_query.return_value = ([], False, None)

playlist = SessionRecordingPlaylist.objects.create(
team=self.team,
name="test",
filters={},
)
existing_value = {"refreshed_at": (datetime.now() - timedelta(seconds=3600)).isoformat()}
self.redis_client.set(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}", json.dumps(existing_value))

count_recordings_that_match_playlist_filters(playlist.id)

mock_list_recordings_from_query.assert_not_called()
mock_capture_exception.assert_not_called()

assert self.redis_client.get(f"{PLAYLIST_COUNT_REDIS_PREFIX}{playlist.short_id}").decode("utf-8") == json.dumps(
existing_value
)
12 changes: 8 additions & 4 deletions posthog/settings/session_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@
SESSION_REPLAY_AI_DEFAULT_MODEL = get_from_env("SESSION_REPLAY_AI_DEFAULT_MODEL", "gpt-4o")
SESSION_REPLAY_AI_REGEX_MODEL = get_from_env("SESSION_REPLAY_AI_REGEX_MODEL", "gpt-4o-mini")

PLAYLIST_COUNTER_PROCESSING_ALLOWED_TEAMS = get_list(
get_from_env("PLAYLIST_COUNTER_PROCESSING_ALLOWED_TEAMS", "1,2" if settings.DEBUG else "")
PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID = get_from_env(
"PLAYLIST_COUNTER_PROCESSING_MAX_ALLOWED_TEAM_ID", default=2 if settings.DEBUG else 0, type_cast=int
)
# TODO want this to be 24 hours in prod but initial testing is better with 1 hour

PLAYLIST_COUNTER_PROCESSING_SCHEDULE_SECONDS = get_from_env(
"PLAYLIST_COUNTER_PROCESSING_SCHEDULE_SECONDS", 60 if settings.DEBUG else 3600, type_cast=int
"PLAYLIST_COUNTER_PROCESSING_SCHEDULE_SECONDS", default=60 if settings.DEBUG else 3600, type_cast=int
)

PLAYLIST_COUNTER_PROCESSING_COOLDOWN_SECONDS = get_from_env(
"PLAYLIST_COUNTER_PROCESSING_COOLDOWN_SECONDS", 3600, type_cast=int
)
7 changes: 5 additions & 2 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,14 +907,17 @@ def ee_persist_finished_recordings() -> None:
persist_finished_recordings()


@shared_task(ignore_result=True)
@shared_task(
ignore_result=True,
queue=CeleryQueue.SESSION_REPLAY_GENERAL.value,
)
def ee_count_items_in_playlists() -> None:
try:
from ee.session_recordings.playlist_counters.recordings_that_match_playlist_filters import (
enqueue_recordings_that_match_playlist_filters,
)
except ImportError as ie:
posthoganalytics.capture_exception(ie)
posthoganalytics.capture_exception(ie, properties={"posthog_feature": "session_replay_playlist_counters"})
logger.exception("Failed to import task to count items in playlists", error=ie)
else:
enqueue_recordings_that_match_playlist_filters()
Expand Down

0 comments on commit a074f96

Please sign in to comment.