From d413c20565230c0526dc54d9bad80ec6b4c49bc8 Mon Sep 17 00:00:00 2001 From: o-mura Date: Thu, 11 Jan 2024 14:21:33 +0900 Subject: [PATCH] Dev monitoring (#388) add a function of ingesting Sources data fix data format of multiple columns to json format --- .../monitoring/basic_monitoring/README.md | 2 ++ .../basic_monitoring/common/settings.yaml | 2 ++ .../basic_monitoring/incremental_ingest.dig | 17 +++++++++++ .../basic_monitoring/initial_ingest.dig | 10 +++++++ .../basic_monitoring/scripts/ingest_policy.py | 6 ++-- .../scripts/ingest_sources.py | 30 +++++++++++++++++++ .../initial_ingest_session_attempt.dig | 4 +-- .../scripts/ingest_attempt.py | 7 +++++ .../scripts/ingest_schedule.py | 6 ++++ .../scripts/ingest_session.py | 7 +++++ .../scripts/ingest_task.py | 9 ++++++ .../scripts/ingest_workflow.py | 6 ++++ .../scripts/update_attempt.py | 9 ++++++ .../scripts/update_session.py | 8 +++++ 14 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py diff --git a/scenarios/monitoring/basic_monitoring/README.md b/scenarios/monitoring/basic_monitoring/README.md index d956d593..490d9513 100644 --- a/scenarios/monitoring/basic_monitoring/README.md +++ b/scenarios/monitoring/basic_monitoring/README.md @@ -29,6 +29,8 @@ Second, you register td.apikey as a secret. You should set lower_job_id option (initial_ingest_jobs task of initial_ingest workflow) properly. If you set lower id, initial_ingest workflow may take longer or cause a timeout error. + There is no pagination for listing API (except jobs). So, if you encounter Internal Server Error(500), you need to comment out the target task. + # Relationships of Table and REST API | table | REST API| diff --git a/scenarios/monitoring/basic_monitoring/common/settings.yaml b/scenarios/monitoring/basic_monitoring/common/settings.yaml index d2d293eb..db4584bd 100644 --- a/scenarios/monitoring/basic_monitoring/common/settings.yaml +++ b/scenarios/monitoring/basic_monitoring/common/settings.yaml @@ -6,6 +6,8 @@ td: connections_history: connections_history schedules: schedules schedules_history: schedules_history + sources: sources + sources_history: sources_history users: users users_history: users_history user_assign_policies: user_assign_policies diff --git a/scenarios/monitoring/basic_monitoring/incremental_ingest.dig b/scenarios/monitoring/basic_monitoring/incremental_ingest.dig index 1cc2239e..1a0d202b 100644 --- a/scenarios/monitoring/basic_monitoring/incremental_ingest.dig +++ b/scenarios/monitoring/basic_monitoring/incremental_ingest.dig @@ -37,6 +37,23 @@ schedule: _env: TD_API_KEY: ${secret:td.apikey} ++increment_ingest_source: + +append_sources_history: + td>: + query: select * from ${td.tables.sources} + database: ${td.database} + insert_into: ${td.tables.sources_history} + +ingest_schedule: + py>: scripts.ingest_sources.run + dest_db: ${td.database} + dest_table: ${td.tables.sources} + api_endpoint: ${td.api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + + +increment_ingest_jobs: +check_old_running_jobs: td>: diff --git a/scenarios/monitoring/basic_monitoring/initial_ingest.dig b/scenarios/monitoring/basic_monitoring/initial_ingest.dig index be937f8d..5a84761e 100644 --- a/scenarios/monitoring/basic_monitoring/initial_ingest.dig +++ b/scenarios/monitoring/basic_monitoring/initial_ingest.dig @@ -26,6 +26,16 @@ _export: _env: TD_API_KEY: ${secret:td.apikey} ++ingest_source: + py>: scripts.ingest_sources.run + dest_db: ${td.database} + dest_table: ${td.tables.sources} + api_endpoint: ${td.api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + +initial_inget_jobs: py>: scripts.ingest_job.run session_unixtime: ${session_unixtime} diff --git a/scenarios/monitoring/basic_monitoring/scripts/ingest_policy.py b/scenarios/monitoring/basic_monitoring/scripts/ingest_policy.py index 83018fbb..d3a009db 100644 --- a/scenarios/monitoring/basic_monitoring/scripts/ingest_policy.py +++ b/scenarios/monitoring/basic_monitoring/scripts/ingest_policy.py @@ -22,9 +22,9 @@ def get_all_policy_with_permission(api_endpoint, headers, policyid_list): if res.status_code != requests.codes.ok: res.raise_for_status() permissions = res.json() - permissions['id'] = i - - l.append(permissions) + if len(permissions) != 0: + data = {'id':i , 'permissions': json.dumps(permissions)} + l.append(data) return l def get_all_policy_with_column_permission(api_endpoint, headers, policyid_list): diff --git a/scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py b/scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py new file mode 100644 index 00000000..f8b024f3 --- /dev/null +++ b/scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py @@ -0,0 +1,30 @@ +# https://api-docs.treasuredata.com/en/api/system-api/bulk-loads-api/ + +import requests +import pandas as pd +import pytd +import os +import json + +def convert(s): + return json.dumps(s) + +def get_all_sources(url, headers): + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + return res.json() + +def run(dest_db, dest_table, api_endpoint='api.treasuredata.com'): + url = 'https://%s/v3/bulk_loads/' % api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_sources(url, headers) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['config'] = df['config'].apply(convert) + df['config_diff'] = df['config_diff'].apply(convert) + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') diff --git a/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig b/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig index 024d1124..47aba05d 100644 --- a/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig +++ b/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig @@ -43,7 +43,7 @@ _export: api_endpoint: ${td.api_endpoint} workflow_endpoint: ${td.workflow_endpoint} count: 100 - lower_limit_session_id: '113468608' + lower_limit_session_id: '180059011' if_exists: "overwrite" docker: image: "digdag/digdag-python:3.9" @@ -58,7 +58,7 @@ _export: workflow_endpoint: ${td.workflow_endpoint} count: 100 if_exists: "overwrite" - lower_limit_attempt_id: '592849652' + lower_limit_attempt_id: '1040926253' docker: image: "digdag/digdag-python:3.9" _env: diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_attempt.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_attempt.py index 51f35be9..1e215ca1 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_attempt.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_attempt.py @@ -5,10 +5,14 @@ import pytd import os from datetime import datetime as dt +import json def convert(s): return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp()) +def convert_to_json(s): + return json.dumps(s) + def get_attempt1(url, headers): print(url) res = requests.get(url=url, headers= headers) @@ -41,6 +45,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co df = pd.DataFrame(atmp_list) df['time'] = df['createdAt'].apply(convert) df['id'] = df['id'].astype('int') + df['project'] = df['project'].apply(convert_to_json) + df['workflow'] = df['workflow'].apply(convert_to_json) + df['params'] = df['params'].apply(convert_to_json) df = df[df['id'] > int(lower_limit_attempt_id)] client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack') \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_schedule.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_schedule.py index 5b9b43ce..199c9eac 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_schedule.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_schedule.py @@ -4,6 +4,10 @@ import pandas as pd import pytd import os +import json + +def convert_to_json(s): + return json.dumps(s) def get_schedules1(url, headers): print(url) @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co sches_list = get_all_schedule(workflow_url, headers) df = pd.DataFrame(sches_list) df['time'] = int(session_unixtime) + df['project'] = df['project'].apply(convert_to_json) + df['workflow'] = df['workflow'].apply(convert_to_json) client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_session.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_session.py index 0a8c5283..6fd41415 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_session.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_session.py @@ -5,10 +5,14 @@ import pytd import os from datetime import datetime as dt +import json def convert(session_time): return int(dt.strptime(session_time, '%Y-%m-%dT%H:%M:%S%z').timestamp()) +def convert_to_json(s): + return json.dumps(s) + def get_sessions1(url, headers): print(url) res = requests.get(url=url, headers= headers) @@ -40,6 +44,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co df = pd.DataFrame(ses_list) df['time'] = df['sessionTime'].apply(convert) df['id'] = df['id'].astype('int') + df['project'] = df['project'].apply(convert_to_json) + df['workflow'] = df['workflow'].apply(convert_to_json) + df['lastAttempt'] = df['lastattempt'].apply(convert_to_json) df = df[df['id'] > int(lower_limit_session_id)] client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack') \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_task.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_task.py index ffbe0071..9106416c 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_task.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_task.py @@ -2,6 +2,10 @@ import os import pytd import pandas as pd +import json + +def convert_to_json(s): + return json.dumps(s) def get_task_info(base_url, headers, ids): l = [] @@ -20,6 +24,11 @@ def get_task_info(base_url, headers, ids): def insert_task_info(import_unixtime, endpoint, apikey, dest_db, dest_table, tasks): df = pd.DataFrame(tasks) df['time'] = int(import_unixtime) + df['config'] = df['config'].apply(convert_to_json) + df['upstreams'] = df['upstreams'].apply(convert_to_json) + df['exportparams'] = df['exportparams'].apply(convert_to_json) + df['storeparams'] = df['storeparams'].apply(convert_to_json) + df['error'] = df['error'].apply(convert_to_json) client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack') diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_workflow.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_workflow.py index 2448627a..f23fc9df 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_workflow.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_workflow.py @@ -4,6 +4,10 @@ import pandas as pd import pytd import os +import json + +def convert_to_json(s): + return json.dumps(s) def get_workflows1(url, headers): print(url) @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co wfs_list = get_all_workflow(workflow_url, headers, count) df = pd.DataFrame(wfs_list) df['time'] = session_unixtime + df['project'] = df['project'].apply(convert_to_json) + df['config'] = df['config'].apply(convert_to_json) client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py b/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py index 9accef94..463520f9 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py @@ -4,6 +4,10 @@ import pytd import os from datetime import datetime as dt +import json + +def convert_to_json(s): + return json.dumps(s) def convert(s): return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp()) @@ -31,6 +35,11 @@ def get_attempt_info(base_url, headers, ids): def insert_attempt_info(import_unixtime, endpoint, apikey, dest_db, dest_table, attempts): df = pd.DataFrame(attempts) df['time'] = df['createdAt'].apply(convert) + df['id'] = df['id'].astype('int') + df['project'] = df['project'].apply(convert_to_json) + df['workflow'] = df['workflow'].apply(convert_to_json) + df['params'] = df['params'].apply(convert_to_json) + client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack') diff --git a/scenarios/monitoring/workflow_monitoring/scripts/update_session.py b/scenarios/monitoring/workflow_monitoring/scripts/update_session.py index 132595f8..10023b14 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/update_session.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/update_session.py @@ -3,10 +3,14 @@ import pytd import os from datetime import datetime as dt +import json def convert(s): return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%S%z').timestamp()) +def convert_to_json(s): + return json.dumps(s) + def delete_session_info(endpoint, apikey, dest_db, dest_table, ids): s = '' for i in ids: @@ -30,6 +34,10 @@ def get_session_info(base_url, headers, ids): def insert_session_info(endpoint, apikey, dest_db, dest_table, sessions, import_unix_time): df = pd.DataFrame(sessions) df['time'] = df['sessionTime'].apply(convert) + df['id'] = df['id'].astype('int') + df['project'] = df['project'].apply(convert_to_json) + df['workflow'] = df['workflow'].apply(convert_to_json) + df['lastattempt'] = df['lastattempt'].apply(convert_to_json) client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')