5
5
6
6
yaml = YAML ()
7
7
8
+ CONFIG_KEY = "config"
9
+
8
10
9
11
def create_glue_connection (conn_dict ):
10
- return GlueExtractor (conn_dict .get ("name" , "glue" ))
12
+ return GlueExtractor (conn_dict .get ("name" , "glue" )), {}
11
13
12
14
13
15
def create_postgres_connection (conn_dict ):
14
- return PostgresMetadataExtractor ()
16
+ extractor = PostgresMetadataExtractor ()
17
+ extractor_scope = extractor .get_scope ()
18
+ config = conn_dict .get (CONFIG_KEY , {})
19
+ included_schemas = config .get ("included_schemas" , ["public" ])
20
+ schemas_sql_list = "('{schemas}')" .format (schemas = "', '" .join (included_schemas ))
21
+ schema_where_clause = f"where table_schema in { schemas_sql_list } "
22
+
23
+ connection_string = config .get ("connection_string" )
24
+ if connection_string is None :
25
+ raise KeyError (
26
+ "connection_string is a required config item for PostgreSQL connections"
27
+ )
28
+
29
+ return PostgresMetadataExtractor (), {
30
+ f"{ extractor_scope } .{ PostgresMetadataExtractor .WHERE_CLAUSE_SUFFIX_KEY } " : schema_where_clause ,
31
+ f"{ extractor_scope } .extractor.sqlalchemy.conn_string" : connection_string ,
32
+ }
15
33
16
34
17
35
CONNECTION_FACTORIES = {
@@ -27,12 +45,23 @@ def parse_config(filename):
27
45
28
46
connections = parsed_data .get ("connections" , [])
29
47
30
- extractors = [
31
- CONNECTION_FACTORIES [conn_dict .get ("type" )](conn_dict )
32
- for conn_dict in connections
33
- ]
48
+ extractors = []
49
+ config = {}
50
+
51
+ for conn_dict in connections :
52
+ extractor , extractor_config = CONNECTION_FACTORIES [conn_dict .get ("type" )](
53
+ conn_dict
54
+ )
55
+ custom_config = {}
56
+ scope = extractor .get_scope ()
57
+ config = conn_dict .get (CONFIG_KEY , {})
58
+ for conf_key , conf_value in config .items ():
59
+ custom_config [f"{ scope } .{ conf_key } " ] = conf_value
60
+
61
+ config = {** config , ** extractor_config , ** custom_config }
62
+ extractors .append (extractor )
34
63
35
- return extractors , connections
64
+ return extractors , config
36
65
37
66
38
67
def _read_file (filename : str ):
0 commit comments