diff --git a/scenarios/account_reporting/README.md b/scenarios/account_reporting/README.md new file mode 100644 index 00000000..9c467ae9 --- /dev/null +++ b/scenarios/account_reporting/README.md @@ -0,0 +1,26 @@ +# Workflow: Scenario (Account Reporting) + +This scenario shows how you can ingest account data from Treasure Data API's for account reporting. + +# How to Run + +1. Update the values in `config.yml`: +- The API base urls for your region can be found here: https://api-docs.treasuredata.com/en/overview/aboutendpoints/#treasure-data-api-baseurls +- `target.database` - Sets the database the data will be ingested to. This database must exist prior to running the workflow. +- `target.tables` - Sets the table names for each report. + +2. Upload the workflow with TD CLI. +``` + $ td wf push account_reporting +``` +3. Set the Treasure Data API Key as a workflow secret using the `td wf secrets` command. +``` + # Set Treasure Data API Key workflow secret + $ td wf secrets --project account_reporting --set td.apikey= +``` +Finally, you can trigger the session manually. +``` + # Run + $ td wf start account_reporting account_reporting --session now +``` +If you have any questions, contact to support@treasuredata.com. diff --git a/scenarios/account_reporting/account_reporting.dig b/scenarios/account_reporting/account_reporting.dig new file mode 100644 index 00000000..9bfdd9e2 --- /dev/null +++ b/scenarios/account_reporting/account_reporting.dig @@ -0,0 +1,16 @@ +_export: + !include : config.yml + ++get_activations: + if>: ${reports_to_run.activations_list} + _do: + +get_activations: + py>: activations.get_list + destination_db: ${target.database} + destination_tbl: ${target.tables.activations_list} + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_BASEURL: ${td_api_baseurl} + CDP_API_BASEURL: ${cdp_api_baseurl} + docker: + image: "digdag/digdag-python:3.9" \ No newline at end of file diff --git a/scenarios/account_reporting/activations.py b/scenarios/account_reporting/activations.py new file mode 100644 index 00000000..78c51714 --- /dev/null +++ b/scenarios/account_reporting/activations.py @@ -0,0 +1,67 @@ +import os +import pytd +import requests +import pandas as pd +import json + +def get_list(destination_db, destination_tbl): + # get all audiences + audience_list = get_audience_list() + print(f'{len(audience_list)} audiences found') + + activations_df = pd.DataFrame() + + # get activations list for each audience + for audience in audience_list: + try: + activations = get_activations_by_audience(audience['id']) + + if len(activations) > 0: + activations_df = activations_df.append(activations,ignore_index=True) + activations_df['audience'] = audience['name'] + except: + print('Error retrieving activation details for audience ' + audience['id']) + + # clean activations data + activations_df.rename(columns={'id':'activationid'}, inplace=True) + activations_df['lastsessionid'] = activations_df["executions"].apply(get_last_execution_detail, args = ('workflowSessionId',)) + activations_df['lastsessiondate'] = activations_df["executions"].apply(get_last_execution_detail, args = ('createdAt',)) + activations_df['lastsessionstatus'] = activations_df["executions"].apply(get_last_execution_detail, args = ('status',)) + activations_df['createduser'] = activations_df['createdBy'].apply(get_name_from_user) + activations_df['updateduser'] = activations_df['updatedBy'].apply(get_name_from_user) + activations_df.drop(['executions','createdBy','updatedBy'],axis=1,inplace=True) + + if activations_df.empty: + print('No activations found on account') + else: + # write activations to db + apikey = os.environ["TD_API_KEY"] + td_api_baseurl = os.environ["TD_API_BASEURL"] + client = pytd.Client(endpoint=td_api_baseurl,apikey=apikey,database=destination_db,default_engine='presto') + client.load_table_from_dataframe(activations_df,destination=destination_tbl,writer='bulk_import',if_exists='overwrite') + +def get_activations_by_audience(audience_id): + return cdp_get(f"/audiences/{audience_id}/syndications") + +def get_audience_list(): + return cdp_get("/master_segments") + +def cdp_get(endpoint): + apikey = os.environ["TD_API_KEY"] + headers = {'Authorization': 'TD1 ' + apikey} + + cdp_api_baseurl = os.environ["CDP_API_BASEURL"] + request_url = cdp_api_baseurl + endpoint + + response = requests.get(url = request_url, headers = headers) + return response.json() + +def get_last_execution_detail(executions, field): + if len(executions) == 0: + return '' + return executions[0][field] + +def get_name_from_user(user): + if len(user) == 0: + return 'n/a' + return user['name'] \ No newline at end of file diff --git a/scenarios/account_reporting/config.yml b/scenarios/account_reporting/config.yml new file mode 100644 index 00000000..311557ac --- /dev/null +++ b/scenarios/account_reporting/config.yml @@ -0,0 +1,10 @@ +td_api_baseurl: https://api.treasuredata.com +cdp_api_baseurl: https://api-cdp.treasuredata.com + +target: + database: account_reporting + tables: + activations_list: activations + +reports_to_run: + activations_list: true \ No newline at end of file