Skip to content

add tier2 query to allthethings pipeline #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 93 additions & 79 deletions coldfront/plugins/fasrc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,69 @@ def produce_query_statement(self, vol_type, volumes=None):

query_dict = {
'quota': {
'match': '[r:HasQuota]-(e:Quota) MATCH (d:ConfigValue {Name: \'Quota.Invocation\'})',
'validation_query': 'NOT ((e.SizeGB IS null) OR (e.usedBytes = 0 AND e.SizeGB = 1024)) AND NOT (e.Path IS null)',
'relation': 'HasQuota',
'match': "(e:Quota) MATCH (d:ConfigValue {Name: 'Quota.Invocation'})",
'server': 'filesystem',
'validation_query':
"NOT ((e.SizeGB IS null) OR (e.usedBytes = 0 AND e.SizeGB = 1024)) \
AND (datetime() - duration('P31D') <= datetime(r.DotsLFSUpdateDate)) \
AND NOT (e.Path IS null)",
'r_updated': 'DotsLFSUpdateDate',
'storage_type': '\'Quota\'',
'storage_type': 'Quota',
'usedgb': 'usedGB',
'sizebytes': 'limitBytes',
'usedbytes': 'usedBytes',
'fs_path': 'Path',
'server': 'filesystem',
'server_replace': '/n/',
'path_replace': '/n//',
'unique':'datetime(e.DotsLFSUpdateDate) as begin_date'
},
'isilon': {
'match': '[r:Owns]-(e:IsilonPath) MATCH (d:ConfigValue {Name: \'IsilonPath.Invocation\'})',
'validation_query':"r.DotsUpdateDate = d.DotsUpdateDate \
AND NOT (e.Path =~ '.*/rc_admin/.*')\
AND (e.Path =~ '.*labs.*')\
AND NOT (e.SizeGB = 0)",
'r_updated': 'DotsUpdateDate',
'storage_type':'\'Isilon\'',
'relation': 'Owns',
'match': "(e:IsilonPath) MATCH (d:ConfigValue {Name: 'IsilonPath.Invocation'})",
'server': 'Isilon',
'validation_query': "r.DotsUpdateDate = d.DotsUpdateDate \
AND NOT (e.Path =~ '.*/rc_admin/.*')\
AND (e.Path =~ '.*labs.*')\
AND (datetime() - duration('P31D') <= datetime(r.DotsUpdateDate)) \
AND NOT (e.SizeGB = 0)",
'fs_path':'Path',
'server':'Isilon',
'r_updated': 'DotsUpdateDate',
'storage_type': 'Isilon',
'usedgb': 'UsedGB',
'sizebytes': 'SizeBytes',
'usedbytes': 'UsedBytes',
'server_replace': '01.rc.fas.harvard.edu',
'path_replace': '/ifs/',
'unique':'datetime(e.DotsUpdateDate) as begin_date'
}
'unique': 'datetime(e.DotsUpdateDate) as begin_date'
},
'volume': {
'relation': 'Owns',
'match': '(e:Volume)',
'server': 'Hostname',
'validation_query': 'NOT (e.SizeGB = 0)',
'r_updated': 'DotsLVSUpdateDate',
'storage_type': 'Volume',
'fs_path': 'LogicalVolume',
'path_replace': '/dev/data/',
'usedgb': 'UsedGB',
'sizebytes': 'SizeGB * 1073741824',
'usedbytes': 'UsedGB * 1073741824',
'server_replace': '.rc.fas.harvard.edu',
'unique': 'datetime(e.DotsLVSUpdateDate) as update_date, \
datetime(e.DotsLVDisplayUpdateDate) as display_date'
},
}
d = query_dict[vol_type]

if volumes:
volumes = '|'.join(volumes)
else:
volumes = '|'.join([r.name.split('/')[0] for r in Resource.objects.all()])
where = f"(e.{d['server']} =~ \'.*({volumes}).*\')"

if not volumes:
volumes = [r.name.split('/')[0] for r in Resource.objects.filter(resource_type__name='Storage')]
volumes = '|'.join(volumes)
where = f"(e.{d['server']} =~ '.*({volumes}).*')"
statement = {
'statement': f"MATCH p=(g:Group)-{d['match']} \
'statement': f"MATCH p=(g:Group)-[r:{d['relation']}]-{d['match']} \
WHERE {where} AND {d['validation_query']}\
AND NOT (g.ADSamAccountName =~ '.*(disabled|rc_admin).*')\
AND (datetime() - duration('P31D') <= datetime(r.{d['r_updated']})) \
RETURN \
{d['unique']}, \
g.ADSamAccountName as lab,\
Expand All @@ -79,7 +98,7 @@ def produce_query_statement(self, vol_type, volumes=None):
e.{d['usedbytes']} as byte_usage,\
(e.{d['usedgb']} / 1024.0) as tb_usage,\
replace(e.{d['fs_path']}, '{d['path_replace']}', '') as fs_path, \
{d['storage_type']} as storage_type, \
'{d['storage_type']}' as storage_type, \
datetime(r.{d['r_updated']}) as rel_updated, \
replace(e.{d['server']}, '{d['server_replace']}', '') as server"
}
Expand All @@ -89,28 +108,32 @@ def produce_query_statement(self, vol_type, volumes=None):
class QuotaDataPuller:
"""pull and standardize quota data"""

def pull(self, format):
standardizer = self.get_standardizer(format)
def __init__(self, volumes=None):
self.volumes = volumes

def pull(self, standard):
standardizer = self.get_standardizer(standard)
return standardizer()

def get_standardizer(self, format):
if format == 'ATTQuery':
def get_standardizer(self, standard):
if standard == 'ATTQuery':
return self._standardize_attquery
if format == 'NESEfile':
if standard == 'NESEfile':
return self._standardize_nesefile
raise ValueError(format)
raise ValueError(standard)

def _standardize_attquery(self):
attconn = AllTheThingsConn()
attconn = AllTheThingsConn(volumes=self.volumes)
resp_json = attconn.pull_quota_data()
return attconn.format_query_results(resp_json)

def _standardize_nesefile(self):
datafile = 'nese_data/pools'
header_file = 'nese_data/pools.header'
translator = dict((
kv.split('=') for kv in (l.strip('\n') for l in open('nese_data/groupkey'))
))
with open('nese_data/groupkey') as groupkey_file:
translator = dict((
kv.split('=') for kv in (l.strip('\n') for l in groupkey_file)
))
headers_df = pd.read_csv(header_file, header=0, delim_whitespace=True)
headers = headers_df.columns.values.tolist()
data = pd.read_csv(datafile, names=headers, delim_whitespace=True)
Expand All @@ -133,50 +156,44 @@ def _standardize_nesefile(self):
return nesedict



class AllTheThingsConn:

def __init__(self):
def __init__(self, volumes=None):
self.url = 'https://allthethings.rc.fas.harvard.edu:7473/db/data/transaction/commit'
self.token = import_from_settings('NEO4JP', '')
self.headers = generate_headers(self.token)
self.volumes = volumes

def post_query(self, query):
resp = requests.post(self.url, headers=self.headers, data=json.dumps(query), verify=False)
resp = requests.post(self.url, headers=self.headers, data=json.dumps(query))
return json.loads(resp.text)

def format_query_results(self, resp_json):
result_dicts = list(resp_json['results'])
return [dict(zip(rdict['columns'],entrydict['row'])) \
for rdict in result_dicts for entrydict in rdict['data'] ]
return [dict(zip(d['columns'], ed['row'])) for d in result_dicts for ed in d['data']]

def stage_user_member_query(self, groupsearch, pi=False):
match_statement = f'MATCH (u:User)-[r:MemberOf|ManagedBy]-(g:Group) \
WHERE (g.ADSamAccountName =~ \'{groupsearch}\')'
return_statement = 'type(r) AS relationship,\
g.ADManaged_By AS group_manager'
match_vars = '(u:User)-[r:MemberOf|ManagedBy]-(g:Group) WHERE'
return_vars = 'type(r) AS relationship, g.ADManaged_By AS group_manager'
if pi:
match_statement = f"MATCH (g:Group) WITH g\
MATCH (u:User)\
WHERE (g.ADSamAccountName =~ \'({groupsearch})\') \
AND u.ADSamAccountName = g.ADManaged_By"
return_statement = 'u.ADParentCanonicalName AS path, \
u.ADDepartment AS department, '
match_vars = '(g:Group) WITH g MATCH (u:User)\
WHERE u.ADSamAccountName = g.ADManaged_By AND'
return_vars = 'u.ADParentCanonicalName AS path, u.ADDepartment AS department'
query = {'statements': [{
'statement': f'{match_statement} \
RETURN \
u.ADgivenName AS first_name, \
u.ADsurname AS last_name, \
u.ADSamAccountName AS user_name, \
u.ADenabled AS user_enabled, \
g.ADSamAccountName AS group_name,\
{return_statement} \
g.ADManaged_By AS group_manager, \
u.ADgidNumber AS user_gid_number, \
u.ADTitle AS title, \
u.ADCompany AS company, \
g.ADgidNumber AS group_gid_number'
}]}
'statement': f"MATCH {match_vars} (g.ADSamAccountName =~ '({groupsearch})')\
RETURN \
u.ADgivenName AS first_name, \
u.ADsurname AS last_name, \
u.ADSamAccountName AS user_name, \
u.ADenabled AS user_enabled, \
g.ADSamAccountName AS group_name,\
{return_vars},\
g.ADManaged_By AS group_manager, \
u.ADgidNumber AS user_gid_number, \
u.ADTitle AS title, \
u.ADCompany AS company, \
g.ADgidNumber AS group_gid_number"
}]}
resp_json = self.post_query(query)
resp_json_formatted = self.format_query_results(resp_json)
return resp_json_formatted
Expand All @@ -188,27 +205,23 @@ def collect_group_membership(self, groupsearch):
resp_json_formatted = self.stage_user_member_query(groupsearch)
return resp_json_formatted


def collect_pi_data(self, grouplist):
"""collect information on pis for a given list of groups
"""
resp_json_formatted = self.stage_user_member_query(grouplist, pi=True)
return resp_json_formatted

def pull_quota_data(self, volumes=None):
def pull_quota_data(self):
"""Produce JSON file of quota data for LFS and Isilon from AlltheThings.
Parameters
----------
volumes : List of volume names to collect. Optional, default None.
"""
logger = logging.getLogger('import_quotas')
query = ATTAllocationQuery()
if volumes:
volumes = '|'.join(volumes)
else:
volumes = '|'.join([r.name.split('/')[0] for r in Resource.objects.all()])
query.produce_query_statement('isilon')
query.produce_query_statement('quota')
query.produce_query_statement('isilon', volumes=self.volumes)
query.produce_query_statement('quota', volumes=self.volumes)
query.produce_query_statement('volume', volumes=self.volumes)
resp_json = self.post_query(query.queries)
logger.debug(resp_json)
return resp_json
Expand All @@ -223,6 +236,7 @@ def matched_dict_processing(allocation, data_dicts, paired_allocs, log_message):
logger.warning('too many matches for allocation %s: %s', allocation, data_dicts)
return paired_allocs


def pair_allocations_data(project, quota_dicts):
"""pair allocations with usage dicts"""
logger = logging.getLogger('import_quotas')
Expand Down Expand Up @@ -253,7 +267,9 @@ def pair_allocations_data(project, quota_dicts):
]
unpaired_dicts = [d for d in unpaired_dicts if d not in paired_allocs.values()]
if unpaired_dicts or unpaired_allocs:
logger.warning("WARNING: unpaired allocation data: %s %s", unpaired_allocs, unpaired_dicts)
logger.warning(
"WARNING: unpaired allocation data: %s %s", unpaired_allocs, unpaired_dicts
)
return paired_allocs


Expand Down Expand Up @@ -311,9 +327,9 @@ def push_quota_data(result_file):
defaults={'value':True}
)
counts['complete'] += 1
except Exception as e:
except Exception as exc:
allocation_name = f"{data_dict['lab']}/{data_dict['server']}"
errored_allocations[allocation_name] = e
errored_allocations[allocation_name] = exc
log_missing('allocation', missing_allocations)
logger.warning('error counts: %s', counts)
logger.warning('errored_allocations:\n%s', errored_allocations)
Expand All @@ -330,10 +346,11 @@ def match_entries_with_projects(result_json):
[result_json.pop(t) for t in missing_proj_titles]
return result_json, proj_models


def pull_push_quota_data(volumes=None):
logger = logging.getLogger('import_quotas')
att_data = QuotaDataPuller().pull('ATTQuery')
nese_data = QuotaDataPuller().pull('NESEfile')
att_data = QuotaDataPuller(volumes=volumes).pull('ATTQuery')
nese_data = QuotaDataPuller(volumes=volumes).pull('NESEfile')
combined_data = att_data + nese_data
resp_json_by_lab = {entry['lab']:[] for entry in combined_data}
[resp_json_by_lab[e['lab']].append(e) for e in combined_data]
Expand All @@ -347,8 +364,5 @@ def pull_push_quota_data(volumes=None):
def generate_headers(token):
"""Generate 'headers' attribute by using the 'token' attribute.
"""
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {token}',
}
headers = {'accept': 'application/json', 'Authorization': f'Bearer {token}'}
return headers