forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
table_source.py
99 lines (85 loc) · 3.65 KB
/
table_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional
from databuilder.models.neo4j_csv_serde import Neo4jCsvSerializable, NODE_KEY, \
NODE_LABEL, RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE
from databuilder.models.table_metadata import TableMetadata
class TableSource(Neo4jCsvSerializable):
"""
Hive table source model.
"""
LABEL = 'Source'
KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/_source'
SOURCE_TABLE_RELATION_TYPE = 'SOURCE_OF'
TABLE_SOURCE_RELATION_TYPE = 'SOURCE'
def __init__(self,
db_name: str,
schema: str,
table_name: str,
cluster: str,
source: str,
source_type: str='github',
) -> None:
self.db = db_name
self.schema = schema
self.table = table_name
self.cluster = cluster if cluster else 'gold'
# source is the source file location
self.source = source
self.source_type = source_type
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self) -> Optional[Dict[str, Any]]:
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self) -> Optional[Dict[str, Any]]:
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_source_model_key(self) -> str:
return TableSource.KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table)
def get_metadata_model_key(self) -> str:
return '{db}://{cluster}.{schema}/{table}'.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
table=self.table)
def create_nodes(self) -> List[Dict[str, Any]]:
"""
Create a list of Neo4j node records
:return:
"""
results = [{
NODE_KEY: self.get_source_model_key(),
NODE_LABEL: TableSource.LABEL,
'source': self.source,
'source_type': self.source_type
}]
return results
def create_relation(self) -> List[Dict[str, Any]]:
"""
Create a list of relation map between owner record with original hive table
:return:
"""
results = [{
RELATION_START_KEY: self.get_source_model_key(),
RELATION_START_LABEL: TableSource.LABEL,
RELATION_END_KEY: self.get_metadata_model_key(),
RELATION_END_LABEL: TableMetadata.TABLE_NODE_LABEL,
RELATION_TYPE: TableSource.SOURCE_TABLE_RELATION_TYPE,
RELATION_REVERSE_TYPE: TableSource.TABLE_SOURCE_RELATION_TYPE
}]
return results
def __repr__(self) -> str:
return 'TableSource({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.db,
self.cluster,
self.schema,
self.table,
self.source)