From 9012b2c6129475ab3174177d3fe398b153167e6a Mon Sep 17 00:00:00 2001 From: nutrina Date: Mon, 29 Jul 2024 19:51:41 +0300 Subject: [PATCH] feat: extended the export command to include the scores for all models (#645) * feat: extended the export command to include the scores for all models * feat: update test cases for scorer_dump_data_model_score * feat: update the cronjob exporting the model scroes, to include the scores for all existing models and use parquet file format --- .../commands/scorer_dump_data_model_score.py | 76 ++++-- ...st_cmd_scorer_dump_data_eth_model_score.py | 221 ++++++++++++++++-- infra/aws/index.ts | 6 +- 3 files changed, 264 insertions(+), 39 deletions(-) diff --git a/api/data_model/management/commands/scorer_dump_data_model_score.py b/api/data_model/management/commands/scorer_dump_data_model_score.py index 7984e0986..b0ebb7a37 100644 --- a/api/data_model/management/commands/scorer_dump_data_model_score.py +++ b/api/data_model/management/commands/scorer_dump_data_model_score.py @@ -1,22 +1,47 @@ import json import traceback +from contextlib import contextmanager +from logging import getLogger from urllib.parse import urlparse - +import pyarrow as pa +import pyarrow.parquet as pq from django.core.management.base import BaseCommand +from django.core.serializers.json import DjangoJSONEncoder + +from data_model.models import Cache from scorer.export_utils import ( export_data_for_model, + get_pa_schema, upload_to_s3, ) -from data_model.models import Cache -from contextlib import contextmanager -from django.core.serializers.json import DjangoJSONEncoder -from logging import getLogger log = getLogger(__name__) -def get_writer(output_file): +def get_parquet_writer(output_file): + @contextmanager + def writer_context_manager(model): + schema = get_pa_schema(model) + try: + with pq.ParquetWriter(output_file, schema) as writer: + + class WriterWrappe: + def __init__(self, writer): + self.writer = writer + + def write_batch(self, data): + batch = pa.RecordBatch.from_pylist(data, schema=schema) + self.writer.write_batch(batch) + + yield WriterWrappe(writer) + finally: + pass + + return writer_context_manager + + +def get_jsonl_writer(output_file): @contextmanager def eth_stamp_writer_context_manager(queryset): try: @@ -31,17 +56,10 @@ def write_batch(self, data): try: value = d["value"] address = d["key_1"].lower() + model = d["key_0"].lower() self.file.write( json.dumps( - { - "address": address, - "data": { - "score": str( - value["data"]["human_probability"] - ) - }, - "updated_at": d["updated_at"], - }, + d, cls=DjangoJSONEncoder, ) + "\n" @@ -88,19 +106,30 @@ def add_arguments(self, parser): ) parser.add_argument("--filename", type=str, help="The output filename") - parser.add_argument( "--s3-extra-args", type=str, help="""JSON object, that contains extra args for the files uploaded to S3. This will be passed in as the `ExtraArgs` parameter to boto3's upload_file method.""", ) + parser.add_argument( + "--format", + type=str, + choices=["jsonl", "parquet"], + help="The output format", + default="jsonl", + ) def handle(self, *args, **options): batch_size = options["batch_size"] s3_uri = options["s3_uri"] filename = options["filename"] - data_model_name = options["data_model"] + format = options["format"] + data_model_names = ( + [n.strip() for n in options["data_model"].split(",")] + if options["data_model"] + else None + ) extra_args = ( json.loads(options["s3_extra_args"]) if options["s3_extra_args"] else None @@ -113,16 +142,19 @@ def handle(self, *args, **options): parsed_uri = urlparse(s3_uri) s3_bucket_name = parsed_uri.netloc s3_folder = parsed_uri.path.strip("/") + query = Cache.objects.all() + if data_model_names: + query = query.filter(key_0__in=data_model_names) try: export_data_for_model( - Cache.objects.filter( - key_0=data_model_name - ), # This will only filter the scores for eth_stamp_model (v1) + query, "id", batch_size, - get_writer(filename), - jsonfields_as_str=False, + get_parquet_writer(filename) + if format == "parquet" + else get_jsonl_writer(filename), + jsonfields_as_str=(format == "parquet"), ) self.stdout.write( diff --git a/api/data_model/test/test_cmd_scorer_dump_data_eth_model_score.py b/api/data_model/test/test_cmd_scorer_dump_data_eth_model_score.py index 5a1832017..a8d1959dd 100644 --- a/api/data_model/test/test_cmd_scorer_dump_data_eth_model_score.py +++ b/api/data_model/test/test_cmd_scorer_dump_data_eth_model_score.py @@ -1,13 +1,14 @@ import json from datetime import datetime, timezone +import pandas as pd import pytest from django.core.management import call_command -from data_model.models import Cache from django.core.serializers.json import DjangoJSONEncoder - from django.db import connections +from data_model.models import Cache + @pytest.fixture def create_cache_table(): @@ -44,8 +45,8 @@ def create_cache_table(): class TestGetStamps: @pytest.mark.django_db(databases=["default", "data_model"]) - def test_dump_data_eth_model_score(self, mocker, create_cache_table): - """Test the 'scorer_dump_data_model_score' command""" + def test_dump_data_eth_model_score_jsonl(self, mocker, create_cache_table): + """Test the 'scorer_dump_data_model_score' command export jsonl format""" ############################################################### # Create data in the DB @@ -54,21 +55,24 @@ def test_dump_data_eth_model_score(self, mocker, create_cache_table): for i in range(10): address = f"0x{i}" timestamp = datetime(2024, 4, 9, i, 0, 0, tzinfo=timezone.utc) - Cache.objects.create( + value = { + "data": {"human_probability": i}, + "meta": {"version": "v1", "Training_date": "2023/12/27"}, + } + c = Cache.objects.create( key_0="predict", key_1=address, - value={ - "data": {"human_probability": i}, - "meta": {"version": "v1", "Training_date": "2023/12/27"}, - }, + value=value, updated_at=timestamp, ) expected_data.append( json.loads( json.dumps( { - "address": address, - "data": {"score": str(i)}, + "id": c.id, + "key_0": "predict", + "key_1": address, + "value": value, "updated_at": timestamp, }, cls=DjangoJSONEncoder, @@ -106,8 +110,8 @@ def test_dump_data_eth_model_score(self, mocker, create_cache_table): # The data file will always be the 1st file in the list data_file = s3_upload_mock.call_args[0][0] data = [] - expected_keys = {"address", "data", "updated_at"} - expected_data_keys = {"score"} + expected_keys = {"id", "key_0", "key_1", "value", "updated_at"} + expected_data_keys = {"data", "meta"} with open(data_file, "r") as f: for idx, expected_record in enumerate(expected_data): line = f.readline() @@ -115,8 +119,197 @@ def test_dump_data_eth_model_score(self, mocker, create_cache_table): data.append(line_data) assert expected_keys == set(line_data.keys()) - assert expected_data_keys == set(line_data["data"].keys()) + assert expected_data_keys == set(line_data["value"].keys()) assert expected_data[idx] == line_data # We only expect the number of records we generated for the community that we filtered by assert len(data) == 10 + + @pytest.mark.django_db(databases=["default", "data_model"]) + def test_dump_data_eth_model_score_parquet(self, mocker, create_cache_table): + """Test the 'scorer_dump_data_model_score' command export parquet format""" + + ############################################################### + # Create data in the DB + ############################################################### + expected_data = [] + for i in range(10): + address = f"0x{i}" + timestamp = datetime(2024, 4, 9, i, 0, 0, tzinfo=timezone.utc) + value = { + "data": {"human_probability": i}, + "meta": {"version": "v1", "Training_date": "2023/12/27"}, + } + c = Cache.objects.create( + key_0="predict", + key_1=address, + value=value, + updated_at=timestamp, + ) + expected_data.append( + json.loads( + json.dumps( + { + "id": c.id, + "key_0": "predict", + "key_1": address, + "value": value, + "updated_at": timestamp, + }, + cls=DjangoJSONEncoder, + ) + ) + ) + # Also add data to cache for other models (differet value in first element in key) + # The export command should filter correctly + Cache.objects.create( + key_0="predict_nft", + key_1=address, + value={ + "data": {"human_probability": i}, + "meta": {"version": "v1", "Training_date": "2023/12/27"}, + }, + updated_at=timestamp, + ) + + s3_upload_mock = mocker.patch( + "data_model.management.commands.scorer_dump_data_model_score.upload_to_s3" + ) + call_command( + "scorer_dump_data_model_score", + *[], + **{ + "batch_size": 2, # set a small batch size, we want make sure it can handle pagination + "s3_uri": "s3://public.scorer.gitcoin.co/passport_scores/", + "filename": "eth_model_score.jsonl", + "data_model": "predict", + "format": "parquet", + }, + ) + + s3_upload_mock.assert_called_once() + + # The data file will always be the 1st file in the list + data_file = s3_upload_mock.call_args[0][0] + data = [] + expected_keys = {"id", "key_0", "key_1", "value", "updated_at"} + expected_data_keys = {"data", "meta"} + + # Load the Parquet file into a DataFrame + df = pd.read_parquet(data_file) + + for idx, row in df.iterrows(): + data.append(row) + row_dict = row.to_dict() + row_dict["updated_at"] = row_dict["updated_at"].isoformat() + "Z" + row_dict["value"] = json.loads(row_dict["value"]) + + row_value = json.loads(row.value) + assert expected_keys == set(df.columns) + assert expected_data_keys == set(row_value.keys()) + assert expected_data[idx] == row_dict + + # We only expect the number of records we generated for the community that we filtered by + assert len(data) == 10 + + @pytest.mark.django_db(databases=["default", "data_model"]) + def test_dump_data_eth_model_score_parquet_multiple_models( + self, mocker, create_cache_table + ): + """Test the 'scorer_dump_data_model_score' command export multiple model to parquet format""" + + ############################################################### + # Create data in the DB + ############################################################### + expected_data = [] + for i in range(10): + address = f"0x{i}" + timestamp = datetime(2024, 4, 9, i, 0, 0, tzinfo=timezone.utc) + value = { + "data": {"human_probability": i}, + "meta": {"version": "v1", "Training_date": "2023/12/27"}, + } + c = Cache.objects.create( + key_0="predict", + key_1=address, + value=value, + updated_at=timestamp, + ) + expected_data.append( + json.loads( + json.dumps( + { + "id": c.id, + "key_0": c.key_0, + "key_1": c.key_1, + "value": c.value, + "updated_at": timestamp, + }, + cls=DjangoJSONEncoder, + ) + ) + ) + # Also add data to cache for other models (differet value in first element in key) + # The export command should filter correctly + c = Cache.objects.create( + key_0="predict_nft", + key_1=address, + value={ + "data": {"human_probability": i}, + "meta": {"version": "v1", "Training_date": "2023/12/27"}, + }, + updated_at=timestamp, + ) + expected_data.append( + json.loads( + json.dumps( + { + "id": c.id, + "key_0": c.key_0, + "key_1": c.key_1, + "value": c.value, + "updated_at": timestamp, + }, + cls=DjangoJSONEncoder, + ) + ) + ) + s3_upload_mock = mocker.patch( + "data_model.management.commands.scorer_dump_data_model_score.upload_to_s3" + ) + call_command( + "scorer_dump_data_model_score", + *[], + **{ + "batch_size": 2, # set a small batch size, we want make sure it can handle pagination + "s3_uri": "s3://public.scorer.gitcoin.co/passport_scores/", + "filename": "eth_model_score.jsonl", + "data_model": "predict,predict_nft", + "format": "parquet", + }, + ) + + s3_upload_mock.assert_called_once() + + # The data file will always be the 1st file in the list + data_file = s3_upload_mock.call_args[0][0] + data = [] + expected_keys = {"id", "key_0", "key_1", "value", "updated_at"} + expected_data_keys = {"data", "meta"} + + # Load the Parquet file into a DataFrame + df = pd.read_parquet(data_file) + + for idx, row in df.iterrows(): + data.append(row) + row_dict = row.to_dict() + row_dict["updated_at"] = row_dict["updated_at"].isoformat() + "Z" + row_dict["value"] = json.loads(row_dict["value"]) + + row_value = json.loads(row.value) + assert expected_keys == set(df.columns) + assert expected_data_keys == set(row_value.keys()) + assert expected_data[idx] == row_dict + + # We only expect the number of records we generated for the community that we filtered by + assert len(data) == 20 diff --git a/infra/aws/index.ts b/infra/aws/index.ts index 6f19e95a9..a943e064c 100644 --- a/infra/aws/index.ts +++ b/infra/aws/index.ts @@ -1416,9 +1416,9 @@ export const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = "python", "manage.py", "scorer_dump_data_model_score", - `--s3-uri=s3://${publicDataDomain}/eth_model_scores_v2/`, - "--filename=eth_model_scores.jsonl", - "--data-model=predict_eth_v2", + `--s3-uri=s3://${publicDataDomain}/model_scores/`, + "--filename=model_scores.parquet", + "--format=parquet", ].join(" "), scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min alertTopic: pagerdutyTopic,