Skip to content

Commit

Permalink
add activations list to data
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuu Ohmura committed Jan 29, 2024
1 parent 547663d commit eebceaf
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
2 changes: 2 additions & 0 deletions scenarios/monitoring/cdp_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ td:
entities_history: entities_history
journey_statistics: journey_statistics
journey_statistics_history: journey_statistics_history
activations: activations
activations_history: activations_history
api_endpoint: api.treasuredata.com
cdp_api_endpoint: api-cdp.treasuredata.com

25 changes: 25 additions & 0 deletions scenarios/monitoring/cdp_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_activations:
+append_activations_history:
td>:
query: select * from ${td.tables.activations}
insert_into: ${td.tables.activations_history}

+get_current_parent_segment_list:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}

+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_journey_statistics:
+append_journey_statistics_history:
td>:
Expand Down
20 changes: 20 additions & 0 deletions scenarios/monitoring/cdp_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_activations:
+get_current_parent_segment_list:
td>:
query: select ARRAY_AGG(id) as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}
+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}


+initial_ingest_journey_statistics:
+get_current_journey_list:
td>:
Expand Down
45 changes: 45 additions & 0 deletions scenarios/monitoring/cdp_monitoring/scripts/ingest_activation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_activations_per_audience(base_url, headers, id):
url = base_url % id
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
activations = res.json()
for a in activations:
a['ps_id'] = id
return activations

def get_all_activations(base_url, headers, id_list):
l = []
for i in id_list:
l.extend(get_activations_per_audience(base_url=base_url, headers=headers, id=i))
return l

def insert_activations(import_unixtime, endpoint, apikey, dest_db, dest_table, activations):
df = pd.DataFrame(activations)
df['time'] = int(import_unixtime)
df['columns'] = df['columns'].apply(json.dumps)
df['connectorConfig'] = df['connectorConfig'].apply(json.dumps)
df['createdBy'] = df['createdBy'].apply(json.dumps)
df['updatedBy'] = df['updatedBy'].apply(json.dumps)
df['executions'] = df['executions'].apply(json.dumps)
df['notifyOn'] = df['notifyOn'].apply(json.dumps)
df['emailRecipients'] = df['emailRecipients'].apply(json.dumps)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
id_list = ids.split(',')
if len(id_list) == 0:
print('no parent id')
return
cdp_url = 'https://%s/audiences' % cdp_api_endpoint + '/%s/syndications'
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_activations(cdp_url, headers, id_list)
insert_activations(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)

0 comments on commit eebceaf

Please sign in to comment.