Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch input for iid_list_exists #34

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions leap_data_management_utils/cmip_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import zarr
from google.cloud import bigquery
from pangeo_forge_recipes.transforms import Indexed, T
from tqdm.auto import tqdm

from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils.data_management_transforms import BQInterface
Expand Down Expand Up @@ -136,23 +137,14 @@ def iid_exists(self, iid: str) -> bool:
"""Check if iid exists in the table"""
return self._get_iid_results(iid).exists

def iid_list_exists(self, iids: list[str]) -> list[str]:
def _iid_list_exists_batch(self, iids: list[str]) -> list[str]:
"""More efficient way to check if a list of iids exists in the table
Passes the entire list to a single SQL query.
Returns a list of iids that exist in the table
Only supports list up to 10k elements. If you want to check more, you should
work in batches:
```
iids = df['instance_id'].tolist()
iids_in_bq = []
batchsize = 10000
iid_batches = [iids[i : i + batchsize] for i in range(0, len(iids), batchsize)]
for iids_batch in tqdm(iid_batches):
iids_in_bq_batch = bq.iid_list_exists(iids_batch)
iids_in_bq.extend(iids_in_bq_batch)
```
"""
assert len(iids) <= 10000
if len(iids) > 10000:
raise ValueError('List of iids is too long. Please work in batches.')

# source: https://stackoverflow.com/questions/26441928/how-do-i-check-if-multiple-values-exists-in-database
query = f"""
Expand All @@ -164,6 +156,21 @@ def iid_list_exists(self, iids: list[str]) -> list[str]:
# this is a full row iterator, for now just return the iids
return list(set([r['instance_id'] for r in results]))

def iid_list_exists(self, iids: list[str]) -> list[str]:
"""More efficient way to check if a list of iids exists in the table
Passes the entire list in batches into SQL querys for maximum efficiency.
Returns a list of iids that exist in the table
"""

# make batches of the input, since bq cannot handle more than 10k elements here
iids_in_bq = []
batchsize = 10000
iid_batches = [iids[i : i + batchsize] for i in range(0, len(iids), batchsize)]
for iids_batch in tqdm(iid_batches):
iids_in_bq_batch = self._iid_list_exists_batch(iids_batch)
iids_in_bq.extend(iids_in_bq_batch)
return iids_in_bq


# ----------------------------------------------------------------------------------------------
# apache Beam stages
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
[project.optional-dependencies]

pangeo-forge=[
"tqdm",
"db_dtypes",
"google-api-core",
"google-cloud-bigquery",
Expand Down