Skip to content

Commit

Permalink
aggregate/saga mapping (#311)
Browse files Browse the repository at this point in the history
* Improve identifier mapping scenarios

* Add expression header mapping

* Add test scenario for headers mapping
  • Loading branch information
dgafka authored Apr 7, 2024
1 parent 44f0eac commit 57796ce
Show file tree
Hide file tree
Showing 27 changed files with 599 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public function hasAnnotation(Type $annotationClass): bool
public function getAnnotation(Type $annotationClass): object
{
foreach ($this->annotations as $annotation) {
if (TypeDescriptor::createFromVariable($annotation)->equals($annotationClass)) {
if (TypeDescriptor::createFromVariable($annotation)->isCompatibleWith($annotationClass)) {
return $annotation;
}
}
Expand Down Expand Up @@ -181,6 +181,6 @@ public function isProtected(): bool

public function __toString()
{
return $this->visibility . $this->name . $this->type->toString();
return '`' . $this->visibility . ':' . $this->name . '` (' . $this->type->toString() . ')';
}
}
12 changes: 6 additions & 6 deletions packages/Ecotone/src/Messaging/Handler/InterfaceToCall.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public function getClassAnnotations(): iterable
public function hasMethodAnnotation(Type $className): bool
{
foreach ($this->methodAnnotations as $methodAnnotation) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->isCompatibleWith($className)) {
return true;
}
}
Expand All @@ -123,7 +123,7 @@ public function hasMethodAnnotation(Type $className): bool
public function hasClassAnnotation(Type $className): bool
{
foreach ($this->getClassAnnotations() as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($classAnnotation)->isCompatibleWith($className)) {
return true;
}
}
Expand All @@ -134,7 +134,7 @@ public function hasClassAnnotation(Type $className): bool
public function getSingleClassAnnotationOf(Type $className): object
{
foreach ($this->getClassAnnotations() as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($classAnnotation)->isCompatibleWith($className)) {
return $classAnnotation;
}
}
Expand All @@ -149,7 +149,7 @@ public function getClassAnnotationOf(Type $className): array
{
$annotations = [];
foreach ($this->getClassAnnotations() as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($classAnnotation)->isCompatibleWith($className)) {
$annotations[] = $classAnnotation;
}
}
Expand All @@ -165,7 +165,7 @@ public function getClassAnnotationOf(Type $className): array
public function getSingleMethodAnnotationOf(Type $className): object
{
foreach ($this->methodAnnotations as $methodAnnotation) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->isCompatibleWith($className)) {
return $methodAnnotation;
}
}
Expand All @@ -180,7 +180,7 @@ public function getMethodAnnotationsOf(Type $className): array
{
$methodAnnotations = [];
foreach ($this->methodAnnotations as $methodAnnotation) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->equals($className)) {
if (TypeDescriptor::createFromVariable($methodAnnotation)->isCompatibleWith($className)) {
$methodAnnotations[] = $methodAnnotation;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function load(Message $message): ?Message
foreach ($aggregateIdentifiers as $identifierName => $aggregateIdentifier) {
if (is_null($aggregateIdentifier)) {
$messageType = TypeDescriptor::createFromVariable($message->getPayload());
throw AggregateNotFoundException::create("Aggregate identifier {$identifierName} definition found in {$messageType->toString()}, but is null. Can't load aggregate {$this->aggregateClassName} to call {$this->aggregateMethod}.");
throw AggregateNotFoundException::create("Can't call Aggregate {$this->aggregateClassName}:{$this->aggregateMethod} as value for identifier `{$identifierName}` is missing. Please check your identifier mapping in {$messageType->toString()}. Have you forgot to add #[TargetIdentifier] in your Command or `aggregate.id` in metadata?");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\Enricher\PropertyPath;
use Ecotone\Messaging\Handler\Enricher\PropertyReaderAccessor;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\MessageBuilder;

/**
Expand All @@ -18,26 +20,25 @@
*/
class AggregateIdentifierRetrevingService
{
private ConversionService $conversionService;
private TypeDescriptor $typeToConvertTo;
private array $payloadIdentifierMapping;
private array $metadataIdentifierMapping;
private PropertyReaderAccessor $propertyReaderAccessor;

public function __construct(private string $aggregateClassName, ConversionService $conversionService, PropertyReaderAccessor $propertyReaderAccessor, TypeDescriptor $typeToConvertTo, array $metadataIdentifierMapping, array $payloadIdentifierMapping)
public function __construct(
private string $aggregateClassName,
private ConversionService $conversionService,
private PropertyReaderAccessor $propertyReaderAccessor,
private TypeDescriptor $typeToConvertTo,
private array $metadataIdentifierMapping,
private array $messageIdentifierMapping,
private array $identifierMapping,
private ExpressionEvaluationService $expressionEvaluationService,
)
{
$this->conversionService = $conversionService;
$this->propertyReaderAccessor = $propertyReaderAccessor;
$this->metadataIdentifierMapping = $metadataIdentifierMapping;
$this->payloadIdentifierMapping = $payloadIdentifierMapping;
$this->typeToConvertTo = $typeToConvertTo;

}

public function convert(Message $message): Message
{
if ($message->getHeaders()->containsKey(AggregateMessage::OVERRIDE_AGGREGATE_IDENTIFIER)) {
$aggregateIds = $message->getHeaders()->get(AggregateMessage::OVERRIDE_AGGREGATE_IDENTIFIER);
$aggregateIds = is_array($aggregateIds) ? $aggregateIds : [array_key_first($this->payloadIdentifierMapping) => $aggregateIds];
$aggregateIds = is_array($aggregateIds) ? $aggregateIds : [array_key_first($this->messageIdentifierMapping) => $aggregateIds];

return MessageBuilder::fromMessage($message)
->setHeader(AggregateMessage::AGGREGATE_ID, AggregateIdResolver::resolveArrayOfIdentifiers($this->aggregateClassName, $aggregateIds))
Expand Down Expand Up @@ -65,7 +66,7 @@ public function convert(Message $message): Message
}

$aggregateIdentifiers = [];
foreach ($this->payloadIdentifierMapping as $aggregateIdentifierName => $aggregateIdentifierMappingName) {
foreach ($this->messageIdentifierMapping as $aggregateIdentifierName => $aggregateIdentifierMappingName) {
if (is_null($aggregateIdentifierMappingName)) {
$aggregateIdentifiers[$aggregateIdentifierName] = null;
continue;
Expand All @@ -82,6 +83,12 @@ public function convert(Message $message): Message
$aggregateIdentifiers[$identifierName] = $metadata[$headerName];
}
}
foreach ($this->identifierMapping as $identifierName => $expression) {
$aggregateIdentifiers[$identifierName] = $this->expressionEvaluationService->evaluate($expression, [
'headers' => $metadata,
'payload' => $payload,
]);
}

return MessageBuilder::fromMessage($message)
->setHeader(AggregateMessage::AGGREGATE_ID, AggregateIdResolver::resolveArrayOfIdentifiers($this->aggregateClassName, $aggregateIdentifiers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Handler\ClassDefinition;
use Ecotone\Messaging\Handler\Enricher\PropertyReaderAccessor;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Handler\InputOutputMessageHandlerBuilder;
use Ecotone\Messaging\Handler\InterfaceToCall;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
Expand All @@ -27,24 +28,23 @@
*/
class AggregateIdentifierRetrevingServiceBuilder extends InputOutputMessageHandlerBuilder implements MessageHandlerBuilder
{
private ?ClassDefinition $messageClassNameToConvertTo;
private ClassDefinition $aggregateClassName;
private array $metadataIdentifierMapping;
private TypeDescriptor $typeToConvertTo;
private array $payloadIdentifierMapping;

private function __construct(ClassDefinition $aggregateClassName, array $metadataIdentifierMapping, ?ClassDefinition $messageClassNameToConvertTo, InterfaceToCallRegistry $interfaceToCallRegistry)
private array $messageIdentifierMapping;

private function __construct(
private ClassDefinition $aggregateClassName,
private array $metadataIdentifierMapping,
private array $identifierMapping,
?ClassDefinition $messageClassNameToConvertTo,
InterfaceToCallRegistry $interfaceToCallRegistry
)
{
$this->messageClassNameToConvertTo = $messageClassNameToConvertTo;
$this->aggregateClassName = $aggregateClassName;
$this->metadataIdentifierMapping = $metadataIdentifierMapping;

$this->initialize($interfaceToCallRegistry, $aggregateClassName, $messageClassNameToConvertTo, $metadataIdentifierMapping);
$this->initialize($interfaceToCallRegistry, $aggregateClassName, $messageClassNameToConvertTo, $metadataIdentifierMapping, $identifierMapping);
}

public static function createWith(ClassDefinition $aggregateClassName, array $metadataIdentifierMapping, ?ClassDefinition $messageClassNameToConvertTo, InterfaceToCallRegistry $interfaceToCallRegistry): self
public static function createWith(ClassDefinition $aggregateClassName, array $metadataIdentifierMapping, array $identifierMapping, ?ClassDefinition $messageClassNameToConvertTo, InterfaceToCallRegistry $interfaceToCallRegistry): self
{
return new self($aggregateClassName, $metadataIdentifierMapping, $messageClassNameToConvertTo, $interfaceToCallRegistry);
return new self($aggregateClassName, $metadataIdentifierMapping, $identifierMapping, $messageClassNameToConvertTo, $interfaceToCallRegistry);
}

/**
Expand All @@ -56,11 +56,13 @@ public function compile(MessagingContainerBuilder $builder): Definition
AggregateIdentifierRetrevingService::class,
[
$this->aggregateClassName->getClassType()->toString(),
new Reference(ConversionService::REFERENCE_NAME),
new Reference(PropertyReaderAccessor::class),
Reference::to(ConversionService::REFERENCE_NAME),
Reference::to(PropertyReaderAccessor::class),
$this->typeToConvertTo,
$this->metadataIdentifierMapping,
$this->payloadIdentifierMapping,
$this->messageIdentifierMapping,
$this->identifierMapping,
Reference::to(ExpressionEvaluationService::REFERENCE),
]
);
$serviceActivatorBuilder = ServiceActivatorBuilder::createWithDefinition($aggregateIdentifierRetrevingService, 'convert')
Expand Down Expand Up @@ -101,15 +103,21 @@ private function hasAccordingIdentifier(InterfaceToCallRegistry $interfaceToCall
return false;
}

private function initialize(InterfaceToCallRegistry $interfaceToCallRegistry, ClassDefinition $aggregateClassDefinition, ?ClassDefinition $handledMessageClassNameDefinition, array $metadataIdentifierMapping): void
private function initialize(
InterfaceToCallRegistry $interfaceToCallRegistry,
ClassDefinition $aggregateClassDefinition,
?ClassDefinition $handledMessageClassNameDefinition,
array $metadataIdentifierMapping,
array $identifierMapping
): void
{
$aggregatePayloadIdentifiersMapping = [];
$messageIdentifiersMapping = [];

$aggregateIdentifierAnnotation = TypeDescriptor::create(AggregateIdentifier::class);
$aggregateIdentifierMethod = TypeDescriptor::create(AggregateIdentifierMethod::class);
foreach ($aggregateClassDefinition->getProperties() as $property) {
if ($property->hasAnnotation($aggregateIdentifierAnnotation)) {
$aggregatePayloadIdentifiersMapping[$property->getName()] = null;
$messageIdentifiersMapping[$property->getName()] = null;
}
}
foreach ($aggregateClassDefinition->getPublicMethodNames() as $method) {
Expand All @@ -118,11 +126,11 @@ private function initialize(InterfaceToCallRegistry $interfaceToCallRegistry, Cl
if ($methodToCheck->hasMethodAnnotation($aggregateIdentifierMethod)) {
/** @var AggregateIdentifierMethod $attribute */
$attribute = $methodToCheck->getSingleMethodAnnotationOf($aggregateIdentifierMethod);
$aggregatePayloadIdentifiersMapping[$attribute->getIdentifierPropertyName()] = null;
$messageIdentifiersMapping[$attribute->getIdentifierPropertyName()] = null;
}
}

if (empty($aggregatePayloadIdentifiersMapping)) {
if (empty($messageIdentifiersMapping)) {
throw InvalidArgumentException::create("Aggregate {$aggregateClassDefinition} has no identifiers defined. How you forgot to mark #[AggregateIdentifier]?");
}

Expand All @@ -131,6 +139,11 @@ private function initialize(InterfaceToCallRegistry $interfaceToCallRegistry, Cl
throw InvalidArgumentException::create("Aggregate {$aggregateClassDefinition} has no identifier {$propertyName} for metadata identifier mapping.");
}
}
foreach ($identifierMapping as $propertyName => $mappingName) {
if (! $this->hasAccordingIdentifier($interfaceToCallRegistry, $aggregateClassDefinition, $propertyName)) {
throw InvalidArgumentException::create("Aggregate {$aggregateClassDefinition} has no identifier {$propertyName} for identifier mapping.");
}
}

$messageProperties = [];
if ($handledMessageClassNameDefinition) {
Expand All @@ -142,21 +155,15 @@ private function initialize(InterfaceToCallRegistry $interfaceToCallRegistry, Cl
$mappingName = $annotation->identifierName ? $annotation->identifierName : $property->getName();

if ($aggregateClassDefinition->hasProperty($mappingName) && $aggregateClassDefinition->getProperty($mappingName)->hasAnnotation($aggregateIdentifierAnnotation)) {
$aggregatePayloadIdentifiersMapping[$mappingName] = $property->getName();
$messageIdentifiersMapping[$mappingName] = $property->getName();
}
}
}

$messageProperties = $handledMessageClassNameDefinition->getProperties();
}

foreach ($this->metadataIdentifierMapping as $identifierName => $mapping) {
if (! in_array($identifierName, array_keys($aggregatePayloadIdentifiersMapping))) {
throw ConfigurationException::create("Aggregate {$aggregateClassDefinition} for {$handledMessageClassNameDefinition} has metadata mapping for non existing identifier key {$identifierName}. It should be {\"aggregateId\":\"metadataIdKey\"}?");
}
}

foreach ($aggregatePayloadIdentifiersMapping as $aggregateIdentifierName => $aggregateIdentifierMappingKey) {
foreach ($messageIdentifiersMapping as $aggregateIdentifierName => $aggregateIdentifierMappingKey) {
if (is_null($aggregateIdentifierMappingKey)) {
$mappingKey = null;
foreach ($messageProperties as $property) {
Expand All @@ -166,21 +173,21 @@ private function initialize(InterfaceToCallRegistry $interfaceToCallRegistry, Cl
}

if (is_null($handledMessageClassNameDefinition) && is_null($mappingKey)) {
$aggregatePayloadIdentifiersMapping[$aggregateIdentifierName] = $aggregateIdentifierName;
} elseif (is_null($mappingKey) && ! $this->hasIdentifierMappingInMetadata($metadataIdentifierMapping, $aggregateIdentifierName)) {
$messageIdentifiersMapping[$aggregateIdentifierName] = $aggregateIdentifierName;
} elseif (is_null($mappingKey) && ! $this->hasRuntimeIdentifierMapping($metadataIdentifierMapping, $aggregateIdentifierName) && ! $this->hasRuntimeIdentifierMapping($identifierMapping, $aggregateIdentifierName)) {
/** NO mapping available, identifier should come from message headers under "aggregate.id" */
$aggregatePayloadIdentifiersMapping = [];
$messageIdentifiersMapping = [];
} else {
$aggregatePayloadIdentifiersMapping[$aggregateIdentifierName] = $mappingKey;
$messageIdentifiersMapping[$aggregateIdentifierName] = $mappingKey;
}
}
}

$this->payloadIdentifierMapping = $aggregatePayloadIdentifiersMapping;
$this->messageIdentifierMapping = $messageIdentifiersMapping;
$this->typeToConvertTo = $handledMessageClassNameDefinition ? $handledMessageClassNameDefinition->getClassType() : TypeDescriptor::createArrayType();
}

private function hasIdentifierMappingInMetadata(array $metadataIdentifierMapping, $aggregateIdentifierName): bool
private function hasRuntimeIdentifierMapping(array $metadataIdentifierMapping, $aggregateIdentifierName): bool
{
foreach ($metadataIdentifierMapping as $identifierNameHeaderMapping => $headerName) {
if ($aggregateIdentifierName == $identifierNameHeaderMapping) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
use Attribute;
use Ecotone\Messaging\Support\Assert;

/**
* @deprecated Ecotone 2.0 will drop this attribute. Use #[IdentifierMethod] instead
*/
#[Attribute(Attribute::TARGET_METHOD)]
final class AggregateIdentifierMethod
class AggregateIdentifierMethod
{
private string $identifierPropertyName;

Expand Down
3 changes: 3 additions & 0 deletions packages/Ecotone/src/Modelling/Attribute/AggregateVersion.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Attribute;

/**
* @deprecated Ecotone 2.0 will drop this attribute. Use #[Version] instead
*/
#[Attribute(Attribute::TARGET_PROPERTY | Attribute::TARGET_PARAMETER)]
class AggregateVersion
{
Expand Down
Loading

0 comments on commit 57796ce

Please sign in to comment.