Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add some data as monitoring target object #390

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions scenarios/monitoring/cdp_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ td:
entities_history: entities_history
journey_statistics: journey_statistics
journey_statistics_history: journey_statistics_history
journey_summary: journey_summary
journey_summary_history: journey_summary_history
journey_activation: journey_activation
journey_activation_history: journey_activation_history
activations: activations
activations_history: activations_history
api_endpoint: api.treasuredata.com
cdp_api_endpoint: api-cdp.treasuredata.com

84 changes: 81 additions & 3 deletions scenarios/monitoring/cdp_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ schedule:
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

Expand All @@ -44,7 +44,32 @@ schedule:
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_activations:
+append_activations_history:
td>:
query: select * from ${td.tables.activations}
insert_into: ${td.tables.activations_history}

+get_current_parent_segment_list:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}

+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

Expand All @@ -70,6 +95,59 @@ schedule:
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}


+incremental_ingest_journey_summary:
+append_journey_summary_history:
td>:
query: select * from ${td.tables.journey_summary}
insert_into: ${td.tables.journey_summary_history}
database: ${td.database}

+get_current_journey_list:
td>:
query: select ARRAY_JOIN(array_agg(id), ',') as ids from ${td.tables.entities} where type = 'journey'
store_last_results: true
database: ${td.database}

+ingest_journey_summary:
py>: scripts.ingest_journey_summary.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.journey_summary}
journey_ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_journey_activation:
+append_journey_activation_history:
td>:
query: select * from ${td.tables.journey_activation}
insert_into: ${td.tables.journey_activation_history}
database: ${td.database}

+get_current_journey_list:
td>:
query: select ARRAY_JOIN(array_agg(id), ',') as ids from ${td.tables.entities} where type = 'journey'
store_last_results: true
database: ${td.database}

+ingest_journey_activation:
py>: scripts.ingest_journey_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.journey_activation}
journey_ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}
60 changes: 60 additions & 0 deletions scenarios/monitoring/cdp_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_activations:
+get_current_parent_segment_list:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}
+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}


+initial_ingest_journey_statistics:
+get_current_journey_list:
td>:
Expand All @@ -55,6 +75,46 @@ _export:
journey_ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_journey_summary:
+get_current_journey_list:
td>:
query: select ARRAY_JOIN(array_agg(id), ',') as ids from ${td.tables.entities} where type = 'journey'
store_last_results: true
database: ${td.database}

+ingest_journey_summary:
py>: scripts.ingest_journey_summary.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.journey_summary}
journey_ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_journey_activation:
+get_current_journey_list:
td>:
query: select ARRAY_JOIN(array_agg(id), ',') as ids from ${td.tables.entities} where type = 'journey'
store_last_results: true
database: ${td.database}

+ingest_journey_activation:
py>: scripts.ingest_journey_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.journey_activation}
journey_ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_activations_per_audience(base_url, headers, id):
url = base_url % id
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
activations = res.json()
for a in activations:
a['ps_id'] = id
return activations

def get_all_activations(base_url, headers, id_list):
l = []
for i in id_list:
l.extend(get_activations_per_audience(base_url=base_url, headers=headers, id=i))
return l

def insert_activations(import_unixtime, endpoint, apikey, dest_db, dest_table, activations):
df = pd.DataFrame(activations)
df['time'] = int(import_unixtime)
df['columns'] = df['columns'].apply(json.dumps)
df['connectorConfig'] = df['connectorConfig'].apply(json.dumps)
df['createdBy'] = df['createdBy'].apply(json.dumps)
df['updatedBy'] = df['updatedBy'].apply(json.dumps)
df['executions'] = df['executions'].apply(json.dumps)
df['notifyOn'] = df['notifyOn'].apply(json.dumps)
df['emailRecipients'] = df['emailRecipients'].apply(json.dumps)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
id_list = ids.split(',')
if len(id_list) == 0:
print('no parent id')
return
cdp_url = 'https://%s/audiences' % cdp_api_endpoint + '/%s/syndications'
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_activations(cdp_url, headers, id_list)
insert_activations(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_journey_activation(base_url, headers, id):
url = base_url + '/entities/journeys/' + str(id) + '/activations'
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
data = res.json()['data']
if data == None or len(data) == 0:
return None

for d in data:
d['journey_id'] = id

return data

def get_all_journey_activation(base_url, headers, ids):
l = []
for i in ids:
d = get_journey_activation(base_url, headers, i)
if d != None:
l.extend(d)
return l

def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
print('ingest journey activation')
if len(journey_ids) == 0:
print('no jouney id')
return
id_list = journey_ids.split(',')
if len(id_list) == 0:
print('no jouney id')
return
print('count of target jouney: ' + str(len(id_list)))
base_url = 'https://%s' % cdp_api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_journey_activation(base_url, headers, id_list)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['time'] = int(session_unixtime)
df['attributes'] = df['attributes'].apply(json.dumps)
df['relationships'] = df['relationships'].apply(json.dumps)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_journey_summary(base_url, headers, id):
url = base_url + '/entities/journeys/' + str(id)
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
data = res.json()['data']
if data == None or len(data) == 0:
return None

for k in data:
if type(data[k]) is dict:
data[k] = json.dumps(data[k])
data['journey_id'] = id

return data

def get_all_journey_summary(base_url, headers, ids):
l = []
for i in ids:
d = get_journey_summary(base_url, headers, i)
if d != None:
l.append(d)
return l

def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
print('ingest journey summary')
if len(journey_ids) == 0:
print('no jouney id')
return
id_list = journey_ids.split(',')
if len(id_list) == 0:
print('no jouney id')
return
print('count of target jouney: ' + str(len(id_list)))
base_url = 'https://%s' % cdp_api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_journey_summary(base_url, headers, id_list)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['time'] = int(session_unixtime)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ td:
sessions: sessions
attempts: attempts
tasks: tasks
revisions: revisions
api_endpoint: api.treasuredata.com
workflow_endpoint: api-workflow.treasuredata.com
14 changes: 14 additions & 0 deletions scenarios/monitoring/workflow_monitoring/manual_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ _export:
attempt_ids:
- 605508773
- 605506079
project_ids:
- 627610
- 686558

+manual_ingest_attempt_task:
py>: scripts.ingest_task.run
Expand All @@ -14,3 +17,14 @@ _export:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+manual_ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${project_ids}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}
Loading
Loading