Skip to content

Commit

Permalink
Merge pull request #381 from treasure-data/stp-335-account-reporting
Browse files Browse the repository at this point in the history
Stp 335 account reporting
  • Loading branch information
jangleakash authored Dec 7, 2023
2 parents 3485de9 + a1a5b60 commit 8e96b6e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
26 changes: 26 additions & 0 deletions scenarios/account_reporting/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Workflow: Scenario (Account Reporting)

This scenario shows how you can ingest account data from Treasure Data API's for account reporting.

# How to Run

1. Update the values in `config.yml`:
- The API base urls for your region can be found here: https://api-docs.treasuredata.com/en/overview/aboutendpoints/#treasure-data-api-baseurls
- `target.database` - Sets the database the data will be ingested to. This database must exist prior to running the workflow.
- `target.tables` - Sets the table names for each report.

2. Upload the workflow with TD CLI.
```
$ td wf push account_reporting
```
3. Set the Treasure Data API Key as a workflow secret using the `td wf secrets` command.
```
# Set Treasure Data API Key workflow secret
$ td wf secrets --project account_reporting --set td.apikey=<treasuredata_master_api_key>
```
Finally, you can trigger the session manually.
```
# Run
$ td wf start account_reporting account_reporting --session now
```
If you have any questions, contact to support@treasuredata.com.
16 changes: 16 additions & 0 deletions scenarios/account_reporting/account_reporting.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
_export:
!include : config.yml

+get_activations:
if>: ${reports_to_run.activations_list}
_do:
+get_activations:
py>: activations.get_list
destination_db: ${target.database}
destination_tbl: ${target.tables.activations_list}
_env:
TD_API_KEY: ${secret:td.apikey}
TD_API_BASEURL: ${td_api_baseurl}
CDP_API_BASEURL: ${cdp_api_baseurl}
docker:
image: "digdag/digdag-python:3.9"
67 changes: 67 additions & 0 deletions scenarios/account_reporting/activations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import pytd
import requests
import pandas as pd
import json

def get_list(destination_db, destination_tbl):
# get all audiences
audience_list = get_audience_list()
print(f'{len(audience_list)} audiences found')

activations_df = pd.DataFrame()

# get activations list for each audience
for audience in audience_list:
try:
activations = get_activations_by_audience(audience['id'])

if len(activations) > 0:
activations_df = activations_df.append(activations,ignore_index=True)
activations_df['audience'] = audience['name']
except:
print('Error retrieving activation details for audience ' + audience['id'])

# clean activations data
activations_df.rename(columns={'id':'activationid'}, inplace=True)
activations_df['lastsessionid'] = activations_df["executions"].apply(get_last_execution_detail, args = ('workflowSessionId',))
activations_df['lastsessiondate'] = activations_df["executions"].apply(get_last_execution_detail, args = ('createdAt',))
activations_df['lastsessionstatus'] = activations_df["executions"].apply(get_last_execution_detail, args = ('status',))
activations_df['createduser'] = activations_df['createdBy'].apply(get_name_from_user)
activations_df['updateduser'] = activations_df['updatedBy'].apply(get_name_from_user)
activations_df.drop(['executions','createdBy','updatedBy'],axis=1,inplace=True)

if activations_df.empty:
print('No activations found on account')
else:
# write activations to db
apikey = os.environ["TD_API_KEY"]
td_api_baseurl = os.environ["TD_API_BASEURL"]
client = pytd.Client(endpoint=td_api_baseurl,apikey=apikey,database=destination_db,default_engine='presto')
client.load_table_from_dataframe(activations_df,destination=destination_tbl,writer='bulk_import',if_exists='overwrite')

def get_activations_by_audience(audience_id):
return cdp_get(f"/audiences/{audience_id}/syndications")

def get_audience_list():
return cdp_get("/master_segments")

def cdp_get(endpoint):
apikey = os.environ["TD_API_KEY"]
headers = {'Authorization': 'TD1 ' + apikey}

cdp_api_baseurl = os.environ["CDP_API_BASEURL"]
request_url = cdp_api_baseurl + endpoint

response = requests.get(url = request_url, headers = headers)
return response.json()

def get_last_execution_detail(executions, field):
if len(executions) == 0:
return ''
return executions[0][field]

def get_name_from_user(user):
if len(user) == 0:
return 'n/a'
return user['name']
10 changes: 10 additions & 0 deletions scenarios/account_reporting/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
td_api_baseurl: https://api.treasuredata.com
cdp_api_baseurl: https://api-cdp.treasuredata.com

target:
database: account_reporting
tables:
activations_list: activations

reports_to_run:
activations_list: true

0 comments on commit 8e96b6e

Please sign in to comment.