Skip to content

Commit

Permalink
Added extra links for Comprehend operators (apache#46031)
Browse files Browse the repository at this point in the history
* Added extra links for Comprehend operators

* Fixed broken test
  • Loading branch information
ellisms authored Jan 26, 2025
1 parent ff1e3a6 commit f2aa80b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 0 deletions.
41 changes: 41 additions & 0 deletions providers/src/airflow/providers/amazon/aws/links/comprehend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.providers.amazon.aws.links.base_aws import BASE_AWS_CONSOLE_LINK, BaseAwsLink


class ComprehendPiiEntitiesDetectionLink(BaseAwsLink):
"""Helper class for constructing Amazon Comprehend PII Detection console link."""

name = "PII Detection Job"
key = "comprehend_pii_detection"
format_str = (
BASE_AWS_CONSOLE_LINK
+ "/comprehend/home?region={region_name}#"
+ "/analysis-job-details/pii/{job_id}"
)


class ComprehendDocumentClassifierLink(BaseAwsLink):
"""Helper class for constructing Amazon Comprehend Document Classifier console link."""

name = "Document Classifier"
key = "comprehend_document_classifier"
format_str = (
BASE_AWS_CONSOLE_LINK + "/comprehend/home?region={region_name}#" + "classifier-version-details/{arn}"
)
36 changes: 36 additions & 0 deletions providers/src/airflow/providers/amazon/aws/operators/comprehend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook
from airflow.providers.amazon.aws.links.comprehend import (
ComprehendDocumentClassifierLink,
ComprehendPiiEntitiesDetectionLink,
)
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.comprehend import (
ComprehendCreateDocumentClassifierCompletedTrigger,
Expand Down Expand Up @@ -120,6 +124,8 @@ class ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator):
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""

operator_extra_links = (ComprehendPiiEntitiesDetectionLink(),)

def __init__(
self,
input_data_config: dict,
Expand Down Expand Up @@ -166,6 +172,19 @@ def execute(self, context: Context) -> str:
**self.start_pii_entities_kwargs,
)["JobId"]

job_url = ComprehendPiiEntitiesDetectionLink.format_str.format(
aws_domain=ComprehendPiiEntitiesDetectionLink.get_aws_domain(self.hook.conn_partition),
region_name=self.hook.conn_region_name,
job_id=job_id,
)
ComprehendPiiEntitiesDetectionLink.persist(
context=context,
operator=self,
region_name=self.hook.conn_region_name,
aws_partition=self.hook.conn_partition,
job_id=job_id,
)
self.log.info("You can view the PII entities detection job at %s", job_url)
message_description = f"start pii entities detection job {job_id} to complete."
if self.deferrable:
self.log.info("Deferring %s", message_description)
Expand Down Expand Up @@ -238,6 +257,7 @@ class ComprehendCreateDocumentClassifierOperator(AwsBaseOperator[ComprehendHook]
"""

aws_hook_class = ComprehendHook
operator_extra_links = (ComprehendDocumentClassifierLink(),)

template_fields: Sequence[str] = aws_template_fields(
"document_classifier_name",
Expand Down Expand Up @@ -300,6 +320,22 @@ def execute(self, context: Context) -> str:
**self.document_classifier_kwargs,
)["DocumentClassifierArn"]

# create the link to console
job_url = ComprehendDocumentClassifierLink.format_str.format(
aws_domain=ComprehendDocumentClassifierLink.get_aws_domain(self.hook.conn_partition),
region_name=self.hook.conn_region_name,
arn=document_classifier_arn,
)

ComprehendDocumentClassifierLink.persist(
context=context,
operator=self,
region_name=self.hook.conn_region_name,
aws_partition=self.hook.conn_partition,
arn=document_classifier_arn,
)
self.log.info("You can monitor the classifier at %s", job_url)

message_description = f"document classifier {document_classifier_arn} to complete."
if self.deferrable:
self.log.info("Deferring %s", message_description)
Expand Down
3 changes: 3 additions & 0 deletions providers/src/airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ extra-links:
- airflow.providers.amazon.aws.links.sagemaker.SageMakerTransformJobLink
- airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink
- airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink
- airflow.providers.amazon.aws.links.comprehend.ComprehendPiiEntitiesDetectionLink
- airflow.providers.amazon.aws.links.comprehend.ComprehendDocumentClassifierLink


connection-types:
- hook-class-name: airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook
Expand Down
56 changes: 56 additions & 0 deletions providers/tests/amazon/aws/links/test_comprehend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.providers.amazon.aws.links.comprehend import (
ComprehendDocumentClassifierLink,
ComprehendPiiEntitiesDetectionLink,
)

from providers.tests.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase


class TestComprehendPiiEntitiesDetectionLink(BaseAwsLinksTestCase):
link_class = ComprehendPiiEntitiesDetectionLink

def test_extra_link(self):
test_job_id = "123-345-678"
self.assert_extra_link_url(
expected_url=(
f"https://console.aws.amazon.com/comprehend/home?region=eu-west-1#/analysis-job-details/pii/{test_job_id}"
),
region_name="eu-west-1",
aws_partition="aws",
job_id=test_job_id,
)


class TestComprehendDocumentClassifierLink(BaseAwsLinksTestCase):
link_class = ComprehendDocumentClassifierLink

def test_extra_link(self):
test_job_id = (
"arn:aws:comprehend:us-east-1:0123456789:document-classifier/test-custom-document-classifier"
)
self.assert_extra_link_url(
expected_url=(
f"https://console.aws.amazon.com/comprehend/home?region=us-east-1#classifier-version-details/{test_job_id}"
),
region_name="us-east-1",
aws_partition="aws",
arn=test_job_id,
)

0 comments on commit f2aa80b

Please sign in to comment.