Skip to content

Commit

Permalink
Enforce uniqueness for EventGroupMetadataDescriptor message types.
Browse files Browse the repository at this point in the history
Note that this only affects the creation of new resources. Existing resources may exist that already violate this constraint.
  • Loading branch information
SanjayVas committed Feb 29, 2024
1 parent 30b67d9 commit 5a84e21
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.wfanet.measurement.internal.kingdom.GetEventGroupMetadataDescriptorRe
import org.wfanet.measurement.internal.kingdom.StreamEventGroupMetadataDescriptorsRequest
import org.wfanet.measurement.internal.kingdom.UpdateEventGroupMetadataDescriptorRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.DataProviderNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorAlreadyExistsWithTypeException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamEventGroupMetadataDescriptors
Expand All @@ -45,6 +46,8 @@ class SpannerEventGroupMetadataDescriptorsService(
return CreateEventGroupMetadataDescriptor(request).execute(client, idGenerator)
} catch (e: DataProviderNotFoundException) {
throw e.asStatusRuntimeException(Status.Code.NOT_FOUND, "DataProvider not found.")
} catch (e: EventGroupMetadataDescriptorAlreadyExistsWithTypeException) {
throw e.asStatusRuntimeException(Status.Code.ALREADY_EXISTS)
} catch (e: KingdomInternalException) {
throw e.asStatusRuntimeException(Status.Code.INTERNAL, "Unexpected internal error")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,19 @@ class EventGroupMetadataDescriptorNotFoundException(
)
}

class EventGroupMetadataDescriptorAlreadyExistsWithTypeException(
message: String = "EventGroupMetadataDescriptor with same protobuf type already exists",
cause: Throwable? = null,
) :
KingdomInternalException(
ErrorCode.EVENT_GROUP_METADATA_DESCRIPTOR_ALREADY_EXISTS_WITH_TYPE,
message,
cause,
) {
override val context: Map<String, String>
get() = emptyMap()
}

class RecurringExchangeNotFoundException(
val externalRecurringExchangeId: ExternalId,
provideDescription: () -> String = { "RecurringExchange not found" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@

package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.writers

import com.google.cloud.spanner.ErrorCode
import com.google.cloud.spanner.SpannerException
import org.wfanet.measurement.common.ProtoReflection
import org.wfanet.measurement.common.identity.ExternalId
import org.wfanet.measurement.common.identity.InternalId
import org.wfanet.measurement.gcloud.spanner.bufferInsertMutation
import org.wfanet.measurement.gcloud.spanner.set
import org.wfanet.measurement.gcloud.spanner.setJson
import org.wfanet.measurement.internal.kingdom.EventGroupMetadataDescriptor
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.DataProviderNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupMetadataDescriptorAlreadyExistsWithTypeException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.DataProviderReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.EventGroupMetadataDescriptorReader

/**
* Creates a EventGroupMetadataDescriptor in the database.
*
* Throws a subclass of [KingdomInternalException] on [execute].
*
* @throws [DataProviderNotFoundException] DataProvider not found
* Throws one of the following subclasses of [KingdomInternalException] on [execute]:
* * [DataProviderNotFoundException]
* * [EventGroupMetadataDescriptorAlreadyExistsWithTypeException]
*/
class CreateEventGroupMetadataDescriptor(
private val eventGroupMetadataDescriptor: EventGroupMetadataDescriptor
Expand Down Expand Up @@ -58,6 +62,14 @@ class CreateEventGroupMetadataDescriptor(
return createNewEventGroupMetadataDescriptor(dataProviderId)
}

override suspend fun handleSpannerException(e: SpannerException): EventGroupMetadataDescriptor? {
if (e.errorCode == ErrorCode.ALREADY_EXISTS) {
throw EventGroupMetadataDescriptorAlreadyExistsWithTypeException(cause = e)
} else {
throw e
}
}

private fun TransactionScope.createNewEventGroupMetadataDescriptor(
dataProviderId: InternalId
): EventGroupMetadataDescriptor {
Expand All @@ -76,6 +88,20 @@ class CreateEventGroupMetadataDescriptor(
setJson("DescriptorDetailsJson" to eventGroupMetadataDescriptor.details)
}

val wellKnownTypeNames: Set<String> =
ProtoReflection.WELL_KNOWN_TYPES.flatMap { it.messageTypes }.map { it.fullName }.toSet()
val protobufTypeNames: List<String> =
ProtoReflection.buildDescriptors(listOf(eventGroupMetadataDescriptor.details.descriptorSet))
.map { it.fullName }
.filter { it !in wellKnownTypeNames }
for (protobufTypeName in protobufTypeNames) {
transactionContext.bufferInsertMutation("EventGroupMetadataDescriptorTypes") {
set("DataProviderId" to dataProviderId)
set("EventGroupMetadataDescriptorId" to internalDescriptorId)
set("ProtobufTypeName" to protobufTypeName)
}
}

return eventGroupMetadataDescriptor
.toBuilder()
.setExternalEventGroupMetadataDescriptorId(externalDescriptorId.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:llv2_protocol_config",
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:ro_llv2_protocol_config",
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/testing",
"//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_message_2_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_message_kt_jvm_proto",
"@wfa_common_jvm//imports/java/com/google/cloud/spanner",
"@wfa_common_jvm//imports/java/com/google/common/truth",
"@wfa_common_jvm//imports/java/com/google/common/truth/extensions/proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.wfanet.measurement.kingdom.service.internal.testing

import com.google.common.truth.Truth.assertThat
import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import com.google.protobuf.DescriptorProtos.FileDescriptorSet
import io.grpc.Status
import io.grpc.StatusRuntimeException
import java.time.Clock
Expand All @@ -30,6 +29,9 @@ import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.wfanet.measurement.api.Version
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.TestMetadataMessage
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.TestMetadataMessage2
import org.wfanet.measurement.common.ProtoReflection
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.common.identity.RandomIdGenerator
import org.wfanet.measurement.internal.kingdom.DataProvidersGrpcKt.DataProvidersCoroutineImplBase
Expand All @@ -48,7 +50,11 @@ import org.wfanet.measurement.kingdom.deploy.common.testing.DuchyIdSetter
private const val RANDOM_SEED = 1
private val DETAILS = details {
apiVersion = Version.V2_ALPHA.string
descriptorSet = FileDescriptorSet.getDefaultInstance()
descriptorSet = ProtoReflection.buildFileDescriptorSet(TestMetadataMessage.getDescriptor())
}
private val DETAILS_2 = details {
apiVersion = Version.V2_ALPHA.string
descriptorSet = ProtoReflection.buildFileDescriptorSet(TestMetadataMessage2.getDescriptor())
}

@RunWith(JUnit4::class)
Expand Down Expand Up @@ -163,6 +169,33 @@ abstract class EventGroupMetadataDescriptorsServiceTest<
assertThat(exception).hasMessageThat().contains("NOT_FOUND: DataProvider not found")
}

@Test
fun `createEventGroupMetadataDescriptor fails for duplicate message type`() = runBlocking {
val externalDataProviderId =
population.createDataProvider(dataProvidersService).externalDataProviderId
val externalDataProviderId2 =
population.createDataProvider(dataProvidersService).externalDataProviderId
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId
details = DETAILS
}
)

val exception =
assertFailsWith<StatusRuntimeException> {
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId2
details = DETAILS
}
)
}

assertThat(exception.status.code).isEqualTo(Status.Code.ALREADY_EXISTS)
assertThat(exception).hasMessageThat().contains("EventGroupMetadataDescriptor")
}

@Test
fun `createEventGroupMetadataDescriptor returns existing Descriptor by idempotency key`() =
runBlocking {
Expand Down Expand Up @@ -257,7 +290,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest<
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId
details = DETAILS
details = DETAILS_2
}
)

Expand Down Expand Up @@ -303,7 +336,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest<
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId
details = DETAILS
details = DETAILS_2
}
)

Expand Down Expand Up @@ -342,7 +375,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest<
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId
details = DETAILS
details = DETAILS_2
}
)

Expand Down Expand Up @@ -383,7 +416,7 @@ abstract class EventGroupMetadataDescriptorsServiceTest<
eventGroupMetadataDescriptorService.createEventGroupMetadataDescriptor(
eventGroupMetadataDescriptor {
this.externalDataProviderId = externalDataProviderId
details = DETAILS
details = DETAILS_2
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ kt_jvm_proto_library(
deps = [":test_metadata_message_proto"],
)

proto_library(
name = "test_metadata_message_2_proto",
srcs = ["test_metadata_message_2.proto"],
strip_import_prefix = IMPORT_PREFIX,
)

kt_jvm_proto_library(
name = "test_metadata_message_2_kt_jvm_proto",
deps = [":test_metadata_message_2_proto"],
)

proto_library(
name = "test_parent_metadata_message_proto",
srcs = ["test_parent_metadata_message.proto"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

// Copyright 2021 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package wfa.measurement.api.v2alpha.event_group_metadata.testing;

option java_package = "org.wfanet.measurement.api.v2alpha.event_group_metadata.testing";
option java_multiple_files = true;

message TestMetadataMessage2 {
int32 publisher_id = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ enum ErrorCode {
EVENT_GROUP_INVALID_ARGS = 21;
/** EventGroupMetadataDescriptor could not be found. */
EVENT_GROUP_METADATA_DESCRIPTOR_NOT_FOUND = 22;
// An EventGroupMetadataDescriptor already exists with the same protobuf type.
EVENT_GROUP_METADATA_DESCRIPTOR_ALREADY_EXISTS_WITH_TYPE = 43;
/** RecurringExchange could not be found. */
RECURRING_EXCHANGE_NOT_FOUND = 23;
/** ExchangeStepAttempt could not be found. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- liquibase formatted sql

-- Copyright 2024 The Cross-Media Measurement Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- changeset sanjayvas:15 dbms:cloudspanner
-- comment: Add unique index for EventGroupMetadataDescriptor protobuf types.

CREATE TABLE EventGroupMetadataDescriptorTypes (
DataProviderId INT64 NOT NULL,
EventGroupMetadataDescriptorId INT64 NOT NULL,

-- Fully-qualified protobuf type name.
ProtobufTypeName STRING(MAX) NOT NULL,
) PRIMARY KEY (DataProviderId, EventGroupMetadataDescriptorId, ProtobufTypeName),
INTERLEAVE IN PARENT EventGroupMetadataDescriptors ON DELETE CASCADE;

CREATE UNIQUE INDEX EventGroupMetadataDescriptorTypesByType
ON EventGroupMetadataDescriptorTypes(ProtobufTypeName);
3 changes: 3 additions & 0 deletions src/main/resources/kingdom/spanner/changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ databaseChangeLog:
- include:
file: add-result-api-version.sql
relativeToChangeLogFile: true
- include:
file: add-event-group-metadata-descriptor-types.sql
relativeToChangeLogFile: true

0 comments on commit 5a84e21

Please sign in to comment.