Skip to content

Commit

Permalink
Merge pull request #409 from astronomy-commons/sean/merge-asof
Browse files Browse the repository at this point in the history
Add merge_asof function to catalog
  • Loading branch information
smcguire-cmu authored Aug 27, 2024
2 parents 57dca96 + 55ede31 commit 67bcf5a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
48 changes: 48 additions & 0 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
join_catalog_data_nested,
join_catalog_data_on,
join_catalog_data_through,
merge_asof_catalog_data,
)
from lsdb.dask.partition_indexer import PartitionIndexer
from lsdb.io.schema import get_arrow_schema
Expand Down Expand Up @@ -378,6 +379,53 @@ def merge(
suffixes=suffixes,
)

def merge_asof(
self,
other: Catalog,
direction: str = "backward",
suffixes: Tuple[str, str] | None = None,
output_catalog_name: str | None = None,
):
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys
Must be along catalog indices, and does not include margin caches, meaning results may be incomplete
for merging points.
This function is intended for use in special cases such as Dust Map Catalogs, for general merges,
the `crossmatch`and `join` functions should be used.
Args:
other (lsdb.Catalog): the right catalog to merge to
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof
Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
added, and the rows merged using merge_asof on the specified columns.
"""
if suffixes is None:
suffixes = (f"_{self.name}", f"_{other.name}")

if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")

ddf, ddf_map, alignment = merge_asof_catalog_data(self, other, suffixes=suffixes, direction=direction)

if output_catalog_name is None:
output_catalog_name = (
f"{self.hc_structure.catalog_info.catalog_name}_merge_asof_"
f"{other.hc_structure.catalog_info.catalog_name}"
)

new_catalog_info = dataclasses.replace(
self.hc_structure.catalog_info,
catalog_name=output_catalog_name,
ra_column=self.hc_structure.catalog_info.ra_column + suffixes[0],
dec_column=self.hc_structure.catalog_info.dec_column + suffixes[0],
)
hc_catalog = hc.catalog.Catalog(new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf))
return Catalog(ddf, ddf_map, hc_catalog)

def join(
self,
other: Catalog,
Expand Down
76 changes: 75 additions & 1 deletion src/lsdb/dask/join_catalog_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# pylint: disable=duplicate-code

from __future__ import annotations

import warnings
Expand All @@ -8,6 +7,7 @@
import dask
import nested_dask as nd
import nested_pandas as npd
import pandas as pd
from hipscat.catalog.association_catalog import AssociationCatalogInfo
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.catalog.margin_cache import MarginCacheCatalogInfo
Expand Down Expand Up @@ -228,6 +228,44 @@ def perform_join_through(
return merged


# pylint: disable=too-many-arguments, unused-argument
@dask.delayed
def perform_merge_asof(
left: npd.NestedFrame,
right: npd.NestedFrame,
left_pixel: HealpixPixel,
right_pixel: HealpixPixel,
left_catalog_info: CatalogInfo,
right_catalog_info: CatalogInfo,
suffixes: Tuple[str, str],
direction: str,
):
"""Performs a merge_asof on two catalog partitions
Args:
left (npd.NestedFrame): the left partition to merge
right (npd.NestedFrame): the right partition to merge
left_pixel (HealpixPixel): the HEALPix pixel of the left partition
right_pixel (HealpixPixel): the HEALPix pixel of the right partition
left_catalog_info (hc.CatalogInfo): the catalog info of the left catalog
right_catalog_info (hc.CatalogInfo): the catalog info of the right catalog
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): The direction to perform the merge_asof
Returns:
A dataframe with the result of merging the left and right partitions on the specified columns with
`merge_asof`
"""
if right_pixel.order > left_pixel.order:
left = filter_by_hipscat_index_to_pixel(left, right_pixel.order, right_pixel.pixel)

left, right = rename_columns_with_suffixes(left, right, suffixes)
left.sort_index(inplace=True)
right.sort_index(inplace=True)
merged = pd.merge_asof(left, right, left_index=True, right_index=True, direction=direction)
return merged


def join_catalog_data_on(
left: Catalog, right: Catalog, left_on: str, right_on: str, suffixes: Tuple[str, str]
) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
Expand Down Expand Up @@ -382,3 +420,39 @@ def join_catalog_data_through(
meta_df = generate_meta_df_for_joined_tables([left, extra_df, right], [suffixes[0], "", suffixes[1]])

return construct_catalog_args(joined_partitions, meta_df, alignment)


def merge_asof_catalog_data(
left: Catalog, right: Catalog, suffixes: Tuple[str, str], direction: str = "backward"
) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys
Must be along catalog indices, and does not include margin caches, meaning results may be incomplete for
merging points.
This function is intended for use in special cases such as Dust Map Catalogs, for general merges,
the `crossmatch`and `join` functions should be used.
Args:
left (lsdb.Catalog): the left catalog to join
right (lsdb.Catalog): the right catalog to join
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof
Returns:
A tuple of the dask dataframe with the result of the join, the pixel map from HEALPix
pixel to partition index within the dataframe, and the PixelAlignment of the two input
catalogs.
"""

alignment = align_catalogs(left, right)

left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment)

joined_partitions = align_and_apply(
[(left, left_pixels), (right, right_pixels)], perform_merge_asof, suffixes, direction
)

meta_df = generate_meta_df_for_joined_tables([left, right], suffixes)

return construct_catalog_args(joined_partitions, meta_df, alignment)
33 changes: 32 additions & 1 deletion tests/lsdb/catalog/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix


def test_small_sky_join_small_sky_order1(
Expand Down Expand Up @@ -214,3 +214,34 @@ def test_join_nested(small_sky_catalog, small_sky_order1_source_with_margin, ass
check_column_type=False,
check_index_type=False,
)


def test_merge_asof(small_sky_catalog, small_sky_xmatch_catalog, assert_divisions_are_correct):
suffixes = ("_a", "_b")
for direction in ["backward", "forward", "nearest"]:
joined = small_sky_catalog.merge_asof(
small_sky_xmatch_catalog, direction=direction, suffixes=suffixes
)
assert isinstance(joined._ddf, nd.NestedFrame)
assert_divisions_are_correct(joined)
joined_compute = joined.compute()
assert isinstance(joined_compute, npd.NestedFrame)
small_sky_compute = small_sky_catalog.compute().rename(
columns={c: c + suffixes[0] for c in small_sky_catalog.columns}
)
order_1_partition = hipscat_id_to_healpix(small_sky_compute.index.to_numpy(), 1)
left_partitions = [
small_sky_compute[order_1_partition == p.pixel]
for p in small_sky_xmatch_catalog.get_healpix_pixels()
]
small_sky_order1_partitions = [
p.compute().rename(columns={c: c + suffixes[1] for c in small_sky_xmatch_catalog.columns})
for p in small_sky_xmatch_catalog.partitions
]
correct_result = pd.concat(
[
pd.merge_asof(lp, rp, direction=direction, left_index=True, right_index=True)
for lp, rp in zip(left_partitions, small_sky_order1_partitions)
]
)
pd.testing.assert_frame_equal(joined_compute, correct_result)

0 comments on commit 67bcf5a

Please sign in to comment.