Skip to content

Commit

Permalink
add ingesting parent segment configuration function to monitoring wor…
Browse files Browse the repository at this point in the history
…kflow
  • Loading branch information
Yuu Ohmura committed Jul 10, 2024
1 parent 83bf2e8 commit d086ce7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 1 deletion.
2 changes: 2 additions & 0 deletions scenarios/monitoring/cdp_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ td:
journey_activation_history: journey_activation_history
activations: activations
activations_history: activations_history
parent_segments_configuration: parent_segments_configuration
parent_segments_configuration_history: parent_segments_configuration_history
api_endpoint: api.treasuredata.com
cdp_api_endpoint: api-cdp.treasuredata.com

18 changes: 18 additions & 0 deletions scenarios/monitoring/cdp_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_ps_configuration:
+append_ps_configuration_history:
td>:
query: select * from ${td.tables.parent_segments_configuration}
insert_into: ${td.tables.parent_segments_configuration_history}
database: ${td.database}
+ingest_ps_configuration:
py>: scripts.ingest_ps_configuration.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.parent_segments_configuration}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_entities:
+append_entities_history:
td>:
Expand Down
14 changes: 13 additions & 1 deletion scenarios/monitoring/cdp_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@ _export:
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_ps_configuration:
py>: scripts.ingest_ps_configuration.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.parent_segments_configuration}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import requests
import pandas as pd
import pytd
import os
import json

def convert_to_json(s):
return json.dumps(s)

def get_all_parent_segment_configuration(url, headers):
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()

data = res.json()
for d in data:
for k in d.keys():
if type(d[k]) is dict:
d[k] = json.dumps(d[k])

return data

def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
url = 'https://%s/audiences' % cdp_api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_parent_segment_configuration(url, headers)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['time'] = int(session_unixtime)
df['attributes'] = df['attributes'].apply(convert_to_json)
df['behaviors'] = df['behaviors'].apply(convert_to_json)

client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')

0 comments on commit d086ce7

Please sign in to comment.