Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor indices to accomodate replicas (#5358) #5589

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
337 changes: 0 additions & 337 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1456,343 +1456,6 @@ def docker_registry(self) -> str:
config: Config = Config() # yes, the type hint does help PyCharm


class DocumentType(Enum):
contribution = 'contribution'
aggregate = 'aggregate'

def __repr__(self) -> str:
return f'<{self.__class__.__name__}.{self._name_}>'


@attr.s(frozen=True, kw_only=True, auto_attribs=True)
class IndexName:
"""
The name of an Elasticsearch index used by an Azul deployment, parsed into
its components. The index naming scheme underwent a number of changes during
the evolution of Azul. The different naming schemes are captured in a
`version` component. Note that the first version of the index name syntax
did not carry an explicit version. The resulting ambiguity requires entity
types to not match the version regex below.
"""
#: Every index name starts with this prefix
prefix: str = 'azul'

#: The version of the index naming scheme
version: int

#: The name of the deployment the index belongs to
deployment: str

#: The catalog the index belongs to or None for v1 indices.
catalog: Optional[CatalogName] = attr.ib(default=None)

#: The type of entities this index contains metadata about
entity_type: str

#: Whether the documents in the index are contributions or aggregates
doc_type: DocumentType = DocumentType.contribution

index_name_version_re: ClassVar[re.Pattern] = re.compile(r'v(\d+)')

def __attrs_post_init__(self):
"""
>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contribution>)

>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... catalog=None,
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contribution>)

>>> IndexName(prefix='azul',
... version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contribution>)

>>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', entity_type='foo')
Traceback (most recent call last):
...
azul.RequirementError: Version 1 prohibits a catalog name ('hca').

>>> IndexName(prefix='azul', version=2, deployment='dev', entity_type='foo')
Traceback (most recent call last):
...
azul.RequirementError: Version 2 requires a catalog name (None).

>>> IndexName(prefix='azul', version=2, deployment='dev', catalog=None, entity_type='foo')
Traceback (most recent call last):
...
azul.RequirementError: Version 2 requires a catalog name (None).

>>> IndexName(prefix='_', version=2, deployment='dev', catalog='foo', entity_type='bar')
Traceback (most recent call last):
...
azul.RequirementError: Prefix '_' is to short, too long or contains invalid characters.

>>> IndexName(prefix='azul', version=2, deployment='_', catalog='foo', entity_type='bar')
Traceback (most recent call last):
...
azul.RequirementError: Deployment name '_' is to short, too long or contains invalid characters.

>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='_', entity_type='bar')
Traceback (most recent call last):
...
azul.RequirementError: ('Catalog name is invalid', '_')

>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='foo', entity_type='_')
Traceback (most recent call last):
...
azul.RequirementError: entity_type is either too short, too long or contains invalid characters: '_'
"""
config.validate_prefix(self.prefix)
require(self.version > 0, f'Version must be at least 1, not {self.version}.')
config.validate_deployment_name(self.deployment)
if self.version == 1:
require(self.catalog is None,
f'Version {self.version} prohibits a catalog name ({self.catalog!r}).')
else:
require(self.catalog is not None,
f'Version {self.version} requires a catalog name ({self.catalog!r}).')
config.Catalog.validate_name(self.catalog)
config.validate_entity_type(self.entity_type)
assert '_' not in self.prefix, self.prefix
assert '_' not in self.deployment, self.deployment
assert self.catalog is None or '_' not in self.catalog, self.catalog

@classmethod
def create(cls,
*,
catalog: CatalogName,
entity_type: str,
doc_type: 'DocumentType'
) -> 'IndexName':
return cls(prefix=config.index_prefix,
version=2,
deployment=config.deployment_stage,
catalog=catalog,
entity_type=entity_type,
doc_type=doc_type)

@classmethod
def parse(cls, index_name: str) -> 'IndexName':
"""
Parse the name of an index from any deployment and any version of Azul.

>>> IndexName.parse('azul_foo_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contribution>)

>>> IndexName.parse('azul_foo_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.aggregate>)

>>> IndexName.parse('azul_foo_bar_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contribution>)

>>> IndexName.parse('azul_foo_bar_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.aggregate>)

>>> IndexName.parse('good_foo_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='good',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contribution>)

>>> IndexName.parse('azul_dev')
Traceback (most recent call last):
...
azul.RequirementError: ['azul', 'dev']

>>> IndexName.parse('azul_aggregate_dev') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
azul.RequirementError: entity_type ... ''

>>> IndexName.parse('azul_v2_dev_main_foo') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.contribution>)

>>> IndexName.parse('azul_v2_dev_main_foo_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.aggregate>)

>>> IndexName.parse('azul_v2_dev_main_foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contribution>)

>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.aggregate>)

>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='staging',
catalog='hca',
entity_type='foo_bar',
doc_type=<DocumentType.aggregate>)

>>> IndexName.parse('azul_v2_staging__foo_bar__aggregate') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
azul.RequirementError: entity_type ... 'foo_bar_'

>>> IndexName.parse('azul_v3_bla')
Traceback (most recent call last):
...
azul.RequirementError: 3

"""
index_name = index_name.split('_')
require(len(index_name) > 2, index_name)
prefix, *index_name = index_name
version = cls.index_name_version_re.fullmatch(index_name[0])
if version:
_, *index_name = index_name
version = int(version.group(1))
require(version == 2, version)
deployment, catalog, *index_name = index_name
else:
version = 1
catalog = None
*index_name, deployment = index_name
if index_name[-1] == 'aggregate':
*index_name, _ = index_name
doc_type = DocumentType.aggregate
else:
doc_type = DocumentType.contribution
entity_type = '_'.join(index_name)
config.validate_entity_type(entity_type)
self = cls(prefix=prefix,
version=version,
deployment=deployment,
catalog=catalog,
entity_type=entity_type,
doc_type=doc_type)
return self

def __str__(self) -> str:
"""
>>> str(IndexName(version=1, deployment='dev', entity_type='foo'))
'azul_foo_dev'

>>> str(IndexName(version=1, deployment='dev', entity_type='foo', doc_type=DocumentType.aggregate))
'azul_foo_aggregate_dev'

>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar'))
'azul_foo_bar_dev'

>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', doc_type=DocumentType.aggregate))
'azul_foo_bar_aggregate_dev'

>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo'))
'azul_v2_dev_main_foo'

>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo',
... doc_type=DocumentType.aggregate))
'azul_v2_dev_main_foo_aggregate'

>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar'))
'azul_v2_dev_main_foo_bar'

>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregate))
'azul_v2_dev_main_foo_bar_aggregate'

>>> str(IndexName(version=2,
... deployment='staging',
... catalog='hca',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregate))
'azul_v2_staging_hca_foo_bar_aggregate'
"""
aggregate = ['aggregate'] if self.doc_type is DocumentType.aggregate else []
if self.version == 1:
require(self.catalog is None)
return '_'.join([
self.prefix,
self.entity_type,
*aggregate,
self.deployment
])
elif self.version == 2:
require(self.catalog is not None, self.catalog)
return '_'.join([
self.prefix,
f'v{self.version}',
self.deployment,
self.catalog,
self.entity_type,
*aggregate,
])
else:
assert False, self.version


class RequirementError(RuntimeError):
"""
Unlike assertions, unsatisfied requirements do not constitute a bug in the program.
Expand Down
Loading