diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/PayloadSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/PayloadSerializer.scala index d1f18acfbdd..0f54f2237e2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/PayloadSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/PayloadSerializer.scala @@ -17,6 +17,15 @@ import akka.protobufv3.internal.UnsafeByteOperations import akka.serialization.SerializationExtension import akka.serialization.Serializers +/** + * INTERNAL API + */ +@InternalApi +object PayloadSerializer { + case object NoDeserializableMetadataMarker + val NoDeserializableMetadataComposite = CompositeMetadata(Seq(NoDeserializableMetadataMarker)) +} + /** * INTERNAL API */ @@ -94,18 +103,22 @@ final class PayloadSerializer(val system: ExtendedActorSystem) val protoCompositeMetadata = mf.CompositeMetadata.parseFrom(bytes) val metadataEntries = - protoCompositeMetadata.getPayloadsList.iterator.asScala.map { persistentPayload => + protoCompositeMetadata.getPayloadsList.iterator.asScala.flatMap { persistentPayload => val manifest = if (persistentPayload.hasPayloadManifest) persistentPayload.getPayloadManifest.toStringUtf8 else "" + // if the metadata type is unknown to the deserializing service, we do not want to fail deserialization + // of the event as a whole, so such metadata payloads are ignored serialization .deserialize(persistentPayload.getPayload.toByteArray, persistentPayload.getSerializerId, manifest) - .get + .toOption }.toSeq - CompositeMetadata(metadataEntries) + if (metadataEntries.nonEmpty) CompositeMetadata(metadataEntries) + else + PayloadSerializer.NoDeserializableMetadataComposite // we always need to return a CompositeMetadata with at least one entry } } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/PayloadSerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/PayloadSerializerSpec.scala index f34eb741c54..365daf1f6a9 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/PayloadSerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/PayloadSerializerSpec.scala @@ -7,10 +7,12 @@ package akka.persistence.serialization import akka.persistence.CompositeMetadata import akka.persistence.FilteredPayload import akka.persistence.SerializedEvent +import akka.persistence.serialization.{ MessageFormats => mf } import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest import akka.serialization.Serializers import akka.testkit.AkkaSpec +import akka.protobufv3.internal.{ ByteString => ProtoByteString } class PayloadSerializerSpec extends AkkaSpec { @@ -62,6 +64,59 @@ class PayloadSerializerSpec extends AkkaSpec { deserialized.entries.head shouldBe "a" deserialized.entries(1) shouldBe 17L } + + "ignore some metadata unknown when deserializing rather than fail" in { + val stringSerializer = serialization.serializerFor(classOf[String]) + val stringMeta = "suchMetaWow!" + val metadataWithUnknownEntry = mf.CompositeMetadata + .newBuilder() + .addPayloads( + mf.PersistentPayload + .newBuilder() + .setSerializerId(Int.MinValue) + .setPayloadManifest(ProtoByteString.copyFromUtf8("Q")) + .setPayload(ProtoByteString.empty()) + .build()) + .addPayloads( + mf.PersistentPayload + .newBuilder() + .setSerializerId(stringSerializer.identifier) + .setPayloadManifest(ProtoByteString.empty()) + .setPayload(ProtoByteString.copyFrom(stringSerializer.toBinary(stringMeta)))) + .build() + val bytes = metadataWithUnknownEntry.toByteArray + + val someMeta = CompositeMetadata(List("dummy")) + val serializer = serialization.findSerializerFor(someMeta).asInstanceOf[SerializerWithStringManifest] + val manifest = serializer.manifest(someMeta) + val deserialized = + serialization.deserialize(bytes, serializer.identifier, manifest).get.asInstanceOf[CompositeMetadata] + + deserialized.entries should have size (1) + deserialized.entries shouldEqual Seq(stringMeta) + } + + "ignore all metadata unknown when deserializing rather than fail" in { + val metadataWithUnknownEntry = mf.CompositeMetadata + .newBuilder() + .addPayloads( + mf.PersistentPayload + .newBuilder() + .setSerializerId(Int.MinValue) + .setPayloadManifest(ProtoByteString.copyFromUtf8("Q")) + .setPayload(ProtoByteString.empty()) + .build()) + .build() + val bytes = metadataWithUnknownEntry.toByteArray + + val someMeta = CompositeMetadata(List("dummy")) + val serializer = serialization.findSerializerFor(someMeta).asInstanceOf[SerializerWithStringManifest] + val manifest = serializer.manifest(someMeta) + val deserialized = + serialization.deserialize(bytes, serializer.identifier, manifest).get.asInstanceOf[CompositeMetadata] + + deserialized shouldBe theSameInstanceAs(PayloadSerializer.NoDeserializableMetadataComposite) + } } }