Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions performance_test/create_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ def add_timestamp(obj, create, extracted, **kwargs):
ObjectRecordFactory.create_batch(
5000,
object__object_type=object_type,
_object_type=object_type,
start_at="2020-01-01",
version=1,
data={"identifier": "63f473de-a7a6-4000-9421-829e146499e3", "foo": "bar"},
add_timestamp=True,
)
ObjectRecordFactory.create(
object__object_type=object_type,
_object_type=object_type,
start_at="2020-01-01",
version=1,
data={"identifier": "ec5cde18-40a0-4135-8d97-3500d1730e60", "foo": "bar"},
Expand Down
3 changes: 3 additions & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ celery>=5.5.0
kombu>=5.4.0

psycopg[pool]

# Progress indicator for scripts/migration
tqdm
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ structlog==25.4.0
# open-api-framework
tornado==6.5
# via flower
tqdm==4.67.1
# via -r requirements/base.in
typing-extensions==4.9.0
# via
# mozilla-django-oidc-db
Expand Down
4 changes: 4 additions & 0 deletions requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ tornado==6.5
# -c requirements/base.txt
# -r requirements/base.txt
# flower
tqdm==4.67.1
# via
# -c requirements/base.txt
# -r requirements/base.txt
typing-extensions==4.9.0
# via
# -c requirements/base.txt
Expand Down
4 changes: 4 additions & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,10 @@ tornado==6.5
# -c requirements/ci.txt
# -r requirements/ci.txt
# flower
tqdm==4.67.1
# via
# -c requirements/ci.txt
# -r requirements/ci.txt
typing-extensions==4.9.0
# via
# -c requirements/ci.txt
Expand Down
2 changes: 1 addition & 1 deletion src/objects/api/kanalen.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_kenmerken(
data = data or {}
return {
kenmerk: (
data.get("type") or obj.object.object_type.url
data.get("type") or obj._object_type.url
if kenmerk == "object_type"
else data.get(kenmerk, getattr(obj, kenmerk))
)
Expand Down
8 changes: 5 additions & 3 deletions src/objects/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ObjectSerializer(DynamicFieldsMixin, serializers.HyperlinkedModelSerialize
type = ObjectTypeField(
min_length=1,
max_length=1000,
source="object.object_type",
source="_object_type",
queryset=ObjectType.objects.all(),
help_text=_("Url reference to OBJECTTYPE in Objecttypes API"),
validators=[IsImmutableValidator()],
Expand All @@ -119,7 +119,9 @@ class Meta:

@transaction.atomic
def create(self, validated_data):
object_data = validated_data.pop("object")
object_data = validated_data.pop("object", {})
if object_type := validated_data.pop("_object_type"):
object_data["object_type"] = object_type
object = Object.objects.create(**object_data)

validated_data["object"] = object
Expand Down Expand Up @@ -156,7 +158,7 @@ def update(self, instance, validated_data):
logger.info(
"object_updated",
object_uuid=str(record.object.uuid),
objecttype_uuid=str(record.object.object_type.uuid),
objecttype_uuid=str(record._object_type.uuid),
objecttype_version=record.version,
token_identifier=token_auth.identifier,
token_application=token_auth.application,
Expand Down
2 changes: 1 addition & 1 deletion src/objects/api/v2/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def clean(self):

class ObjectRecordFilterSet(FilterSet):
type = ObjectTypeFilter(
field_name="object__object_type",
field_name="_object_type",
help_text=_("Url reference to OBJECTTYPE in Objecttypes API"),
queryset=ObjectType.objects.all(),
min_length=1,
Expand Down
19 changes: 11 additions & 8 deletions src/objects/api/v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@
class ObjectViewSet(
ObjectNotificationMixin, SearchMixin, GeoMixin, viewsets.ModelViewSet
):
queryset = ObjectRecord.objects.select_related(
"object",
"object__object_type",
"object__object_type__service",
"correct",
"corrected",
).order_by("-pk")
queryset = (
ObjectRecord.objects.select_related(
"_object_type",
"_object_type__service",
"correct",
"corrected",
)
.prefetch_related("object")
.order_by("-pk")
)
serializer_class = ObjectSerializer
filterset_class = ObjectRecordFilterSet
filter_backends = [FilterBackend, OrderingBackend]
Expand All @@ -105,7 +108,7 @@ def get_queryset(self):
# prefetch permissions for DB optimization. Used in DynamicFieldsMixin
base = base.prefetch_related(
models.Prefetch(
"object__object_type__permissions",
"_object_type__permissions",
queryset=Permission.objects.filter(token_auth=token_auth),
to_attr="token_permissions",
),
Expand Down
12 changes: 5 additions & 7 deletions src/objects/api/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ def __call__(self, attrs, serializer):

# create
if not instance:
object_type = attrs.get("object", {}).get("object_type")
object_type = attrs.get("_object_type")
version = attrs.get("version")
data = attrs.get("data", {})

# update
else:
object_type = (
attrs.get("object", {}).get("object_type")
if "object" in attrs
else instance.object.object_type
attrs.get("_object_type")
if "_object_type" in attrs
else instance._object_type
)
version = attrs.get("version") if "version" in attrs else instance.version
data = attrs.get("data", {}) if "data" in attrs else instance.data
Expand Down Expand Up @@ -124,9 +124,7 @@ class GeometryValidator:

def __call__(self, attrs, serializer):
instance = getattr(serializer, "instance", None)
object_type = (
attrs.get("object", {}).get("object_type") or instance.object.object_type
)
object_type = attrs.get("_object_type") or instance._object_type
geometry = attrs.get("geometry")

if not geometry:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# Generated by Django 5.2.3 on 2025-09-09 08:24

import django.contrib.postgres.indexes
from django.db import migrations
from django.contrib.postgres.operations import AddIndexConcurrently
from django.db import migrations


class Migration(migrations.Migration):
atomic = False

dependencies = [
('core', '0029_alter_object_object_type'),
("core", "0029_alter_object_object_type"),
]

operations = [
AddIndexConcurrently(
model_name='objectrecord',
index=django.contrib.postgres.indexes.GinIndex(fields=['data'], name='idx_objectrecord_data_gin'),
model_name="objectrecord",
index=django.contrib.postgres.indexes.GinIndex(
fields=["data"], name="idx_objectrecord_data_gin"
),
),
]
25 changes: 25 additions & 0 deletions src/objects/core/migrations/0032_objectrecord__object_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 5.2.3 on 2025-09-29 09:54

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0031_object_created_on_object_modified_on_and_more"),
]

operations = [
migrations.AddField(
model_name="objectrecord",
name="_object_type",
field=models.ForeignKey(
blank=True,
db_index=False,
help_text="OBJECTTYPE in Objecttypes API",
null=True,
on_delete=django.db.models.deletion.PROTECT,
to="core.objecttype",
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Generated by Django 5.2.3 on 2025-09-29 08:19

import os
import random
import threading
from concurrent.futures import ThreadPoolExecutor
from time import sleep

from django.conf import settings
from django.db import connection, migrations, transaction

from structlog import get_logger
from tqdm import tqdm

logger = get_logger(__name__)


BATCH_SIZE = int(os.getenv("OBJECTRECORD_MIGRATION_0033_BATCH_SIZE", 2_000))
NUM_WORKERS = int(os.getenv("OBJECTRECORD_MIGRATION_0033_NUM_WORKERS", 2))

# Pooling does not work with multiple workers
if "pool" in settings.DATABASES["default"].get("OPTIONS", {}):
NUM_WORKERS = 1


def backfill_object_type_batch_concurrent(cursor):
"""
Grab a batch of rows with _object_type_id IS NULL and update them.
SKIP LOCKED ensures multiple workers won't update the same row.
"""
cursor.execute(
"""
WITH cte AS (
SELECT r.id, o.object_type_id
FROM core_objectrecord r
JOIN core_object o ON r.object_id = o.id
WHERE r._object_type_id IS NULL
FOR UPDATE SKIP LOCKED
LIMIT %s
)
UPDATE core_objectrecord r
SET _object_type_id = cte.object_type_id
FROM cte
WHERE r.id = cte.id;
""",
[BATCH_SIZE],
)

return cursor.rowcount


def worker(apps, progress):
"""
Worker that keeps grabbing batches until none are left.
"""
# Stagger the workers to avoid synchronized bursts of commit I/O
delay = random.uniform(0.1, 0.3)
sleep(delay)

while True:
with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute("SET LOCAL synchronous_commit = OFF;")
num_updated = backfill_object_type_batch_concurrent(cursor)

if num_updated == 0:
sleep(0.5)
break

progress.update(num_updated)


def forward(apps, schema_editor):
"""
Spin up NUM_WORKERS parallel workers to process the table concurrently.
"""
ObjectRecord = apps.get_model("core", "ObjectRecord")
total_records = ObjectRecord.objects.count()

# Progress bar
lock = threading.Lock()
tqdm.set_lock(lock) # make tqdm thread-safe
progress = tqdm(
total=total_records, desc="Backfilling ObjectRecords", smoothing=0.1
)
if NUM_WORKERS == 1:
worker(apps, progress)
else:
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = [executor.submit(worker, apps, progress) for _ in range(NUM_WORKERS)]

for f in futures:
f.result()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("core", "0032_objectrecord__object_type"),
]

operations = [
migrations.RunPython(forward, migrations.RunPython.noop),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Generated by Django 5.2.6 on 2025-10-02 09:37

import django.db.models.deletion
from django.contrib.postgres.operations import AddIndexConcurrently
from django.db import migrations, models


class Migration(migrations.Migration):
atomic = False
dependencies = [
("core", "0033_objectrecord__backfill_denormalized_fields"),
]

operations = [
migrations.AlterField(
model_name="objectrecord",
name="_object_type",
field=models.ForeignKey(
db_index=True,
help_text="OBJECTTYPE in Objecttypes API",
on_delete=django.db.models.deletion.PROTECT,
to="core.objecttype",
),
),
AddIndexConcurrently(
model_name="objectrecord",
index=models.Index(
fields=["_object_type_id", "-index"], name="idx_objectrecord_type_index"
),
),
AddIndexConcurrently(
model_name="objectrecord",
index=models.Index(
fields=["_object_type_id", "id"], name="idx_objectrecord_type_id"
),
),
AddIndexConcurrently(
model_name="objectrecord",
index=models.Index(
fields=["_object_type_id", "start_at", "end_at", "object", "-index"],
name="idx_type_start_end_object_idx",
),
),
]
Loading
Loading