Skip to content

Commit

Permalink
Refactor indices to accomodate replicas (#5358)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Aug 29, 2023
1 parent 6ff645a commit d4c3978
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 195 deletions.
190 changes: 142 additions & 48 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from enum import (
Enum,
auto,
)
import functools
from itertools import (
Expand Down Expand Up @@ -81,6 +82,11 @@ def cache_per_thread(f, /):
mutable_furl = furl


class DocumentType(Enum):
contributions = auto()
aggregates = auto()


class Config:
"""
See `environment` for documentation of these settings.
Expand Down Expand Up @@ -932,14 +938,14 @@ def integration_test_catalogs(self) -> Mapping[CatalogName, Catalog]:
def es_index_name(self,
catalog: CatalogName,
entity_type: str,
aggregate: bool
doc_type: DocumentType
) -> str:
return str(IndexName(prefix=self._index_prefix,
version=2,
deployment=self.deployment_stage,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate))
doc_type=doc_type))

def parse_es_index_name(self, index_name: str) -> 'IndexName':
"""
Expand Down Expand Up @@ -1503,20 +1509,46 @@ class IndexName:
entity_type: str

#: Whether the documents in the index are contributions or aggregates
aggregate: bool = False
doc_type: DocumentType = DocumentType.contributions

index_name_version_re: ClassVar[re.Pattern] = re.compile(r'v(\d+)')

def __attrs_post_init__(self):
"""
>>> IndexName(prefix='azul', version=1, deployment='dev', entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... catalog=None,
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul',
... version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', entity_type='foo')
Traceback (most recent call last):
Expand Down Expand Up @@ -1573,20 +1605,45 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
"""
Parse the name of an index from any deployment and any version of Azul.
>>> IndexName.parse('azul_foo_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_foo_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=True)
>>> IndexName.parse('azul_foo_bar_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName.parse('azul_foo_bar_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('good_foo_dev', expected_prefix='good')
IndexName(prefix='good', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_foo_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_foo_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_foo_bar_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_foo_bar_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('good_foo_dev', expected_prefix='good') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='good',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('bad_foo_dev')
Traceback (most recent call last):
Expand All @@ -1603,20 +1660,45 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
...
azul.RequirementError: entity_type ... ''
>>> IndexName.parse('azul_v2_dev_main_foo')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_v2_dev_main_foo_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True)
>>> IndexName.parse('azul_v2_dev_main_foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('azul_v2_dev_main_foo') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_v2_dev_main_foo_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_dev_main_foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='staging',
catalog='hca',
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_staging__foo_bar__aggregate') # doctest: +ELLIPSIS
Traceback (most recent call last):
Expand Down Expand Up @@ -1645,48 +1727,60 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
*index_name, deployment = index_name
if index_name[-1] == 'aggregate':
*index_name, _ = index_name
aggregate = True
doc_type = DocumentType.aggregates
else:
aggregate = False
doc_type = DocumentType.contributions
entity_type = '_'.join(index_name)
Config.validate_entity_type(entity_type)
return cls(prefix=prefix,
version=version,
deployment=deployment,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate)
doc_type=doc_type)

def __str__(self) -> str:
"""
>>> str(IndexName(version=1, deployment='dev', entity_type='foo'))
'azul_foo_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', entity_type='foo', doc_type=DocumentType.aggregates))
'azul_foo_aggregate_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar'))
'azul_foo_bar_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', doc_type=DocumentType.aggregates))
'azul_foo_bar_aggregate_dev'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo'))
'azul_v2_dev_main_foo'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True))
>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo',
... doc_type=DocumentType.aggregates))
'azul_v2_dev_main_foo_aggregate'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar'))
'azul_v2_dev_main_foo_bar'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregates))
'azul_v2_dev_main_foo_bar_aggregate'
>>> str(IndexName(version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2,
... deployment='staging',
... catalog='hca',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregates))
'azul_v2_staging_hca_foo_bar_aggregate'
"""
aggregate = ['aggregate'] if self.aggregate else []
aggregate = ['aggregate'] if self.doc_type is DocumentType.aggregates else []
if self.version == 1:
require(self.catalog is None)
return '_'.join([
Expand Down
43 changes: 43 additions & 0 deletions src/azul/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,49 @@ def dict_merge(dicts: Iterable[Mapping]) -> Mapping:
return dict(items)


def deep_dict_merge(dicts: Iterable[Mapping]) -> Mapping:
"""
Merge all dictionaries yielded by the argument. If more than one dictionary
contains a given key, and all values associated with this key are themselves
dictionaries, then the value present in the result is the recursive merging
of those nested dictionaries.
>>> deep_dict_merge(({0: 1}, {1: 0}))
{0: 1, 1: 0}
>>> deep_dict_merge(({0: {'a': 1}}, {0: {'b': 2}}))
{0: {'a': 1, 'b': 2}}
Key collisions where either value is not a dictionary raise an exception,
unless the values compare equal to each other, in which case the entries
from *earlier* dictionaries takes precedence. This behavior is the opposite
of `dict_merge`, where later entries take precedence.
>>> deep_dict_merge(({0: 1}, {0: 2}))
Traceback (most recent call last):
...
ValueError: 1 != 2
>>> l1, l2 = [], []
>>> d = deep_dict_merge(({0: l1}, {0: l2}))
>>> d
{0: []}
>>> id(d[0]) == id(l1)
True
"""
merged = {}
for m in dicts:
for k, v2 in m.items():
v1 = merged.setdefault(k, v2)
if v1 != v2:
if isinstance(v1, dict) and isinstance(v2, dict):
merged[k] = deep_dict_merge((v1, v2))
else:
raise ValueError(f'{v1!r} != {v2!r}')

return merged


K = TypeVar('K')
V = TypeVar('V')

Expand Down
Loading

0 comments on commit d4c3978

Please sign in to comment.