Skip to content

Commit

Permalink
feat(api): avoid caching already concrete tables
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrist committed Aug 30, 2024
1 parent 4f85430 commit 6c42221
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
52 changes: 41 additions & 11 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,17 +805,22 @@ def _cached_table(self, table: ir.Table) -> ir.CachedTable:
Cached table
"""
entry = self._cache_op_to_entry.get(table.op())
if entry is None or (cached_op := entry.cached_op_ref()) is None:
cached_op = self._create_cached_table(util.gen_name("cached"), table).op()
entry = CacheEntry(
table.op(),
weakref.ref(cached_op),
weakref.finalize(
cached_op, self._finalize_cached_table, cached_op.name
),
)
self._cache_op_to_entry[table.op()] = entry
self._cache_name_to_entry[cached_op.name] = entry
if entry is not None and (cached_op := entry.cached_op_ref()) is not None:
# Entry was already cached
return ir.CachedTable(cached_op)

if not self._should_cache_table_expr(table):
# Expression isn't worth caching, no-op
return ir.CachedTable(table.op())

cached_op = self._create_cached_table(util.gen_name("cached"), table).op()
entry = CacheEntry(
table.op(),
weakref.ref(cached_op),
weakref.finalize(cached_op, self._finalize_cached_table, cached_op.name),
)
self._cache_op_to_entry[table.op()] = entry
self._cache_name_to_entry[cached_op.name] = entry
return ir.CachedTable(cached_op)

def _finalize_cached_table(self, name: str) -> None:
Expand All @@ -838,6 +843,31 @@ def _finalize_cached_table(self, name: str) -> None:
if not sys.is_finalizing():
raise

def _should_cache_table_expr(self, expr: ir.Table) -> bool:
"""Checks if a given table expression is worth caching."""
op = expr.op()

# Don't cache if an expression is a column subselection of a physical table.
while isinstance(op, (ops.Project, ops.DropColumns)):
if isinstance(op, ops.Project) and not all(
isinstance(v, ops.Field) for v in op.values.values()
):
return True
op = op.parent

return not isinstance(
op, ops.PhysicalTable
) or self._should_cache_physical_table(op)

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
"""Check whether a PhysicalTable node is worth caching.
By default we don't cache any PhysicalTable ops. Some backends need
to override this method to allow for caching of tables backed by
potentially expensive IO (e.g. a TEMP VIEW backed by data on S3).
"""
return False

def _create_cached_table(self, name: str, expr: ir.Table) -> ir.Table:
return self.create_table(name, expr, schema=expr.schema(), temp=True)

Expand Down
18 changes: 18 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,24 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
}
)

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)):
# Cache temp views since they're used for `read_csv`/`read_parquet`
# and may point to remote data, don't cache anything else.
sql = (
sg.select(sg.func("any_value", C.table_type.eq("VIEW")))
.from_(sg.table("tables", db="information_schema"))
.where(
C.table_catalog.eq(op.namespace.catalog or self.current_catalog),
C.table_schema.eq(op.namespace.database or self.current_database),
)
.sql(self.dialect)
)
with self._safe_raw_sql(sql) as cur:
result = cur.fetchone()
return True if result is None else result[0]
return False

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
name = op.name
try:
Expand Down
19 changes: 19 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,25 @@ def _drop_cached_table(self, name):
t.unpersist()
assert not t.is_cached

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)):
# Cache temp views since they're used for `read_csv`/`read_parquet`
# and may point to remote data, don't cache anything else.
sql = (
f"SHOW VIEWS IN {op.namespace.database}"
if op.namespace.database
else "SHOW VIEWS"
)
with self._active_catalog(op.namespace.catalog):
for view in self._session.sql(sql).collect():
if view.viewName == op.name:
# already cached tables are also exposed as temp views,
# check the view isn't backed by the cache
return (
view.isTemporary and op.name not in self._cached_dataframes
)
return False

def read_delta(
self,
path: str | Path,
Expand Down
3 changes: 2 additions & 1 deletion ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4900,7 +4900,8 @@ def __enter__(self):
def release(self):
"""Release the underlying expression from the cache."""
current_backend = self._find_backend(use_default=True)
return current_backend._finalize_cached_table(self.op().name)
if isinstance(op := self.op(), ops.PhysicalTable):
current_backend._finalize_cached_table(op.name)


public(Table=Table, CachedTable=CachedTable)

0 comments on commit 6c42221

Please sign in to comment.