Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev monitoring #388

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scenarios/monitoring/basic_monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Second, you register td.apikey as a secret.
You should set lower_job_id option (initial_ingest_jobs task of initial_ingest workflow) properly.
If you set lower id, initial_ingest workflow may take longer or cause a timeout error.

There is no pagination for listing API (except jobs). So, if you encounter Internal Server Error(500), you need to comment out the target task.

# Relationships of Table and REST API

| table | REST API|
Expand Down
2 changes: 2 additions & 0 deletions scenarios/monitoring/basic_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ td:
connections_history: connections_history
schedules: schedules
schedules_history: schedules_history
sources: sources
sources_history: sources_history
users: users
users_history: users_history
user_assign_policies: user_assign_policies
Expand Down
17 changes: 17 additions & 0 deletions scenarios/monitoring/basic_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+increment_ingest_source:
+append_sources_history:
td>:
query: select * from ${td.tables.sources}
database: ${td.database}
insert_into: ${td.tables.sources_history}
+ingest_schedule:
py>: scripts.ingest_sources.run
dest_db: ${td.database}
dest_table: ${td.tables.sources}
api_endpoint: ${td.api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}


+increment_ingest_jobs:
+check_old_running_jobs:
td>:
Expand Down
10 changes: 10 additions & 0 deletions scenarios/monitoring/basic_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+ingest_source:
py>: scripts.ingest_sources.run
dest_db: ${td.database}
dest_table: ${td.tables.sources}
api_endpoint: ${td.api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_inget_jobs:
py>: scripts.ingest_job.run
session_unixtime: ${session_unixtime}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def get_all_policy_with_permission(api_endpoint, headers, policyid_list):
if res.status_code != requests.codes.ok:
res.raise_for_status()
permissions = res.json()
permissions['id'] = i

l.append(permissions)
if len(permissions) != 0:
data = {'id':i , 'permissions': json.dumps(permissions)}
l.append(data)
return l

def get_all_policy_with_column_permission(api_endpoint, headers, policyid_list):
Expand Down
30 changes: 30 additions & 0 deletions scenarios/monitoring/basic_monitoring/scripts/ingest_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# https://api-docs.treasuredata.com/en/api/system-api/bulk-loads-api/

import requests
import pandas as pd
import pytd
import os
import json

def convert(s):
return json.dumps(s)

def get_all_sources(url, headers):
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
return res.json()

def run(dest_db, dest_table, api_endpoint='api.treasuredata.com'):
url = 'https://%s/v3/bulk_loads/' % api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_sources(url, headers)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['config'] = df['config'].apply(convert)
df['config_diff'] = df['config_diff'].apply(convert)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ _export:
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
count: 100
lower_limit_session_id: '113468608'
lower_limit_session_id: '180059011'
if_exists: "overwrite"
docker:
image: "digdag/digdag-python:3.9"
Expand All @@ -58,7 +58,7 @@ _export:
workflow_endpoint: ${td.workflow_endpoint}
count: 100
if_exists: "overwrite"
lower_limit_attempt_id: '592849652'
lower_limit_attempt_id: '1040926253'
docker:
image: "digdag/digdag-python:3.9"
_env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp())

def convert_to_json(s):
return json.dumps(s)

def get_attempt1(url, headers):
print(url)
res = requests.get(url=url, headers= headers)
Expand Down Expand Up @@ -41,6 +45,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
df = pd.DataFrame(atmp_list)
df['time'] = df['createdAt'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['params'] = df['params'].apply(convert_to_json)
df = df[df['id'] > int(lower_limit_attempt_id)]
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pandas as pd
import pytd
import os
import json

def convert_to_json(s):
return json.dumps(s)

def get_schedules1(url, headers):
print(url)
Expand Down Expand Up @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
sches_list = get_all_schedule(workflow_url, headers)
df = pd.DataFrame(sches_list)
df['time'] = int(session_unixtime)
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(session_time):
return int(dt.strptime(session_time, '%Y-%m-%dT%H:%M:%S%z').timestamp())

def convert_to_json(s):
return json.dumps(s)

def get_sessions1(url, headers):
print(url)
res = requests.get(url=url, headers= headers)
Expand Down Expand Up @@ -40,6 +44,9 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
df = pd.DataFrame(ses_list)
df['time'] = df['sessionTime'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['lastAttempt'] = df['lastattempt'].apply(convert_to_json)
df = df[df['id'] > int(lower_limit_session_id)]
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists=if_exists, fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import os
import pytd
import pandas as pd
import json

def convert_to_json(s):
return json.dumps(s)

def get_task_info(base_url, headers, ids):
l = []
Expand All @@ -20,6 +24,11 @@ def get_task_info(base_url, headers, ids):
def insert_task_info(import_unixtime, endpoint, apikey, dest_db, dest_table, tasks):
df = pd.DataFrame(tasks)
df['time'] = int(import_unixtime)
df['config'] = df['config'].apply(convert_to_json)
df['upstreams'] = df['upstreams'].apply(convert_to_json)
df['exportparams'] = df['exportparams'].apply(convert_to_json)
df['storeparams'] = df['storeparams'].apply(convert_to_json)
df['error'] = df['error'].apply(convert_to_json)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pandas as pd
import pytd
import os
import json

def convert_to_json(s):
return json.dumps(s)

def get_workflows1(url, headers):
print(url)
Expand Down Expand Up @@ -33,5 +37,7 @@ def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.co
wfs_list = get_all_workflow(workflow_url, headers, count)
df = pd.DataFrame(wfs_list)
df['time'] = session_unixtime
df['project'] = df['project'].apply(convert_to_json)
df['config'] = df['config'].apply(convert_to_json)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert_to_json(s):
return json.dumps(s)

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timestamp())
Expand Down Expand Up @@ -31,6 +35,11 @@ def get_attempt_info(base_url, headers, ids):
def insert_attempt_info(import_unixtime, endpoint, apikey, dest_db, dest_table, attempts):
df = pd.DataFrame(attempts)
df['time'] = df['createdAt'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['params'] = df['params'].apply(convert_to_json)

client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import pytd
import os
from datetime import datetime as dt
import json

def convert(s):
return int(dt.strptime(s, '%Y-%m-%dT%H:%M:%S%z').timestamp())

def convert_to_json(s):
return json.dumps(s)

def delete_session_info(endpoint, apikey, dest_db, dest_table, ids):
s = ''
for i in ids:
Expand All @@ -30,6 +34,10 @@ def get_session_info(base_url, headers, ids):
def insert_session_info(endpoint, apikey, dest_db, dest_table, sessions, import_unix_time):
df = pd.DataFrame(sessions)
df['time'] = df['sessionTime'].apply(convert)
df['id'] = df['id'].astype('int')
df['project'] = df['project'].apply(convert_to_json)
df['workflow'] = df['workflow'].apply(convert_to_json)
df['lastattempt'] = df['lastattempt'].apply(convert_to_json)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

Expand Down
Loading