Skip to content

Conversation

@NJManganelli
Copy link
Collaborator

This PR is a precursor to both parquet reading being re-enabled (pre-processing parquet data, most importantly) and column-joining (where dataset specifications become very complicated and hard-to-construct/validate by hand). This exposes and expands the explicit Dataclasses using Pydantic models. preprocess is updated to handle these dataclasses explicitly. Column-joining imposes some requirements on the status of a dataset for automatic column-determination, so a joinable() method is introduced to check that status after preprocessing, though some followup work may be generated by propagating the pydantic models through all the followup PRs.

Sidenote: These dataclasses are especially crucial for column-joining, as creating a joinable dataset spec becomes complicated when one must combine two disparate datasets (with different forms, if one wishes to understand their structure and determine necessary columns automatically) AND a non-trivial join specification (which tells column-joining how to handle the inputs to deliver a joined dataset to the user), and that spec needs to be preprocessed (separately for root and parquet sub-datasets to be joined) then recomposed into a joinable (i.e. non-Optional FileSpec) version of the joinable spec

For now, most of the simpler functions interacting with datasets have been updated with dual-path code to simultaneously support legacy pure-dictionary inputs and the pydantic models. This includes e.g. apply_to_(file|data)set, slice_(chunks|files), preprocess, etc. An example notebook including usage is introduced. In the future, once these are thoroughly tested in the wild, the old legacy dictionaries can be removed from code paths, since the Pydantic models already handle conversions trivially)

A followup PR will introduce the preprocess_parquet function. Another will be needed to handle threading the classes through the runner (where there's an open question of whether to harmonize the runner and preprocess+apply_to_fileset interfaces first). Another is expected to finish the changes for processing parquet again.

Tests should cover the overwhelming majority of cases, originally generated by Copilot, with heavy editing, expansion, corrections. The test code is not concise, unfortunately.

Pydantic automatically buys us serialization, as they can be saved into json via each model's .model_dump() function

This PR is an alternative to #1395

NOTE: considering the huge number of commits, squash and merge may be desired instead...

@NJManganelli
Copy link
Collaborator Author

@lgray @nsmith- @ikrommyd This is ready for a fresh set of eyes, particularly for anything I may have missed/overlooked so far.

Keep in mind, supporting runner, preprocess_parquet needs to come in followup PRs (latter will need to be converted for pydantic, former I have not considered yet - do we want to (semi-)unify runner and preprocess+apply_to_fileset interfaces first?)

@NJManganelli NJManganelli force-pushed the parquet-precursor-pydantic-datafactory branch from 581245f to 187113f Compare August 20, 2025 00:47
@NJManganelli NJManganelli requested a review from Copilot August 20, 2025 01:51
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Pydantic-based dataclasses to replace dictionary-based dataset specifications, providing type safety, validation, and serialization capabilities for Coffea's dataset processing workflow. This is a foundational change to support upcoming parquet reading and column-joining features.

Key changes:

  • Introduces comprehensive Pydantic models for file specifications (UprootFileSpec, ParquetFileSpec, and their Coffea variants)
  • Adds DatasetSpec and FilesetSpec classes with validation and serialization
  • Updates preprocessing and manipulation functions to support both legacy dictionaries and new Pydantic models
  • Provides dual-path compatibility for gradual migration

Reviewed Changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/coffea/dataset_tools/filespec.py Core Pydantic models defining file and dataset specifications with validation
tests/test_dataset_tools_filespec.py Comprehensive test suite for all new Pydantic models and validation logic
src/coffea/dataset_tools/preprocess.py Updated preprocessing to handle both dict and Pydantic model inputs
src/coffea/dataset_tools/manipulations.py Modified manipulation functions for dual-path compatibility
tests/test_dataset_tools.py Extended existing tests to validate both dict and Pydantic model workflows
src/coffea/dataset_tools/apply_processor.py Updated processor application to handle new model types
src/coffea/dataset_tools/__init__.py Added exports for new Pydantic classes
pyproject.toml Added Pydantic dependency
docs/source/examples.rst Added reference to new filespec notebook

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@NJManganelli
Copy link
Collaborator Author

Ping @lgray

@lgray
Copy link
Collaborator

lgray commented Aug 27, 2025

@NJManganelli looking at the volume of code that's been produced I assume this is AI generated for the most part? Not that it matters, just understanding how to approach it.

@NJManganelli
Copy link
Collaborator Author

The core code (non-tests/notebooks code, in particular filespec.py) is 100% human-written (w/ occasional copilot suggestions/fixes, but it was the pydantic conversion of non-assisted code I wrote before).

For the tests, it was a mix, lets call it 2/3rds Copilot-initiated and 1/3rd by-hand (edits + consolidating via parametrization for the copilot tests, and updating older tests to explicitly parametrize Pydantic variation).

Notebook is almost entirely Copilot, with some edits for style, presentation, and corrections.

Hopefully that helps in what/how to review them respectively

@lgray
Copy link
Collaborator

lgray commented Aug 27, 2025

An initial comment looking through the user-facing partsfilespec.ipynb the very last tutorial just stops without explaining everything it sets out to do. The rest of the tutorials seem well covering.


def identify_file_format(name_or_directory: str) -> str:
root_expression = re.compile(r"\.root")
parquet_expression = re.compile(r"\.parq(?:uet)?$")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also seen .pq running around as well for parquet files, may want to add this.

The right way to do it for both would be to check the magic bytes in the files but that's very slow for this purpose.

Copy link
Collaborator

@lgray lgray Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, riffing on that, when we go to process the files in preprocess we could also validate the determined file type against the magic bytes. However, this is largely already accomplished just by assuming the extension is correct and opening with the appropriate package.

Maybe we could give a more helpful error message? Then again I expect this kind of error (file with wrong extension) is very rare.

Moreover the actual filename is more a bookkeeping tool at the end of the day, so perhaps it's better to fall back to trying to open whatever ways we have available and notify the user (possibly by failing) if we exhausted all those options.

@lgray
Copy link
Collaborator

lgray commented Aug 27, 2025

Aside from that one additional comment and some pondering, I think this looks reasonable.

I appreciate the automation of writing core functionality unit tests.

@ikrommyd anything that catches your attention?

@lgray lgray changed the title feat: Introduce pydantic dataclasses feat: Make preprocess rigorous with IOFactory and pydantic dataclasses Aug 27, 2025
@lgray
Copy link
Collaborator

lgray commented Aug 27, 2025

As for unifying the actual processing and running interfaces.

My idea around apply_to_fileset in the old-style processor based framework is starting settle towards it producing generator of all the futures much like apply_to_fileset does in the dask case

Dask vs. non-dask can be driven by a boolean flag passed to apply_to_fileset, which should be the same as the mode flag for nanoevents and start out unset.

Then we introduce one new function coffea.compute(computable, runner=None, **dask_kwargs), where computable is a dask collection or the future container that we produce from apply_to_fileset in the non-dask case. It then returns the result of dask.compute on the dask collection or it returns the accumulated result of running the given futures.

@NJManganelli
Copy link
Collaborator Author

An initial comment looking through the user-facing partsfilespec.ipynb the very last tutorial just stops without explaining everything it sets out to do. The rest of the tutorials seem well covering.

Yeah, I deleted Copilot's suggestion for the IOFactory, partially because it was lying, and partially because while that class was the User-facing gateway to using the pure dataclass implementation (and Copilot treated it as such), Pydantic automates so much (for example, I had a function which would recurse down and convert the dataclass to pure dictionary, when the right option was set, or would only 'unwrap' the top-most layer and give you a dictionary with possibly-dataclass values(). With pydantic, these are respectively accomplished by just calling AModel.model_dump() and dict(AModel)). I was starting to think the IOFactory might be removed completely, but it's also the base class for the column-join IOJoinFactory, and that has a fair bit more work to do, and I haven't yet propagated this all through to see if that can be eliminated/converted to standalone functions yet.

In short, I was thinking of not advertising the IOFactory much anymore for users, in case it makes more sense to break into standalone functions.

But I'll write an example for the few use-cases it still has

Let me also add the .pq support and consolidate the joinable/check code.

For the magic-byte checking, lemme have a look into that in tandem with the preprocess_parquet PR. If I get that, I'll add it for both preprocess and preprocess_parquet (and maybe I can upstream some of the functionality from join_preprocess that's in column-join to coffea, a third of which is just dispatching root/parquet preprocessing to the appropriate function variation and re-ziping the fileset together at the end. That would let us just unify into a single user-facing function too.

@lgray
Copy link
Collaborator

lgray commented Aug 28, 2025

Ah - OK if it's not user facing then we shouldn't talk about it unless we have to. :-) It just sorta stuck out to me!

For the magic-byte checking I started thinking too much about the right way to do it. The end of that thinking on my side is:

  • I don't think we need to check the magic bytes, uproot and pyarrow.parquet do this for us
  • what we should do is protect the user against feeding incomprehensible data to the system, and also tell the user when a file is mislabeled.

i.e. add in some logic if a file doesn't open to check if a root file is actually parquet (and vice versa), and if it's not either valid input format then we mark the file as bad, otherwise open the file as what it really is based on the filename guess and then make a note that it has the wrong postfix.

This is more a creature comfort than anything else, consider it low priority.

@NJManganelli
Copy link
Collaborator Author

Sounds appropriate to me, do you think the format checking should be exclusively in e.g. preprocess, or both preprocess and apply_to_*set?

@lgray
Copy link
Collaborator

lgray commented Aug 28, 2025

That's a good question.

My first instinct is that once it's in apply_to_*set we should error rather than politely inform, since the user should providing that level of processing sanitized inputs of known-to-be-good files which it's at scale. When it's just messing around (like process/threads executors) we should allow globs, the jobs are typically fast enough at that scale to where time is not really wasted.

This is just the beginning of an idea though.

Nick Manganelli and others added 12 commits August 29, 2025 12:32
…as precursor to column-joining, with a hard-requirement that a form is stored and can be decoded with checking
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
… point about dict(AModel) vs AModel.model_dump() methods
@NJManganelli NJManganelli force-pushed the parquet-precursor-pydantic-datafactory branch from 0442203 to 23ee888 Compare August 29, 2025 19:32
@NJManganelli
Copy link
Collaborator Author

Okay, .pq formats accounted for, more tests for formats, factorized code for joinable and the datasetspec validation function (renamed), and some docs in the notebook for IOFactory showing off the still-relevant bits.

Branch was out of date, rebased, and the tests are off to the races...

Ciao

@lgray
Copy link
Collaborator

lgray commented Aug 29, 2025

Giving it another look through I think it looks pretty good to me. I think it sets us up for a much more robust future when it comes to dataset communication, manipulation, and tracking.

@nsmith- @ikrommyd if y'all could have a look as well, at whatever depth of attention you can spare, I'd appreciate it.

@ikrommyd ikrommyd self-requested a review August 30, 2025 06:46
Copy link
Collaborator

@ikrommyd ikrommyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never used pydantic so I don't know good and bad patterns with pydantic. I've looked at this by looking at the tests and the example notebook mainly and I like the features and I think it sets up a good path for good dataset specifications and also the unification of executor-dask preprocessing.

@lgray
Copy link
Collaborator

lgray commented Sep 2, 2025

@nsmith- Could you give this a look since you've got the most pydantic experience among people I know in HEP.

@NJManganelli
Copy link
Collaborator Author

NJManganelli commented Sep 8, 2025

@nsmith- do you have any preliminary thoughts that might dramatically change things? Otherwise I will start to work on the next couple PRs in the second half of this month, hoping only minor changes will be requested

@nsmith- nsmith- self-requested a review September 9, 2025 15:00
Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments now just to better understand the idea. Will look more.

Comment on lines 94 to 99
Union[
CoffeaUprootFileSpec,
CoffeaParquetFileSpec,
CoffeaUprootFileSpecOptional,
CoffeaParquetFileSpecOptional,
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an opportunity to use a discriminated union using format as the discriminator. Though, if we want to keep separate types for the preprocessed and input file sets then we need a function discriminator. Personally I would separate the preprocessed and input at this level: CoffeaFileDict -> InputFiles, PreprocessedFiles

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by separating the preprocessed and input levels? I suppose I've been thinking of this as "maybe-maybenot" ready-to-use and "definitively ready-to-use"; the latter may be accomplished via preprocessing, but may be manually set (if e.g. one doesn't need/care about the saved_form). As this all coalesces, maybe it makes sense to be even stricter, so that's not an objection, but where do the boundaries get defined?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've separted this into separate InputFiles (with automatic promotion) and PreprocessedFiles. There's not yet a discriminated union implementation, but I think that can be deferred to later, since at least for the moment that would mostly be a validation optimization, if I understand the purpose correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference (see also the later review comment on the return type of preprocess()) is we want to go for a type hierarchy more like

class InputFileSpec(BaseModel):
    steps: Annotated[list[StepPair], Field(min_length=1)] | None
    format: Literal["root", "parquet"]

class ROOTInputSpec(InputFileSpec):
    format = "root"
    object_path: str
    uuid: str | None = None

class ParquetInputSpec(InputFileSpec):
    format = "parquet"

class InputFilesSpec(
    RootModel[
        dict[
            str,
            Union[
                ROOTInputSpec,
                ParquetInputSpec,
            ],
        ]
    ]
)

class FileSpec(BaseModel):
    path: str
    steps: Annotated[list[StepPair], Field(min_length=1)]
    format: Literal["root", "parquet"]

class CoffeaROOTFileSpec(FileSpec):
    format = "root"
    object_path: str
    uuid: str

class CoffeaParquetFileSpec(FileSpec):
    format = "parquet"
    uuid: str

class PreprocessedFilesSpec(
    RootModel[
        list[
            Union[
                CoffeaROOTFileSpec,
                CoffeaParquetFileSpec,
            ],
        ]
    ],
)

# and then similarly for
class InputDatasetSpec: ...

class DatasetSpec: ...

steps: Annotated[list[StepPair], Field(min_length=1)] | StepPair
num_entries: Annotated[int, Field(ge=0)]
uuid: str
# directory: Literal[True, False] #identify whether it's a directory of parquet files or a single parquet file, may be useful or necessary to distinguish
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_directory: bool

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider support for directories with e.g. xrootd/root files in general, too? Similar to the format validation/magic-bytes discussion @lgray brought up, that's only possible to check rigorously by opening the paths/files, and I'm unsure what/how much is supported via fsspec and what alternative paths would be needed. But if this is the right moment to crystallize some handling of directories, lets do that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name is swapped, but this needs more thinking on how exactly is_directory is implemented, tested for (e.g. in preprocessing where directories/files are actually opened), and utilized (I think there are instances, particularly with parquet, where being able to treat things as a directory is advantageous. On the other hand, for column-joining, there'll be optimizations when the file-level view is utilized)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support wildcard paths as well, then that's another variant. Basically, the key of the InputFiles dictionary is not so trivial: could be a local file, URL, local directory, directory URL, glob pattern, ...

@NJManganelli
Copy link
Collaborator Author

Would appreciate another look now, I addressed most comments. Things are not perfect, but in the interest of perfect not becoming the enemy of good, I hope it's reasonably close enough. Would like to get this merged and propagate changes to preprocessing and column-joining before circling back to those kinds of changes (the is_directory feature might make more sense after those downstream changes, too). And we have the unification of the runner / apply_to_X to consider as well. I might refactor preprocess in that followup PR to similarly try to force to the pydantic class and only fallback if it fails, which is the more aggressive form of getting things tested in the wild.

@lgray
Copy link
Collaborator

lgray commented Oct 10, 2025

@nsmith- could you please review after Nick's updates? Thanks!

@nsmith- nsmith- self-requested a review October 13, 2025 14:46
Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, thanks for implementing the last round of review! I'm trying to keep in mind the balance of not breaking existing types but we have a chance to really improve the ability to type-check the processing tools and I think its worth it even at this early stage of refactoring.

files = dataset.files
events = NanoEventsFactory.from_root(
files,
files.model_dump(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NanoEventsFactory.from_root could accept type InputFiles. This could be for a future PR since this changeset focuses on dataset_tools only. If so, let's make an issue

Comment on lines +65 to 67
maybe_base_form = dataset.form
if maybe_base_form is not None:
maybe_base_form = awkward.forms.from_json(decompress_form(maybe_base_form))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatasetSpec.form should be a property that decompresses and returns the form if it is available, otherwise None. Then this code can be simplified to passing known_base_form=dataset.form

metadata = copy.deepcopy(dataset.metadata)
if metadata is None:
metadata = {}
metadata.setdefault("dataset", name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an example where the choice of dict[str, UnnamedThing] vs. list[NamedThing] makes it a little unergonomic: we could just pass a named dataset object to apply_to_dataset rather than copy and alter the metadata here. It's also not great for schemas to have high key cardinality. But we're locked into this by the legacy structure for now.



class CoffeaROOTFileSpecOptional(ROOTFileSpec):
num_entries: Annotated[int, Field(ge=0)] | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could num_entries be a computed property, deduced from steps[-1][1]?


class GenericFileSpec(BaseModel):
object_path: str | None = None
steps: Annotated[list[StepPair], Field(min_length=1)] | StepPair | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for a bare StepPair?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think basically all these free functions would be much more user-friendly if they were implemented on the FileSpec and related types themselves with method-chaining patterns, e.g.

spec = (
    spec.limit_steps(4, per_file=False)
    .limit_files(3)
    .filter_files(pattern='*DY*.root')
    .repartition(step_size=1000)
)

The methods can be implemented once in their most generic type, e.g. GenericFileSpec.limit_steps, with the parent containers also having the same function name that maps over their items.

We may want to leave these in for backwards-compatibility but they can then just call the method-chain versions.

select_datasets_by_criteria is another candidate for chaining, but I would suggest an interface more like spec.filter_datasets(pattern="ttbar*", metadata={"key": "value"})

Also, we're using steps and chunks in various places. Since uproot has consistently gone with steps we should probably move towards that for the names of the bound methods.

class InputFiles(
RootModel[
dict[
str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we require the dict keys to be URLs here?

Comment on lines +484 to +489
if is_datasetspec:
out_updated[name].form = compressed_union_form
out_available[name].form = compressed_union_form
else:
out_updated[name]["form"] = compressed_union_form
out_available[name]["form"] = compressed_union_form
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not always upconvert to DatasetSpec, mainpulate under that assumption, and then at the end downconvert if the user passed in a legacy object?


_trivial_file_fields = {"run", "luminosityBlock", "event"}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: add

from typing import overload

@overload
def preprocess(
    fileset: FilesetSpec,
    step_size: None | int = None,
    align_clusters: bool = False,
    recalculate_steps: bool = False,
    files_per_batch: int = 1,
    skip_bad_files: bool = False,
    file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,),
    save_form: bool = False,
    scheduler: None | Callable | str = None,
    uproot_options: dict = {},
    step_size_safety_factor: float = 0.5,
    allow_empty_datasets: bool = False,
) -> tuple[FilesetSpec, FilesetSpec]: ...

@overload
def preprocess(
    fileset: dict,
    step_size: None | int = None,
    align_clusters: bool = False,
    recalculate_steps: bool = False,
    files_per_batch: int = 1,
    skip_bad_files: bool = False,
    file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,),
    save_form: bool = False,
    scheduler: None | Callable | str = None,
    uproot_options: dict = {},
    step_size_safety_factor: float = 0.5,
    allow_empty_datasets: bool = False,
) -> tuple[dict, dict]: ...

step_size_safety_factor: float = 0.5,
allow_empty_datasets: bool = False,
) -> tuple[FilesetSpec, FilesetSpecOptional]:
) -> tuple[FilesetSpec | dict]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be tuple[FilesetSpec, FilesetSpec] | tuple[dict, dict]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants