Skip to content

Commit

Permalink
[FIX] Remove requirement for equal {participants x sessions} across p…
Browse files Browse the repository at this point in the history
…ipelines/assessments (#144)

* remove constraint on equal {participants x sessions}

* add check for duplicate entries for an event in input

* test get_duplicate_entries()

* update error message for duplicate entries

* switch to returning str for single event id column name

* add smoke test for long-to-wide input data transform

* remove leftover func for checking equal participants x sessions

* remove CSV with unequal participants x sessions as invalid input test case

* update comment

* clearer docstrings and return types

* add TODO regarding NaN sessions

* test NaN handling in get_pipelines_overview()
  • Loading branch information
alyssadai authored Mar 18, 2024
1 parent a1536da commit 08381a8
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 25 deletions.
6 changes: 6 additions & 0 deletions digest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ def process_bagel(upload_contents, available_digest_nclicks, filenames):
)
is None
):
# Convert session column to string so numeric labels are not treated as numbers
#
# TODO: Any existing NaNs will currently be turned into "nan". (See open issue https://github.com/pandas-dev/pandas/issues/25353)
# Another side effect of allowing NaN sessions is that if this column has integer values, they will be read in as floats
# (before being converted to str) if there are NaNs in the column.
# This should not be a problem after we disallow NaNs value in "participant_id" and "session" columns, https://github.com/neurobagel/digest/issues/20
bagel["session"] = bagel["session"].astype(str)
session_list = bagel["session"].unique().tolist()

Expand Down
64 changes: 40 additions & 24 deletions digest/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,23 @@ def get_missing_required_columns(bagel: pd.DataFrame, schema_file: str) -> set:
)


def get_event_id_columns(bagel: pd.DataFrame, schema: str) -> Union[list, str]:
"""Returns names of columns which identify a unique assessment or processing pipeline."""
def get_event_id_columns(
bagel: pd.DataFrame, schema: str
) -> Union[str, list, None]:
"""
Returns name(s) of columns which identify a unique assessment or processing pipeline.
When there is only one relevant column, we return a string instead of a list to avoid grouper problems when the column name is used in pandas groupby.
"""
if schema == "imaging":
return ["pipeline_name", "pipeline_version"]
elif schema == "phenotypic":
if schema == "phenotypic":
return (
["assessment_name", "assessment_version"]
if "assessment_version" in bagel.columns
else "assessment_name"
)
return None


def extract_pipelines(bagel: pd.DataFrame, schema: str) -> dict:
Expand Down Expand Up @@ -171,23 +178,9 @@ def get_id_columns(data: pd.DataFrame) -> list:
)


def are_subjects_same_across_pipelines(
bagel: pd.DataFrame, schema: str
) -> bool:
"""Checks if subjects and sessions are the same across pipelines in the input."""
pipelines_dict = extract_pipelines(bagel, schema)

pipeline_subject_sessions = []
for df in pipelines_dict.values():
# per pipeline, rows are sorted first in case participants/sessions are out of order
pipeline_subject_sessions.append(
df.sort_values(get_id_columns(bagel)).loc[:, get_id_columns(bagel)]
)

return all(
pipeline.equals(pipeline_subject_sessions[0])
for pipeline in pipeline_subject_sessions
)
def get_duplicate_entries(data: pd.DataFrame, subset: list) -> pd.DataFrame:
"""Returns a dataframe containing all duplicate entries in the input data."""
return data[data.duplicated(subset=subset, keep=False)]


def count_unique_subjects(data: pd.DataFrame) -> int:
Expand All @@ -207,9 +200,16 @@ def count_unique_records(data: pd.DataFrame) -> int:

def get_pipelines_overview(bagel: pd.DataFrame, schema: str) -> pd.DataFrame:
"""
Constructs a dataframe containing global statuses of pipelines in bagel.csv
(based on "pipeline_complete" column) for each participant and session.
Constructs a wide format dataframe from the long format input file,
with one row per participant-session pair and one column per event (e.g., pipeline, assessment)
"""
# NOTE: pd.pivot_table has more flexibility in terms of replacing all NaN values in the pivotted table and handling duplicate entries (not really needed in our case),
# but has known issues where it silently drops NaNs, regardless of the dropna parameter value.
# For now we don't need the extra flexibility, so we use the simpler pd.pivot instead.
#
# Related issues:
# https://github.com/pandas-dev/pandas/issues/21969
# https://github.com/pandas-dev/pandas/issues/17595
pipeline_complete_df = bagel.pivot(
index=get_id_columns(bagel),
columns=get_event_id_columns(bagel, schema),
Expand All @@ -228,6 +228,8 @@ def get_pipelines_overview(bagel: pd.DataFrame, schema: str) -> pd.DataFrame:

pipeline_complete_df = (
# Enforce original order of sessions as they appear in input (pivot automatically sorts them)
# NOTE: .reindex only works correctly when there are no NaN values in the index level
# (Here, the entire "session" column should have already been cast to a string)
pipeline_complete_df.reindex(
index=bagel["session"].unique(), level="session"
)
Expand Down Expand Up @@ -269,6 +271,14 @@ def get_schema_validation_errors(
"""Checks that the input CSV adheres to the schema for the selected bagel type. If not, returns an informative error message as a string."""
error_msg = None

# Get the columns that uniquely identify a participant-session's value for an event,
# to be able to check for duplicate entries before transforming the data to wide format later on
unique_value_id_columns = get_id_columns(bagel) + (
get_event_id_columns(bagel, schema)
if isinstance(get_event_id_columns(bagel, schema), list)
else [get_event_id_columns(bagel, schema)]
)

if (
len(
missing_req_cols := get_missing_required_columns(
Expand All @@ -278,8 +288,14 @@ def get_schema_validation_errors(
> 0
):
error_msg = f"The selected CSV is missing the following required {schema} metadata columns: {missing_req_cols}. Please try again."
elif not are_subjects_same_across_pipelines(bagel, schema):
error_msg = "The pipelines in the selected CSV do not have the same number of subjects and sessions. Please try again."
elif (
get_duplicate_entries(
data=bagel, subset=unique_value_id_columns
).shape[0]
> 0
):
# TODO: Switch to warning once alerts are implemented for errors?
error_msg = f"The selected CSV contains duplicate entries in the combination of: {unique_value_id_columns}. Please double check your input."

return error_msg

Expand Down
1 change: 0 additions & 1 deletion tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def test_002_upload_invalid_imaging_bagel(test_server, bagels_path):
"""
invalid_input_output = {
"example_missing-col_bagel.csv": "missing the following required imaging metadata columns: {'pipeline_starttime'}",
"example_mismatch-subs_bagel.csv": "do not have the same number of subjects and sessions",
"example_pheno_bagel.csv": "missing the following required imaging metadata columns",
}

Expand Down
216 changes: 216 additions & 0 deletions tests/test_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,222 @@ def test_invalid_filetype_returns_informative_error(filename):
assert "Invalid file type" in upload_error


@pytest.mark.parametrize(
"original_df,duplicates_df",
[
(
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": [1, 2, 1, 2],
"assessment_name": ["moca", "moca", "moca", "moca"],
"assessment_score": [21.0, 24.0, np.nan, 24.0],
}
),
# Have to specify column dtypes as well because the df will otherwise not evaluate as equal to an empty subset of above df
pd.DataFrame(
{
"participant_id": pd.Series([], dtype="object"),
"session": pd.Series([], dtype="int64"),
"assessment_name": pd.Series([], dtype="object"),
"assessment_score": pd.Series([], dtype="float64"),
}
),
),
(
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": [1, 1, 1, 2],
"assessment_name": ["moca", "moca", "moca", "moca"],
"assessment_score": [21.0, 24.0, np.nan, 24.0],
}
),
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1"],
"session": [1, 1],
"assessment_name": ["moca", "moca"],
"assessment_score": [21.0, 24.0],
}
),
),
(
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": [np.nan, np.nan, 1, 2],
"assessment_name": ["moca", "moca", "moca", "moca"],
"assessment_score": [21.0, 24.0, np.nan, 24.0],
}
),
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1"],
"session": [np.nan, np.nan],
"assessment_name": ["moca", "moca"],
"assessment_score": [21.0, 24.0],
}
),
),
# TODO: Revisit this example when we want to handle NaN values in identifier columns differently (e.g., see https://github.com/neurobagel/digest/issues/20)
(
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": [1, np.nan, 1, 2],
"assessment_name": ["moca", "moca", "moca", "moca"],
"assessment_score": [21.0, 24.0, np.nan, 24.0],
}
),
pd.DataFrame(
{
"participant_id": pd.Series([], dtype="object"),
"session": pd.Series([], dtype="float64"),
"assessment_name": pd.Series([], dtype="object"),
"assessment_score": pd.Series([], dtype="float64"),
}
),
),
],
)
def test_get_duplicate_entries(original_df, duplicates_df):
"""Test that get_duplicate_entries() returns a dataframe containing the duplicate entries in a given dataframe."""

unique_value_id_columns = ["participant_id", "session", "assessment_name"]
assert util.get_duplicate_entries(
data=original_df, subset=unique_value_id_columns
).equals(duplicates_df)


@pytest.mark.parametrize(
"bagel_path,schema,expected_columns,expected_n_records",
[
(
"example_mismatch-subs_bagel.csv",
"imaging",
[
"participant_id",
"session",
"fmriprep-20.2.7",
"freesurfer-6.0.1",
"freesurfer-7.3.2",
],
6,
),
(
"example_imaging_bagel.csv",
"imaging",
[
"participant_id",
"bids_id",
"session",
"fmriprep-20.2.7",
"freesurfer-6.0.1",
"freesurfer-7.3.2",
],
7,
),
(
"example_pheno_bagel.csv",
"phenotypic",
[
"participant_id",
"bids_id",
"session",
"group",
"moca_total",
"updrs_3_total",
],
7,
),
],
)
def test_get_pipelines_overview(
bagels_path, bagel_path, schema, expected_columns, expected_n_records
):
"""
Smoke test that get_pipelines_overview() returns a dataframe with the expected columns and number of participant-session rows
after reshaping data into a wide format.
"""
bagel = pd.read_csv(bagels_path / bagel_path)
bagel["session"] = bagel["session"].astype(str)
overview_df = util.get_pipelines_overview(bagel=bagel, schema=schema)

assert overview_df.columns.tolist() == expected_columns
assert len(overview_df) == expected_n_records


@pytest.mark.parametrize(
"bagel,expected_overview_df",
[
(
# TODO: Update once https://github.com/neurobagel/digest/issues/20 is addressed
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": [1, np.nan, 1, 2],
"assessment_name": ["moca", "moca", "moca", "moca"],
"assessment_score": [21.0, 24.0, np.nan, 24.0],
}
),
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-2", "sub-2"],
"session": ["1.0", "nan", "1.0", "2.0"],
"moca": [21.0, 24.0, np.nan, 24.0],
}
),
),
(
# This example also provides a test that original session order is preserved
pd.DataFrame(
{
"participant_id": [
"sub-1",
"sub-1",
"sub-1",
"sub-1",
"sub-1",
],
"session": [
"intake",
"baseline",
"follow-up",
"intake",
"baseline",
],
"assessment_name": [
"moca",
"moca",
"moca",
"updrs",
"updrs",
],
"assessment_score": [np.nan, 24.0, np.nan, 12, 12],
}
),
pd.DataFrame(
{
"participant_id": ["sub-1", "sub-1", "sub-1"],
"session": ["intake", "baseline", "follow-up"],
"moca": [np.nan, 24.0, np.nan],
"updrs": [12.0, 12.0, np.nan],
}
),
),
],
)
def test_get_pipelines_overview_handles_nan_correctly(
bagel, expected_overview_df
):
"""Test that get_pipelines_overview() handles NaN values in the original long-format data as expected."""
bagel["session"] = bagel["session"].astype(str)
overview_df = util.get_pipelines_overview(bagel=bagel, schema="phenotypic")

assert overview_df.equals(expected_overview_df), overview_df


def test_reset_column_dtypes():
"""
Test that reset_column_dtypes() infers more appropriate dtypes for columns whose values were erroneously stored as strings,
Expand Down

0 comments on commit 08381a8

Please sign in to comment.