Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Making introspection more efficient #36

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions tap_snowflake/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,36 @@
from pathlib import Path
from typing import Any, Iterable, List, Tuple
from uuid import uuid4
import datetime
import re

import sqlalchemy
from singer_sdk import SQLConnector, SQLStream, metrics
from singer_sdk.helpers._batch import BaseBatchFileEncoding, BatchConfig
from singer_sdk.streams.core import REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL
import singer_sdk.helpers._typing
from snowflake.sqlalchemy import URL
from sqlalchemy.sql import text

unpatched_conform = singer_sdk.helpers._typing._conform_primitive_property


def patched_conform(
elem: Any,
property_schema: dict,
) -> Any:
"""Overrides Singer SDK type conformance to prevent dates turning into datetimes.
Converts a primitive (i.e. not object or array) to a json compatible type.
Returns:
The appropriate json compatible type.
"""
if isinstance(elem, datetime.date):
return elem.isoformat()
return unpatched_conform(elem=elem, property_schema=property_schema)


singer_sdk.helpers._typing._conform_primitive_property = patched_conform


class ProfileStats(Enum):
"""Profile Statistics Enum."""
Expand Down Expand Up @@ -78,7 +100,7 @@ def create_engine(self) -> sqlalchemy.engine.Engine:
self.sqlalchemy_url,
echo=False,
pool_timeout=10,
)
)

# overridden to filter out the information_schema from catalog discovery
def discover_catalog_entries(self) -> list[dict]:
Expand All @@ -91,13 +113,22 @@ def discover_catalog_entries(self) -> list[dict]:
tables = [t.lower() for t in self.config.get("tables", [])]
engine = self.create_sqlalchemy_engine()
inspected = sqlalchemy.inspect(engine)
schema_names = [
schema_name
for schema_name in self.get_schema_names(engine, inspected)
if schema_name.lower() != "information_schema"
]

if self.config.get("schema"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon I'm curious what you think about this. I know the schema setting was documented to be the schema used to connect to the database but not necessarily meant to be a filter for only data in that database. It seems like this PR would implement it to only consider data in the configured schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, is there an update on this? We would really like to merge this in asap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon ping on the previous message in case you have thoughts but we mentioned it in office hours and it looks like this is implementation specific and not related to the SDK as I originally thought.

@nidhi-akkio sorry about the delay. A few more comments but I think we can get this merged soon.

This is a of breaking change for some users who assigned a schema to use for connecting but are expecting the status quo where its not using it for filtering. If they upgrade they'll silently stop syncing tables outside this schema. We can either:

  1. accept the breaking change and document it well in release notes and update the tap setting description which will also get pulled into the hub docs.
  2. we can add this as a new config like schema_filter. Avoiding breaking changes but likely making the difference between the 2 options confusing for users, although schema in its current state is already confusing to me 😅 .

@nidhi-akkio @edgarrmondragon what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, like you say @pnadolny13, that the current usage and description of the schema setting, i.e. The initial schema for the Snowflake session. is rather confusing.

My preference would be to not introduce a breaking change, add a new setting schema_filter and think about what to do with the existing schema setting.

cc @nidhi-akkio

schema_names = []
schema_names.append(self.config.get("schema"))

else:
schema_names = [
schema_name
for schema_name in self.get_schema_names(engine, inspected)
if schema_name.lower() != "information_schema"
]

for schema_name in schema_names:
# Iterate through each table and view
# We shouldn't have to iterate through every table in a schema if tables are provided
# However, the only way to get is_view for tables is self.get_object_names with schema
for table_name, is_view in self.get_object_names(
engine, inspected, schema_name
):
Expand Down
2 changes: 1 addition & 1 deletion tests/catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
"type": ["number"]
},
"o_orderdate": {
"format": "date-time",
"format": "date",
"type": ["string"]
},
"o_orderpriority": {
Expand Down
Loading