Skip to content

Commit

Permalink
Dev monitoring (#388)
Browse files Browse the repository at this point in the history
add a function of ingesting Sources data
fix data format of multiple columns to json format
  • Loading branch information
o-mura authored Jan 11, 2024
1 parent 8bce2e0 commit d413c20
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 5 deletions.
2 changes: 2 additions & 0 deletions scenarios/monitoring/basic_monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Second, you register td.apikey as a secret.
You should set lower_job_id option (initial_ingest_jobs task of initial_ingest workflow) properly.
If you set lower id, initial_ingest workflow may take longer or cause a timeout error.

There is no pagination for listing API (except jobs). So, if you encounter Internal Server Error(500), you need to comment out the target task.

# Relationships of Table and REST API

| table | REST API|
Expand Down
2 changes: 2 additions & 0 deletions scenarios/monitoring/basic_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ td:
connections_history: connections_history
schedules: schedules
schedules_history: schedules_history
sources: sources
sources_history: sources_history
users: users
users_history: users_history
user_assign_policies: user_assign_policies
Expand Down
17 changes: 17 additions & 0 deletions scenarios/monitoring/basic_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+increment_ingest_source:
+append_sources_history:
td>:
query: select * from ${td.tables.sources}
database: ${td.database}
insert_into: ${td.tables.sources_history}
+ingest_schedule:
py>: scripts.ingest_sources.run
dest_db: ${td.database}
dest_table: ${td.tables.sources}
api_endpoint: ${td.api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}


+increment_ingest_jobs:
+check_old_running_jobs:
td>:
Expand Down
10 changes: 10 additions & 0 deletions scenarios/monitoring/basic_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+ingest_source:
py>: scripts.ingest_sources.run
dest_db: ${td.database}
dest_table: ${td.tables.sources}
api_endpoint: ${td.api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_inget_jobs:
py>: scripts.ingest_job.run
session_unixtime: ${session_unixtime}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def get_all_policy_with_permission(api_endpoint, headers, policyid_list):
if res.status_code != requests.codes.ok:
res.raise_for_status()
permissions = res.json()
permissions['id'] = i

l.append(permissions)
if len(permissions) != 0:
data = {'id':i , 'permissions': json.dumps(permissions)}
l.append(data)
return l

def get_all_policy_with_column_permission(api_endpoint, headers, policyid_list):
Expand Down
30 changes: 30 additions & 0 deletions scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# https://api-docs.treasuredata.com/en/api/system-api/bulk-loads-api/

import requests
import pandas as pd
import pytd
import os
import json

def convert(s):
return json.dumps(s)

def get_all_sources(url, headers):
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
return res.json()

def run(dest_db, dest_table, api_endpoint='api.treasuredata.com'):
url = 'https://%s/v3/bulk_loads/' % api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_sources(url, headers)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['config'] = df['config'].apply(convert)
df['config_diff'] = df['config_diff'].apply(convert)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ _export:
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
count: 100
lower_limit_session_id: '113468608'
lower_limit_session_id: '180059011'
if_exists: "overwrite"
docker:
image: "digdag/digdag-python:3.9"
Expand All @@ -58,7 +58,7 @@ _export:
workflow_endpoint: ${td.workflow_endpoint}
count: 100
if_exists: "overwrite"
lower_limit_attempt_id: '592849652'
lower_limit_attempt_id: '1040926253'
docker:
image: "digdag/digdag-python:3.9"
_env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp())

def convert_to_json(s):
return json.dumps(s)

def get_attempt1(url, headers):
print(url)
res = requests.get(url=url, headers= headers)
Expand Down Expand Up @@ -41,6 +45,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
df = pd.DataFrame(atmp_list)
df['time'] = df['createdAt'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['params'] = df['params'].apply(convert_to_json)
df = df[df['id'] > int(lower_limit_attempt_id)]
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pandas as pd
import pytd
import os
import json

def convert_to_json(s):
return json.dumps(s)

def get_schedules1(url, headers):
print(url)
Expand Down Expand Up @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
sches_list = get_all_schedule(workflow_url, headers)
df = pd.DataFrame(sches_list)
df['time'] = int(session_unixtime)
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(session_time):
return int(dt.strptime(session_time, '%Y-%m-%dT%H:%M:%S%z').timestamp())

def convert_to_json(s):
return json.dumps(s)

def get_sessions1(url, headers):
print(url)
res = requests.get(url=url, headers= headers)
Expand Down Expand Up @@ -40,6 +44,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
df = pd.DataFrame(ses_list)
df['time'] = df['sessionTime'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['lastAttempt'] = df['lastattempt'].apply(convert_to_json)
df = df[df['id'] > int(lower_limit_session_id)]
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import os
import pytd
import pandas as pd
import json

def convert_to_json(s):
return json.dumps(s)

def get_task_info(base_url, headers, ids):
l = []
Expand All @@ -20,6 +24,11 @@ def get_task_info(base_url, headers, ids):
def insert_task_info(import_unixtime, endpoint, apikey, dest_db, dest_table, tasks):
df = pd.DataFrame(tasks)
df['time'] = int(import_unixtime)
df['config'] = df['config'].apply(convert_to_json)
df['upstreams'] = df['upstreams'].apply(convert_to_json)
df['exportparams'] = df['exportparams'].apply(convert_to_json)
df['storeparams'] = df['storeparams'].apply(convert_to_json)
df['error'] = df['error'].apply(convert_to_json)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pandas as pd
import pytd
import os
import json

def convert_to_json(s):
return json.dumps(s)

def get_workflows1(url, headers):
print(url)
Expand Down Expand Up @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
wfs_list = get_all_workflow(workflow_url, headers, count)
df = pd.DataFrame(wfs_list)
df['time'] = session_unixtime
df['project'] = df['project'].apply(convert_to_json)
df['config'] = df['config'].apply(convert_to_json)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert_to_json(s):
return json.dumps(s)

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp())
Expand Down Expand Up @@ -31,6 +35,11 @@ def get_attempt_info(base_url, headers, ids):
def insert_attempt_info(import_unixtime, endpoint, apikey, dest_db, dest_table, attempts):
df = pd.DataFrame(attempts)
df['time'] = df['createdAt'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['params'] = df['params'].apply(convert_to_json)

client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%S%z').timestamp())

def convert_to_json(s):
return json.dumps(s)

def delete_session_info(endpoint, apikey, dest_db, dest_table, ids):
s = ''
for i in ids:
Expand All @@ -30,6 +34,10 @@ def get_session_info(base_url, headers, ids):
def insert_session_info(endpoint, apikey, dest_db, dest_table, sessions, import_unix_time):
df = pd.DataFrame(sessions)
df['time'] = df['sessionTime'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['lastattempt'] = df['lastattempt'].apply(convert_to_json)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down

0 comments on commit d413c20

Please sign in to comment.