Skip to content

Commit

Permalink
[r] Track replica hub IDs (#5360, PR #5987)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsotirho-ucsc committed Mar 9, 2024
2 parents 63c51a5 + 3827495 commit 82110e8
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 85 deletions.
4 changes: 4 additions & 0 deletions environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ def env() -> Mapping[str, Optional[str]]:
# Whether to create and populate an index for replica documents.
'AZUL_ENABLE_REPLICAS': '1',

# Maximum number of conflicts to allow before giving when writing
# replica documents.
'AZUL_REPLICA_CONFLICT_LIMIT': '10',

# The name of the current deployment. This variable controls the name of
# all cloud resources and is the main vehicle for isolating cloud
# resources between deployments.
Expand Down
4 changes: 4 additions & 0 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,10 @@ def es_volume_size(self) -> int:
def enable_replicas(self) -> bool:
return self._boolean(self.environ['AZUL_ENABLE_REPLICAS'])

@property
def replica_conflict_limit(self) -> int:
return int(self.environ['AZUL_REPLICA_CONFLICT_LIMIT'])

# Because this property is relatively expensive to produce and frequently
# used we are applying aggressive caching here, knowing very well that
# this eliminates the option to reconfigure the running process by
Expand Down
43 changes: 32 additions & 11 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,10 @@ class OpType(Enum):
#: Remove the document from the index or fail if it does not exist
delete = auto()

#: Modify a document in the index via a scripted update or create it if it
#: does not exist
update = auto()


C = TypeVar('C', bound=DocumentCoordinates)

Expand Down Expand Up @@ -1326,11 +1330,7 @@ def to_index(self,
if op_type is OpType.delete else
{
'_source' if bulk else 'body':
self.translate_fields(doc=self.to_json(),
field_types=field_types[coordinates.entity.catalog],
forward=True)
if self.needs_translation else
self.to_json()
self._body(field_types[coordinates.entity.catalog])
}
),
'_id' if bulk else 'id': self.coordinates.document_id
Expand Down Expand Up @@ -1361,6 +1361,14 @@ def to_index(self,
def op_type(self) -> OpType:
raise NotImplementedError

def _body(self, field_types: FieldTypes) -> JSON:
body = self.to_json()
if self.needs_translation:
body = self.translate_fields(doc=body,
field_types=field_types,
forward=True)
return body


class DocumentSource(SourceRef[SimpleSourceSpec, SourceRef]):
pass
Expand Down Expand Up @@ -1537,10 +1545,6 @@ class Replica(Document[ReplicaCoordinates[E]]):

hub_ids: list[EntityID]

#: The version_type attribute will change to VersionType.none if writing
#: to Elasticsearch fails with 409
version_type: VersionType = VersionType.create_only

needs_translation: ClassVar[bool] = False

def __attrs_post_init__(self):
Expand All @@ -1555,11 +1559,28 @@ def field_types(cls, field_types: FieldTypes) -> FieldTypes:
def to_json(self) -> JSON:
return dict(super().to_json(),
replica_type=self.replica_type,
hub_ids=self.hub_ids)
# Ensure that index contents is deterministic for unit tests
hub_ids=sorted(set(self.hub_ids)))

@property
def op_type(self) -> OpType:
return OpType.create
assert self.version_type is VersionType.none, self.version_type
return OpType.update

def _body(self, field_types: FieldTypes) -> JSON:
return {
'script': {
'source': '''
Stream stream = Stream.concat(ctx._source.hub_ids.stream(),
params.hub_ids.stream());
ctx._source.hub_ids = stream.sorted().distinct().collect(Collectors.toList());
''',
'params': {
'hub_ids': self.hub_ids
}
},
'upsert': super()._body(field_types)
}


CataloguedContribution = Contribution[CataloguedEntityReference]
12 changes: 8 additions & 4 deletions src/azul/indexer/index_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,14 @@ def contribute(self, event: Iterable[SQSRecord], *, retry=False):
for entity, num_contributions in tallies.items()]

if replicas:
log.info('Writing %i replicas to index.', len(replicas))
num_written, num_present = self.index_service.replicate(catalog, replicas)
log.info('Successfully wrote %i replicas; %i were already present',
num_written, num_present)
if delete:
# FIXME: Replica index does not support deletions
# https://github.com/DataBiosphere/azul/issues/5846
log.warning('Deletion of replicas is not supported')
else:
log.info('Writing %i replicas to index.', len(replicas))
num_written = self.index_service.replicate(catalog, replicas)
log.info('Successfully wrote %i replicas', num_written)
else:
log.info('No replicas to write.')

Expand Down
49 changes: 24 additions & 25 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
# number of contributions per bundle.
# https://github.com/DataBiosphere/azul/issues/610
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
# FIXME: Replica index does not support deletions
# https://github.com/DataBiosphere/azul/issues/5846
self.aggregate(tallies)

def deep_transform(self,
Expand Down Expand Up @@ -417,7 +418,7 @@ def contribute(self,
with a count of 0 may exist. This is ok. See description of aggregate().
"""
tallies = Counter()
writer = self._create_writer(catalog)
writer = self._create_writer(DocumentType.contribution, catalog)
while contributions:
writer.write(contributions)
retry_contributions = []
Expand Down Expand Up @@ -451,7 +452,7 @@ def aggregate(self, tallies: CataloguedTallies):
catalogs.
"""
# Use catalog specified in each tally
writer = self._create_writer(catalog=None)
writer = self._create_writer(DocumentType.aggregate, catalog=None)
while True:
# Read the aggregates
old_aggregates = self._read_aggregates(tallies)
Expand Down Expand Up @@ -505,33 +506,24 @@ def replicate(self,
catalog: CatalogName,
replicas: list[Replica]
) -> tuple[int, int]:
writer = self._create_writer(catalog)
writer = self._create_writer(DocumentType.replica, catalog)
num_replicas = len(replicas)
num_written, num_present = 0, 0
num_written = 0
while replicas:
writer.write(replicas)
retry_replicas = []
for r in replicas:
if r.coordinates in writer.retries:
conflicts = writer.conflicts[r.coordinates]
if conflicts == 0:
retry_replicas.append(r)
elif conflicts == 1:
# FIXME: Track replica hub IDs
# https://github.com/DataBiosphere/azul/issues/5360
writer.conflicts.pop(r.coordinates)
num_present += 1
else:
assert False, (conflicts, r.coordinates)
retry_replicas.append(r)
else:
num_written += 1
replicas = retry_replicas

writer.raise_on_errors()
assert num_written + num_present == num_replicas, (
num_written, num_present, num_replicas
assert num_written == num_replicas, (
num_written, num_replicas
)
return num_written, num_present
return num_written

def _read_aggregates(self,
entities: CataloguedTallies
Expand Down Expand Up @@ -790,16 +782,25 @@ def _reconcile(self,
for entity_type, entities in result.items()
}

def _create_writer(self, catalog: Optional[CatalogName]) -> 'IndexWriter':
def _create_writer(self,
doc_type: DocumentType,
catalog: Optional[CatalogName]
) -> 'IndexWriter':
# We allow one conflict retry in the case of duplicate notifications and
# switch from 'add' to 'update'. After that, there should be no
# conflicts because we use an SQS FIFO message group per entity. For
# other errors we use SQS message redelivery to take care of the
# retries.
# conflicts because we use an SQS FIFO message group per entity.
# Conflicts are common when writing replicas due to entities being
# shared between bundles. For other errors we use SQS message redelivery
# to take care of the retries.
limits = {
DocumentType.contribution: 1,
DocumentType.aggregate: 1,
DocumentType.replica: config.replica_conflict_limit
}
return IndexWriter(catalog,
self.catalogued_field_types(),
refresh=False,
conflict_retry_limit=1,
conflict_retry_limit=limits[doc_type],
error_retry_limit=0)


Expand Down Expand Up @@ -855,8 +856,6 @@ def write(self, documents: list[Document]):
def _write_individually(self, documents: Iterable[Document]):
log.info('Writing documents individually')
for doc in documents:
if isinstance(doc, Replica):
assert doc.version_type is VersionType.create_only, doc
try:
method = getattr(self.es_client, doc.op_type.name)
method(refresh=self.refresh, **doc.to_index(self.catalog, self.field_types))
Expand Down
8 changes: 6 additions & 2 deletions src/azul/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,18 @@ def _contribution(self,
source=self.bundle.fqid.source,
contents=contents)

def _replica(self, contents: MutableJSON, entity: EntityReference) -> Replica:
def _replica(self,
contents: MutableJSON,
entity: EntityReference,
hub_ids: list[EntityID]
) -> Replica:
coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(),
entity=entity)
return Replica(coordinates=coordinates,
version=None,
replica_type=self.replica_type(entity),
contents=contents,
hub_ids=[])
hub_ids=hub_ids)

@classmethod
@abstractmethod
Expand Down
55 changes: 43 additions & 12 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ def _transform(self, entity: EntityReference) -> Transform:
def _add_replica(self,
contribution: Optional[MutableJSON],
entity: EntityReference,
hub_ids: list[EntityID]
) -> Transform:
no_replica = not config.enable_replicas or self.entity_type() == 'bundles'
return (
None if contribution is None else self._contribution(contribution, entity),
None if no_replica else self._replica(self.bundle.entities[entity], entity)
None if no_replica else self._replica(self.bundle.entities[entity],
entity,
hub_ids)
)

def _pluralize(self, entity_type: str) -> str:
Expand Down Expand Up @@ -474,8 +477,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]:

class SingletonTransformer(BaseTransformer, metaclass=ABCMeta):

def _transform(self, entity: EntityReference) -> Transform:
contents = dict(
def _contents(self) -> MutableJSON:
return dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
Expand All @@ -486,7 +489,6 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file'])
)
return self._add_replica(contents, entity)

@classmethod
def field_types(cls) -> FieldTypes:
Expand Down Expand Up @@ -527,15 +529,17 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=[self._activity(entity)],
biosamples=self._entities(self._biosample, linked['biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, linked['file']),
files=self._entities(self._file, files),
)
return self._add_replica(contents, entity)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)


class BiosampleTransformer(BaseTransformer):
Expand All @@ -546,6 +550,7 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -555,15 +560,18 @@ def _transform(self, entity: EntityReference) -> Transform:
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, linked['file']),
files=self._entities(self._file, files),
)
return self._add_replica(contents, entity)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)


class DiagnosisTransformer(BaseTransformer):

def _transform(self, entity: EntityReference) -> Transform:
return self._add_replica(None, entity)
files = self._linked_entities(entity)['file']
hub_ids = [f.entity_id for f in files]
return self._add_replica(None, entity, hub_ids)

@classmethod
def entity_type(cls) -> EntityType:
Expand All @@ -580,6 +588,11 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def _transform(self, entity: EntityReference) -> Transform:
contents = self._contents()
hub_ids = [f.entity_id for f in self._entities_by_type['file']]
return self._add_replica(contents, entity, hub_ids)


class DatasetTransformer(SingletonTransformer):

Expand All @@ -590,6 +603,18 @@ def entity_type(cls) -> str:
def _singleton(self) -> EntityReference:
return self._only_dataset()

def _transform(self, entity: EntityReference) -> Transform:
contents = self._contents()
# Every file in a snapshot is linked to that snapshot's singular
# dataset, making an explicit list of hub IDs for the dataset both
# redundant and impractically large (we observe that for large
# snapshots, trying to track this many files in a single data structure
# causes a prohibitively high rate of conflicts during replica updates).
# Therefore, we leave the hub IDs field empty for datasets and rely on
# the tenet that every file is an implicit hub of its parent dataset.
hub_ids = []
return self._add_replica(contents, entity, hub_ids)


class DonorTransformer(BaseTransformer):

Expand All @@ -599,6 +624,7 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -608,9 +634,10 @@ def _transform(self, entity: EntityReference) -> Transform:
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=[self._donor(entity)],
files=self._entities(self._file, linked['file']),
files=self._entities(self._file, files),
)
return self._add_replica(contents, entity)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)


class FileTransformer(BaseTransformer):
Expand All @@ -632,4 +659,8 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, linked['donor']),
files=[self._file(entity)],
)
return self._add_replica(contents, entity)
# The result of the link traversal does not include the starting entity,
# so without this step the file itself wouldn't be included in its hubs
files = (entity, *linked['file'])
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
Loading

0 comments on commit 82110e8

Please sign in to comment.