@@ -51,7 +51,7 @@ def __init__(self, file_path):
51
51
config = AppConfig (args .config )
52
52
53
53
# set up Elastic logger for the initialization time
54
- logging .getLogger ('elasticsearch' ).setLevel (level = logging .ERROR )
54
+ logging .getLogger ('elasticsearch' ).setLevel (level = logging .INFO )
55
55
56
56
# initialize the elastic source
57
57
source_params = config .params ['source' ]
@@ -62,6 +62,9 @@ def __init__(self, file_path):
62
62
client_key_path = src_sec ['client-key-path' ])
63
63
es_source_conf = ElasticConnectorConfig (hosts = source_params ['es' ]['hosts' ],
64
64
ssl_config = source_ssl_config )
65
+ elif 'extra_params' in source_params ['es' ]:
66
+ e_params = source_params ['es' ]['extra_params' ]
67
+ es_source_conf = ElasticConnectorConfig (hosts = source_params ['es' ]['hosts' ], extra_params = e_params )
65
68
else :
66
69
es_source_conf = ElasticConnectorConfig (hosts = source_params ['es' ]['hosts' ])
67
70
es_source_conn = ElasticConnector (es_source_conf )
@@ -79,13 +82,14 @@ def __init__(self, file_path):
79
82
client_cert_path = sink_sec ['client-cert-path' ],
80
83
client_key_path = sink_sec ['client-key-path' ])
81
84
82
- es_sink_conf = ElasticConnectorConfig (hosts = source_params ['es' ]['hosts' ],
85
+ es_sink_conf = ElasticConnectorConfig (hosts = sink_params ['es' ]['hosts' ],
83
86
ssl_config = sink_ssl_config )
84
87
elif 'extra_params' in sink_params ['es' ]:
85
88
e_params = sink_params ['es' ]['extra_params' ]
86
89
es_sink_conf = ElasticConnectorConfig (hosts = sink_params ['es' ]['hosts' ], extra_params = e_params )
87
90
else :
88
91
es_sink_conf = ElasticConnectorConfig (hosts = sink_params ['es' ]['hosts' ])
92
+
89
93
es_sink_conn = ElasticConnector (es_sink_conf )
90
94
es_sink = ElasticIndexer (es_sink_conn , sink_params ['es' ]['index-name' ])
91
95
0 commit comments