Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ import akka.protobufv3.internal.UnsafeByteOperations
import akka.serialization.SerializationExtension
import akka.serialization.Serializers

/**
* INTERNAL API
*/
@InternalApi
object PayloadSerializer {
case object NoDeserializableMetadataMarker
val NoDeserializableMetadataComposite = CompositeMetadata(Seq(NoDeserializableMetadataMarker))
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -94,18 +103,22 @@ final class PayloadSerializer(val system: ExtendedActorSystem)
val protoCompositeMetadata = mf.CompositeMetadata.parseFrom(bytes)

val metadataEntries =
protoCompositeMetadata.getPayloadsList.iterator.asScala.map { persistentPayload =>
protoCompositeMetadata.getPayloadsList.iterator.asScala.flatMap { persistentPayload =>
val manifest =
if (persistentPayload.hasPayloadManifest)
persistentPayload.getPayloadManifest.toStringUtf8
else ""

// if the metadata type is unknown to the deserializing service, we do not want to fail deserialization
// of the event as a whole, so such metadata payloads are ignored
serialization
.deserialize(persistentPayload.getPayload.toByteArray, persistentPayload.getSerializerId, manifest)
.get
.toOption
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have a marker trait to define that the metadata is mandatory and should fail deserialization instead of skipping? Then we have it as default to ignore, but can mark certain things, like ReplicatedEventMetadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have a serializer id and a manifest, no class on the deserializing side when unknown metadata type so we couldn't know that the marker is applied?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, didn't think (that far)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that it could be only for unknown serializer id:s but where we hit it ourselves was there is an existing serializer for protos but the message type is unknown, so that doesn't work well enough either.

}.toSeq

CompositeMetadata(metadataEntries)
if (metadataEntries.nonEmpty) CompositeMetadata(metadataEntries)
else
PayloadSerializer.NoDeserializableMetadataComposite // we always need to return a CompositeMetadata with at least one entry
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package akka.persistence.serialization
import akka.persistence.CompositeMetadata
import akka.persistence.FilteredPayload
import akka.persistence.SerializedEvent
import akka.persistence.serialization.{ MessageFormats => mf }
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.serialization.Serializers
import akka.testkit.AkkaSpec
import akka.protobufv3.internal.{ ByteString => ProtoByteString }

class PayloadSerializerSpec extends AkkaSpec {

Expand Down Expand Up @@ -62,6 +64,59 @@ class PayloadSerializerSpec extends AkkaSpec {
deserialized.entries.head shouldBe "a"
deserialized.entries(1) shouldBe 17L
}

"ignore some metadata unknown when deserializing rather than fail" in {
val stringSerializer = serialization.serializerFor(classOf[String])
val stringMeta = "suchMetaWow!"
val metadataWithUnknownEntry = mf.CompositeMetadata
.newBuilder()
.addPayloads(
mf.PersistentPayload
.newBuilder()
.setSerializerId(Int.MinValue)
.setPayloadManifest(ProtoByteString.copyFromUtf8("Q"))
.setPayload(ProtoByteString.empty())
.build())
.addPayloads(
mf.PersistentPayload
.newBuilder()
.setSerializerId(stringSerializer.identifier)
.setPayloadManifest(ProtoByteString.empty())
.setPayload(ProtoByteString.copyFrom(stringSerializer.toBinary(stringMeta))))
.build()
val bytes = metadataWithUnknownEntry.toByteArray

val someMeta = CompositeMetadata(List("dummy"))
val serializer = serialization.findSerializerFor(someMeta).asInstanceOf[SerializerWithStringManifest]
val manifest = serializer.manifest(someMeta)
val deserialized =
serialization.deserialize(bytes, serializer.identifier, manifest).get.asInstanceOf[CompositeMetadata]

deserialized.entries should have size (1)
deserialized.entries shouldEqual Seq(stringMeta)
}

"ignore all metadata unknown when deserializing rather than fail" in {
val metadataWithUnknownEntry = mf.CompositeMetadata
.newBuilder()
.addPayloads(
mf.PersistentPayload
.newBuilder()
.setSerializerId(Int.MinValue)
.setPayloadManifest(ProtoByteString.copyFromUtf8("Q"))
.setPayload(ProtoByteString.empty())
.build())
.build()
val bytes = metadataWithUnknownEntry.toByteArray

val someMeta = CompositeMetadata(List("dummy"))
val serializer = serialization.findSerializerFor(someMeta).asInstanceOf[SerializerWithStringManifest]
val manifest = serializer.manifest(someMeta)
val deserialized =
serialization.deserialize(bytes, serializer.identifier, manifest).get.asInstanceOf[CompositeMetadata]

deserialized shouldBe theSameInstanceAs(PayloadSerializer.NoDeserializableMetadataComposite)
}
}

}
Loading