Skip to content

Commit

Permalink
perf: Prune unused operations from sql (#1365)
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron authored Feb 6, 2025
1 parent 0c55b07 commit 923da03
Show file tree
Hide file tree
Showing 12 changed files with 489 additions and 252 deletions.
24 changes: 18 additions & 6 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,25 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
if destination_id in self.column_ids: # Mutate case
exprs = [
(
ex.deref(source_id if (col_id == destination_id) else col_id),
ids.ColumnId(col_id),
bigframes.core.nodes.AliasedRef(
ex.deref(source_id if (col_id == destination_id) else col_id),
ids.ColumnId(col_id),
)
)
for col_id in self.column_ids
]
else: # append case
self_projection = (
(ex.deref(col_id), ids.ColumnId(col_id)) for col_id in self.column_ids
bigframes.core.nodes.AliasedRef.identity(ids.ColumnId(col_id))
for col_id in self.column_ids
)
exprs = [
*self_projection,
(ex.deref(source_id), ids.ColumnId(destination_id)),
(
bigframes.core.nodes.AliasedRef(
ex.deref(source_id), ids.ColumnId(destination_id)
)
),
]
return ArrayValue(
nodes.SelectionNode(
Expand All @@ -337,7 +344,10 @@ def create_constant(

def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
# This basically just drops and reorders columns - logically a no-op except as a final step
selections = ((ex.deref(col_id), ids.ColumnId(col_id)) for col_id in column_ids)
selections = (
bigframes.core.nodes.AliasedRef.identity(ids.ColumnId(col_id))
for col_id in column_ids
)
return ArrayValue(
nodes.SelectionNode(
child=self.node,
Expand Down Expand Up @@ -488,7 +498,9 @@ def prepare_join_names(
nodes.SelectionNode(
other.node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
bigframes.core.nodes.AliasedRef(
ex.deref(old_id), ids.ColumnId(new_id)
)
for old_id, new_id in r_mapping.items()
),
),
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _to_ibis_expr(
# Special case for empty tables, since we can't create an empty
# projection.
if not self._columns:
return bigframes_vendored.ibis.memtable([])
return self._table.select([bigframes_vendored.ibis.literal(1)])

table = self._table.select(self._columns)
if fraction is not None:
Expand Down
22 changes: 15 additions & 7 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ def compile_sql(
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
node = nodes.bottom_up(node, rewrites.rewrite_slice)
# TODO: Extract out CTEs
node, ordering = rewrites.pull_up_order(
node, order_root=True, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
return ir.to_sql(
order_by=ordering.all_ordering_columns,
Expand All @@ -76,6 +78,7 @@ def compile_sql(
node, _ = rewrites.pull_up_order(
node, order_root=False, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
return ir.to_sql(selections=output_ids)

Expand All @@ -86,6 +89,7 @@ def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
node, _ = rewrites.pull_up_order(
node, order_root=False, ordered_joins=self.strict
)
node = rewrites.column_pruning(node)
return self.compile_node(node).to_sql(limit=n_rows, selections=ids)

def compile_raw(
Expand All @@ -97,6 +101,7 @@ def compile_raw(
node = nodes.bottom_up(node, rewrites.rewrite_slice)
node = nodes.top_down(node, rewrites.rewrite_timedelta_ops)
node, ordering = rewrites.pull_up_order(node, ordered_joins=self.strict)
node = rewrites.column_pruning(node)
ir = self.compile_node(node)
sql = ir.to_sql()
return sql, node.schema.to_bigquery(), ordering
Expand Down Expand Up @@ -192,31 +197,34 @@ def compile_readtable(self, node: nodes.ReadTableNode):
return self.compile_read_table_unordered(node.source, node.scan_list)

def read_table_as_unordered_ibis(
self, source: nodes.BigqueryDataSource
self,
source: nodes.BigqueryDataSource,
scan_cols: typing.Sequence[str],
) -> ibis_types.Table:
full_table_name = f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
used_columns = tuple(col.name for col in source.table.physical_schema)
# Physical schema might include unused columns, unsupported datatypes like JSON
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
list(i for i in source.table.physical_schema if i.name in used_columns)
list(source.table.physical_schema)
)
if source.at_time is not None or source.sql_predicate is not None:
import bigframes.session._io.bigquery

sql = bigframes.session._io.bigquery.to_query(
full_table_name,
columns=used_columns,
columns=scan_cols,
sql_predicate=source.sql_predicate,
time_travel_timestamp=source.at_time,
)
return ibis_bigquery.Backend().sql(schema=physical_schema, query=sql)
else:
return ibis_api.table(physical_schema, full_table_name)
return ibis_api.table(physical_schema, full_table_name).select(scan_cols)

def compile_read_table_unordered(
self, source: nodes.BigqueryDataSource, scan: nodes.ScanList
):
ibis_table = self.read_table_as_unordered_ibis(source)
ibis_table = self.read_table_as_unordered_ibis(
source, scan_cols=[col.source_id for col in scan.items]
)
return compiled.UnorderedIR(
ibis_table,
tuple(
Expand Down Expand Up @@ -291,7 +299,7 @@ def set_output_names(
return nodes.SelectionNode(
node,
tuple(
(ex.DerefOp(old_id), ids.ColumnId(out_id))
bigframes.core.nodes.AliasedRef(ex.DerefOp(old_id), ids.ColumnId(out_id))
for old_id, out_id in zip(node.ids, output_ids)
),
)
Loading

0 comments on commit 923da03

Please sign in to comment.