diff --git a/eve_elastic/elastic.py b/eve_elastic/elastic.py index 466efcb..6ff298b 100644 --- a/eve_elastic/elastic.py +++ b/eve_elastic/elastic.py @@ -327,7 +327,7 @@ def set_filters(query, filters): def set_sort(query, sort): query["sort"] = [] - for (key, sortdir) in sort: + for key, sortdir in sort: sort_dict = dict([(key, "asc" if sortdir > 0 else "desc")]) query["sort"].append(sort_dict) @@ -376,23 +376,23 @@ def init_app(self, app): self.index = app.config["ELASTICSEARCH_INDEX"] self.es = get_es(app.config["ELASTICSEARCH_URL"], **self.kwargs) - def init_index(self): + def init_index(self, resource=None, raise_on_mapping_error=False): """Create indexes and put mapping.""" - for resource in self._get_elastic_resources(): - es = self.elastic(resource) - index = self._resource_index(resource) - settings = self._resource_config(resource, "SETTINGS") - mappings = self._resource_mapping(resource) + for _resource in self._get_elastic_resources(): + if resource and _resource != resource: + continue + es = self.elastic(_resource) + index = self._resource_index(_resource) + settings = self._resource_config(_resource, "SETTINGS") + mappings = self._resource_mapping(_resource) try: self._init_index(es, index, settings, mappings) except elasticsearch.exceptions.RequestError: - if app.config.get("DEBUG"): - raise - else: - logger.error( - "mapping error, updating settings resource=%s", resource - ) + if app.config.get("DEBUG") or raise_on_mapping_error: raise + logger.warning( + "mapping error, updating settings resource=%s", _resource + ) def _init_index(self, es, index, settings=None, mapping=None): if not es.indices.exists(index): @@ -523,8 +523,13 @@ def get_settings(self, resource): def get_index(self, resource): alias = self._resource_index(resource) - info = self.elastic(resource).indices.get_alias(name=alias) - return next(iter(info.keys())) + try: + info = self.elastic(resource).indices.get_alias(name=alias) + return next(iter(info.keys())) + except elasticsearch.exceptions.NotFoundError: + if self.elastic(resource).indices.exists(alias): + return alias + raise def get_index_by_alias(self, alias): """Get index name for given alias. @@ -1041,7 +1046,11 @@ def reindex(self, resource): alias = self._resource_index(resource) settings = self._resource_config(resource, "SETTINGS") mapping = self._resource_mapping(resource) - old_index = self.get_index(resource) + new_index = False + try: + old_index = self.get_index(resource) + except elasticsearch.exceptions.NotFoundError: + new_index = True # create new index index = generate_index_name(alias) @@ -1049,17 +1058,22 @@ def reindex(self, resource): self._create_index(es, index, settings) self._put_mapping(es, index, mapping) - # reindex data - print("reindex", alias, index) - reindex(es, alias, index) + if not new_index: + # reindex data + print("reindex", alias, index) + reindex(es, alias, index) - # remove old alias - print("remove alias", alias, old_index) - es.indices.delete_alias(index=old_index, name=alias) + # remove old alias + print("remove alias", alias, old_index) + try: + es.indices.delete_alias(index=old_index, name=alias) + except elasticsearch.exceptions.NotFoundError: + # this was not an alias, but an index. will be removed in next step + pass - # remove old index - print("remove index", old_index) - es.indices.delete(old_index) + # remove old index + print("remove index", old_index) + es.indices.delete(old_index) # create alias for new index print("put", alias, index) diff --git a/eve_elastic/helpers.py b/eve_elastic/helpers.py index 16e561e..6b1439d 100644 --- a/eve_elastic/helpers.py +++ b/eve_elastic/helpers.py @@ -363,7 +363,6 @@ def reindex( scan_kwargs={}, bulk_kwargs={}, ): - """ Reindex all documents from one index that satisfy a given query to another, potentially (if `target_client` is specified) on a different cluster. diff --git a/setup.py b/setup.py index 1e6f54b..b57e655 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ "arrow>=0.4.2", "ciso8601>=1.0.2,<2", "pytz>=2015.4", - "elasticsearch>=7.0.0,<7.14.0", + "elasticsearch>=7.0,<7.14", ], classifiers=[ "Development Status :: 4 - Beta", diff --git a/test/test_elastic.py b/test/test_elastic.py index 0f68fda..a108a9c 100644 --- a/test/test_elastic.py +++ b/test/test_elastic.py @@ -884,6 +884,39 @@ def test_filtered_query(self): ) self.assertEqual(1, docs.count()) + def test_reindex_old_index_without_alias(self): + resource = "items" + with self.app.app_context(): + elastic = self.app.data + es = elastic.elastic(resource) + alias = elastic._resource_index(resource) + index = elastic.get_index(resource) + + # drop index and alias, create index using alias name + es.indices.delete_alias(index, alias) + es.indices.delete(index) + es.indices.create(alias) + + elastic.reindex(resource) + + assert es.indices.exists_alias(alias) + + def test_reindex_without_old_index(self): + resource = "items" + with self.app.app_context(): + elastic = self.app.data + es = elastic.elastic(resource) + alias = elastic._resource_index(resource) + index = elastic.get_index(resource) + + # drop index and alias, create index using alias name + es.indices.delete_alias(index, alias) + es.indices.delete(index) + + elastic.reindex(resource) + + assert es.indices.exists_alias(alias) + class TestElasticSearchWithSettings(TestCase): resource = "items" @@ -923,13 +956,11 @@ def test_elastic_settings(self): analyzer = settings["settings"]["index"]["analysis"]["analyzer"] self.assertDictEqual( { - "phrase_prefix_analyzer": { - "tokenizer": "keyword", - "filter": ["lowercase"], - "type": "custom", - } + "tokenizer": "keyword", + "filter": ["lowercase"], + "type": "custom", }, - analyzer, + analyzer["phrase_prefix_analyzer"], ) def test_put_settings(self): @@ -938,13 +969,11 @@ def test_put_settings(self): analyzer = settings["settings"]["index"]["analysis"]["analyzer"] self.assertDictEqual( { - "phrase_prefix_analyzer": { - "tokenizer": "keyword", - "filter": ["lowercase"], - "type": "custom", - } + "tokenizer": "keyword", + "filter": ["lowercase"], + "type": "custom", }, - analyzer, + analyzer["phrase_prefix_analyzer"], ) new_settings = deepcopy(ELASTICSEARCH_SETTINGS) @@ -958,13 +987,11 @@ def test_put_settings(self): analyzer = settings["settings"]["index"]["analysis"]["analyzer"] self.assertDictEqual( { - "phrase_prefix_analyzer": { - "tokenizer": "whitespace", - "filter": ["uppercase"], - "type": "custom", - } + "tokenizer": "whitespace", + "filter": ["uppercase"], + "type": "custom", }, - analyzer, + analyzer["phrase_prefix_analyzer"], ) def test_put_settings_with_no_changes_existing_settings(self): @@ -983,8 +1010,6 @@ def test_put_settings_existing_index(self): "mapping": {"type": "text"}, } - return - new_settings = deepcopy(ELASTICSEARCH_SETTINGS) new_settings["settings"]["analysis"]["analyzer"]["prefix_analyzer"] = { "type": "custom", @@ -997,7 +1022,7 @@ def test_put_settings_existing_index(self): with self.assertLogs("elastic") as log: self.app.data.init_index() self.assertIn( - "ERROR:elastic:mapping error, updating settings resource=items", + "WARNING:elastic:mapping error, updating settings resource=items", log.output[0], )