Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ jobs:
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Base.txt
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: get current time
run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
Expand Down Expand Up @@ -189,4 +190,26 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
- name: run Table Row Inference Sklearn Batch
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=NONE --metrics_table=result_table_row_inference_batch --influx_measurement=result_table_row_inference_batch --mode=batch --input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl --input_expand_factor=100 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs --job_name=benchmark-tests-table-row-inference-batch-${{env.NOW_UTC}}'
- name: run Table Row Inference Sklearn Stream
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

--project=apache-beam-testing
--region=us-central1
--worker_machine_type=n1-standard-4
--num_workers=10
--disk_size_gb=50
--autoscaling_algorithm=NONE
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=result_table_row_inference_batch
--input_options={}
--influx_measurement=result_table_row_inference_batch
--mode=batch
--input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl
# 100k lines × 100 = 10M rows; use 1000 for 100M rows
--input_expand_factor=100
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
--feature_columns=feature1,feature2,feature3,feature4,feature5
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs
--runner=DataflowRunner
--experiments=use_runner_v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

--project=apache-beam-testing
--region=us-central1
--worker_machine_type=n1-standard-4
--num_workers=10
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=20
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=result_table_row_inference_stream
--input_options={}
--influx_measurement=result_table_row_inference_stream
--mode=streaming
--input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark
--window_size_sec=60
--trigger_interval_sec=30
--timeout_ms=1800000
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
--feature_columns=feature1,feature2,feature3,feature4,feature5
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs
--runner=DataflowRunner
--experiments=use_runner_v2
3 changes: 2 additions & 1 deletion .test-infra/tools/refresh_looker_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
("82", ["263", "264", "265", "266", "267"]), # PyTorch Sentiment Streaming DistilBERT base uncased
("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased
("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma
("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch
("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming
]


def get_look(id: str) -> models.Look:
look = next(iter(sdk.search_looks(id=id)), None)
if not look:
Expand Down
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/examples/inference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -968,4 +968,70 @@ and produce the following result in your output file location:
An emperor penguin is an adorable creature that lives in Antarctica.
```

---
## Table row inference

[`table_row_inference.py`](./table_row_inference.py) contains an implementation for a RunInference pipeline that processes structured table rows from a file or Pub/Sub, runs ML inference while preserving the table schema, and writes results to BigQuery. It supports both batch (file input) and streaming (Pub/Sub) modes.

### Prerequisites for table row inference

Install dependencies (or use `apache_beam/ml/inference/table_row_inference_requirements.txt` from the `sdks/python` directory):

```sh
pip install apache-beam[gcp] scikit-learn google-cloud-pubsub
```

For streaming mode you need a Pub/Sub topic and subscription, a BigQuery dataset, and a GCS bucket for model and temp files.

### Model and data for table row inference

1. Create a scikit-learn model and sample data using the provided utilities:

```sh
python -m apache_beam.examples.inference.table_row_inference_utils --action=create_model --output_path=model.pkl --num_features=3
python -m apache_beam.examples.inference.table_row_inference_utils --action=generate_data --output_path=input_data.jsonl --num_rows=1000 --num_features=3
```

2. Input data should be JSONL with an `id` field and feature columns, for example:

```json
{"id": "row_1", "feature1": 1.5, "feature2": 2.3, "feature3": 3.7}
```

### Running `table_row_inference.py` (batch)

To run the table row inference pipeline in batch mode locally:

```sh
python -m apache_beam.examples.inference.table_row_inference \
--mode=batch \
--input_file=input_data.jsonl \
--output_table=PROJECT:DATASET.predictions \
--model_path=model.pkl \
--feature_columns=feature1,feature2,feature3 \
--runner=DirectRunner
```

### Running `table_row_inference.py` (streaming)

For streaming mode, use a Pub/Sub subscription and DataflowRunner. Set up a topic and subscription first, then run:

```sh
python -m apache_beam.examples.inference.table_row_inference \
--mode=streaming \
--input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \
--output_table=PROJECT:DATASET.predictions \
--model_path=gs://BUCKET/model.pkl \
--feature_columns=feature1,feature2,feature3 \
--runner=DataflowRunner \
--project=PROJECT \
--region=us-central1 \
--temp_location=gs://BUCKET/temp \
--staging_location=gs://BUCKET/staging
```

See the script for full pipeline options (window size, trigger interval, worker settings, etc.).

Output is written to the BigQuery table with columns such as `row_key`, `prediction`, and the original input feature columns.

---
194 changes: 194 additions & 0 deletions sdks/python/apache_beam/examples/inference/table_row_batch_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Complete batch inference example with sample data generation.

This script demonstrates how to use the batch table row inference pipeline
with automatically generated sample data and model.

Usage:
# Run complete local example
python table_row_batch_example.py

# Run with custom parameters
python table_row_batch_example.py --num_rows=1000 --num_features=5
"""

import argparse
import json
import logging
import os
import pickle
import sys
import tempfile

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_sample_data_and_model(tmpdir, num_rows=100, num_features=3):
"""Create sample model and data for testing.

Args:
tmpdir: Temporary directory path
num_rows: Number of data rows to generate
num_features: Number of features per row

Returns:
Tuple of (model_path, data_path, feature_columns)
"""
try:
import numpy as np
from sklearn.linear_model import LinearRegression
except ImportError:
logger.error(
'sklearn and numpy are required. '
'Install with: pip install scikit-learn numpy')
sys.exit(1)

logger.info('Creating sample model with %s features...', num_features)
model = LinearRegression()
X_train = np.random.randn(100, num_features)
y_train = np.sum(X_train, axis=1) + np.random.randn(100) * 0.1
model.fit(X_train, y_train)

model_path = os.path.join(tmpdir, 'model.pkl')
with open(model_path, 'wb') as f:
pickle.dump(model, f)
logger.info(' ✓ Model saved to %s', model_path)

logger.info('Generating %s sample data rows...', num_rows)
data_path = os.path.join(tmpdir, 'input_data.jsonl')
feature_columns = [f'feature{i+1}' for i in range(num_features)]

with open(data_path, 'w') as f:
for i in range(num_rows):
row = {'id': f'row_{i}'}
for col in feature_columns:
row[col] = float(np.random.randn())
f.write(json.dumps(row) + '\n')
logger.info(' ✓ Data saved to %s', data_path)

return model_path, data_path, feature_columns


def run_example(num_rows=100, num_features=3):
"""Run complete batch inference example.

Args:
num_rows: Number of data rows to generate
num_features: Number of features per row
"""
logger.info('=' * 70)
logger.info('BATCH TABLE ROW INFERENCE - COMPLETE EXAMPLE')
logger.info('=' * 70)

with tempfile.TemporaryDirectory() as tmpdir:
logger.info('\n[Step 1/4] Creating sample model and data...')
model_path, data_path, feature_columns = create_sample_data_and_model(
tmpdir, num_rows, num_features)

logger.info('\n[Step 2/4] Setting up output paths...')
output_file = os.path.join(tmpdir, 'predictions')
logger.info(' Output file: %s.jsonl', output_file)

logger.info('\n[Step 3/4] Running batch inference pipeline...')
logger.info(' Features: %s', feature_columns)

cmd = [
sys.executable,
'table_row_inference.py',
'--mode=batch',
f'--input_file={data_path}',
f'--model_path={model_path}',
f'--feature_columns={",".join(feature_columns)}',
f'--output_file={output_file}',
'--runner=DirectRunner'
]

logger.info(' Command: %s', ' '.join(cmd))

import subprocess
script_dir = os.path.dirname(os.path.abspath(__file__))
result = subprocess.run(
cmd, capture_output=True, text=True, check=False, cwd=script_dir)

if result.returncode != 0:
logger.error('Pipeline failed!')
logger.error(result.stderr)
sys.exit(1)

logger.info(' ✓ Pipeline completed successfully!')

logger.info('\n[Step 4/4] Viewing results...')
output_path = '%s.jsonl' % output_file

if os.path.exists(output_path):
with open(output_path, 'r') as f:
lines = f.readlines()

logger.info(' Total predictions: %s', len(lines))
logger.info('\n Sample predictions (first 5):')

for i, line in enumerate(lines[:5]):
prediction = json.loads(line)
feats = [prediction[f'input_{f}'] for f in feature_columns][:3]
logger.info(
' %s. Key: %10s | Prediction: %8.4f | Features: %s...',
i + 1,
prediction['row_key'],
prediction['prediction'],
feats)

logger.info('\n Full output saved to: %s', output_path)
else:
logger.warning(' Output file not found: %s', output_path)

logger.info('\n' + '=' * 70)
logger.info('EXAMPLE COMPLETED SUCCESSFULLY!')
logger.info('=' * 70)
logger.info('\n📖 Next steps:')
logger.info(' 1. Review the generated predictions above')
logger.info(
' 2. Try with your own data: '
'python table_row_inference.py --help')
logger.info(' 3. Deploy to Dataflow with --runner=DataflowRunner')
logger.info('\n✨ You now understand batch table row inference!')


def main():
parser = argparse.ArgumentParser(
description='Run batch table row inference example')
parser.add_argument(
'--num_rows',
type=int,
default=100,
help='Number of data rows to generate (default: 100)')
parser.add_argument(
'--num_features',
type=int,
default=3,
help='Number of features per row (default: 3)')

args = parser.parse_args()

run_example(num_rows=args.num_rows, num_features=args.num_features)


if __name__ == '__main__':
main()
Loading
Loading