Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mass M2M join in Table object #566

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions piccolo/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,125 @@ def remove_m2m(self, *rows: Table, m2m: M2M) -> M2MRemoveRelated:
m2m=m2m,
)

async def __join_field(self, field: str, ignore: bool = False) -> list:
"""
Runs get_m2m for a FIELD of object. Catches ValueError, when there are
no relations in M2M table and returns empty list(). If ignore flag is
True, also returns empty list.

:param field:
The field to join.
:param ignore:
Flag for include and exclude logic of join_m2m().
:returns:
A list of related objects, or an empty one if none exist.

"""
if ignore:
return list()
try:
return await self.get_m2m(self.__getattribute__(field)).run()
except ValueError:
return list()

async def join_m2m(
self,
include_fields: t.Union[set[str], t.List[str], None] = None,
exclude_fields: t.Union[set[str], t.List[str], None] = None,
):
"""
Runs get_m2m() method for all M2M fields of object. Can be useful for
complex PyDantic models in READ actions. Returns empty list() for an
attribute, if there are no relations to this object.

Optional, you can include or exclude fields to define which attrs
should be joined. Setting either include_fields, and exclude_fields
will raise AssertionError.

Model example:

.. code-block:: python

class Band(Table):
genres = m2m.M2M(LazyTableReference(
"BandtoGenre",
module_path=__name_
)
)
concerts = m2m.M2M(
LazyTableReference(
"BandToConcert",
module_path=__name__
)
)

.. code-block:: python

>>> band = await Band.objects().get(Band.name == "Pythonistas")
>>> await band.join_m2m()
>>> band.genres
[<Genre: 1>, <Genre: 2>]
>>> band.concerts
[<Concert: 1>,<Concert: 2>,<Concert: 3>]

Include_fields example:

.. code-block:: python
>>> await band.join_m2m(include_fields=['genres'])
>>> band.genres
[<Genre: 1>, <Genre: 2>]
>>> band.concerts
[]

Exclude_fields example:

.. code-block:: python
>>> await band.join_m2m(exclude_fields=['genres'])
>>> band.genres
[]
>>> band.concerts
[<Concert: 1>,<Concert: 2>,<Concert: 3>]

:param include_fields:
Only these fields will be joined to base model`s object. Defaults
to None.
:param exclude_fields:
Only these fields will be excluded from the join. Defaults to None.

"""
assert (include_fields is None) or (
exclude_fields is None
), "Only one of FIELDS arguments can exist"
if include_fields is not None:
assert isinstance(
include_fields, (set, list)
), "include_fields MUST be set, list or None"
if exclude_fields is not None:
assert isinstance(
exclude_fields, (set, list)
), "exclude_fields MUST be set, list or None"
m2m_fields: set = set(
[
field
for field, object in inspect.getmembers(
self, lambda a: (isinstance(a, M2M))
)
]
)
ignore_fields: list = list()
if include_fields:
ignore_fields = list(m2m_fields.difference(set(include_fields)))
if exclude_fields:
ignore_fields = list(m2m_fields.intersection(set(exclude_fields)))
for field_ in list(m2m_fields):
ignore: bool = False
if field_ in ignore_fields:
ignore = True
self.__setattr__(
field_, # M2M attr name
await self.__join_field(field=field_, ignore=ignore),
)

def to_dict(self, *columns: Column) -> t.Dict[str, t.Any]:
"""
A convenience method which returns a dictionary, mapping column names
Expand Down