Skip to content

Commit

Permalink
Merge pull request #16 from IvanildoBarauna/feature/AddedNewMetrics
Browse files Browse the repository at this point in the history
feature/DataDogNewMetrics
  • Loading branch information
IvanildoBarauna authored Aug 14, 2024
2 parents afc99a1 + 96a7837 commit cac1770
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .idea/data-consumer-pipeline.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/runConfigurations/PipelineDeploy.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions .idea/runConfigurations/PipelineDeployment.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
FROM apache/beam_python3.9_sdk:2.57.0
FROM apache/beam_python3.10_sdk:2.57.0

WORKDIR /app

ENV env=production

# Update pip and install poetry
RUN pip install --upgrade pip \
&& pip install poetry
Expand Down
3 changes: 3 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import logging

from pipeline.launcher import run

if __name__ == "__main__":
if not os.getenv("env"):
os.environ["env"] = "test"
logging.getLogger().setLevel(logging.INFO)
run()
40 changes: 28 additions & 12 deletions pipeline/launcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import apache_beam as beam
from pipeline.config.beam_config import get_options
from pipeline.config.gcp.constants import PUBSUB_SUBSCRIPTION
Expand All @@ -10,29 +11,44 @@
metrics_client = MetricsClient()


def increment_metric(element):
metrics_client.incr(metric_name=f"{APPLICATION_NAME}.extracted", value=1)
return element
class IncrementMetricFn(beam.DoFn):
def process(self, element, action: str):
metrics_client.incr(metric_name=APPLICATION_NAME, action=action, value=1)
# yield element


def increment_metric(element: str, action: str):
metrics_client.incr(metric_name=APPLICATION_NAME, action=action, value=1)

if action == "extracted":
return element


def run():
with beam.Pipeline(options=get_options()) as p:
(
data = (
p
| "ReadTopic" >> beam.io.ReadFromPubSub(subscription=PUBSUB_SUBSCRIPTION)
| "ExtractMetric" >> beam.Map(lambda x: increment_metric(x))
| "ExtractMetric"
>> beam.Map(lambda x: increment_metric(element=x, action="extracted"))
| "Decode" >> beam.Map(lambda x: x.decode("utf-8"))
| "CreateKey" >> beam.Map(set_key, "created_at", 8)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(5))
| "GroupMessage" >> beam.GroupByKey()
| "ExtractMessage" >> beam.Map(lambda x: x[1])
| "ParseToJson" >> beam.Map(set_datetime, "ARRIVAL_DATE", "data")
| "Flatten" >> beam.FlatMap(lambda x: x)
| "WriteToBigQuery"
>> beam.io.WriteToBigQuery(
table=STG.api_data.get_table_name(),
schema=STG.api_data.schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS",
)
)

# Escreve no BigQuery
data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=STG.api_data.get_table_name(),
schema=STG.api_data.schema(),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS",
)

# Incrementa a métrica depois da escrita
data | "IncrementFinalMetric" >> beam.ParDo(
IncrementMetricFn(), "bigquery_writed"
)
9 changes: 7 additions & 2 deletions pipeline/utils/send_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pipeline.utils.instance_validator import Singleton
from pipeline.config.gcp.constants import PROJECT_ID, DD_AGENT_SERVER_SECRET_ID
from google.cloud import secretmanager
import os


class DataDogClient:
Expand All @@ -27,5 +28,9 @@ def __init__(self) -> None:
DataDogClient()
self.initialized = True

def incr(self, metric_name: str, value: int):
statsd.increment(metric=metric_name, value=value)
def incr(self, metric_name: str, action: str, value: int):
statsd.increment(
metric=metric_name,
value=value,
tags=["action:" + action, "env:" + os.getenv("env")],
)
17 changes: 2 additions & 15 deletions pipeline_runner.sh → pipeline_deployment.sh
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
## Using setup.py

#python main.py --project ivanildobarauna \
# --region us-east4 \
# --runner Dataflowrunner \
#--setup_file ./setup.py \
#--staging_location gs://gcp-streaming-pipeline-dataflow/staging \
#--temp_location gs://gcp-streaming-pipeline-dataflow/temp \
#--save_main_session

## Using container

python main.py --project ivanildobarauna \
--region us-east4 \
--runner DataflowRunner \
--staging_location gs://gcp-streaming-pipeline-dataflow/staging \
--temp_location gs://gcp-streaming-pipeline-dataflow/temp \
--sdk_container_image=us-central1-docker.pkg.dev/ivanildobarauna/dataflow-ar/custom-beam/img:latest \
--sdk_location=container \
--worker_machine_type=n1-standard-1 \
--worker_machine_type=e2-custom-1-3072 \
--num_workers=1 \
--max_num_workers=1 \
--autoscaling_algorithm=NONE

--autoscaling_algorithm=NONE
11 changes: 11 additions & 0 deletions pipeline_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
python main.py --project ivanildobarauna \
--region us-east4 \
--runner DirectRunner \
--staging_location gs://gcp-streaming-pipeline-dataflow/staging \
--temp_location gs://gcp-streaming-pipeline-dataflow/temp \
--sdk_container_image=us-central1-docker.pkg.dev/ivanildobarauna/dataflow-ar/custom-beam/img:latest \
--sdk_location=container \
--worker_machine_type=e2-custom-1-3072 \
--num_workers=1 \
--max_num_workers=1 \
--autoscaling_algorithm=NONE

0 comments on commit cac1770

Please sign in to comment.