Skip to content

Commit

Permalink
improve reindexing and allow init intex for single resource (#102)
Browse files Browse the repository at this point in the history
* improve reindexing and allow init intex for single resource

- when reindexing missing index just create an empty one
- handle reindexing of an old index which does not have alias
- allow init index for single resource
  • Loading branch information
petrjasek authored Aug 28, 2023
1 parent ed0fdd4 commit 4efbb6b
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 48 deletions.
64 changes: 39 additions & 25 deletions eve_elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def set_filters(query, filters):

def set_sort(query, sort):
query["sort"] = []
for (key, sortdir) in sort:
for key, sortdir in sort:
sort_dict = dict([(key, "asc" if sortdir > 0 else "desc")])
query["sort"].append(sort_dict)

Expand Down Expand Up @@ -376,23 +376,23 @@ def init_app(self, app):
self.index = app.config["ELASTICSEARCH_INDEX"]
self.es = get_es(app.config["ELASTICSEARCH_URL"], **self.kwargs)

def init_index(self):
def init_index(self, resource=None, raise_on_mapping_error=False):
"""Create indexes and put mapping."""
for resource in self._get_elastic_resources():
es = self.elastic(resource)
index = self._resource_index(resource)
settings = self._resource_config(resource, "SETTINGS")
mappings = self._resource_mapping(resource)
for _resource in self._get_elastic_resources():
if resource and _resource != resource:
continue
es = self.elastic(_resource)
index = self._resource_index(_resource)
settings = self._resource_config(_resource, "SETTINGS")
mappings = self._resource_mapping(_resource)
try:
self._init_index(es, index, settings, mappings)
except elasticsearch.exceptions.RequestError:
if app.config.get("DEBUG"):
raise
else:
logger.error(
"mapping error, updating settings resource=%s", resource
)
if app.config.get("DEBUG") or raise_on_mapping_error:
raise
logger.warning(
"mapping error, updating settings resource=%s", _resource
)

def _init_index(self, es, index, settings=None, mapping=None):
if not es.indices.exists(index):
Expand Down Expand Up @@ -523,8 +523,13 @@ def get_settings(self, resource):

def get_index(self, resource):
alias = self._resource_index(resource)
info = self.elastic(resource).indices.get_alias(name=alias)
return next(iter(info.keys()))
try:
info = self.elastic(resource).indices.get_alias(name=alias)
return next(iter(info.keys()))
except elasticsearch.exceptions.NotFoundError:
if self.elastic(resource).indices.exists(alias):
return alias
raise

def get_index_by_alias(self, alias):
"""Get index name for given alias.
Expand Down Expand Up @@ -1041,25 +1046,34 @@ def reindex(self, resource):
alias = self._resource_index(resource)
settings = self._resource_config(resource, "SETTINGS")
mapping = self._resource_mapping(resource)
old_index = self.get_index(resource)
new_index = False
try:
old_index = self.get_index(resource)
except elasticsearch.exceptions.NotFoundError:
new_index = True

# create new index
index = generate_index_name(alias)
print("create", index)
self._create_index(es, index, settings)
self._put_mapping(es, index, mapping)

# reindex data
print("reindex", alias, index)
reindex(es, alias, index)
if not new_index:
# reindex data
print("reindex", alias, index)
reindex(es, alias, index)

# remove old alias
print("remove alias", alias, old_index)
es.indices.delete_alias(index=old_index, name=alias)
# remove old alias
print("remove alias", alias, old_index)
try:
es.indices.delete_alias(index=old_index, name=alias)
except elasticsearch.exceptions.NotFoundError:
# this was not an alias, but an index. will be removed in next step
pass

# remove old index
print("remove index", old_index)
es.indices.delete(old_index)
# remove old index
print("remove index", old_index)
es.indices.delete(old_index)

# create alias for new index
print("put", alias, index)
Expand Down
1 change: 0 additions & 1 deletion eve_elastic/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ def reindex(
scan_kwargs={},
bulk_kwargs={},
):

"""
Reindex all documents from one index that satisfy a given query
to another, potentially (if `target_client` is specified) on a different cluster.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"arrow>=0.4.2",
"ciso8601>=1.0.2,<2",
"pytz>=2015.4",
"elasticsearch>=7.0.0,<7.14.0",
"elasticsearch>=7.0,<7.14",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand Down
67 changes: 46 additions & 21 deletions test/test_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,39 @@ def test_filtered_query(self):
)
self.assertEqual(1, docs.count())

def test_reindex_old_index_without_alias(self):
resource = "items"
with self.app.app_context():
elastic = self.app.data
es = elastic.elastic(resource)
alias = elastic._resource_index(resource)
index = elastic.get_index(resource)

# drop index and alias, create index using alias name
es.indices.delete_alias(index, alias)
es.indices.delete(index)
es.indices.create(alias)

elastic.reindex(resource)

assert es.indices.exists_alias(alias)

def test_reindex_without_old_index(self):
resource = "items"
with self.app.app_context():
elastic = self.app.data
es = elastic.elastic(resource)
alias = elastic._resource_index(resource)
index = elastic.get_index(resource)

# drop index and alias, create index using alias name
es.indices.delete_alias(index, alias)
es.indices.delete(index)

elastic.reindex(resource)

assert es.indices.exists_alias(alias)


class TestElasticSearchWithSettings(TestCase):
resource = "items"
Expand Down Expand Up @@ -923,13 +956,11 @@ def test_elastic_settings(self):
analyzer = settings["settings"]["index"]["analysis"]["analyzer"]
self.assertDictEqual(
{
"phrase_prefix_analyzer": {
"tokenizer": "keyword",
"filter": ["lowercase"],
"type": "custom",
}
"tokenizer": "keyword",
"filter": ["lowercase"],
"type": "custom",
},
analyzer,
analyzer["phrase_prefix_analyzer"],
)

def test_put_settings(self):
Expand All @@ -938,13 +969,11 @@ def test_put_settings(self):
analyzer = settings["settings"]["index"]["analysis"]["analyzer"]
self.assertDictEqual(
{
"phrase_prefix_analyzer": {
"tokenizer": "keyword",
"filter": ["lowercase"],
"type": "custom",
}
"tokenizer": "keyword",
"filter": ["lowercase"],
"type": "custom",
},
analyzer,
analyzer["phrase_prefix_analyzer"],
)

new_settings = deepcopy(ELASTICSEARCH_SETTINGS)
Expand All @@ -958,13 +987,11 @@ def test_put_settings(self):
analyzer = settings["settings"]["index"]["analysis"]["analyzer"]
self.assertDictEqual(
{
"phrase_prefix_analyzer": {
"tokenizer": "whitespace",
"filter": ["uppercase"],
"type": "custom",
}
"tokenizer": "whitespace",
"filter": ["uppercase"],
"type": "custom",
},
analyzer,
analyzer["phrase_prefix_analyzer"],
)

def test_put_settings_with_no_changes_existing_settings(self):
Expand All @@ -983,8 +1010,6 @@ def test_put_settings_existing_index(self):
"mapping": {"type": "text"},
}

return

new_settings = deepcopy(ELASTICSEARCH_SETTINGS)
new_settings["settings"]["analysis"]["analyzer"]["prefix_analyzer"] = {
"type": "custom",
Expand All @@ -997,7 +1022,7 @@ def test_put_settings_existing_index(self):
with self.assertLogs("elastic") as log:
self.app.data.init_index()
self.assertIn(
"ERROR:elastic:mapping error, updating settings resource=items",
"WARNING:elastic:mapping error, updating settings resource=items",
log.output[0],
)

Expand Down

0 comments on commit 4efbb6b

Please sign in to comment.