Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using S3 Select in indexer #4212

Merged
merged 12 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lambdas/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<!-- markdownlint-disable line-length -->
# Changelog

Changes are listed in reverse chronological order (newer entries at the top).
The entry format is

```markdown
- [Verb] Change description ([#<PR-number>](https://github.com/quiltdata/quilt/pull/<PR-number>))
```

where verb is one of

- Removed
- Added
- Fixed
- Changed

## Changes

- [Changed] Stop using S3 select ([#4212](https://github.com/quiltdata/quilt/pull/4212))
- [Added] Bootstrap the change log ([#4212](https://github.com/quiltdata/quilt/pull/4212))
97 changes: 58 additions & 39 deletions lambdas/indexer/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@


import datetime
import functools
import json
import os
import pathlib
Expand Down Expand Up @@ -92,7 +93,6 @@
POINTER_PREFIX_V1,
get_available_memory,
get_quilt_logger,
query_manifest_content,
separated_env_to_iter,
)

Expand Down Expand Up @@ -168,12 +168,7 @@
# currently only affects .parquet, TODO: extend to other extensions
assert 'SKIP_ROWS_EXTS' in os.environ
SKIP_ROWS_EXTS = separated_env_to_iter('SKIP_ROWS_EXTS')
SELECT_PACKAGE_META = "SELECT * from S3Object o WHERE o.version IS NOT MISSING LIMIT 1"
# No WHERE clause needed for aggregations since S3 Select skips missing fields for aggs
SELECT_PACKAGE_STATS = (
"SELECT COALESCE(SUM(obj['size']), 0) as total_bytes,"
" COUNT(obj['size']) as total_files from S3Object obj"
)
DUCKDB_SELECT_LAMBDA_ARN = os.environ["DUCKDB_SELECT_LAMBDA_ARN"]
TEST_EVENT = "s3:TestEvent"
# we need to filter out GetObject and HeadObject calls generated by the present
# lambda in order to display accurate analytics in the Quilt catalog
Expand All @@ -182,6 +177,7 @@


logger = get_quilt_logger()
s3_client = boto3.client("s3", config=botocore.config.Config(user_agent_extra=USER_AGENT_EXTRA))


def now_like_boto3():
Expand Down Expand Up @@ -247,13 +243,10 @@
wrapper for retry and returning a string
"""
try:
raw = query_manifest_content(
s3_client,
bucket=bucket,
key=key,
sql_stmt=SELECT_PACKAGE_META
)
return json.load(raw)
body = s3_client.get_object(Bucket=bucket, Key=key)["Body"]
with body: # this *might* be needed to close the stream ASAP
for line in body.iter_lines():
return json.loads(line)

Check warning on line 249 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L246-L249

Added lines #L246 - L249 were not covered by tests
except (botocore.exceptions.ClientError, json.JSONDecodeError) as cle:
print(f"Unable to S3 select manifest: {cle}")

Expand Down Expand Up @@ -439,7 +432,7 @@
first = select_manifest_meta(s3_client, bucket, manifest_key)
if not first:
return
stats = select_package_stats(s3_client, bucket, manifest_key)
stats = select_package_stats(bucket, manifest_key)
if not stats:
return

Expand Down Expand Up @@ -472,33 +465,54 @@
return True


def select_package_stats(s3_client, bucket, manifest_key) -> str:
@functools.lru_cache(maxsize=None)
def get_bucket_region(bucket: str) -> str:
resp = s3_client.head_bucket(Bucket=bucket)
return resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"]

Check warning on line 471 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L470-L471

Added lines #L470 - L471 were not covered by tests


@functools.lru_cache(maxsize=None)
def get_presigner_client(bucket: str):
return boto3.client(

Check warning on line 476 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L476

Added line #L476 was not covered by tests
"s3",
region_name=get_bucket_region(bucket),
config=botocore.config.Config(signature_version="s3v4"),
)


def select_package_stats(bucket, manifest_key) -> Optional[dict]:
"""use s3 select to generate file stats for package"""
logger_ = get_quilt_logger()
try:
raw_stats = query_manifest_content(
s3_client,
bucket=bucket,
key=manifest_key,
sql_stmt=SELECT_PACKAGE_STATS
).read()

if raw_stats:
stats = json.loads(raw_stats)
assert isinstance(stats['total_bytes'], int)
assert isinstance(stats['total_files'], int)

return stats

except (
AssertionError,
botocore.exceptions.ClientError,
json.JSONDecodeError,
KeyError,
) as err:
logger_.exception("Unable to compute package stats via S3 select")
presigner_client = get_presigner_client(bucket)
url = presigner_client.generate_presigned_url(

Check warning on line 487 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L486-L487

Added lines #L486 - L487 were not covered by tests
ClientMethod="get_object",
Params={
"Bucket": bucket,
"Key": manifest_key,
},
)
lambda_ = make_lambda_client()
q = f"""

Check warning on line 495 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L494-L495

Added lines #L494 - L495 were not covered by tests
SELECT
COALESCE(SUM(size), 0) AS total_bytes,
COUNT(size) AS total_files FROM read_ndjson('{url}', columns={{size: 'UBIGINT'}}) obj
"""
resp = lambda_.invoke(

Check warning on line 500 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L500

Added line #L500 was not covered by tests
FunctionName=DUCKDB_SELECT_LAMBDA_ARN,
Payload=json.dumps({"query": q, "user_agent": f"DuckDB Select {USER_AGENT_EXTRA}"}),
)

return None
payload = resp["Payload"].read()
if "FunctionError" in resp:
logger_.error("DuckDB select unhandled error: %s", payload)
return None
parsed = json.loads(payload)
if "error" in parsed:
logger_.error("DuckDB select error: %s", parsed["error"])
return None

Check warning on line 512 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L505-L512

Added lines #L505 - L512 were not covered by tests

rows = parsed["rows"]
return rows[0] if rows else None

Check warning on line 515 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L514-L515

Added lines #L514 - L515 were not covered by tests


def extract_pptx(fileobj, max_size: int) -> str:
Expand Down Expand Up @@ -732,6 +746,11 @@
return boto3.client("s3", config=configuration)


@functools.lru_cache(maxsize=None)
def make_lambda_client():
return boto3.client("lambda")

Check warning on line 751 in lambdas/indexer/index.py

View check run for this annotation

Codecov / codecov/patch/informational

lambdas/indexer/index.py#L751

Added line #L751 was not covered by tests


def map_event_name(event: dict):
"""transform eventbridge names into S3-like ones"""
input_ = event["eventName"]
Expand Down
4 changes: 3 additions & 1 deletion lambdas/indexer/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[pytest]
env =
DUCKDB_SELECT_LAMBDA_ARN = "arn:aws:lambda:us-west-2:123456789012:function:select-lambda"
log_cli = True
# This is set above critical to prevent logger events from confusing output in CI
log_level = 51
log_level = 51
1 change: 1 addition & 0 deletions lambdas/indexer/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pluggy==0.9
py==1.10.0
pytest==4.4.0
pytest-cov==2.6.1
pytest-env==0.6.2
responses==0.10.14
50 changes: 2 additions & 48 deletions lambdas/indexer/test/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import responses
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ParamValidationError
from botocore.stub import Stubber
from dateutil.tz import tzutc
from document_queue import EVENT_PREFIX, RetryError
Expand Down Expand Up @@ -979,7 +978,7 @@ def test_index_if_package_select_stats_fail(self, append_mock, select_meta_mock,
)

select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(bucket, manifest_key)
append_mock.assert_called_once_with({
"_index": bucket + PACKAGE_INDEX_SUFFIX,
"_id": key,
Expand Down Expand Up @@ -1023,7 +1022,7 @@ def test_index_if_package(self, append_mock, select_meta_mock, select_stats_mock
)

select_meta_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(self.s3_client, bucket, manifest_key)
select_stats_mock.assert_called_once_with(bucket, manifest_key)
append_mock.assert_called_once_with({
"_index": bucket + PACKAGE_INDEX_SUFFIX,
"_id": key,
Expand Down Expand Up @@ -1182,51 +1181,6 @@ def test_extension_overrides(self):
assert self._get_contents('foo.txt', '.txt') == ""
assert self._get_contents('foo.ipynb', '.ipynb') == ""

@pytest.mark.xfail(
raises=ParamValidationError,
reason="boto bug https://github.com/boto/botocore/issues/1621",
strict=True,
)
def test_stub_select_object_content(self):
"""Demonstrate that mocking S3 select with boto3 is broken"""
sha_hash = "50f4d0fc2c22a70893a7f356a4929046ce529b53c1ef87e28378d92b884691a5"
manifest_key = f"{MANIFEST_PREFIX_V1}{sha_hash}"
# this SHOULD work, but due to botocore bugs it does not
self.s3_stubber.add_response(
method="select_object_content",
service_response={
"ResponseMetadata": ANY,
# it is sadly not possible to mock S3 select responses because
# boto incorrectly believes "Payload"'s value should be a dict
# but it's really an iterable in realworld code
# see https://github.com/boto/botocore/issues/1621
"Payload": [
{
"Stats": {}
},
{
"Records": {
"Payload": json.dumps(MANIFEST_DATA).encode(),
},
},
{
"End": {}
},
]
},
expected_params={
"Bucket": "test-bucket",
"Key": manifest_key,
"Expression": index.SELECT_PACKAGE_META,
"ExpressionType": "SQL",
"InputSerialization": {
'JSON': {'Type': 'LINES'},
'CompressionType': 'NONE'
},
"OutputSerialization": {'JSON': {'RecordDelimiter': '\n'}}
}
)

def test_synthetic_copy_event(self):
"""check synthetic ObjectCreated:Copy event vs organic obtained on 26-May-2020
(bucket versioning on)
Expand Down