Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update readme, examples and IT tests for table creation. #197

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Next

* Support creation of new table in BigQuery sink. This is integrated with Datastream and Table/SQL API.
* Remove need for BigQuerySchemaProvider in BigQuery sink configs.
* Deprecate unbounded source. To be completely removed in next release.

## 0.4.0 - 2024-11-04

* Support exactly-once consistency in BigQuery sink. This is integrated with Datastream and Table/SQL API.
Expand Down
428 changes: 224 additions & 204 deletions README.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cloudbuild/nightly/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ steps:

# 4. Start the nested schema table e2e test.
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly'
id: 'e2e-bounded-nested-schema-table-test'
id: 'e2e-bounded-nested-schema-test'
waitFor: ['create-clusters-bounded-small-table']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_nested_schema_table_test']
args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_nested_schema_test']
env:
- 'GCS_JAR_LOCATION=${_GCS_JAR_LOCATION}'
- 'PROJECT_ID=${_PROJECT_ID}'
Expand All @@ -120,7 +120,7 @@ steps:
# 5. Table API nested schema table e2e test.
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly'
id: 'e2e-bounded-table-api-nested-schema-test'
waitFor: ['e2e-bounded-nested-schema-table-test']
waitFor: ['e2e-bounded-nested-schema-test']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_table_api_nested_schema_test']
env:
Expand Down
41 changes: 16 additions & 25 deletions cloudbuild/nightly/nightly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ run_read_write_test(){
SINK_PARALLELISM=${11}
# Take default value = false in case not provided.
IS_SQL=${12:-False}
ENABLE_TABLE_CREATION=${13:-False}
# Get the final region and the cluster name.
export REGION=$(cat "$REGION_FILE")
export CLUSTER_NAME=$(cat "$CLUSTER_FILE")

# Run the simple bounded write table test.
source cloudbuild/nightly/scripts/table_write.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$SOURCE" "$DESTINATION_TABLE_NAME" "$IS_EXACTLY_ONCE_ENABLED" "$MODE" "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL"
# Run the test.
source cloudbuild/nightly/scripts/table_write.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$SOURCE" "$DESTINATION_TABLE_NAME" "$IS_EXACTLY_ONCE_ENABLED" "$MODE" "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$ENABLE_TABLE_CREATION"
}

# Function to run the test to check BQ Table Read and Write.
Expand Down Expand Up @@ -144,75 +145,65 @@ case $STEP in
;;

# Run the nested schema bounded e2e test.
e2e_bounded_nested_schema_table_test)
jayehwhyehentee marked this conversation as resolved.
Show resolved Hide resolved
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB"
e2e_bounded_nested_schema_test)
IS_SQL=False
ENABLE_TABLE_CREATION=True
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB"
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION"
exit
;;

# Run the nested schema bounded Table API e2e test.
e2e_bounded_table_api_nested_schema_test)
IS_SQL=True
ENABLE_TABLE_CREATION=True
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL"
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_COMPLEX_SCHEMA_TABLE" "$TABLE_NAME_DESTINATION_COMPLEX_SCHEMA_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION"
exit
;;

# Run the all datatypes bounded Table API e2e test.
e2e_bounded_table_api_all_datatypes_test)
IS_SQL=True
ENABLE_TABLE_CREATION=True
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL"
run_read_write_test "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_ALL_DATATYPES_TABLE" "$TABLE_NAME_DESTINATION_ALL_DATATYPES_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB" "$SINK_PARALLELISM_SMALL_BOUNDED_JOB" "$IS_SQL" "$ENABLE_TABLE_CREATION"
exit
;;

# Run the query bounded e2e test.
# Run the query bounded e2e test.
e2e_bounded_query_test)
run_read_only_test_delete_cluster "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "" "" "$QUERY" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB"
exit
;;

# Run the large table O(GB's) bounded e2e test.
# Run the large table bounded e2e test.
e2e_bounded_large_table_test)
# Run the large table test.
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_LARGE_TABLE_TEST_FILE" "$CLUSTER_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_LARGE_TABLE" "$TABLE_NAME_DESTINATION_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_LARGE_TABLE_TEST_FILE" "$CLUSTER_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_LARGE_TABLE" "$TABLE_NAME_DESTINATION_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB"
exit
;;

# Run the large table O(GB's) bounded e2e test.
# Run the Table API large table bounded e2e test.
e2e_bounded_table_api_large_table_test)
# Run the large table test.
IS_SQL=True
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_TABLE_API_LARGE_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_TABLE_API_LARGE_TABLE" "$TABLE_NAME_DESTINATION_TABLE_API_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" "$IS_SQL"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_TABLE_API_LARGE_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_LARGE_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME_SOURCE_TABLE_API_LARGE_TABLE" "$TABLE_NAME_DESTINATION_TABLE_API_LARGE_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "bounded" "$PROPERTIES_LARGE_BOUNDED_JOB" "$SINK_PARALLELISM_LARGE_BOUNDED_JOB" "$IS_SQL"
exit
;;

# Run the unbounded table e2e test.
# Run the unbounded e2e test.
e2e_unbounded_test)
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB"
exit
;;

# Run the unbounded table e2e test.
# Run the Table API unbounded e2e test.
e2e_table_api_unbounded_test)
jayehwhyehentee marked this conversation as resolved.
Show resolved Hide resolved
IS_SQL=True
IS_EXACTLY_ONCE_ENABLED=False
run_read_write_test "$PROJECT_ID" "$REGION_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" "$IS_SQL"
IS_EXACTLY_ONCE_ENABLED=True
run_read_write_test_delete_cluster "$PROJECT_ID" "$REGION_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$CLUSTER_TABLE_API_UNBOUNDED_TABLE_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "$GCS_SOURCE_URI" "$TABLE_NAME_DESTINATION_UNBOUNDED_TABLE" "$IS_EXACTLY_ONCE_ENABLED" "unbounded" "$PROPERTIES_UNBOUNDED_JOB" "$SINK_PARALLELISM_UNBOUNDED_JOB" "$IS_SQL"
exit
Expand Down
15 changes: 10 additions & 5 deletions cloudbuild/nightly/scripts/bounded_table_write.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ PROPERTIES=$1
BOUNDED_JOB_SINK_PARALLELISM=$2
IS_SQL=$3
IS_EXACTLY_ONCE_ENABLED=$4
ENABLE_TABLE_CREATION=$5

# We won't run this async as we can wait for a bounded job to succeed or fail.
# Create the destination table from the source table schema.
python3 cloudbuild/nightly/scripts/python-scripts/create_sink_table.py -- --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --source_table_name "$SOURCE" --destination_table_name "$DESTINATION_TABLE_NAME"
# Set the expiration time to 1 hour.
bq update --expiration 3600 "$DATASET_NAME"."$DESTINATION_TABLE_NAME"
if [ "$ENABLE_TABLE_CREATION" == False ]
then
echo "Creating destination table before test"
# Create the destination table from the source table schema.
python3 cloudbuild/nightly/scripts/python-scripts/create_sink_table.py -- --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --source_table_name "$SOURCE" --destination_table_name "$DESTINATION_TABLE_NAME"
# Set the expiration time to 1 hour.
bq update --expiration 3600 "$DATASET_NAME"."$DESTINATION_TABLE_NAME"
fi
# Run the sink JAR JOB
gcloud dataproc jobs submit flink --id "$JOB_ID" --jar="$GCS_JAR_LOCATION" --cluster="$CLUSTER_NAME" --region="$REGION" --properties="$PROPERTIES" -- --gcp-source-project "$PROJECT_NAME" --bq-source-dataset "$DATASET_NAME" --bq-source-table "$SOURCE" --gcp-dest-project "$PROJECT_NAME" --bq-dest-dataset "$DATASET_NAME" --bq-dest-table "$DESTINATION_TABLE_NAME" --sink-parallelism "$BOUNDED_JOB_SINK_PARALLELISM" --is-sql "$IS_SQL" --exactly-once "$IS_EXACTLY_ONCE_ENABLED"
gcloud dataproc jobs submit flink --id "$JOB_ID" --jar="$GCS_JAR_LOCATION" --cluster="$CLUSTER_NAME" --region="$REGION" --properties="$PROPERTIES" -- --gcp-source-project "$PROJECT_NAME" --bq-source-dataset "$DATASET_NAME" --bq-source-table "$SOURCE" --gcp-dest-project "$PROJECT_NAME" --bq-dest-dataset "$DATASET_NAME" --bq-dest-table "$DESTINATION_TABLE_NAME" --sink-parallelism "$BOUNDED_JOB_SINK_PARALLELISM" --is-sql "$IS_SQL" --exactly-once "$IS_EXACTLY_ONCE_ENABLED" --enable-table-creation "$ENABLE_TABLE_CREATION"
prashastia marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion cloudbuild/nightly/scripts/table_write.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ MODE=$9
PROPERTIES=${10}
SINK_PARALLELISM=${11}
IS_SQL=${12}
ENABLE_TABLE_CREATION=${13}
set -euxo pipefail
gcloud config set project "$PROJECT_ID"

Expand Down Expand Up @@ -55,7 +56,7 @@ then
echo "At least once is Enabled!"
DESTINATION_TABLE_NAME="$DESTINATION_TABLE_NAME"-ALO
fi
source cloudbuild/nightly/scripts/bounded_table_write.sh "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$IS_EXACTLY_ONCE_ENABLED"
source cloudbuild/nightly/scripts/bounded_table_write.sh "$PROPERTIES" "$SINK_PARALLELISM" "$IS_SQL" "$IS_EXACTLY_ONCE_ENABLED" "$ENABLE_TABLE_CREATION"
elif [ "$MODE" == "unbounded" ]
then
echo [LOGS: "$PROJECT_NAME" "$SOURCE" Write Test in Unbounded Mode] Created JOB ID: "$JOB_ID"
Expand Down
Loading
Loading