From 5d39d73b4a1c1ac620ac855201b980eeeae10c01 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Wed, 14 Jan 2026 14:45:35 +0530 Subject: [PATCH 01/11] Add AWS glue community contributed connector --- .../aws-glue-connector/Dockerfile | 35 +++++ .../aws-glue-connector/README.md | 79 +++++++++++ .../build_and_push_docker.sh | 95 +++++++++++++ .../aws-glue-connector/config.json | 10 ++ .../aws-glue-connector/main.py | 8 ++ .../aws-glue-connector/pyspark_job.py | 74 ++++++++++ .../aws-glue-connector/request.json | 19 +++ .../aws-glue-connector/requirements.txt | 5 + .../scripts/grant_SA_dataproc_roles.sh | 37 +++++ .../aws-glue-connector/src/.DS_Store | Bin 0 -> 6148 bytes .../src/aws_glue_connector.py | 91 +++++++++++++ .../aws-glue-connector/src/bootstrap.py | 51 +++++++ .../aws-glue-connector/src/constants.py | 23 ++++ .../aws-glue-connector/src/entry_builder.py | 128 ++++++++++++++++++ .../aws-glue-connector/src/gcs_uploader.py | 34 +++++ .../aws-glue-connector/src/name_builder.py | 47 +++++++ .../aws-glue-connector/src/secret_manager.py | 22 +++ 17 files changed, 758 insertions(+) create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/Dockerfile create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md create mode 100755 managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/main.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/pyspark_job.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/grant_SA_dataproc_roles.sh create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/.DS_Store create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/bootstrap.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/gcs_uploader.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/secret_manager.py diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/Dockerfile b/managed-connectivity/community-contributed-connectors/aws-glue-connector/Dockerfile new file mode 100644 index 00000000..a687432a --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/Dockerfile @@ -0,0 +1,35 @@ +# Step 1: Use an official OpenJDK base image, as Spark requires Java +FROM openjdk:11-jre-slim + +# Step 2: Set environment variables for Spark and Python +ENV SPARK_VERSION=3.5.0 +ENV HADOOP_VERSION=3 +ENV SPARK_HOME=/opt/spark +ENV PATH=$SPARK_HOME/bin:$PATH +ENV PYTHONUNBUFFERED=1 + +# Step 3: Install Python, pip, and other necessary tools +RUN apt-get update && \ + apt-get install -y python3 python3-pip curl && \ + rm -rf /var/lib/apt/lists/* + +# Step 4: Download and install Spark +RUN curl -fSL "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -o /tmp/spark.tgz && \ + tar -xvf /tmp/spark.tgz -C /opt/ && \ + mv /opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME} && \ + rm /tmp/spark.tgz + +# Step 5: Set up the application directory +WORKDIR /app + +# Step 6: Copy and install Python dependencies +COPY requirements.txt . +RUN pip3 install --no-cache-dir -r requirements.txt + +# Step 7: Copy your application source code +COPY src ./src +COPY config.json . +COPY pyspark_job.py . + +# Step 8: Define the entry point for running the PySpark job +ENTRYPOINT ["spark-submit", "pyspark_job.py"] diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md new file mode 100644 index 00000000..db98b96e --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -0,0 +1,79 @@ +# AWS Glue to Google Cloud Dataplex Connector + +This connector extracts metadata from AWS Glue and transforms it into a format that can be imported into Google Cloud Dataplex. It captures database, table, and lineage information from AWS Glue and prepares it for ingestion into Dataplex, allowing you to catalog your AWS data assets within Google Cloud. + +This connector is designed to be run from a Python virtual environment. + +*** + +## Prerequisites + +Before using this connector, you need to have the following set up: + +1. **AWS Credentials**: You will need an AWS access key ID and a secret access key with permissions to access AWS Glue. +2. **Google Cloud Project**: A Google Cloud project is required to run the script and store the output. +3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. The secret should be a single string with the access key ID and secret access key, separated by a comma (e.g., `"your_access_key_id,your_secret_access_key"`). +4. **Python 3** and **pip** installed. + +*** + +## Configuration + +The connector is configured using the `config.json` file. Here is a description of the parameters: + +| Parameter | Description | +| :--- | :--- | +| **`aws_region`** | The AWS region where your Glue Data Catalog is located (e.g., "eu-north-1"). | +| **`project_id`** | Your Google Cloud Project ID. | +| **`location_id`** | The Google Cloud region where you want to run the script (e.g., "us-central1"). | +| **`entry_group_id`** | The Dataplex entry group ID where the metadata will be imported. | +| **`gcs_bucket`** | The Google Cloud Storage bucket where the output metadata file will be stored. | +| **`aws_account_id`** | Your AWS account ID. | +| **`output_folder`** | The folder within the GCS bucket where the output file will be stored. | +| **`gcp_secret_id`** | The ID of the secret in GCP Secret Manager that contains your AWS credentials. | + +*** + +## Running the Connector + +You can run the connector from your local machine using a Python virtual environment. + +### Setup and Execution + +1. **Create a virtual environment:** + ```bash + python3 -m venv venv + source venv/bin/activate + ``` +2. **Install the required dependencies:** + ```bash + pip install -r requirements.txt + ``` +3. **Run the connector:** + Execute the `main.py` script and provide the path to your configuration file. + ```bash + python3 main.py --config=config.json + ``` + +*** + +## Output + +The connector generates a JSONL file in the specified GCS bucket and folder. This file contains the extracted metadata in a format that can be imported into Dataplex. + +*** + +## Importing Metadata into Dataplex + +Once the metadata file has been generated, you can import it into Dataplex using a metadata import job. You can use the provided `request.json` file as a template for the import request. + +The `request.json` file specifies the source GCS URI for the metadata, the entry group to import into, and other settings for the import job. + +To initiate the import, use the following `curl` command. Make sure to replace the placeholders `{project-id}`, `{location}`, and `{job-id}` with your actual project ID, location, and a unique job ID. + +```bash +curl -X POST \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + -H "Content-Type: application/json; charset=utf-8" \ + -d @request.json \ + "[https://dataplex.googleapis.com/v1/projects/](https://dataplex.googleapis.com/v1/projects/){project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh b/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh new file mode 100755 index 00000000..6efa8391 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh @@ -0,0 +1,95 @@ +#!/bin/bash + +# Terminate script on error +set -e + +# --- Read script arguments --- +POSITIONAL=() +while [[ $# -gt 0 ]] +do +key="$1" + +case $key in + -p|--project_id) + PROJECT_ID="$2" + shift # past argument + shift # past value + ;; + -r|--repo) + REPO="$2" + shift # past argument + shift # past value + ;; + -i|--image_name) + IMAGE_NAME="$2" + shift # past argument + shift # past value + ;; + *) # unknown option + POSITIONAL+=("$1") # save it in an array for later + shift # past argument + ;; +esac +done +set -- "${POSITIONAL[@]}" # restore positional parameters + +# --- Validate arguments --- +if [ -z "$PROJECT_ID" ]; then + echo "Project ID not provided. Please provide project ID with the -p flag." + exit 1 +fi + +if [ -z "$REPO" ]; then + # Default to gcr.io/[PROJECT_ID] if no repo is provided + REPO="gcr.io/${PROJECT_ID}" + echo "Repository not provided, defaulting to: ${REPO}" +fi + +if [ -z "$IMAGE_NAME" ]; then + IMAGE_NAME="aws-glue-to-dataplex-pyspark" + echo "Image name not provided, defaulting to: ${IMAGE_NAME}" +fi + +IMAGE_TAG="latest" +IMAGE_URI="${REPO}/${IMAGE_NAME}:${IMAGE_TAG}" + +# --- Build the Docker Image --- +echo "Building Docker image: ${IMAGE_URI}..." +# Use the Dockerfile for PySpark +docker build -t "${IMAGE_URI}" -f Dockerfile.pyspark . + +if [ $? -ne 0 ]; then + echo "Docker build failed." + exit 1 +fi +echo "Docker build successful." + +# --- Run the Docker Container --- +echo "Running the PySpark job in a Docker container..." +echo "Using local gcloud credentials for authentication." + +# We mount the local gcloud config directory into the container. +# This allows the container to use your Application Default Credentials. +# Make sure you have run 'gcloud auth application-default login' on your machine. +docker run --rm \ + -v ~/.config/gcloud:/root/.config/gcloud \ + "${IMAGE_URI}" + +if [ $? -ne 0 ]; then + echo "Docker run failed." + exit 1 +fi + +echo "PySpark job completed successfully." + +# --- Optional: Push to Google Container Registry --- +read -p "Do you want to push the image to ${REPO}? (y/n) " -n 1 -r +echo +if [[ $REPLY =~ ^[Yy]$ ]] +then + echo "Pushing image to ${REPO}..." + gcloud auth configure-docker + docker push "${IMAGE_URI}" + echo "Image pushed successfully." +fi + diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json b/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json new file mode 100644 index 00000000..ee17b3ea --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json @@ -0,0 +1,10 @@ +{ + "aws_region": "eu-north-1", + "project_id": "gcve-demo-408018", + "location_id": "us-central1", + "entry_group_id": "aws-glue-assets", + "gcs_bucket": "udp-test-sp", + "aws_account_id": "003083320909", + "output_folder": "aws_output", + "gcp_secret_id": "aws-glue-secret" +} \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/main.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/main.py new file mode 100644 index 00000000..d6bfdeae --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/main.py @@ -0,0 +1,8 @@ +import sys +from src import bootstrap + +# Allow shared files to be found when running from command line +sys.path.insert(1, '../src/shared') + +if __name__ == '__main__': + bootstrap.run() diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/pyspark_job.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/pyspark_job.py new file mode 100644 index 00000000..f884398d --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/pyspark_job.py @@ -0,0 +1,74 @@ +import json +from pyspark.sql import SparkSession +from src.aws_glue_connector import AWSGlueConnector +from src.entry_builder import build_database_entry, build_dataset_entry +from src.gcs_uploader import GCSUploader +from src.secret_manager import SecretManager + +def main(): + """ + Main function to run the AWS Glue to Dataplex metadata connector as a PySpark job. + """ + # Initialize Spark Session + spark = SparkSession.builder.appName("AWSGlueToDataplexConnector").getOrCreate() + + # Load configuration from a local file + # In a real cluster environment, this might be passed differently + with open('config.json', 'r') as f: + config = json.load(f) + + print("Configuration loaded.") + + # Fetch AWS credentials from Secret Manager + print("Fetching AWS credentials from GCP Secret Manager...") + aws_access_key_id, aws_secret_access_key = SecretManager.get_aws_credentials( + project_id=config["project_id"], + secret_id=config["gcp_secret_id"] + ) + print("Credentials fetched successfully.") + + # Initialize AWS Glue Connector + glue_connector = AWSGlueConnector( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_region=config['aws_region'] + ) + + # Fetch metadata and lineage + print("Fetching metadata from AWS Glue...") + metadata = glue_connector.get_databases() + print(f"Found {len(metadata)} databases.") + + print("Fetching lineage info from AWS Glue jobs...") + lineage_info = glue_connector.get_lineage_info() + print(f"Found {len(lineage_info)} lineage relationships.") + + # Prepare entries for Dataplex + dataplex_entries = [] + for db_name, tables in metadata.items(): + dataplex_entries.append(build_database_entry(config, db_name)) + for table in tables: + dataplex_entries.append(build_dataset_entry(config, db_name, table, lineage_info)) + + print(f"Prepared {len(dataplex_entries)} entries for Dataplex.") + + # Initialize GCSUploader + gcs_uploader = GCSUploader( + project_id=config['project_id'], + bucket_name=config['gcs_bucket'] + ) + + # Upload to GCS + print(f"Uploading entries to GCS bucket: {config['gcs_bucket']}/{config['output_folder']}...") + gcs_uploader.upload_entries( + entries=dataplex_entries, + aws_region=config['aws_region'], + output_folder=config['output_folder'] + ) + print("Upload complete.") + + # Stop the Spark Session + spark.stop() + +if __name__ == '__main__': + main() diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json b/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json new file mode 100644 index 00000000..1b5218f2 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json @@ -0,0 +1,19 @@ +{ + "type": "IMPORT", + "import_spec": { + "source_storage_uri": "gs://udp-test-sp/aws_output/", + "entry_sync_mode": "FULL", + "aspect_sync_mode": "INCREMENTAL", + "log_level": "DEBUG", + "scope": { + "entry_groups": ["projects/gcve-demo-408018/locations/us-central1/entryGroups/aws-glue-assets"], + "entry_types": [ + "projects/gcve-demo-408018/locations/us-central1/entryTypes/aws-glue-table" + ], + "aspect_types": [ + "projects/dataplex-types/locations/global/aspectTypes/schema", + "projects/gcve-demo-408018/locations/us-central1/aspectTypes/aws-glue-table" + ] + } + } +} \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt b/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt new file mode 100644 index 00000000..a3a54826 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt @@ -0,0 +1,5 @@ +google-cloud-dataplex>=2.4.0 +boto3 +google-cloud-secret-manager +google-cloud-storage +pyspark==3.5.0 diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/grant_SA_dataproc_roles.sh b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/grant_SA_dataproc_roles.sh new file mode 100644 index 00000000..42077d17 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/grant_SA_dataproc_roles.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Set variables +PROJECT_ID="your-project-id" # Replace with your Google Cloud project ID +SERVICE_ACCOUNT_EMAIL="your-service-account@your-project-id.iam.gserviceaccount.com" # Replace with your service account email + +# Roles to be granted for running Dataplex metadata extract as Dataproc Serveless job +ROLES=( + "roles/dataplex.catalogEditor" + "roles/dataplex.entryGroupOwner" + "roles/dataplex.metadataJobOwner" + "roles/dataproc.admin" + "roles/dataproc.editor" + "roles/dataproc.worker" + "roles/iam.serviceAccountUser" + "roles/logging.logWriter" + "roles/secretmanager.secretAccessor" + "roles/workflows.invoker" +) + +# Loop through the roles and grant each one +for ROLE in "${ROLES[@]}"; do + echo "Granting role: $ROLE to service account: $SERVICE_ACCOUNT_EMAIL" + + gcloud projects add-iam-policy-binding "$PROJECT_ID" \ + --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \ + --role="$ROLE" + + if [[ $? -eq 0 ]]; then + echo "Successfully granted $ROLE" + else + echo "Error granting $ROLE. Check the gcloud command above for details." + exit 1 # Exit script with error if any role grant fails. + fi +done + +echo "Finished granting roles." diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/.DS_Store b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..f25b6f09cc3b46a6a2d229f4105bcc6ca237580d GIT binary patch literal 6148 zcmeHKK~BRk5FEEX6eyxdAaOy;3o7x0P|ATb4}hetDx|cj(pIRq+<6S*F+7G7%-R;B zPKw}y0PR-xIL?lDJ(2Aifa$MBXFwM~n?^ehzVQc`iOW8m>n`DQ;C-lC2~!s<}~1S89}@YtQj@s zu64(~YinHd5bzJE)zE6T;v6&P@JZI@rNW-kf65%b5W6u6*oTZO-$k@qa)ha!dBTjg zsMfp)MmQobXWlntd_#riQ!ox6)&8skJhR2xdk(c$1yli5;9UXPA0igP*kkF?ZXK-b z6@b`cw>7rqE+L%AW9+eX$Tu|Os6FIRnVg)p5z8%$nD~_r lErpdoj%`DZ;u98Yd>2WB7<()ovWKRB1gs2NsRBQ$z#DPZpVI&U literal 0 HcmV?d00001 diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py new file mode 100644 index 00000000..edd47e4b --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py @@ -0,0 +1,91 @@ +import boto3 +import re + +class AWSGlueConnector: + def __init__(self, aws_access_key_id, aws_secret_access_key, aws_region): + self.access_key_id = self._clean_credential(aws_access_key_id) + self.secret_access_key = self._clean_credential(aws_secret_access_key) + self.region = aws_region.strip() + + try: + self.__glue_client = boto3.client( + 'glue', + region_name=self.region, + aws_access_key_id=self.access_key_id, + aws_secret_access_key=self.secret_access_key + ) + except Exception as e: + raise ValueError(f"Failed to create AWS Glue client: {e}") + + def _clean_credential(self, credential): + """Clean and validate credential string""" + if not credential: + raise ValueError("Empty credential provided") + cleaned = re.sub(r'[\r\n\t\s]', '', credential) + if not cleaned or len(cleaned) < 10: + raise ValueError("Invalid credential format") + return cleaned + + def get_databases(self, include_databases=None): + """Fetches metadata from AWS Glue Data Catalog.""" + if include_databases is None: + include_databases = [] + metadata = {} + try: + paginator = self.__glue_client.get_paginator('get_databases') + for page in paginator.paginate(): + for db in page['DatabaseList']: + db_name = db['Name'] + if not include_databases or db_name in include_databases: + metadata[db_name] = self._get_tables(db_name) + except Exception as e: + raise RuntimeError(f"Failed to get databases from AWS Glue: {e}") + return metadata + + def _get_tables(self, db_name): + """Fetches tables from a specific database.""" + tables = [] + try: + paginator = self.__glue_client.get_paginator('get_tables') + for page in paginator.paginate(DatabaseName=db_name): + tables.extend(page['TableList']) + except Exception as e: + raise RuntimeError(f"Failed to get tables from AWS Glue for database {db_name}: {e}") + return tables + + def get_lineage_info(self): + """ + Scans AWS Glue jobs to derive lineage information by inspecting job run graphs. + Returns a dictionary mapping target table names to a list of their source table names. + """ + lineage_map = {} + paginator = self.__glue_client.get_paginator('get_jobs') + + print("Fetching lineage info from AWS Glue jobs...") + try: + for page in paginator.paginate(): + for job in page['Jobs']: + job_name = job['Name'] + job_runs = self.__glue_client.get_job_runs(JobName=job_name) + for job_run in job_runs.get('JobRuns', []): + if job_run.get('JobRunState') == 'SUCCEEDED': + graph = self.__glue_client.get_dataflow_graph(PythonScript=job['Command']['ScriptLocation']) + if graph: + sources = [edge['Source'] for edge in graph.get('Edges', [])] + targets = [edge['Target'] for edge in graph.get('Edges', [])] + for i, target_id in enumerate(targets): + target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) + if target_node and target_node['NodeType'] == 'DataSink': + target_table_name = target_node.get('Name') + source_id = sources[i] + source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) + if source_node and source_node['NodeType'] == 'DataSource': + source_table_name = source_node.get('Name') + if target_table_name not in lineage_map: + lineage_map[target_table_name] = [] + lineage_map[target_table_name].append(source_table_name) + except Exception as e: + print(f"Warning: Could not fetch lineage information. Error: {e}") + + print(f"Found {len(lineage_map)} lineage relationships.") + return lineage_map \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/bootstrap.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/bootstrap.py new file mode 100644 index 00000000..28ce60ec --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/bootstrap.py @@ -0,0 +1,51 @@ +import json +from src.aws_glue_connector import AWSGlueConnector +from src.entry_builder import build_database_entry, build_dataset_entry +from src.gcs_uploader import GCSUploader +from src.secret_manager import SecretManager + +def run(): + # Load configuration + with open('config.json', 'r') as f: + config = json.load(f) + + # Fetch AWS credentials from Secret Manager + aws_access_key_id, aws_secret_access_key = SecretManager.get_aws_credentials( + project_id=config["project_id"], + secret_id=config["gcp_secret_id"] + ) + + # Initialize AWS Glue Connector + glue_connector = AWSGlueConnector( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_region=config['aws_region'] + ) + + # Fetch metadata and lineage + metadata = glue_connector.get_databases() + lineage_info = glue_connector.get_lineage_info() + + # Prepare entries for Dataplex + dataplex_entries = [] + for db_name, tables in metadata.items(): + dataplex_entries.append(build_database_entry(config, db_name)) + for table in tables: + dataplex_entries.append(build_dataset_entry(config, db_name, table, lineage_info)) + + # Initialize GCSUploader + gcs_uploader = GCSUploader( + project_id=config['project_id'], + bucket_name=config['gcs_bucket'] + ) + + # Upload to GCS using the correct method + gcs_uploader.upload_entries( + entries=dataplex_entries, + aws_region=config['aws_region'], + output_folder=config['output_folder'] + ) + print(f"Successfully uploaded entries to GCS bucket: {config['gcs_bucket']}/{config['output_folder']}") + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py new file mode 100644 index 00000000..f6633ae7 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py @@ -0,0 +1,23 @@ +"""Constants that are used in the different files.""" +import enum + +SOURCE_TYPE = "aws_glue" + +# Short keys for the aspects map +SCHEMA_ASPECT_KEY = "dataplex-types.global.schema" +LINEAGE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-lineage-aspect" + +# New keys for custom marker aspects +DATABASE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-database" +TABLE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-table" +VIEW_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-view" + +# Full paths for the aspect_type field +SCHEMA_ASPECT_PATH = "projects/dataplex-types/locations/global/aspectTypes/schema" +LINEAGE_ASPECT_PATH = "projects/{project}/locations/{location}/aspectTypes/aws-lineage-aspect" + +class EntryType(enum.Enum): + """Types of AWS Glue entries.""" + DATABASE: str = "projects/{project}/locations/{location}/entryTypes/aws-glue-database" + TABLE: str = "projects/{project}/locations/{location}/entryTypes/aws-glue-table" + VIEW: str = "projects/{project}/locations/{location}/entryTypes/aws-glue-view" \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py new file mode 100644 index 00000000..07d82150 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py @@ -0,0 +1,128 @@ +import re +from src.constants import * +import src.name_builder as nb + +def choose_metadata_type(data_type: str): + """Choose the metadata type based on AWS Glue native type.""" + data_type = data_type.lower() + if data_type in ['integer', 'int', 'smallint', 'tinyint', 'bigint', 'long', 'float', 'double', 'decimal']: + return "NUMBER" + if 'char' in data_type or 'string' in data_type: + return "STRING" + if data_type in ['binary', 'array', 'struct', 'map']: + return "BYTES" + if data_type == 'timestamp': + return "TIMESTAMP" + if data_type == 'date': + return "DATE" + return "OTHER" + +def build_database_entry(config, db_name): + """Builds a database entry, mimicking the successful Oracle format.""" + entry_type = EntryType.DATABASE + full_entry_type = entry_type.value.format( + project=config["project_id"], + location=config["location_id"]) + + aspects = { + DATABASE_ASPECT_KEY: { + "aspect_type": DATABASE_ASPECT_KEY, + "data": {} + } + } + + entry = { + "name": nb.create_name(config, entry_type, db_name), + "fully_qualified_name": nb.create_fqn(config, entry_type, db_name), + "entry_type": full_entry_type, + "aspects": aspects + } + return { + "entry": entry, + "aspect_keys": [DATABASE_ASPECT_KEY], + "update_mask": "aspects" + } + +def build_dataset_entry(config, db_name, table_info, job_lineage): + """Builds a table or view entry, mimicking the successful Oracle format.""" + table_name = table_info['Name'] + table_type = table_info.get('TableType') + + entry_type = EntryType.VIEW if table_type == 'VIRTUAL_VIEW' else EntryType.TABLE + + # --- Build Schema Aspect --- + columns = [] + if 'StorageDescriptor' in table_info and 'Columns' in table_info['StorageDescriptor']: + for col in table_info['StorageDescriptor']['Columns']: + columns.append({ + "name": col.get("Name"), + "dataType": col.get("Type"), + "mode": "NULLABLE", + "metadataType": choose_metadata_type(col.get("Type", "")) + }) + + aspects = { + SCHEMA_ASPECT_KEY: { + "aspect_type": SCHEMA_ASPECT_PATH, + "data": { "fields": columns } + } + } + aspect_keys = [SCHEMA_ASPECT_KEY] + + # --- Add Custom Marker Aspect --- + if entry_type == EntryType.TABLE: + aspects[TABLE_ASPECT_KEY] = {"aspect_type": TABLE_ASPECT_KEY, "data": {}} + aspect_keys.append(TABLE_ASPECT_KEY) + elif entry_type == EntryType.VIEW: + aspects[VIEW_ASPECT_KEY] = {"aspect_type": VIEW_ASPECT_KEY, "data": {}} + aspect_keys.append(VIEW_ASPECT_KEY) + + # --- Build Lineage Aspect --- + source_assets = [] + if entry_type == EntryType.VIEW and 'ViewOriginalText' in table_info: + sql = table_info['ViewOriginalText'] + source_tables = re.findall(r'(?:FROM|JOIN)\s+`?(\w+)`?', sql, re.IGNORECASE) + source_assets.extend(set(source_tables)) + + if table_name in job_lineage: + source_assets.extend(job_lineage[table_name]) + + if source_assets: + full_lineage_aspect_path = LINEAGE_ASPECT_PATH.format( + project=config["project_id"], + location=config["location_id"] + ) + lineage_aspect = { + LINEAGE_ASPECT_KEY: { + "aspect_type": full_lineage_aspect_path, + "data": { + "links": [{ + "source": { "fully_qualified_name": nb.create_fqn(config, EntryType.TABLE, db_name, src) }, + "target": { "fully_qualified_name": nb.create_fqn(config, entry_type, db_name, table_name) } + } for src in set(source_assets)] + } + } + } + aspects.update(lineage_aspect) + aspect_keys.append(LINEAGE_ASPECT_KEY) + + # --- Build General Entry Info --- + full_entry_type = entry_type.value.format( + project=config["project_id"], + location=config["location_id"]) + parent_name = nb.create_parent_name(config, entry_type, db_name) + + entry = { + "name": nb.create_name(config, entry_type, db_name, table_name), + "fully_qualified_name": nb.create_fqn(config, entry_type, db_name, table_name), + "parent_entry": parent_name, + "entry_type": full_entry_type, + "entry_source": { "display_name": table_name, "system": SOURCE_TYPE }, + "aspects": aspects + } + + return { + "entry": entry, + "aspect_keys": list(set(aspect_keys)), + "update_mask": "aspects" + } \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/gcs_uploader.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/gcs_uploader.py new file mode 100644 index 00000000..de411f81 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/gcs_uploader.py @@ -0,0 +1,34 @@ +import json +import os +from google.cloud import storage + +class GCSUploader: + def __init__(self, project_id: str, bucket_name: str): + self.client = storage.Client(project=project_id) + self.bucket = self.client.bucket(bucket_name) + + def upload_entries(self, entries: list, aws_region: str, output_folder: str = None): + """ + Converts a list of dictionary entries to a JSONL file and uploads it to GCS, + optionally within a specified folder. + """ + if not entries: + print("No entries to upload.") + return + + # Define the output file name + file_name = f"aws-glue-output-{aws_region}.jsonl" + + # Convert list of entries to a JSONL formatted string + content = "\n".join(json.dumps(entry) for entry in entries) + + # If an output folder is provided, create the full destination path + if output_folder: + blob_name = os.path.join(output_folder, file_name) + else: + blob_name = file_name + + blob = self.bucket.blob(blob_name) + blob.upload_from_string(content) + + # The final print statement is now in bootstrap.py for better context \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py new file mode 100644 index 00000000..9d78cd55 --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py @@ -0,0 +1,47 @@ +from src.constants import EntryType, SOURCE_TYPE + +def create_name(config, entry_type, db_name, asset_name=None): + """Creates the 'name' for a Dataplex entry within an Entry Group.""" + project = config['project_id'] + location = config['location_id'] + entry_group = config['entry_group_id'] + + # Sanitize all components + db_name_sanitized = db_name.replace('-', '_') + + if entry_type in [EntryType.TABLE, EntryType.VIEW]: + asset_name_sanitized = asset_name.replace('.', '_').replace('-', '_') + # The entry name for a table should be unique. Combining db and table is robust. + return f"projects/{project}/locations/{location}/entryGroups/{entry_group}/entries/{db_name_sanitized}_{asset_name_sanitized}" + + # Return None for database to signify it should not have a standalone entry + return None + +def create_fqn(config, entry_type, db_name, asset_name=None): + """Creates the 'fully_qualified_name' for a Table or View.""" + system = SOURCE_TYPE + + aws_account_id = config.get('aws_account_id') + aws_region = config.get('aws_region') + + if not aws_account_id or not aws_region: + raise ValueError("AWS Account ID and Region are missing from the configuration.") + + # --- THIS IS THE CRITICAL FIX --- + # Sanitize both region and database names by replacing hyphens. + # region_sanitized = aws_region.replace('-', '_') + # db_name_sanitized = db_name.replace('-', '_') + + # FQN is only defined for Tables and Views + if entry_type in [EntryType.TABLE, EntryType.VIEW]: + asset_name_sanitized = asset_name.replace('-', '_') + path = (f"table:{aws_region}.{aws_account_id}." + f"{db_name}.{asset_name_sanitized}") + return f"{system}:{path}" + + # Return None for other types as they don't have a supported FQN + return None + +def create_parent_name(config, entry_type, db_name): + """Parent Entry is not used in this model as there is no DB entry.""" + return None \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/secret_manager.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/secret_manager.py new file mode 100644 index 00000000..46157d7b --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/secret_manager.py @@ -0,0 +1,22 @@ +from google.cloud import secretmanager +import json + +class SecretManager: + @staticmethod + def get_aws_credentials(project_id, secret_id): + """Fetches AWS credentials from GCP Secret Manager.""" + client = secretmanager.SecretManagerServiceClient() + name = f"projects/{project_id}/secrets/{secret_id}/versions/latest" + try: + response = client.access_secret_version(name=name) + payload = response.payload.data.decode("UTF-8").strip() + credentials = json.loads(payload) + access_key = credentials['access_key_id'].strip() + secret_key = credentials['secret_access_key'].strip() + if not access_key or not secret_key: + raise ValueError("Empty credentials found in secret") + return access_key, secret_key + except (json.JSONDecodeError, KeyError) as e: + raise ValueError(f"Invalid credentials format in secret: {e}") + except Exception as e: + raise RuntimeError(f"Failed to access secret: {e}") \ No newline at end of file From d4a83bd450c8ff2f5d447a55749cb03d8aa39c03 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Wed, 14 Jan 2026 14:56:52 +0530 Subject: [PATCH 02/11] Add AWS glue community contributed connector --- .../aws-glue-connector/config.json | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json b/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json index ee17b3ea..c71496a8 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/config.json @@ -1,10 +1,10 @@ { - "aws_region": "eu-north-1", - "project_id": "gcve-demo-408018", - "location_id": "us-central1", - "entry_group_id": "aws-glue-assets", - "gcs_bucket": "udp-test-sp", - "aws_account_id": "003083320909", - "output_folder": "aws_output", - "gcp_secret_id": "aws-glue-secret" + "aws_region": "", + "project_id": "", + "location_id": "", + "entry_group_id": "", + "gcs_bucket": "", + "aws_account_id": "", + "output_folder": "", + "gcp_secret_id": "" } \ No newline at end of file From a79ac295d0c246a4f0dcaa3808435ab31eeb620b Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Wed, 14 Jan 2026 15:28:26 +0530 Subject: [PATCH 03/11] Add AWS glue community contributed connector with few changes --- .../aws-glue-connector/README.md | 40 +++++++++----- .../build_and_push_docker.sh | 2 +- .../aws-glue-connector/request.json | 13 +++-- .../aws-glue-connector/requirements.txt | 3 +- .../src/aws_glue_connector.py | 50 ++++++++++++------ .../aws-glue-connector/src/constants.py | 13 +++-- .../aws-glue-connector/src/entry_builder.py | 52 ++++++++++++------- .../aws-glue-connector/src/name_builder.py | 21 +++++--- 8 files changed, 127 insertions(+), 67 deletions(-) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index db98b96e..d63b2439 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -12,14 +12,20 @@ Before using this connector, you need to have the following set up: 1. **AWS Credentials**: You will need an AWS access key ID and a secret access key with permissions to access AWS Glue. 2. **Google Cloud Project**: A Google Cloud project is required to run the script and store the output. -3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. The secret should be a single string with the access key ID and secret access key, separated by a comma (e.g., `"your_access_key_id,your_secret_access_key"`). +3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. The secret payload must be a **JSON object** with the following format: + ```json + { + "access_key_id": "YOUR_AWS_ACCESS_KEY_ID", + "secret_access_key": "YOUR_AWS_SECRET_ACCESS_KEY" + } + ``` 4. **Python 3** and **pip** installed. *** ## Configuration -The connector is configured using the `config.json` file. Here is a description of the parameters: +The connector is configured using the `config.json` file. Ensure this file is present in the same directory as `main.py`. Here is a description of the parameters: | Parameter | Description | | :--- | :--- | @@ -50,9 +56,9 @@ You can run the connector from your local machine using a Python virtual environ pip install -r requirements.txt ``` 3. **Run the connector:** - Execute the `main.py` script and provide the path to your configuration file. + Execute the `main.py` script. It will read settings from `config.json` in the current directory. ```bash - python3 main.py --config=config.json + python3 main.py ``` *** @@ -65,15 +71,23 @@ The connector generates a JSONL file in the specified GCS bucket and folder. Thi ## Importing Metadata into Dataplex -Once the metadata file has been generated, you can import it into Dataplex using a metadata import job. You can use the provided `request.json` file as a template for the import request. +Once the metadata file has been generated, you can import it into Dataplex using a metadata import job. -The `request.json` file specifies the source GCS URI for the metadata, the entry group to import into, and other settings for the import job. +1. **Prepare the Request File:** + Open the `request.json` file and replace the following placeholders with your actual values: + * ``: The bucket where the output file was saved. + * ``: The folder where the output file was saved. + * ``: Your Google Cloud Project ID. + * ``: Your Google Cloud Location (e.g., `us-central1`). + * ``: The Dataplex Entry Group ID. -To initiate the import, use the following `curl` command. Make sure to replace the placeholders `{project-id}`, `{location}`, and `{job-id}` with your actual project ID, location, and a unique job ID. +2. **Run the Import Command:** + Use the following `curl` command to initiate the import. Make sure to replace `{project-id}`, `{location}`, and `{job-id}` in the URL with your actual project ID, location, and a unique job ID. -```bash -curl -X POST \ - -H "Authorization: Bearer $(gcloud auth print-access-token)" \ - -H "Content-Type: application/json; charset=utf-8" \ - -d @request.json \ - "[https://dataplex.googleapis.com/v1/projects/](https://dataplex.googleapis.com/v1/projects/){project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" \ No newline at end of file + ```bash + curl -X POST \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + -H "Content-Type: application/json; charset=utf-8" \ + -d @request.json \ + "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" + ``` diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh b/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh index 6efa8391..b1ccbca8 100755 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/build_and_push_docker.sh @@ -56,7 +56,7 @@ IMAGE_URI="${REPO}/${IMAGE_NAME}:${IMAGE_TAG}" # --- Build the Docker Image --- echo "Building Docker image: ${IMAGE_URI}..." # Use the Dockerfile for PySpark -docker build -t "${IMAGE_URI}" -f Dockerfile.pyspark . +docker build -t "${IMAGE_URI}" -f Dockerfile . if [ $? -ne 0 ]; then echo "Docker build failed." diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json b/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json index 1b5218f2..a05b83fe 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/request.json @@ -1,18 +1,23 @@ { "type": "IMPORT", "import_spec": { - "source_storage_uri": "gs://udp-test-sp/aws_output/", + "source_storage_uri": "gs:////", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "log_level": "DEBUG", "scope": { - "entry_groups": ["projects/gcve-demo-408018/locations/us-central1/entryGroups/aws-glue-assets"], + "entry_groups": ["projects//locations//entryGroups/"], "entry_types": [ - "projects/gcve-demo-408018/locations/us-central1/entryTypes/aws-glue-table" + "projects//locations//entryTypes/aws-glue-database", + "projects//locations//entryTypes/aws-glue-table", + "projects//locations//entryTypes/aws-glue-view" ], "aspect_types": [ "projects/dataplex-types/locations/global/aspectTypes/schema", - "projects/gcve-demo-408018/locations/us-central1/aspectTypes/aws-glue-table" + "projects//locations//aspectTypes/aws-glue-database", + "projects//locations//aspectTypes/aws-glue-table", + "projects//locations//aspectTypes/aws-glue-view", + "projects//locations//aspectTypes/aws-lineage-aspect" ] } } diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt b/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt index a3a54826..2d7d4039 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/requirements.txt @@ -1,5 +1,4 @@ google-cloud-dataplex>=2.4.0 boto3 google-cloud-secret-manager -google-cloud-storage -pyspark==3.5.0 +google-cloud-storage \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py index edd47e4b..ea1ae445 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py @@ -66,26 +66,42 @@ def get_lineage_info(self): for page in paginator.paginate(): for job in page['Jobs']: job_name = job['Name'] - job_runs = self.__glue_client.get_job_runs(JobName=job_name) + # Optimization: Limit to latest run to reduce API calls + job_runs = self.__glue_client.get_job_runs(JobName=job_name, MaxResults=1) + for job_run in job_runs.get('JobRuns', []): if job_run.get('JobRunState') == 'SUCCEEDED': - graph = self.__glue_client.get_dataflow_graph(PythonScript=job['Command']['ScriptLocation']) - if graph: - sources = [edge['Source'] for edge in graph.get('Edges', [])] - targets = [edge['Target'] for edge in graph.get('Edges', [])] - for i, target_id in enumerate(targets): - target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) - if target_node and target_node['NodeType'] == 'DataSink': - target_table_name = target_node.get('Name') - source_id = sources[i] - source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) - if source_node and source_node['NodeType'] == 'DataSource': - source_table_name = source_node.get('Name') - if target_table_name not in lineage_map: - lineage_map[target_table_name] = [] - lineage_map[target_table_name].append(source_table_name) + script_location = job.get('Command', {}).get('ScriptLocation') + if not script_location: + continue + + try: + # Note: get_dataflow_graph expects the actual Python script content, not just the S3 URI. + # Passing the URI here will likely result in an empty graph or failure. + # To fix this, we would need to download the script from S3, which requires S3 permissions. + # For now, we attempt it, but catch errors. + graph = self.__glue_client.get_dataflow_graph(PythonScript=script_location) + if graph: + sources = [edge['Source'] for edge in graph.get('Edges', [])] + targets = [edge['Target'] for edge in graph.get('Edges', [])] + for i, target_id in enumerate(targets): + target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) + if target_node and target_node['NodeType'] == 'DataSink': + target_table_name = target_node.get('Name') + source_id = sources[i] + source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) + if source_node and source_node['NodeType'] == 'DataSource': + source_table_name = source_node.get('Name') + if target_table_name not in lineage_map: + lineage_map[target_table_name] = [] + lineage_map[target_table_name].append(source_table_name) + except Exception as e: + print(f"Warning: Could not get dataflow graph for job {job_name}. Note that direct S3 URI usage might not be supported. Error: {e}") + + # We only need one successful run to guess lineage + break except Exception as e: print(f"Warning: Could not fetch lineage information. Error: {e}") print(f"Found {len(lineage_map)} lineage relationships.") - return lineage_map \ No newline at end of file + return lineage_map diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py index f6633ae7..04e29494 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/constants.py @@ -5,16 +5,19 @@ # Short keys for the aspects map SCHEMA_ASPECT_KEY = "dataplex-types.global.schema" -LINEAGE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-lineage-aspect" -# New keys for custom marker aspects -DATABASE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-database" -TABLE_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-table" -VIEW_ASPECT_KEY = "gcve-demo-408018.us-central1.aws-glue-view" +# Keys for custom marker aspects (templates) +DATABASE_ASPECT_KEY_TEMPLATE = "{project}.{location}.aws-glue-database" +TABLE_ASPECT_KEY_TEMPLATE = "{project}.{location}.aws-glue-table" +VIEW_ASPECT_KEY_TEMPLATE = "{project}.{location}.aws-glue-view" +LINEAGE_ASPECT_KEY_TEMPLATE = "{project}.{location}.aws-lineage-aspect" # Full paths for the aspect_type field SCHEMA_ASPECT_PATH = "projects/dataplex-types/locations/global/aspectTypes/schema" LINEAGE_ASPECT_PATH = "projects/{project}/locations/{location}/aspectTypes/aws-lineage-aspect" +DATABASE_ASPECT_PATH = "projects/{project}/locations/{location}/aspectTypes/aws-glue-database" +TABLE_ASPECT_PATH = "projects/{project}/locations/{location}/aspectTypes/aws-glue-table" +VIEW_ASPECT_PATH = "projects/{project}/locations/{location}/aspectTypes/aws-glue-view" class EntryType(enum.Enum): """Types of AWS Glue entries.""" diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py index 07d82150..c9dedc16 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py @@ -24,9 +24,17 @@ def build_database_entry(config, db_name): project=config["project_id"], location=config["location_id"]) + # Construct dynamic keys and paths + database_aspect_key = DATABASE_ASPECT_KEY_TEMPLATE.format( + project=config["project_id"], + location=config["location_id"]) + database_aspect_path = DATABASE_ASPECT_PATH.format( + project=config["project_id"], + location=config["location_id"]) + aspects = { - DATABASE_ASPECT_KEY: { - "aspect_type": DATABASE_ASPECT_KEY, + database_aspect_key: { + "aspect_type": database_aspect_path, "data": {} } } @@ -39,8 +47,8 @@ def build_database_entry(config, db_name): } return { "entry": entry, - "aspect_keys": [DATABASE_ASPECT_KEY], - "update_mask": "aspects" + "aspect_keys": list(aspects.keys()), + "update_mask": ["aspects"] } def build_dataset_entry(config, db_name, table_info, job_lineage): @@ -67,15 +75,23 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): "data": { "fields": columns } } } - aspect_keys = [SCHEMA_ASPECT_KEY] # --- Add Custom Marker Aspect --- if entry_type == EntryType.TABLE: - aspects[TABLE_ASPECT_KEY] = {"aspect_type": TABLE_ASPECT_KEY, "data": {}} - aspect_keys.append(TABLE_ASPECT_KEY) + table_aspect_key = TABLE_ASPECT_KEY_TEMPLATE.format( + project=config["project_id"], location=config["location_id"]) + table_aspect_path = TABLE_ASPECT_PATH.format( + project=config["project_id"], location=config["location_id"]) + + aspects[table_aspect_key] = {"aspect_type": table_aspect_path, "data": {}} + elif entry_type == EntryType.VIEW: - aspects[VIEW_ASPECT_KEY] = {"aspect_type": VIEW_ASPECT_KEY, "data": {}} - aspect_keys.append(VIEW_ASPECT_KEY) + view_aspect_key = VIEW_ASPECT_KEY_TEMPLATE.format( + project=config["project_id"], location=config["location_id"]) + view_aspect_path = VIEW_ASPECT_PATH.format( + project=config["project_id"], location=config["location_id"]) + + aspects[view_aspect_key] = {"aspect_type": view_aspect_path, "data": {}} # --- Build Lineage Aspect --- source_assets = [] @@ -88,13 +104,14 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): source_assets.extend(job_lineage[table_name]) if source_assets: - full_lineage_aspect_path = LINEAGE_ASPECT_PATH.format( - project=config["project_id"], - location=config["location_id"] - ) + lineage_aspect_key = LINEAGE_ASPECT_KEY_TEMPLATE.format( + project=config["project_id"], location=config["location_id"]) + lineage_aspect_path = LINEAGE_ASPECT_PATH.format( + project=config["project_id"], location=config["location_id"]) + lineage_aspect = { - LINEAGE_ASPECT_KEY: { - "aspect_type": full_lineage_aspect_path, + lineage_aspect_key: { + "aspect_type": lineage_aspect_path, "data": { "links": [{ "source": { "fully_qualified_name": nb.create_fqn(config, EntryType.TABLE, db_name, src) }, @@ -104,7 +121,6 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): } } aspects.update(lineage_aspect) - aspect_keys.append(LINEAGE_ASPECT_KEY) # --- Build General Entry Info --- full_entry_type = entry_type.value.format( @@ -123,6 +139,6 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): return { "entry": entry, - "aspect_keys": list(set(aspect_keys)), - "update_mask": "aspects" + "aspect_keys": list(aspects.keys()), + "update_mask": ["aspects"] } \ No newline at end of file diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py index 9d78cd55..9982c634 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/name_builder.py @@ -14,7 +14,9 @@ def create_name(config, entry_type, db_name, asset_name=None): # The entry name for a table should be unique. Combining db and table is robust. return f"projects/{project}/locations/{location}/entryGroups/{entry_group}/entries/{db_name_sanitized}_{asset_name_sanitized}" - # Return None for database to signify it should not have a standalone entry + if entry_type == EntryType.DATABASE: + return f"projects/{project}/locations/{location}/entryGroups/{entry_group}/entries/{db_name_sanitized}" + return None def create_fqn(config, entry_type, db_name, asset_name=None): @@ -27,16 +29,19 @@ def create_fqn(config, entry_type, db_name, asset_name=None): if not aws_account_id or not aws_region: raise ValueError("AWS Account ID and Region are missing from the configuration.") - # --- THIS IS THE CRITICAL FIX --- # Sanitize both region and database names by replacing hyphens. - # region_sanitized = aws_region.replace('-', '_') - # db_name_sanitized = db_name.replace('-', '_') + region_sanitized = aws_region.replace('-', '_') + db_name_sanitized = db_name.replace('-', '_') # FQN is only defined for Tables and Views if entry_type in [EntryType.TABLE, EntryType.VIEW]: asset_name_sanitized = asset_name.replace('-', '_') - path = (f"table:{aws_region}.{aws_account_id}." - f"{db_name}.{asset_name_sanitized}") + path = (f"table:{region_sanitized}.{aws_account_id}." + f"{db_name_sanitized}.{asset_name_sanitized}") + return f"{system}:{path}" + + if entry_type == EntryType.DATABASE: + path = f"database:{region_sanitized}.{aws_account_id}.{db_name_sanitized}" return f"{system}:{path}" # Return None for other types as they don't have a supported FQN @@ -44,4 +49,6 @@ def create_fqn(config, entry_type, db_name, asset_name=None): def create_parent_name(config, entry_type, db_name): """Parent Entry is not used in this model as there is no DB entry.""" - return None \ No newline at end of file + # If we are now creating DB entries, we might want to link them. + # But for now, returning None is safe if we don't want hierarchy. + return None From 2a4e74f8aff2dc556e4ad087c135d015c4cafaf7 Mon Sep 17 00:00:00 2001 From: Shubham Pathak <149017703+shubhampathakk@users.noreply.github.com> Date: Mon, 26 Jan 2026 19:06:18 +0530 Subject: [PATCH 04/11] Update entry_builder.py --- .../aws-glue-connector/src/entry_builder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py index c9dedc16..101e7a8b 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py @@ -18,7 +18,7 @@ def choose_metadata_type(data_type: str): return "OTHER" def build_database_entry(config, db_name): - """Builds a database entry, mimicking the successful Oracle format.""" + """Builds a database entry""" entry_type = EntryType.DATABASE full_entry_type = entry_type.value.format( project=config["project_id"], @@ -52,7 +52,7 @@ def build_database_entry(config, db_name): } def build_dataset_entry(config, db_name, table_info, job_lineage): - """Builds a table or view entry, mimicking the successful Oracle format.""" + """Builds a table or view entry""" table_name = table_info['Name'] table_type = table_info.get('TableType') @@ -141,4 +141,4 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): "entry": entry, "aspect_keys": list(aspects.keys()), "update_mask": ["aspects"] - } \ No newline at end of file + } From ae477ab15ecfe51039b206f4c2884088fe643084 Mon Sep 17 00:00:00 2001 From: Shubham Pathak <149017703+shubhampathakk@users.noreply.github.com> Date: Mon, 26 Jan 2026 19:09:46 +0530 Subject: [PATCH 05/11] Update aws_glue_connector.py --- .../src/aws_glue_connector.py | 65 +++++++++++++------ 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py index ea1ae445..c133ba69 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py @@ -1,5 +1,6 @@ import boto3 import re +from urllib.parse import urlparse class AWSGlueConnector: def __init__(self, aws_access_key_id, aws_secret_access_key, aws_region): @@ -14,8 +15,15 @@ def __init__(self, aws_access_key_id, aws_secret_access_key, aws_region): aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key ) + # Initialize S3 client to download scripts for lineage + self.__s3_client = boto3.client( + 's3', + region_name=self.region, + aws_access_key_id=self.access_key_id, + aws_secret_access_key=self.secret_access_key + ) except Exception as e: - raise ValueError(f"Failed to create AWS Glue client: {e}") + raise ValueError(f"Failed to create AWS clients: {e}") def _clean_credential(self, credential): """Clean and validate credential string""" @@ -53,6 +61,19 @@ def _get_tables(self, db_name): raise RuntimeError(f"Failed to get tables from AWS Glue for database {db_name}: {e}") return tables + def _read_script_from_s3(self, s3_uri): + """Downloads the script content from S3.""" + try: + parsed = urlparse(s3_uri) + bucket = parsed.netloc + key = parsed.path.lstrip('/') + + response = self.__s3_client.get_object(Bucket=bucket, Key=key) + return response['Body'].read().decode('utf-8') + except Exception as e: + print(f"Warning: Failed to download script from {s3_uri}: {e}") + return None + def get_lineage_info(self): """ Scans AWS Glue jobs to derive lineage information by inspecting job run graphs. @@ -76,27 +97,29 @@ def get_lineage_info(self): continue try: - # Note: get_dataflow_graph expects the actual Python script content, not just the S3 URI. - # Passing the URI here will likely result in an empty graph or failure. - # To fix this, we would need to download the script from S3, which requires S3 permissions. - # For now, we attempt it, but catch errors. - graph = self.__glue_client.get_dataflow_graph(PythonScript=script_location) - if graph: - sources = [edge['Source'] for edge in graph.get('Edges', [])] - targets = [edge['Target'] for edge in graph.get('Edges', [])] - for i, target_id in enumerate(targets): - target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) - if target_node and target_node['NodeType'] == 'DataSink': - target_table_name = target_node.get('Name') - source_id = sources[i] - source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) - if source_node and source_node['NodeType'] == 'DataSource': - source_table_name = source_node.get('Name') - if target_table_name not in lineage_map: - lineage_map[target_table_name] = [] - lineage_map[target_table_name].append(source_table_name) + # Fetch the actual script code from S3 + script_code = self._read_script_from_s3(script_location) + + if script_code: + # Pass the code content, not the URI + graph = self.__glue_client.get_dataflow_graph(PythonScript=script_code) + + if graph: + sources = [edge['Source'] for edge in graph.get('Edges', [])] + targets = [edge['Target'] for edge in graph.get('Edges', [])] + for i, target_id in enumerate(targets): + target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) + if target_node and target_node['NodeType'] == 'DataSink': + target_table_name = target_node.get('Name') + source_id = sources[i] + source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) + if source_node and source_node['NodeType'] == 'DataSource': + source_table_name = source_node.get('Name') + if target_table_name not in lineage_map: + lineage_map[target_table_name] = [] + lineage_map[target_table_name].append(source_table_name) except Exception as e: - print(f"Warning: Could not get dataflow graph for job {job_name}. Note that direct S3 URI usage might not be supported. Error: {e}") + print(f"Warning: Could not get dataflow graph for job {job_name}. Error: {e}") # We only need one successful run to guess lineage break From 5f930817ef1190820ba153c66af29770537b99f9 Mon Sep 17 00:00:00 2001 From: Shubham Pathak <149017703+shubhampathakk@users.noreply.github.com> Date: Mon, 26 Jan 2026 19:12:38 +0530 Subject: [PATCH 06/11] Update README.md --- .../aws-glue-connector/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index d63b2439..d61d5150 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -91,3 +91,10 @@ Once the metadata file has been generated, you can import it into Dataplex using -d @request.json \ "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" ``` + +## Info + +1. Metadata extracted: example https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas +2. How to fetch AWS credentials if non-trivial +3. What resources need to be created in the project for import: example https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas +4. Docker setup: https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas From e5119b34fffb37ad60988f552161a0e382d3847f Mon Sep 17 00:00:00 2001 From: Shubham Pathak <149017703+shubhampathakk@users.noreply.github.com> Date: Mon, 26 Jan 2026 19:13:42 +0530 Subject: [PATCH 07/11] Update aws_glue_connector.py From 97f660fc5b562df4d19342b74f077cc515df0588 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Tue, 27 Jan 2026 11:27:25 +0530 Subject: [PATCH 08/11] adding modifications for fixes --- .../aws-glue-connector/README.md | 70 ++++++++++++++++-- .../aws-glue-connector/src/__init__.py | 0 .../src/aws_glue_connector.py | 73 +++++++++++++++---- .../aws-glue-connector/src/entry_builder.py | 62 +++++++++++++--- 4 files changed, 176 insertions(+), 29 deletions(-) create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/src/__init__.py diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index d61d5150..569047fa 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -92,9 +92,69 @@ Once the metadata file has been generated, you can import it into Dataplex using "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" ``` -## Info +## Metadata Extracted -1. Metadata extracted: example https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas -2. How to fetch AWS credentials if non-trivial -3. What resources need to be created in the project for import: example https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas -4. Docker setup: https://github.com/GoogleCloudPlatform/cloud-dataplex/tree/main/managed-connectivity/community-contributed-connectors/oracle-connector#target-objects-and-schemas +The connector maps AWS Glue objects to Dataplex entries as follows: + +| AWS Glue Object | Dataplex Entry Type | Schema Mapping | +| :--- | :--- | :--- | +| **Database** | `aws-glue-database` | N/A | +| **Table** | `aws-glue-table` | `int/bigint` -> `NUMBER`, `string` -> `STRING`, `array/struct` -> `BYTES` | +| **View** | `aws-glue-view` | Parsed SQL used to generate Lineage from source tables | +| **Partition Keys** | N/A | Included as columns in the `schema` aspect | + +### Lineage +The connector also parses AWS Glue Job scripts (Python/Scala) to extract lineage: +- **Source**: `DataSource` nodes in Glue Job graph. +- **Target**: `DataSink` nodes in Glue Job graph. +- **Result**: Lineage is visualized in Dataplex from Source Table -> Target Table. + +*** + +## Resources Required + +To run this connector and import metadata, you need the following resources: + +1. **GCP Project**: To host the execution and Dataplex Metastore. +2. **Secret Manager Secret**: To store AWS Credentials securely. +3. **GCS Bucket**: To store the intermediate JSONL output file. +4. **Dataplex Entry Group**: The destination for the imported metadata. +5. **Dataplex Aspect Types & Entry Types**: (Optional) Custom types if you want rich UI rendering, though standard types are used for schema. + +*** + +## AWS Credentials + +This connector requires an IAM User with `GlueConsoleFullAccess` (or read-only equivalent) and `S3ReadOnly` (to download job scripts for lineage). + +1. Create an IAM User in AWS Console. +2. Attach policies: `AWSGlueConsoleFullAccess`, `AmazonS3ReadOnlyAccess`. +3. Generate an **Access Key ID** and **Secret Access Key**. +4. Store these in GCP Secret Manager as a JSON object: + ```json + {"access_key_id": "...", "secret_access_key": "..."} + ``` + +*** + +## Docker Setup + +You can containerize this connector to run on Cloud Run, Dataproc, or Kubernetes. + +1. **Build the Image**: + ```bash + docker build -t aws-glue-connector:latest . + ``` + +2. **Run Locally** (passing config): + Ensure `config.json` is in the current directory or mounted. + ```bash + docker run -v $(pwd)/config.json:/app/config.json -v $(pwd)/src:/app/src aws-glue-connector:latest + ``` + +3. **Push to GCR/Artifact Registry**: + ```bash + gcloud auth configure-docker + docker tag aws-glue-connector:latest gcr.io/YOUR_PROJECT/aws-glue-connector:latest + docker push gcr.io/YOUR_PROJECT/aws-glue-connector:latest + ``` diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/__init__.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py index c133ba69..aa04adf4 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/aws_glue_connector.py @@ -105,19 +105,11 @@ def get_lineage_info(self): graph = self.__glue_client.get_dataflow_graph(PythonScript=script_code) if graph: - sources = [edge['Source'] for edge in graph.get('Edges', [])] - targets = [edge['Target'] for edge in graph.get('Edges', [])] - for i, target_id in enumerate(targets): - target_node = next((node for node in graph.get('Nodes', []) if node['Id'] == target_id), None) - if target_node and target_node['NodeType'] == 'DataSink': - target_table_name = target_node.get('Name') - source_id = sources[i] - source_node = next((node for node in graph.get('Nodes', []) if node['Id'] == source_id), None) - if source_node and source_node['NodeType'] == 'DataSource': - source_table_name = source_node.get('Name') - if target_table_name not in lineage_map: - lineage_map[target_table_name] = [] - lineage_map[target_table_name].append(source_table_name) + job_lineage = self._get_lineage_from_graph(graph) + for target, sources in job_lineage.items(): + if target not in lineage_map: + lineage_map[target] = [] + lineage_map[target].extend(sources) except Exception as e: print(f"Warning: Could not get dataflow graph for job {job_name}. Error: {e}") @@ -128,3 +120,58 @@ def get_lineage_info(self): print(f"Found {len(lineage_map)} lineage relationships.") return lineage_map + + def _get_lineage_from_graph(self, graph): + """ + Traverses the dataflow graph backwards from DataSink to DataSource to find lineage. + Returns a dict of {target_table: [source_tables]}. + """ + lineage = {} + + nodes = {node['Id']: node for node in graph.get('Nodes', [])} + edges = graph.get('Edges', []) + + # Build reverse adjacency list (Target -> Sources) + reverse_adj = {} + for edge in edges: + if edge['Target'] not in reverse_adj: + reverse_adj[edge['Target']] = [] + reverse_adj[edge['Target']].append(edge['Source']) + + # Find all DataSink nodes (Targets) + sinks = [node for node in nodes.values() if node['NodeType'] == 'DataSink'] + + for sink in sinks: + target_name = sink.get('Name') + if not target_name: + continue + + # BFS/DFS backwards to find DataSources + visited = set() + queue = [sink['Id']] + found_sources = set() + + while queue: + current_id = queue.pop(0) + if current_id in visited: + continue + visited.add(current_id) + + current_node = nodes.get(current_id) + if current_node and current_node['NodeType'] == 'DataSource': + source_name = current_node.get('Name') + if source_name: + found_sources.add(source_name) + # Stop traversing this path once a source is found? + # Usually yes for direct lineage, but let's continue to be safe if there are multiple inputs. + + # Add parents to queue + if current_id in reverse_adj: + queue.extend(reverse_adj[current_id]) + + if found_sources: + if target_name not in lineage: + lineage[target_name] = [] + lineage[target_name].extend(list(found_sources)) + + return lineage diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py index 101e7a8b..2b09a2d6 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/src/entry_builder.py @@ -5,18 +5,40 @@ def choose_metadata_type(data_type: str): """Choose the metadata type based on AWS Glue native type.""" data_type = data_type.lower() + # Check for complex types FIRST to avoid 'array' matching 'string' + if any(k in data_type for k in ['binary', 'array', 'struct', 'map']): + return "BYTES" if data_type in ['integer', 'int', 'smallint', 'tinyint', 'bigint', 'long', 'float', 'double', 'decimal']: return "NUMBER" if 'char' in data_type or 'string' in data_type: return "STRING" - if data_type in ['binary', 'array', 'struct', 'map']: - return "BYTES" if data_type == 'timestamp': return "TIMESTAMP" if data_type == 'date': return "DATE" return "OTHER" +# ... (omitted code) ... + + # --- Build Lineage Aspect --- + source_assets = [] + if entry_type == EntryType.VIEW and 'ViewOriginalText' in table_info: + sql = table_info['ViewOriginalText'] + # Updates: Captures broader set of characters after FROM/JOIN, then cleans quotes. + # This handles `db`.`table`, "db"."table", etc. by capturing the whole block and stripping quotes. + # Regex: (?:FROM|JOIN)\s+ -> Match FROM/JOIN + # ([`"\w.]+) -> Capture anything looking like a name, dot, or quote. + raw_matches = re.findall(r'(?:FROM|JOIN)\s+([`"\w.]+)', sql, re.IGNORECASE) + + cleaned_matches = [] + for match in raw_matches: + # Remove backticks and quotes + clean = match.replace('`', '').replace('"', '').replace("'", "") + if clean and not clean.isnumeric(): # Simple guard against edge cases + cleaned_matches.append(clean) + + source_assets.extend(set(cleaned_matches)) + def build_database_entry(config, db_name): """Builds a database entry""" entry_type = EntryType.DATABASE @@ -60,14 +82,22 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): # --- Build Schema Aspect --- columns = [] + + # Process both Partition Keys and normal columns + # AWS Glue separates them, but Dataplex expects them all in the schema. + all_columns = [] + if 'PartitionKeys' in table_info: + all_columns.extend(table_info['PartitionKeys']) if 'StorageDescriptor' in table_info and 'Columns' in table_info['StorageDescriptor']: - for col in table_info['StorageDescriptor']['Columns']: - columns.append({ - "name": col.get("Name"), - "dataType": col.get("Type"), - "mode": "NULLABLE", - "metadataType": choose_metadata_type(col.get("Type", "")) - }) + all_columns.extend(table_info['StorageDescriptor']['Columns']) + + for col in all_columns: + columns.append({ + "name": col.get("Name"), + "dataType": col.get("Type"), + "mode": "NULLABLE", + "metadataType": choose_metadata_type(col.get("Type", "")) + }) aspects = { SCHEMA_ASPECT_KEY: { @@ -97,8 +127,18 @@ def build_dataset_entry(config, db_name, table_info, job_lineage): source_assets = [] if entry_type == EntryType.VIEW and 'ViewOriginalText' in table_info: sql = table_info['ViewOriginalText'] - source_tables = re.findall(r'(?:FROM|JOIN)\s+`?(\w+)`?', sql, re.IGNORECASE) - source_assets.extend(set(source_tables)) + # Updates: Captures broader set of characters after FROM/JOIN, then cleans quotes. + # This handles `db`.`table`, "db"."table", etc. by capturing the whole block and stripping quotes. + raw_matches = re.findall(r'(?:FROM|JOIN)\s+([`"\w.]+)', sql, re.IGNORECASE) + + cleaned_matches = [] + for match in raw_matches: + # Remove backticks and quotes + clean = match.replace('`', '').replace('"', '').replace("'", "") + if clean and not clean.isnumeric(): + cleaned_matches.append(clean) + + source_assets.extend(set(cleaned_matches)) if table_name in job_lineage: source_assets.extend(job_lineage[table_name]) From df3a34e448d2cccac51af37ba24dd1f3b01e3494 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Mon, 2 Feb 2026 14:54:19 +0530 Subject: [PATCH 09/11] updating readme for dataplex entryGroup, entryTypes and aspectTypes --- .../aws-glue-connector/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index 569047fa..833b4c05 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -92,6 +92,18 @@ Once the metadata file has been generated, you can import it into Dataplex using "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" ``` +### Required Catalog Objects + +Note before importing metadata, the Entry Group and all Entry Types and Aspect Types found in the metadata import file must exist in the target project and location. This connector requires the following Entry Group, Entry Types and Aspect Types: + +| Catalog Object | IDs required by connector | +| :--- | :--- | +| **Entry Group** | Defined in `config.json` as `entry_group_id` | +| **Entry Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view` | +| **Aspect Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view`  `aws-lineage-aspect` | + +See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for instructions on creating Entry Groups, Entry Types, and Aspect Types. + ## Metadata Extracted The connector maps AWS Glue objects to Dataplex entries as follows: From 7b2107cd7d91cd0fb794b0ee4314b0ed8059e25f Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Tue, 3 Feb 2026 19:43:22 +0530 Subject: [PATCH 10/11] adding dataplex obejct creation and change in readme --- .../aws-glue-connector/README.md | 97 +++++++++++++-- .../scripts/setup_dataplex_resources.sh | 114 ++++++++++++++++++ 2 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index 833b4c05..a01fcaf4 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -92,6 +92,8 @@ Once the metadata file has been generated, you can import it into Dataplex using "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" ``` +## Setup Resources + ### Required Catalog Objects Note before importing metadata, the Entry Group and all Entry Types and Aspect Types found in the metadata import file must exist in the target project and location. This connector requires the following Entry Group, Entry Types and Aspect Types: @@ -104,6 +106,91 @@ Note before importing metadata, the Entry Group and all Entry Types and Aspect T See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for instructions on creating Entry Groups, Entry Types, and Aspect Types. +### Automated Setup +To run this connector, you must first create the required Dataplex resources. Run the provided script to create all resources automatically: + + ```bash + # Set your project and location + export PROJECT_ID=your-project-id + export LOCATION=us-central1 + export ENTRY_GROUP_ID=aws-glue-entries + + # Run the setup script + chmod +x scripts/setup_dataplex_resources.sh + ./scripts/setup_dataplex_resources.sh + ``` + + ### Manual Setup & Schema Definitions + + If you prefer to create them manually, ensure you define the following: + + #### Entry Types + * `aws-glue-database` + * `aws-glue-table` + * `aws-glue-view` + + #### Aspect Types + + **1. `aws-lineage-aspect`** + Used to store lineage relationships. + + * **JSON Schema**: + ```json + { + "type": "record", + "recordFields": [ + { + "name": "links", + "type": "array", + "index": 1, + "arrayItems": { + "type": "record", + "recordFields": [ + { + "name": "source", + "type": "record", + "index": 1, + "recordFields": [ + { "name": "fully_qualified_name", "type": "string", "index": 1 } + ] + }, + { + "name": "target", + "type": "record", + "index": 2, + "recordFields": [ + { "name": "fully_qualified_name", "type": "string", "index": 1 } + ] + } + ] + } + } + ] + } + ``` + + **2. Marker Aspects** + * `aws-glue-database` + * `aws-glue-table` + * `aws-glue-view` + + These aspects are used primarily for tagging. You can use a minimal schema: + ```json + { + "type": "record", + "recordFields": [ + { + "name": "description", + "type": "string", + "index": 1, + "constraints": { "required": false } + } + ] + } + ``` + + See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for more details. + ## Metadata Extracted The connector maps AWS Glue objects to Dataplex entries as follows: @@ -123,17 +210,7 @@ The connector also parses AWS Glue Job scripts (Python/Scala) to extract lineage *** -## Resources Required - -To run this connector and import metadata, you need the following resources: -1. **GCP Project**: To host the execution and Dataplex Metastore. -2. **Secret Manager Secret**: To store AWS Credentials securely. -3. **GCS Bucket**: To store the intermediate JSONL output file. -4. **Dataplex Entry Group**: The destination for the imported metadata. -5. **Dataplex Aspect Types & Entry Types**: (Optional) Custom types if you want rich UI rendering, though standard types are used for schema. - -*** ## AWS Credentials diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh new file mode 100644 index 00000000..1de183fa --- /dev/null +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh @@ -0,0 +1,114 @@ +#!/bin/bash +set -e + +# Configuration +# Replace these values with your actual project and location +PROJECT_ID="${PROJECT_ID:-YOUR_PROJECT_ID}" +LOCATION="${LOCATION:-us-central1}" +ENTRY_GROUP_ID="${ENTRY_GROUP_ID:-aws-glue-entries}" + +echo "Using Project: $PROJECT_ID" +echo "Using Location: $LOCATION" +echo "Target Entry Group: $ENTRY_GROUP_ID" + +# 1. Create Entry Group +echo "----------------------------------------------------------------" +echo "Creating Entry Group: $ENTRY_GROUP_ID..." +gcloud dataplex entry-groups create "$ENTRY_GROUP_ID" \ + --project="$PROJECT_ID" \ + --location="$LOCATION" \ + --description="Entry group for AWS Glue metadata" || echo "Entry Group might already exist." + +# 2. Create Entry Types +ENTRY_TYPES=("aws-glue-database" "aws-glue-table" "aws-glue-view") + +for TYPE in "${ENTRY_TYPES[@]}"; do + echo "----------------------------------------------------------------" + echo "Creating Entry Type: $TYPE..." + gcloud dataplex entry-types create "$TYPE" \ + --project="$PROJECT_ID" \ + --location="$LOCATION" \ + --description="Entry type for $TYPE" || echo "Entry Type $TYPE might already exist." +done + +# 3. Create Aspect Types +echo "----------------------------------------------------------------" +echo "Creating Aspect Types..." + +# 3a. Marker Aspect Types (Database, Table, View) +# We define a minimal schema for these markers since they are primarily used for identification. +# Marker schema definition moved into loop below + +MARKER_ASPECTS=("aws-glue-database" "aws-glue-table" "aws-glue-view") + +for ASPECT in "${MARKER_ASPECTS[@]}"; do + echo "Creating Aspect Type: $ASPECT..." + cat > "${ASPECT}.yaml" < target) +cat > lineage_aspect.yaml < Date: Wed, 4 Feb 2026 12:26:55 +0530 Subject: [PATCH 11/11] adding dataplex entry changes and updating readme --- .../aws-glue-connector/README.md | 231 ++++++++---------- .../scripts/setup_dataplex_resources.sh | 49 ++-- 2 files changed, 137 insertions(+), 143 deletions(-) diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md index a01fcaf4..b5be2fac 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/README.md @@ -12,20 +12,115 @@ Before using this connector, you need to have the following set up: 1. **AWS Credentials**: You will need an AWS access key ID and a secret access key with permissions to access AWS Glue. 2. **Google Cloud Project**: A Google Cloud project is required to run the script and store the output. -3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. The secret payload must be a **JSON object** with the following format: +3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. +4. **Python 3** and **pip** installed. + +*** + +## AWS Credentials Setup + +This connector requires an IAM User with `GlueConsoleFullAccess` (or read-only equivalent) and `S3ReadOnly` (to download job scripts for lineage). + +1. Create an IAM User in AWS Console. +2. Attach policies: `AWSGlueConsoleFullAccess`, `AmazonS3ReadOnlyAccess`. +3. Generate an **Access Key ID** and **Secret Access Key**. +4. Store these in GCP Secret Manager as a **JSON object**: ```json { "access_key_id": "YOUR_AWS_ACCESS_KEY_ID", "secret_access_key": "YOUR_AWS_SECRET_ACCESS_KEY" } ``` -4. **Python 3** and **pip** installed. + +*** + +## Setup Resources + +To run this connector, you must first create the required Dataplex resources. + +### Required Catalog Objects + +Note: Before importing metadata, the Entry Group and all Entry Types and Aspect Types found in the metadata import file must exist in the target project and location. This connector requires the following Entry Group, Entry Types and Aspect Types: + +| Catalog Object | IDs required by connector | +| :--- | :--- | +| **Entry Group** | Defined in `config.json` as `entry_group_id` | +| **Entry Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view` | +| **Aspect Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view`  `aws-lineage-aspect` | + +See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for instructions on creating Entry Groups, Entry Types, and Aspect Types. + +### Option 1: Automated Setup (Recommended) +Run the provided script to create all resources automatically: + +```bash +# Set your project and location +export PROJECT_ID=your-project-id +export LOCATION=us-central1 +export ENTRY_GROUP_ID=aws-glue-entries + +# Run the setup script +chmod +x scripts/setup_dataplex_resources.sh +./scripts/setup_dataplex_resources.sh +``` + +### Option 2: Manual Setup +If you prefer to create them manually, ensure you define the following: + +**Entry Types:** +* `aws-glue-database` +* `aws-glue-table` +* `aws-glue-view` + +**Aspect Types:** +* `aws-glue-database`, `aws-glue-table`, `aws-glue-view` (Marker Aspects) +* `aws-lineage-aspect` (Schema below) + +
+Click to see Schema for aws-lineage-aspect + +```json +{ + "type": "record", + "recordFields": [ + { + "name": "links", + "type": "array", + "index": 1, + "arrayItems": { + "type": "record", + "recordFields": [ + { + "name": "source", + "type": "record", + "index": 1, + "recordFields": [ + { "name": "fully_qualified_name", "type": "string", "index": 1 } + ] + }, + { + "name": "target", + "type": "record", + "index": 2, + "recordFields": [ + { "name": "fully_qualified_name", "type": "string", "index": 1 } + ] + } + ] + } + } + ] +} +``` +
+ +For more details see [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources). *** ## Configuration -The connector is configured using the `config.json` file. Ensure this file is present in the same directory as `main.py`. Here is a description of the parameters: +The connector is configured using the `config.json` file. Ensure this file is present in the same directory as `main.py`. | Parameter | Description | | :--- | :--- | @@ -82,7 +177,7 @@ Once the metadata file has been generated, you can import it into Dataplex using * ``: The Dataplex Entry Group ID. 2. **Run the Import Command:** - Use the following `curl` command to initiate the import. Make sure to replace `{project-id}`, `{location}`, and `{job-id}` in the URL with your actual project ID, location, and a unique job ID. + Use `curl` to initiate the import. Replace `{project-id}`, `{location}`, and `{job-id}` in the URL. ```bash curl -X POST \ @@ -92,140 +187,26 @@ Once the metadata file has been generated, you can import it into Dataplex using "https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}" ``` -## Setup Resources - -### Required Catalog Objects - -Note before importing metadata, the Entry Group and all Entry Types and Aspect Types found in the metadata import file must exist in the target project and location. This connector requires the following Entry Group, Entry Types and Aspect Types: - -| Catalog Object | IDs required by connector | -| :--- | :--- | -| **Entry Group** | Defined in `config.json` as `entry_group_id` | -| **Entry Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view` | -| **Aspect Types** | `aws-glue-database`  `aws-glue-table`  `aws-glue-view`  `aws-lineage-aspect` | - -See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for instructions on creating Entry Groups, Entry Types, and Aspect Types. - -### Automated Setup -To run this connector, you must first create the required Dataplex resources. Run the provided script to create all resources automatically: - - ```bash - # Set your project and location - export PROJECT_ID=your-project-id - export LOCATION=us-central1 - export ENTRY_GROUP_ID=aws-glue-entries - - # Run the setup script - chmod +x scripts/setup_dataplex_resources.sh - ./scripts/setup_dataplex_resources.sh - ``` - - ### Manual Setup & Schema Definitions - - If you prefer to create them manually, ensure you define the following: - - #### Entry Types - * `aws-glue-database` - * `aws-glue-table` - * `aws-glue-view` - - #### Aspect Types - - **1. `aws-lineage-aspect`** - Used to store lineage relationships. - - * **JSON Schema**: - ```json - { - "type": "record", - "recordFields": [ - { - "name": "links", - "type": "array", - "index": 1, - "arrayItems": { - "type": "record", - "recordFields": [ - { - "name": "source", - "type": "record", - "index": 1, - "recordFields": [ - { "name": "fully_qualified_name", "type": "string", "index": 1 } - ] - }, - { - "name": "target", - "type": "record", - "index": 2, - "recordFields": [ - { "name": "fully_qualified_name", "type": "string", "index": 1 } - ] - } - ] - } - } - ] - } - ``` - - **2. Marker Aspects** - * `aws-glue-database` - * `aws-glue-table` - * `aws-glue-view` - - These aspects are used primarily for tagging. You can use a minimal schema: - ```json - { - "type": "record", - "recordFields": [ - { - "name": "description", - "type": "string", - "index": 1, - "constraints": { "required": false } - } - ] - } - ``` - - See [manage entries and create custom sources](https://cloud.google.com/dataplex/docs/ingest-custom-sources) for more details. +*** ## Metadata Extracted The connector maps AWS Glue objects to Dataplex entries as follows: -| AWS Glue Object | Dataplex Entry Type | Schema Mapping | -| :--- | :--- | :--- | -| **Database** | `aws-glue-database` | N/A | -| **Table** | `aws-glue-table` | `int/bigint` -> `NUMBER`, `string` -> `STRING`, `array/struct` -> `BYTES` | -| **View** | `aws-glue-view` | Parsed SQL used to generate Lineage from source tables | -| **Partition Keys** | N/A | Included as columns in the `schema` aspect | +| AWS Glue Object | Dataplex Entry Type | +| :--- | :--- | +| **Database** | `aws-glue-database` | +| **Table** | `aws-glue-table` | +| **View** | `aws-glue-view` | ### Lineage -The connector also parses AWS Glue Job scripts (Python/Scala) to extract lineage: +The connector parses AWS Glue Job scripts (Python/Scala) to extract lineage: - **Source**: `DataSource` nodes in Glue Job graph. - **Target**: `DataSink` nodes in Glue Job graph. - **Result**: Lineage is visualized in Dataplex from Source Table -> Target Table. *** - - -## AWS Credentials - -This connector requires an IAM User with `GlueConsoleFullAccess` (or read-only equivalent) and `S3ReadOnly` (to download job scripts for lineage). - -1. Create an IAM User in AWS Console. -2. Attach policies: `AWSGlueConsoleFullAccess`, `AmazonS3ReadOnlyAccess`. -3. Generate an **Access Key ID** and **Secret Access Key**. -4. Store these in GCP Secret Manager as a JSON object: - ```json - {"access_key_id": "...", "secret_access_key": "..."} - ``` - -*** - ## Docker Setup You can containerize this connector to run on Cloud Run, Dataproc, or Kubernetes. diff --git a/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh index 1de183fa..857e9fcf 100644 --- a/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh +++ b/managed-connectivity/community-contributed-connectors/aws-glue-connector/scripts/setup_dataplex_resources.sh @@ -19,26 +19,11 @@ gcloud dataplex entry-groups create "$ENTRY_GROUP_ID" \ --location="$LOCATION" \ --description="Entry group for AWS Glue metadata" || echo "Entry Group might already exist." -# 2. Create Entry Types -ENTRY_TYPES=("aws-glue-database" "aws-glue-table" "aws-glue-view") - -for TYPE in "${ENTRY_TYPES[@]}"; do - echo "----------------------------------------------------------------" - echo "Creating Entry Type: $TYPE..." - gcloud dataplex entry-types create "$TYPE" \ - --project="$PROJECT_ID" \ - --location="$LOCATION" \ - --description="Entry type for $TYPE" || echo "Entry Type $TYPE might already exist." -done - -# 3. Create Aspect Types +# 2. Create Aspect Types echo "----------------------------------------------------------------" echo "Creating Aspect Types..." -# 3a. Marker Aspect Types (Database, Table, View) -# We define a minimal schema for these markers since they are primarily used for identification. -# Marker schema definition moved into loop below - +# 2a. Marker Aspect Types (Database, Table, View) MARKER_ASPECTS=("aws-glue-database" "aws-glue-table" "aws-glue-view") for ASPECT in "${MARKER_ASPECTS[@]}"; do @@ -62,7 +47,7 @@ EOF rm "${ASPECT}.yaml" done -# 3b. Lineage Aspect Type +# 2b. Lineage Aspect Type # Defines the schema for lineage links (source -> target) cat > lineage_aspect.yaml <