diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/SpannerEventGroupMetadataDescriptorsService.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/SpannerEventGroupMetadataDescriptorsService.kt index 09f94f6a559..1e572818cca 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/SpannerEventGroupMetadataDescriptorsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/SpannerEventGroupMetadataDescriptorsService.kt @@ -27,6 +27,7 @@ import org.wfanet.measurement.internal.kingdom.GetEventGroupMetadataDescriptorRe import org.wfanet.measurement.internal.kingdom.StreamEventGroupMetadataDescriptorsRequest import org.wfanet.measurement.internal.kingdom.UpdateEventGroupMetadataDescriptorRequest import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.DataProviderNotFoundException +import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorAlreadyExistsWithTypeException import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorNotFoundException import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamEventGroupMetadataDescriptors @@ -45,6 +46,8 @@ class SpannerEventGroupMetadataDescriptorsService( return CreateEventGroupMetadataDescriptor(request).execute(client, idGenerator) } catch (e: DataProviderNotFoundException) { throw e.asStatusRuntimeException(Status.Code.NOT_FOUND, "DataProvider not found.") + } catch (e: EventGroupMetadataDescriptorAlreadyExistsWithTypeException) { + throw e.asStatusRuntimeException(Status.Code.ALREADY_EXISTS) } catch (e: KingdomInternalException) { throw e.asStatusRuntimeException(Status.Code.INTERNAL, "Unexpected internal error") } diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common/KingdomInternalException.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common/KingdomInternalException.kt index 9d05c83ae38..33ef66434a7 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common/KingdomInternalException.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common/KingdomInternalException.kt @@ -580,6 +580,19 @@ class EventGroupMetadataDescriptorNotFoundException( ) } +class EventGroupMetadataDescriptorAlreadyExistsWithTypeException( + message: String = "EventGroupMetadataDescriptor with same protobuf type already exists", + cause: Throwable? = null, +) : + KingdomInternalException( + ErrorCode.EVENT_GROUP_METADATA_DESCRIPTOR_ALREADY_EXISTS_WITH_TYPE, + message, + cause, + ) { + override val context: Map + get() = emptyMap() +} + class RecurringExchangeNotFoundException( val externalRecurringExchangeId: ExternalId, provideDescription: () -> String = { "RecurringExchange not found" }, diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/writers/CreateEventGroupMetadataDescriptor.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/writers/CreateEventGroupMetadataDescriptor.kt index 424885ddf00..0de61734638 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/writers/CreateEventGroupMetadataDescriptor.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/writers/CreateEventGroupMetadataDescriptor.kt @@ -14,6 +14,9 @@ package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.writers +import com.google.cloud.spanner.ErrorCode +import com.google.cloud.spanner.SpannerException +import org.wfanet.measurement.common.ProtoReflection import org.wfanet.measurement.common.identity.ExternalId import org.wfanet.measurement.common.identity.InternalId import org.wfanet.measurement.gcloud.spanner.bufferInsertMutation @@ -21,6 +24,7 @@ import org.wfanet.measurement.gcloud.spanner.set import org.wfanet.measurement.gcloud.spanner.setJson import org.wfanet.measurement.internal.kingdom.EventGroupMetadataDescriptor import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.DataProviderNotFoundException +import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorAlreadyExistsWithTypeException import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.DataProviderReader import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.EventGroupMetadataDescriptorReader @@ -28,9 +32,9 @@ import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.EventGroupMe /** * Creates a EventGroupMetadataDescriptor in the database. * - * Throws a subclass of [KingdomInternalException] on [execute]. - * - * @throws [DataProviderNotFoundException] DataProvider not found + * Throws one of the following subclasses of [KingdomInternalException] on [execute]: + * * [DataProviderNotFoundException] + * * [EventGroupMetadataDescriptorAlreadyExistsWithTypeException] */ class CreateEventGroupMetadataDescriptor( private val eventGroupMetadataDescriptor: EventGroupMetadataDescriptor @@ -58,6 +62,14 @@ class CreateEventGroupMetadataDescriptor( return createNewEventGroupMetadataDescriptor(dataProviderId) } + override suspend fun handleSpannerException(e: SpannerException): EventGroupMetadataDescriptor? { + if (e.errorCode == ErrorCode.ALREADY_EXISTS) { + throw EventGroupMetadataDescriptorAlreadyExistsWithTypeException(cause = e) + } else { + throw e + } + } + private fun TransactionScope.createNewEventGroupMetadataDescriptor( dataProviderId: InternalId ): EventGroupMetadataDescriptor { @@ -76,6 +88,20 @@ class CreateEventGroupMetadataDescriptor( setJson("DescriptorDetailsJson" to eventGroupMetadataDescriptor.details) } + val wellKnownTypeNames: Set = + ProtoReflection.WELL_KNOWN_TYPES.flatMap { it.messageTypes }.map { it.fullName }.toSet() + val protobufTypeNames: List = + ProtoReflection.buildDescriptors(listOf(eventGroupMetadataDescriptor.details.descriptorSet)) + .map { it.fullName } + .filter { it !in wellKnownTypeNames } + for (protobufTypeName in protobufTypeNames) { + transactionContext.bufferInsertMutation("EventGroupMetadataDescriptorTypes") { + set("DataProviderId" to dataProviderId) + set("EventGroupMetadataDescriptorId" to internalDescriptorId) + set("ProtobufTypeName" to protobufTypeName) + } + } + return eventGroupMetadataDescriptor .toBuilder() .setExternalEventGroupMetadataDescriptorId(externalDescriptorId.value) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/BUILD.bazel index 277e5253c7f..53681b28352 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/BUILD.bazel @@ -13,6 +13,8 @@ kt_jvm_library( "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:llv2_protocol_config", "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:ro_llv2_protocol_config", "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/testing", + "//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_message_2_kt_jvm_proto", + "//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_message_kt_jvm_proto", "@wfa_common_jvm//imports/java/com/google/cloud/spanner", "@wfa_common_jvm//imports/java/com/google/common/truth", "@wfa_common_jvm//imports/java/com/google/common/truth/extensions/proto", diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/EventGroupMetadataDescriptorsServiceTest.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/EventGroupMetadataDescriptorsServiceTest.kt index 3fa8cfcdcd8..9200f945bf4 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/EventGroupMetadataDescriptorsServiceTest.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/service/internal/testing/EventGroupMetadataDescriptorsServiceTest.kt @@ -16,7 +16,6 @@ package org.wfanet.measurement.kingdom.service.internal.testing import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat -import com.google.protobuf.DescriptorProtos.FileDescriptorSet import io.grpc.Status import io.grpc.StatusRuntimeException import java.time.Clock @@ -30,6 +29,9 @@ import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.JUnit4 import org.wfanet.measurement.api.Version +import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.TestMetadataMessage +import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.TestMetadataMessage2 +import org.wfanet.measurement.common.ProtoReflection import org.wfanet.measurement.common.identity.IdGenerator import org.wfanet.measurement.common.identity.RandomIdGenerator import org.wfanet.measurement.internal.kingdom.DataProvidersGrpcKt.DataProvidersCoroutineImplBase @@ -48,7 +50,11 @@ import org.wfanet.measurement.kingdom.deploy.common.testing.DuchyIdSetter private const val RANDOM_SEED = 1 private val DETAILS = details { apiVersion = Version.V2_ALPHA.string - descriptorSet = FileDescriptorSet.getDefaultInstance() + descriptorSet = ProtoReflection.buildFileDescriptorSet(TestMetadataMessage.getDescriptor()) +} +private val DETAILS_2 = details { + apiVersion = Version.V2_ALPHA.string + descriptorSet = ProtoReflection.buildFileDescriptorSet(TestMetadataMessage2.getDescriptor()) } @RunWith(JUnit4::class) @@ -163,6 +169,33 @@ abstract class EventGroupMetadataDescriptorsServiceTest< assertThat(exception).hasMessageThat().contains("NOT_FOUND: DataProvider not found") } + @Test + fun `createEventGroupMetadataDescriptor fails for duplicate message type`() = runBlocking { + val externalDataProviderId = + population.createDataProvider(dataProvidersService).externalDataProviderId + val externalDataProviderId2 = + population.createDataProvider(dataProvidersService).externalDataProviderId + eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( + eventGroupMetadataDescriptor { + this.externalDataProviderId = externalDataProviderId + details = DETAILS + } + ) + + val exception = + assertFailsWith { + eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( + eventGroupMetadataDescriptor { + this.externalDataProviderId = externalDataProviderId2 + details = DETAILS + } + ) + } + + assertThat(exception.status.code).isEqualTo(Status.Code.ALREADY_EXISTS) + assertThat(exception).hasMessageThat().contains("EventGroupMetadataDescriptor") + } + @Test fun `createEventGroupMetadataDescriptor returns existing Descriptor by idempotency key`() = runBlocking { @@ -257,7 +290,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest< eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( eventGroupMetadataDescriptor { this.externalDataProviderId = externalDataProviderId - details = DETAILS + details = DETAILS_2 } ) @@ -303,7 +336,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest< eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( eventGroupMetadataDescriptor { this.externalDataProviderId = externalDataProviderId - details = DETAILS + details = DETAILS_2 } ) @@ -342,7 +375,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest< eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( eventGroupMetadataDescriptor { this.externalDataProviderId = externalDataProviderId - details = DETAILS + details = DETAILS_2 } ) @@ -383,7 +416,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest< eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor( eventGroupMetadataDescriptor { this.externalDataProviderId = externalDataProviderId - details = DETAILS + details = DETAILS_2 } ) diff --git a/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/BUILD.bazel b/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/BUILD.bazel index 4c584d6cb37..aeb5019ebdb 100644 --- a/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/BUILD.bazel +++ b/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/BUILD.bazel @@ -19,6 +19,17 @@ kt_jvm_proto_library( deps = [":test_metadata_message_proto"], ) +proto_library( + name = "test_metadata_message_2_proto", + srcs = ["test_metadata_message_2.proto"], + strip_import_prefix = IMPORT_PREFIX, +) + +kt_jvm_proto_library( + name = "test_metadata_message_2_kt_jvm_proto", + deps = [":test_metadata_message_2_proto"], +) + proto_library( name = "test_parent_metadata_message_proto", srcs = ["test_parent_metadata_message.proto"], diff --git a/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/test_metadata_message_2.proto b/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/test_metadata_message_2.proto new file mode 100644 index 00000000000..20f1c5329a6 --- /dev/null +++ b/src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing/test_metadata_message_2.proto @@ -0,0 +1,25 @@ + +// Copyright 2021 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package wfa.measurement.api.v2alpha.event_group_metadata.testing; + +option java_package = "org.wfanet.measurement.api.v2alpha.event_group_metadata.testing"; +option java_multiple_files = true; + +message TestMetadataMessage2 { + int32 publisher_id = 1; +} diff --git a/src/main/proto/wfa/measurement/internal/kingdom/error_code.proto b/src/main/proto/wfa/measurement/internal/kingdom/error_code.proto index 75f94f8065c..456e77015b3 100644 --- a/src/main/proto/wfa/measurement/internal/kingdom/error_code.proto +++ b/src/main/proto/wfa/measurement/internal/kingdom/error_code.proto @@ -68,6 +68,8 @@ enum ErrorCode { EVENT_GROUP_INVALID_ARGS = 21; /** EventGroupMetadataDescriptor could not be found. */ EVENT_GROUP_METADATA_DESCRIPTOR_NOT_FOUND = 22; + // An EventGroupMetadataDescriptor already exists with the same protobuf type. + EVENT_GROUP_METADATA_DESCRIPTOR_ALREADY_EXISTS_WITH_TYPE = 43; /** RecurringExchange could not be found. */ RECURRING_EXCHANGE_NOT_FOUND = 23; /** ExchangeStepAttempt could not be found. */ diff --git a/src/main/resources/kingdom/spanner/add-event-group-metadata-descriptor-types.sql b/src/main/resources/kingdom/spanner/add-event-group-metadata-descriptor-types.sql new file mode 100644 index 00000000000..91c86d07976 --- /dev/null +++ b/src/main/resources/kingdom/spanner/add-event-group-metadata-descriptor-types.sql @@ -0,0 +1,30 @@ +-- liquibase formatted sql + +-- Copyright 2024 The Cross-Media Measurement Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- changeset sanjayvas:15 dbms:cloudspanner +-- comment: Add unique index for EventGroupMetadataDescriptor protobuf types. + +CREATE TABLE EventGroupMetadataDescriptorTypes ( + DataProviderId INT64 NOT NULL, + EventGroupMetadataDescriptorId INT64 NOT NULL, + + -- Fully-qualified protobuf type name. + ProtobufTypeName STRING(MAX) NOT NULL, +) PRIMARY KEY (DataProviderId, EventGroupMetadataDescriptorId, ProtobufTypeName), +INTERLEAVE IN PARENT EventGroupMetadataDescriptors ON DELETE CASCADE; + +CREATE UNIQUE INDEX EventGroupMetadataDescriptorTypesByType + ON EventGroupMetadataDescriptorTypes(ProtobufTypeName); diff --git a/src/main/resources/kingdom/spanner/changelog.yaml b/src/main/resources/kingdom/spanner/changelog.yaml index 9818e27f6f8..e59e19b113d 100644 --- a/src/main/resources/kingdom/spanner/changelog.yaml +++ b/src/main/resources/kingdom/spanner/changelog.yaml @@ -60,3 +60,6 @@ databaseChangeLog: - include: file: add-result-api-version.sql relativeToChangeLogFile: true +- include: + file: add-event-group-metadata-descriptor-types.sql + relativeToChangeLogFile: true