From 11b645f1905715123338654a0d3fcbea8e5f9e7f Mon Sep 17 00:00:00 2001 From: Alexander Kozlov Date: Fri, 14 Feb 2025 13:48:19 +0000 Subject: [PATCH 1/3] add test_applying_prediction_on_best_model_only --- tests/test_complex_pipeline.py | 126 ++++++++++++++++++++++++++++++++- 1 file changed, 124 insertions(+), 2 deletions(-) diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index 8368a45a..772834f5 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -6,7 +6,7 @@ from sqlalchemy.sql.sqltypes import Integer, String from datapipe.compute import Catalog, Pipeline, Table, build_compute, run_steps -from datapipe.datatable import DataStore +from datapipe.datatable import DataStore, DataTable from datapipe.step.batch_generate import BatchGenerate from datapipe.step.batch_transform import BatchTransform from datapipe.store.database import TableStoreDB @@ -325,7 +325,6 @@ def complex_transform_with_many_recordings(dbconn, N: int): [ Column("image_id", Integer, primary_key=True), Column("model_id", Integer, primary_key=True), - Column("prediction__attribite", Integer), ], True, ) @@ -446,3 +445,126 @@ def test_complex_transform_with_many_recordings_N1000(dbconn): @pytest.mark.skip(reason="fails on sqlite") def test_complex_transform_with_many_recordings_N10000(dbconn): complex_transform_with_many_recordings(dbconn, N=10000) + + +def test_applying_prediction_on_best_model_only(dbconn): + N = 100 + ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( + { + "tbl_image": Table( + store=TableStoreDB( + dbconn, + "tbl_image", + [ + Column("image_id", Integer, primary_key=True), + ], + True, + ) + ), + "tbl_model": Table( + store=TableStoreDB( + dbconn, + "tbl_model", + [ + Column("model_id", Integer, primary_key=True), + ], + True, + ) + ), + "tbl_best_model": Table( + store=TableStoreDB( + dbconn, + "tbl_best_model", + [ + Column("model_id", Integer, primary_key=True), + ], + True, + ) + ), + "tbl_prediction": Table( + store=TableStoreDB( + dbconn, + "tbl_prediction", + [ + Column("image_id", Integer, primary_key=True), + Column("model_id", Integer, primary_key=True), + ], + True, + ) + ), + } + ) + + def gen_tbls(df1, df2, df3): + yield df1, df2, df3 + + test_df__image = pd.DataFrame({"image_id": range(N)}) + test_df__model = pd.DataFrame( + { + "model_id": [0, 1, 2, 3, 4] + } + ) + test_df__best_model = pd.DataFrame({"model_id": [4]}) + + def inference_only_on_best_model( + df__image: pd.DataFrame, + df__model: pd.DataFrame, + df__best_model: pd.DataFrame, + idx: IndexDF, + ): + assert all([model_id == 4 for model_id in idx["model_id"]]) + df__prediction = pd.merge(df__image, df__model, how="cross") + return df__prediction[["image_id", "model_id"]] + + pipeline = Pipeline( + [ + BatchGenerate( + func=gen_tbls, + outputs=[ + "tbl_image", + "tbl_model", + "tbl_best_model", + ], + kwargs=dict( + df1=test_df__image, + df2=test_df__model, + df3=test_df__best_model, + ), + ), + BatchTransform( + func=inference_only_on_best_model, + inputs=[ + "tbl_image", # image_id + "tbl_model", # model_id + Required("tbl_best_model"), # model_id + ], + outputs=["tbl_prediction"], + transform_keys=["image_id", "model_id"], + ), + ] + ) + steps = build_compute(ds, catalog, pipeline) + run_steps(ds, steps) + test__df_prediction = pd.DataFrame( + {"image_id": range(N), "model_id": [4] * N} + ) + assert_df_equal( + ds.get_table("tbl_prediction").get_data(), + test__df_prediction, + index_cols=["image_id", "model_id"], + ) + + test_df__new_best_model = pd.DataFrame({"model_id": [3]}) + dt__tbl_best_model: DataTable = ds.get_table("tbl_best_model") + dt__tbl_best_model.delete_by_idx(dt__tbl_best_model.get_data()) + dt__tbl_best_model.store_chunk(test_df__new_best_model) + run_steps(ds, steps) + test__new_df_prediction = pd.DataFrame( + {"image_id": range(N), "model_id": [3] * N} + ) + assert_df_equal( + ds.get_table("tbl_prediction").get_data(), + test__new_df_prediction, + index_cols=["image_id", "model_id"], + ) From d3c80dfc5ebc9f4bc90e47ea41ed54180162afde Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 26 Jul 2025 21:05:52 +0400 Subject: [PATCH 2/3] reformat --- tests/test_complex_pipeline.py | 49 +++++++++------------------------- 1 file changed, 13 insertions(+), 36 deletions(-) diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index a185b1e5..bcd19779 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -52,9 +52,7 @@ ) TEST__PREDICTION = pd.merge(TEST__PREDICTION_LEFT, TEST__PREDICTION_CENTER, how="cross") TEST__PREDICTION = pd.merge(TEST__PREDICTION, TEST__PREDICTION_RIGHT, how="cross") -TEST__PREDICTION = TEST__PREDICTION[ - ["item_id", "pipeline_id", "keypoint_name", "prediction__attribute"] -] +TEST__PREDICTION = TEST__PREDICTION[["item_id", "pipeline_id", "keypoint_name", "prediction__attribute"]] def test_complex_pipeline(dbconn): @@ -122,9 +120,7 @@ def test_complex_pipeline(dbconn): } ) - def complex_function( - df__item, df__pipeline, df__prediction, df__keypoint, idx: IndexDF - ): + def complex_function(df__item, df__pipeline, df__prediction, df__keypoint, idx: IndexDF): assert idx[idx[["item_id", "pipeline_id"]].duplicated()].empty assert len(df__keypoint) == len(TEST__KEYPOINT) df__output = pd.merge(df__item, df__prediction, on=["item_id"]) @@ -252,9 +248,7 @@ def train( } ] ) - df__pipeline__total = pd.concat( - [df__pipeline__total, df__pipeline], ignore_index=True - ) + df__pipeline__total = pd.concat([df__pipeline__total, df__pipeline], ignore_index=True) df__pipeline__is_trained_on__frozen_dataset__total = pd.concat( [ df__pipeline__is_trained_on__frozen_dataset__total, @@ -284,12 +278,10 @@ def train( ds.get_table("frozen_dataset").store_chunk(TEST__FROZEN_DATASET) ds.get_table("train_config").store_chunk(TEST__TRAIN_CONFIG) run_steps(ds, steps) - assert len(ds.get_table("pipeline").get_data()) == len(TEST__FROZEN_DATASET) * len( + assert len(ds.get_table("pipeline").get_data()) == len(TEST__FROZEN_DATASET) * len(TEST__TRAIN_CONFIG) + assert len(ds.get_table("pipeline__is_trained_on__frozen_dataset").get_data()) == len(TEST__FROZEN_DATASET) * len( TEST__TRAIN_CONFIG ) - assert len( - ds.get_table("pipeline__is_trained_on__frozen_dataset").get_data() - ) == len(TEST__FROZEN_DATASET) * len(TEST__TRAIN_CONFIG) def complex_transform_with_many_recordings(dbconn, N: int): @@ -324,6 +316,7 @@ def complex_transform_with_many_recordings(dbconn, N: int): [ Column("image_id", Integer, primary_key=True), Column("model_id", Integer, primary_key=True), + Column("prediction__attribite", Integer), ], True, ) @@ -357,9 +350,7 @@ def gen_tbls(df1, df2, df3, df4): yield df1, df2, df3, df4 test_df__image = pd.DataFrame({"image_id": range(N)}) - test_df__image__attribute = pd.DataFrame( - {"image_id": range(N), "attribute": [5 * x for x in range(N)]} - ) + test_df__image__attribute = pd.DataFrame({"image_id": range(N), "attribute": [5 * x for x in range(N)]}) test_df__prediction = pd.DataFrame( { "image_id": list(range(N)) * 5, @@ -384,9 +375,7 @@ def get_some_prediction_only_on_best_model( df__prediction = pd.merge(df__prediction, df__best_model, on=["model_id"]) df__image = pd.merge(df__image, df__image__attribute, on=["image_id"]) df__result = pd.merge(df__image, df__prediction, on=["image_id"]) - df__result["result"] = ( - df__result["attribute"] - df__result["prediction__attribite"] - ) + df__result["result"] = df__result["attribute"] - df__result["prediction__attribite"] return df__result[["image_id", "model_id", "result"]] pipeline = Pipeline( @@ -411,9 +400,7 @@ def get_some_prediction_only_on_best_model( inputs=[ Required("tbl_image"), # image_id "tbl_image__attribute", # image_id, attribute - Required( - "tbl_prediction" - ), # image_id, model_id, prediction__attribite + Required("tbl_prediction"), # image_id, model_id, prediction__attribite Required("tbl_best_model"), # model_id ], outputs=["tbl_output"], @@ -423,9 +410,7 @@ def get_some_prediction_only_on_best_model( ) steps = build_compute(ds, catalog, pipeline) run_steps(ds, steps) - test__df_output = pd.DataFrame( - {"image_id": range(N), "model_id": [4] * N, "result": [0] * N} - ) + test__df_output = pd.DataFrame({"image_id": range(N), "model_id": [4] * N, "result": [0] * N}) assert_df_equal( ds.get_table("tbl_output").get_data(), test__df_output, @@ -499,11 +484,7 @@ def gen_tbls(df1, df2, df3): yield df1, df2, df3 test_df__image = pd.DataFrame({"image_id": range(N)}) - test_df__model = pd.DataFrame( - { - "model_id": [0, 1, 2, 3, 4] - } - ) + test_df__model = pd.DataFrame({"model_id": [0, 1, 2, 3, 4]}) test_df__best_model = pd.DataFrame({"model_id": [4]}) def inference_only_on_best_model( @@ -545,9 +526,7 @@ def inference_only_on_best_model( ) steps = build_compute(ds, catalog, pipeline) run_steps(ds, steps) - test__df_prediction = pd.DataFrame( - {"image_id": range(N), "model_id": [4] * N} - ) + test__df_prediction = pd.DataFrame({"image_id": range(N), "model_id": [4] * N}) assert_df_equal( ds.get_table("tbl_prediction").get_data(), test__df_prediction, @@ -559,9 +538,7 @@ def inference_only_on_best_model( dt__tbl_best_model.delete_by_idx(dt__tbl_best_model.get_data()) dt__tbl_best_model.store_chunk(test_df__new_best_model) run_steps(ds, steps) - test__new_df_prediction = pd.DataFrame( - {"image_id": range(N), "model_id": [3] * N} - ) + test__new_df_prediction = pd.DataFrame({"image_id": range(N), "model_id": [3] * N}) assert_df_equal( ds.get_table("tbl_prediction").get_data(), test__new_df_prediction, From e161017306213df73679c5b3295a376780129f70 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sat, 26 Jul 2025 22:14:04 +0400 Subject: [PATCH 3/3] trying to grok the problem --- tests/test_complex_pipeline.py | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/tests/test_complex_pipeline.py b/tests/test_complex_pipeline.py index bcd19779..60685c42 100644 --- a/tests/test_complex_pipeline.py +++ b/tests/test_complex_pipeline.py @@ -431,9 +431,11 @@ def test_complex_transform_with_many_recordings_N10000(dbconn): complex_transform_with_many_recordings(dbconn, N=10000) -def test_applying_prediction_on_best_model_only(dbconn): - N = 100 +def test_applying_prediction_on_best_model_only(dbconn) -> None: + # N = 100 + N = 5 ds = DataStore(dbconn, create_meta_table=True) + catalog = Catalog( { "tbl_image": Table( @@ -480,9 +482,6 @@ def test_applying_prediction_on_best_model_only(dbconn): } ) - def gen_tbls(df1, df2, df3): - yield df1, df2, df3 - test_df__image = pd.DataFrame({"image_id": range(N)}) test_df__model = pd.DataFrame({"model_id": [0, 1, 2, 3, 4]}) test_df__best_model = pd.DataFrame({"model_id": [4]}) @@ -493,25 +492,11 @@ def inference_only_on_best_model( df__best_model: pd.DataFrame, idx: IndexDF, ): - assert all([model_id == 4 for model_id in idx["model_id"]]) df__prediction = pd.merge(df__image, df__model, how="cross") return df__prediction[["image_id", "model_id"]] pipeline = Pipeline( [ - BatchGenerate( - func=gen_tbls, - outputs=[ - "tbl_image", - "tbl_model", - "tbl_best_model", - ], - kwargs=dict( - df1=test_df__image, - df2=test_df__model, - df3=test_df__best_model, - ), - ), BatchTransform( func=inference_only_on_best_model, inputs=[ @@ -524,8 +509,15 @@ def inference_only_on_best_model( ), ] ) + steps = build_compute(ds, catalog, pipeline) + + ds.get_table("tbl_image").store_chunk(test_df__image) + ds.get_table("tbl_model").store_chunk(test_df__model) + ds.get_table("tbl_best_model").store_chunk(test_df__best_model) + run_steps(ds, steps) + test__df_prediction = pd.DataFrame({"image_id": range(N), "model_id": [4] * N}) assert_df_equal( ds.get_table("tbl_prediction").get_data(), @@ -535,9 +527,11 @@ def inference_only_on_best_model( test_df__new_best_model = pd.DataFrame({"model_id": [3]}) dt__tbl_best_model: DataTable = ds.get_table("tbl_best_model") - dt__tbl_best_model.delete_by_idx(dt__tbl_best_model.get_data()) + dt__tbl_best_model.delete_by_idx(cast(IndexDF, dt__tbl_best_model.get_data())) dt__tbl_best_model.store_chunk(test_df__new_best_model) + run_steps(ds, steps) + test__new_df_prediction = pd.DataFrame({"image_id": range(N), "model_id": [3] * N}) assert_df_equal( ds.get_table("tbl_prediction").get_data(),