From e03ae4488e46558fa25e915cb8b05d30db0765fe Mon Sep 17 00:00:00 2001 From: Viktor Danyliuk Date: Wed, 5 Oct 2022 16:50:39 +0300 Subject: [PATCH 1/5] Add bigquery subscription type support Signed-off-by: Viktor Danyliuk --- apis/pubsub/v1alpha1/subscription_types.go | 24 ++ apis/pubsub/v1alpha1/zz_generated.deepcopy.go | 20 ++ ...ubsub.gcp.crossplane.io_subscriptions.yaml | 25 ++ pkg/clients/subscription/subscription.go | 29 ++- pkg/clients/subscription/subscription_test.go | 223 ++++++++++++++++++ 5 files changed, 320 insertions(+), 1 deletion(-) diff --git a/apis/pubsub/v1alpha1/subscription_types.go b/apis/pubsub/v1alpha1/subscription_types.go index 9172a9081..86923f54d 100644 --- a/apis/pubsub/v1alpha1/subscription_types.go +++ b/apis/pubsub/v1alpha1/subscription_types.go @@ -87,6 +87,10 @@ type SubscriptionParameters struct { // +optional PushConfig *PushConfig `json:"pushConfig,omitempty"` + // BigQueryConfig is a parameter which configures bigquery delivery. + // +optional + BigQueryConfig *BigQueryConfig `json:"bigQueryConfig,omitempty"` + // RetainAckedMessages is a message which indicates whether to retain acknowledged // messages. If true, then messages are not expunged from the // subscription's backlog, even if they are acknowledged, until they @@ -143,6 +147,26 @@ type PushConfig struct { PushEndpoint string `json:"pushEndpoint,omitempty"` } +// BigQueryConfig contains configuration for a bigquery delivery endpoint. +type BigQueryConfig struct { + // Bigquery table to deliver messages to. + Table string `json:"table,omitempty"` + + // When enabled, the topic schema will be used when writing to BigQuery. Else, + // tes the message bytes to a column called data in BigQuery. + UseTopicSchema bool `json:"useTopicSchema,omitempty"` + + // When enabled, the metadata of each message is written to additional columns in + // the BigQuery table. Else, the metadata is not written to the BigQuery table. + // https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata + WriteMetadata bool `json:"writeMetadata,omitempty"` + + // When enabled along with the "Use topic schema" option, any field that is present in + // the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written + // and remain in the subscription backlog. + DropUnknownFields bool `json:"dropUnknownFields,omitempty"` +} + // OidcToken contains information needed for generating an OpenID Connect token type OidcToken struct { // Audience is the "audience" to be used when generating OIDC token. diff --git a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go index 142f26bd1..50ace383a 100644 --- a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go +++ b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go @@ -26,6 +26,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BigQueryConfig) DeepCopyInto(out *BigQueryConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BigQueryConfig. +func (in *BigQueryConfig) DeepCopy() *BigQueryConfig { + if in == nil { + return nil + } + out := new(BigQueryConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeadLetterPolicy) DeepCopyInto(out *DeadLetterPolicy) { *out = *in @@ -217,6 +232,11 @@ func (in *SubscriptionParameters) DeepCopyInto(out *SubscriptionParameters) { *out = new(PushConfig) (*in).DeepCopyInto(*out) } + if in.BigQueryConfig != nil { + in, out := &in.BigQueryConfig, &out.BigQueryConfig + *out = new(BigQueryConfig) + **out = **in + } if in.RetryPolicy != nil { in, out := &in.RetryPolicy, &out.RetryPolicy *out = new(RetryPolicy) diff --git a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml index e1f028a61..63d42e499 100644 --- a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml +++ b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml @@ -65,6 +65,31 @@ spec: value of 10 seconds is used. format: int64 type: integer + bigQueryConfig: + description: BigQueryConfig is a parameter which configures bigquery + delivery. + properties: + dropUnknownFields: + description: When enabled along with the "Use topic schema" + option, any field that is present in the topic schema but + not in the BigQuery schema will be dropped. Else, messages + with extra fields are not written and remain in the subscription + backlog. + type: boolean + table: + description: Bigquery table to deliver messages to. + type: string + useTopicSchema: + description: When enabled, the topic schema will be used when + writing to BigQuery. Else, tes the message bytes to a column + called data in BigQuery. + type: boolean + writeMetadata: + description: When enabled, the metadata of each message is + written to additional columns in the BigQuery table. Else, + the metadata is not written to the BigQuery table. https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata + type: boolean + type: object deadLetterPolicy: description: DeadLetterPolicy is the policy that specifies the conditions for dead lettering messages in this subscription. diff --git a/pkg/clients/subscription/subscription.go b/pkg/clients/subscription/subscription.go index 8a546e043..78e377749 100644 --- a/pkg/clients/subscription/subscription.go +++ b/pkg/clients/subscription/subscription.go @@ -53,6 +53,7 @@ func GenerateSubscription(projectID, name string, p v1alpha1.SubscriptionParamet setDeadLetterPolicy(projectID, p, s) setExpirationPolicy(p, s) setPushConfig(p, s) + setBigQueryConfig(p, s) setRetryPolicy(p, s) return s @@ -85,6 +86,18 @@ func setPushConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { } } +// setBigQueryConfig sets BigQueryConfig of subscription based on SubscriptionParameters. +func setBigQueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { + if p.BigQueryConfig != nil { + s.BigqueryConfig = &pubsub.BigQueryConfig{ + Table: p.BigQueryConfig.Table, + UseTopicSchema: p.BigQueryConfig.UseTopicSchema, + WriteMetadata: p.BigQueryConfig.WriteMetadata, + DropUnknownFields: p.BigQueryConfig.DropUnknownFields, + } + } +} + // setExpirationPolicy sets ExpirationPolicy of subscription based on SubscriptionParameters. func setExpirationPolicy(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { if p.ExpirationPolicy != nil { @@ -169,6 +182,15 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) { } } + if p.BigQueryConfig == nil && s.BigqueryConfig != nil { + p.BigQueryConfig = &v1alpha1.BigQueryConfig{ + Table: s.BigqueryConfig.Table, + DropUnknownFields: s.BigqueryConfig.DropUnknownFields, + UseTopicSchema: s.BigqueryConfig.UseTopicSchema, + WriteMetadata: s.BigqueryConfig.WriteMetadata, + } + } + if p.RetryPolicy == nil && s.RetryPolicy != nil { p.RetryPolicy = &v1alpha1.RetryPolicy{ MaximumBackoff: s.RetryPolicy.MaximumBackoff, @@ -195,7 +217,7 @@ func IsUpToDate(projectID string, p v1alpha1.SubscriptionParameters, s pubsub.Su // GenerateUpdateRequest produces an UpdateSubscriptionRequest with the difference // between SubscriptionParameters and Subscription. // enableMessageOrdering, deadLetterPolicy, topic are not mutable -func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { +func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { // nolint:gocyclo observed := &v1alpha1.SubscriptionParameters{} LateInitialize(observed, s) @@ -245,6 +267,11 @@ func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pub setPushConfig(p, us.Subscription) } + if !cmp.Equal(p.BigQueryConfig, observed.BigQueryConfig) { + mask = append(mask, "bigQueryConfig") + setBigQueryConfig(p, us.Subscription) + } + if !cmp.Equal(p.RetryPolicy, observed.RetryPolicy) { mask = append(mask, "retryPolicy") setRetryPolicy(p, us.Subscription) diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index 8c9c9b74c..006df0f03 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -95,6 +95,67 @@ func subscription() *pubsub.Subscription { } } +func bigqueryParams() *v1alpha1.SubscriptionParameters { + return &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Filter: "foo", + Labels: map[string]string{"example": "true"}, + MessageRetentionDuration: "864000s", + PushConfig: nil, + BigQueryConfig: &v1alpha1.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: true, + WriteMetadata: true, + DropUnknownFields: true, + }, + RetryPolicy: &v1alpha1.RetryPolicy{ + MaximumBackoff: "100s", + MinimumBackoff: "15s", + }, + RetainAckedMessages: true, + Topic: topicName, + } +} + +func bigquerySubscription() *pubsub.Subscription { + return &pubsub.Subscription{ + Name: name, + AckDeadlineSeconds: 15, + DeadLetterPolicy: &pubsub.DeadLetterPolicy{ + DeadLetterTopic: topicNameExternal, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &pubsub.ExpirationPolicy{Ttl: "1296000s"}, + Filter: "foo", + Labels: map[string]string{ + "example": "true", + }, + MessageRetentionDuration: "864000s", + PushConfig: nil, + BigqueryConfig: &pubsub.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: true, + WriteMetadata: true, + DropUnknownFields: true, + }, + RetryPolicy: &pubsub.RetryPolicy{ + MaximumBackoff: "100s", + MinimumBackoff: "15s", + }, + RetainAckedMessages: true, + Topic: topicNameExternal, + } +} + func TestGenerateSubscription(t *testing.T) { type args struct { projectID string @@ -125,6 +186,36 @@ func TestGenerateSubscription(t *testing.T) { } } +func TestGenerateBigquerySubscription(t *testing.T) { + type args struct { + projectID string + name string + s v1alpha1.SubscriptionParameters + } + cases := map[string]struct { + args + out *pubsub.Subscription + }{ + "Full": { + args: args{ + projectID: projectID, + name: name, + s: *bigqueryParams(), + }, + out: bigquerySubscription(), + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := GenerateSubscription(tc.projectID, tc.name, tc.s) + if diff := cmp.Diff(tc.out, got); diff != "" { + t.Errorf("GenerateSubscription(...): -want, +got:\n%s", diff) + } + }) + } +} + func TestLateInitialize(t *testing.T) { type args struct { obs pubsub.Subscription @@ -179,6 +270,59 @@ func TestLateInitialize(t *testing.T) { } } +func TestLateInitializeBigquery(t *testing.T) { + type args struct { + obs pubsub.Subscription + param *v1alpha1.SubscriptionParameters + } + cases := map[string]struct { + args + out *v1alpha1.SubscriptionParameters + }{ + "Full": { + args: args{ + obs: *bigquerySubscription(), + param: &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Filter: "foo", + Labels: map[string]string{"example": "true"}, + MessageRetentionDuration: "864000s", + PushConfig: nil, + BigQueryConfig: &v1alpha1.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: true, + WriteMetadata: true, + DropUnknownFields: true, + }, + RetryPolicy: &v1alpha1.RetryPolicy{ + MaximumBackoff: "100s", + MinimumBackoff: "15s", + }, + RetainAckedMessages: true, + Topic: topicName, + }, + }, + out: bigqueryParams(), + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + LateInitialize(tc.args.param, tc.args.obs) + if diff := cmp.Diff(tc.args.param, tc.out); diff != "" { + t.Errorf("LateInitialize(...): -want, +got:\n%s", diff) + } + }) + } +} + func TestIsUpToDate(t *testing.T) { type args struct { obs pubsub.Subscription @@ -217,6 +361,44 @@ func TestIsUpToDate(t *testing.T) { } } +func TestIsUpToDateBigquery(t *testing.T) { + type args struct { + obs pubsub.Subscription + param v1alpha1.SubscriptionParameters + } + cases := map[string]struct { + args + result bool + }{ + "NotUpToDate": { + args: args{ + obs: *bigquerySubscription(), + param: v1alpha1.SubscriptionParameters{ + RetryPolicy: nil, + }, + }, + result: false, + }, + "UpToDate": { + args: args{ + obs: *bigquerySubscription(), + param: *bigqueryParams(), + }, + result: true, + }, + } + + IsUpToDate(projectID, *bigqueryParams(), *bigquerySubscription()) + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsUpToDate(projectID, tc.args.param, tc.args.obs) + if diff := cmp.Diff(tc.result, got); diff != "" { + t.Errorf("IsUpToDate(...): -want, +got:\n%s", diff) + } + }) + } +} + func TestGenerateUpdateRequest(t *testing.T) { mutableSubscription := subscription() mutableSubscription.Topic = "" @@ -257,3 +439,44 @@ func TestGenerateUpdateRequest(t *testing.T) { }) } } + +func TestGenerateUpdateRequestBigquery(t *testing.T) { + mutableSubscription := bigquerySubscription() + mutableSubscription.Topic = "" + mutableSubscription.EnableMessageOrdering = false + mutableSubscription.DeadLetterPolicy = nil + + type args struct { + projectID string + name string + obs pubsub.Subscription + param v1alpha1.SubscriptionParameters + } + + cases := map[string]struct { + args + result *pubsub.UpdateSubscriptionRequest + }{ + "Full": { + args: args{ + projectID: projectID, + name: name, + obs: pubsub.Subscription{}, + param: *bigqueryParams(), + }, + result: &pubsub.UpdateSubscriptionRequest{ + Subscription: mutableSubscription, + UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,bigQueryConfig,retryPolicy", + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := GenerateUpdateRequest(tc.args.name, tc.args.param, tc.args.obs) + if diff := cmp.Diff(tc.result, got); diff != "" { + t.Errorf("GenerateUpdateRequest(...): -want, +got:\n%s", diff) + } + }) + } +} From 2c67e5aa8c8196c2386085509823cc072b27a41b Mon Sep 17 00:00:00 2001 From: Viktor Danyliuk Date: Mon, 17 Oct 2022 19:20:13 +0300 Subject: [PATCH 2/5] Rename BigQueryConfig to BigqueryConfig to avoid undesired underscores on CamelCase to snale_case conversion Signed-off-by: Viktor Danyliuk --- apis/pubsub/v1alpha1/subscription_types.go | 8 +++--- apis/pubsub/v1alpha1/zz_generated.deepcopy.go | 14 +++++----- ...ubsub.gcp.crossplane.io_subscriptions.yaml | 4 +-- pkg/clients/subscription/subscription.go | 26 +++++++++---------- pkg/clients/subscription/subscription_test.go | 6 ++--- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/apis/pubsub/v1alpha1/subscription_types.go b/apis/pubsub/v1alpha1/subscription_types.go index 86923f54d..c5fd08367 100644 --- a/apis/pubsub/v1alpha1/subscription_types.go +++ b/apis/pubsub/v1alpha1/subscription_types.go @@ -87,9 +87,9 @@ type SubscriptionParameters struct { // +optional PushConfig *PushConfig `json:"pushConfig,omitempty"` - // BigQueryConfig is a parameter which configures bigquery delivery. + // BigqueryConfig is a parameter which configures bigquery delivery. // +optional - BigQueryConfig *BigQueryConfig `json:"bigQueryConfig,omitempty"` + BigqueryConfig *BigqueryConfig `json:"bigqueryConfig,omitempty"` // RetainAckedMessages is a message which indicates whether to retain acknowledged // messages. If true, then messages are not expunged from the @@ -147,8 +147,8 @@ type PushConfig struct { PushEndpoint string `json:"pushEndpoint,omitempty"` } -// BigQueryConfig contains configuration for a bigquery delivery endpoint. -type BigQueryConfig struct { +// BigqueryConfig contains configuration for a bigquery delivery endpoint. +type BigqueryConfig struct { // Bigquery table to deliver messages to. Table string `json:"table,omitempty"` diff --git a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go index 50ace383a..9463bfd3f 100644 --- a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go +++ b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go @@ -27,16 +27,16 @@ import ( ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BigQueryConfig) DeepCopyInto(out *BigQueryConfig) { +func (in *BigqueryConfig) DeepCopyInto(out *BigqueryConfig) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BigQueryConfig. -func (in *BigQueryConfig) DeepCopy() *BigQueryConfig { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BigqueryConfig. +func (in *BigqueryConfig) DeepCopy() *BigqueryConfig { if in == nil { return nil } - out := new(BigQueryConfig) + out := new(BigqueryConfig) in.DeepCopyInto(out) return out } @@ -232,9 +232,9 @@ func (in *SubscriptionParameters) DeepCopyInto(out *SubscriptionParameters) { *out = new(PushConfig) (*in).DeepCopyInto(*out) } - if in.BigQueryConfig != nil { - in, out := &in.BigQueryConfig, &out.BigQueryConfig - *out = new(BigQueryConfig) + if in.BigqueryConfig != nil { + in, out := &in.BigqueryConfig, &out.BigqueryConfig + *out = new(BigqueryConfig) **out = **in } if in.RetryPolicy != nil { diff --git a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml index 63d42e499..2be89b8bc 100644 --- a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml +++ b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml @@ -65,8 +65,8 @@ spec: value of 10 seconds is used. format: int64 type: integer - bigQueryConfig: - description: BigQueryConfig is a parameter which configures bigquery + bigqueryConfig: + description: BigqueryConfig is a parameter which configures bigquery delivery. properties: dropUnknownFields: diff --git a/pkg/clients/subscription/subscription.go b/pkg/clients/subscription/subscription.go index 78e377749..f6473aaed 100644 --- a/pkg/clients/subscription/subscription.go +++ b/pkg/clients/subscription/subscription.go @@ -53,7 +53,7 @@ func GenerateSubscription(projectID, name string, p v1alpha1.SubscriptionParamet setDeadLetterPolicy(projectID, p, s) setExpirationPolicy(p, s) setPushConfig(p, s) - setBigQueryConfig(p, s) + setBigqueryConfig(p, s) setRetryPolicy(p, s) return s @@ -86,14 +86,14 @@ func setPushConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { } } -// setBigQueryConfig sets BigQueryConfig of subscription based on SubscriptionParameters. -func setBigQueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { - if p.BigQueryConfig != nil { +// setBigqueryConfig sets BigqueryConfig of subscription based on SubscriptionParameters. +func setBigqueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { + if p.BigqueryConfig != nil { s.BigqueryConfig = &pubsub.BigQueryConfig{ - Table: p.BigQueryConfig.Table, - UseTopicSchema: p.BigQueryConfig.UseTopicSchema, - WriteMetadata: p.BigQueryConfig.WriteMetadata, - DropUnknownFields: p.BigQueryConfig.DropUnknownFields, + Table: p.BigqueryConfig.Table, + UseTopicSchema: p.BigqueryConfig.UseTopicSchema, + WriteMetadata: p.BigqueryConfig.WriteMetadata, + DropUnknownFields: p.BigqueryConfig.DropUnknownFields, } } } @@ -182,8 +182,8 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) { } } - if p.BigQueryConfig == nil && s.BigqueryConfig != nil { - p.BigQueryConfig = &v1alpha1.BigQueryConfig{ + if p.BigqueryConfig == nil && s.BigqueryConfig != nil { + p.BigqueryConfig = &v1alpha1.BigqueryConfig{ Table: s.BigqueryConfig.Table, DropUnknownFields: s.BigqueryConfig.DropUnknownFields, UseTopicSchema: s.BigqueryConfig.UseTopicSchema, @@ -267,9 +267,9 @@ func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pub setPushConfig(p, us.Subscription) } - if !cmp.Equal(p.BigQueryConfig, observed.BigQueryConfig) { - mask = append(mask, "bigQueryConfig") - setBigQueryConfig(p, us.Subscription) + if !cmp.Equal(p.BigqueryConfig, observed.BigqueryConfig) { + mask = append(mask, "bigqueryConfig") + setBigqueryConfig(p, us.Subscription) } if !cmp.Equal(p.RetryPolicy, observed.RetryPolicy) { diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index 006df0f03..de7da5f6e 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -109,7 +109,7 @@ func bigqueryParams() *v1alpha1.SubscriptionParameters { Labels: map[string]string{"example": "true"}, MessageRetentionDuration: "864000s", PushConfig: nil, - BigQueryConfig: &v1alpha1.BigQueryConfig{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ Table: "projects/my-project/subscriptions/my-bigquery-subscription", UseTopicSchema: true, WriteMetadata: true, @@ -295,7 +295,7 @@ func TestLateInitializeBigquery(t *testing.T) { Labels: map[string]string{"example": "true"}, MessageRetentionDuration: "864000s", PushConfig: nil, - BigQueryConfig: &v1alpha1.BigQueryConfig{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ Table: "projects/my-project/subscriptions/my-bigquery-subscription", UseTopicSchema: true, WriteMetadata: true, @@ -466,7 +466,7 @@ func TestGenerateUpdateRequestBigquery(t *testing.T) { }, result: &pubsub.UpdateSubscriptionRequest{ Subscription: mutableSubscription, - UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,bigQueryConfig,retryPolicy", + UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,bigqueryConfig,retryPolicy", }, }, } From f1a15de5f2467eb2fb40c4cfac9e87caceddbe34 Mon Sep 17 00:00:00 2001 From: Viktor Danyliuk Date: Thu, 20 Oct 2022 19:05:34 +0300 Subject: [PATCH 3/5] Make BigqueryConfig properties optional, remove redundant tests Signed-off-by: Viktor Danyliuk --- apis/pubsub/v1alpha1/subscription_types.go | 3 + pkg/clients/subscription/subscription_test.go | 225 +----------------- 2 files changed, 10 insertions(+), 218 deletions(-) diff --git a/apis/pubsub/v1alpha1/subscription_types.go b/apis/pubsub/v1alpha1/subscription_types.go index c5fd08367..7c089b80e 100644 --- a/apis/pubsub/v1alpha1/subscription_types.go +++ b/apis/pubsub/v1alpha1/subscription_types.go @@ -154,16 +154,19 @@ type BigqueryConfig struct { // When enabled, the topic schema will be used when writing to BigQuery. Else, // tes the message bytes to a column called data in BigQuery. + // +optional UseTopicSchema bool `json:"useTopicSchema,omitempty"` // When enabled, the metadata of each message is written to additional columns in // the BigQuery table. Else, the metadata is not written to the BigQuery table. // https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata + // +optional WriteMetadata bool `json:"writeMetadata,omitempty"` // When enabled along with the "Use topic schema" option, any field that is present in // the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written // and remain in the subscription backlog. + // +optional DropUnknownFields bool `json:"dropUnknownFields,omitempty"` } diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index de7da5f6e..81cc7e6e4 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -53,6 +53,12 @@ func params() *v1alpha1.SubscriptionParameters { }, PushEndpoint: "example.com", }, + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: true, + WriteMetadata: true, + DropUnknownFields: true, + }, RetryPolicy: &v1alpha1.RetryPolicy{ MaximumBackoff: "100s", MinimumBackoff: "15s", @@ -86,61 +92,6 @@ func subscription() *pubsub.Subscription { }, PushEndpoint: "example.com", }, - RetryPolicy: &pubsub.RetryPolicy{ - MaximumBackoff: "100s", - MinimumBackoff: "15s", - }, - RetainAckedMessages: true, - Topic: topicNameExternal, - } -} - -func bigqueryParams() *v1alpha1.SubscriptionParameters { - return &v1alpha1.SubscriptionParameters{ - AckDeadlineSeconds: 15, - DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ - DeadLetterTopic: topicName, - MaxDeliveryAttempts: 5, - }, - Detached: true, - EnableMessageOrdering: true, - ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, - Filter: "foo", - Labels: map[string]string{"example": "true"}, - MessageRetentionDuration: "864000s", - PushConfig: nil, - BigqueryConfig: &v1alpha1.BigqueryConfig{ - Table: "projects/my-project/subscriptions/my-bigquery-subscription", - UseTopicSchema: true, - WriteMetadata: true, - DropUnknownFields: true, - }, - RetryPolicy: &v1alpha1.RetryPolicy{ - MaximumBackoff: "100s", - MinimumBackoff: "15s", - }, - RetainAckedMessages: true, - Topic: topicName, - } -} - -func bigquerySubscription() *pubsub.Subscription { - return &pubsub.Subscription{ - Name: name, - AckDeadlineSeconds: 15, - DeadLetterPolicy: &pubsub.DeadLetterPolicy{ - DeadLetterTopic: topicNameExternal, - MaxDeliveryAttempts: 5, - }, - Detached: true, - EnableMessageOrdering: true, - ExpirationPolicy: &pubsub.ExpirationPolicy{Ttl: "1296000s"}, - Filter: "foo", - Labels: map[string]string{ - "example": "true", - }, - MessageRetentionDuration: "864000s", - PushConfig: nil, BigqueryConfig: &pubsub.BigQueryConfig{ Table: "projects/my-project/subscriptions/my-bigquery-subscription", UseTopicSchema: true, @@ -186,36 +137,6 @@ func TestGenerateSubscription(t *testing.T) { } } -func TestGenerateBigquerySubscription(t *testing.T) { - type args struct { - projectID string - name string - s v1alpha1.SubscriptionParameters - } - cases := map[string]struct { - args - out *pubsub.Subscription - }{ - "Full": { - args: args{ - projectID: projectID, - name: name, - s: *bigqueryParams(), - }, - out: bigquerySubscription(), - }, - } - - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - got := GenerateSubscription(tc.projectID, tc.name, tc.s) - if diff := cmp.Diff(tc.out, got); diff != "" { - t.Errorf("GenerateSubscription(...): -want, +got:\n%s", diff) - } - }) - } -} - func TestLateInitialize(t *testing.T) { type args struct { obs pubsub.Subscription @@ -270,59 +191,6 @@ func TestLateInitialize(t *testing.T) { } } -func TestLateInitializeBigquery(t *testing.T) { - type args struct { - obs pubsub.Subscription - param *v1alpha1.SubscriptionParameters - } - cases := map[string]struct { - args - out *v1alpha1.SubscriptionParameters - }{ - "Full": { - args: args{ - obs: *bigquerySubscription(), - param: &v1alpha1.SubscriptionParameters{ - AckDeadlineSeconds: 15, - DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ - DeadLetterTopic: topicName, - MaxDeliveryAttempts: 5, - }, - Detached: true, - EnableMessageOrdering: true, - ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, - Filter: "foo", - Labels: map[string]string{"example": "true"}, - MessageRetentionDuration: "864000s", - PushConfig: nil, - BigqueryConfig: &v1alpha1.BigqueryConfig{ - Table: "projects/my-project/subscriptions/my-bigquery-subscription", - UseTopicSchema: true, - WriteMetadata: true, - DropUnknownFields: true, - }, - RetryPolicy: &v1alpha1.RetryPolicy{ - MaximumBackoff: "100s", - MinimumBackoff: "15s", - }, - RetainAckedMessages: true, - Topic: topicName, - }, - }, - out: bigqueryParams(), - }, - } - - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - LateInitialize(tc.args.param, tc.args.obs) - if diff := cmp.Diff(tc.args.param, tc.out); diff != "" { - t.Errorf("LateInitialize(...): -want, +got:\n%s", diff) - } - }) - } -} - func TestIsUpToDate(t *testing.T) { type args struct { obs pubsub.Subscription @@ -361,44 +229,6 @@ func TestIsUpToDate(t *testing.T) { } } -func TestIsUpToDateBigquery(t *testing.T) { - type args struct { - obs pubsub.Subscription - param v1alpha1.SubscriptionParameters - } - cases := map[string]struct { - args - result bool - }{ - "NotUpToDate": { - args: args{ - obs: *bigquerySubscription(), - param: v1alpha1.SubscriptionParameters{ - RetryPolicy: nil, - }, - }, - result: false, - }, - "UpToDate": { - args: args{ - obs: *bigquerySubscription(), - param: *bigqueryParams(), - }, - result: true, - }, - } - - IsUpToDate(projectID, *bigqueryParams(), *bigquerySubscription()) - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - got := IsUpToDate(projectID, tc.args.param, tc.args.obs) - if diff := cmp.Diff(tc.result, got); diff != "" { - t.Errorf("IsUpToDate(...): -want, +got:\n%s", diff) - } - }) - } -} - func TestGenerateUpdateRequest(t *testing.T) { mutableSubscription := subscription() mutableSubscription.Topic = "" @@ -425,48 +255,7 @@ func TestGenerateUpdateRequest(t *testing.T) { }, result: &pubsub.UpdateSubscriptionRequest{ Subscription: mutableSubscription, - UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,retryPolicy", - }, - }, - } - - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - got := GenerateUpdateRequest(tc.args.name, tc.args.param, tc.args.obs) - if diff := cmp.Diff(tc.result, got); diff != "" { - t.Errorf("GenerateUpdateRequest(...): -want, +got:\n%s", diff) - } - }) - } -} - -func TestGenerateUpdateRequestBigquery(t *testing.T) { - mutableSubscription := bigquerySubscription() - mutableSubscription.Topic = "" - mutableSubscription.EnableMessageOrdering = false - mutableSubscription.DeadLetterPolicy = nil - - type args struct { - projectID string - name string - obs pubsub.Subscription - param v1alpha1.SubscriptionParameters - } - - cases := map[string]struct { - args - result *pubsub.UpdateSubscriptionRequest - }{ - "Full": { - args: args{ - projectID: projectID, - name: name, - obs: pubsub.Subscription{}, - param: *bigqueryParams(), - }, - result: &pubsub.UpdateSubscriptionRequest{ - Subscription: mutableSubscription, - UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,bigqueryConfig,retryPolicy", + UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,bigqueryConfig,retryPolicy", }, }, } From d6bf450c0be4d8134c1ac0c9d714a6d0799fec4b Mon Sep 17 00:00:00 2001 From: Viktor Danyliuk Date: Mon, 24 Oct 2022 11:30:58 +0300 Subject: [PATCH 4/5] Use pointers for optional fields in BigQueryConfig struct Signed-off-by: Viktor Danyliuk --- apis/pubsub/v1alpha1/subscription_types.go | 6 +++--- apis/pubsub/v1alpha1/zz_generated.deepcopy.go | 17 ++++++++++++++++- pkg/clients/subscription/subscription.go | 13 +++++++------ pkg/clients/subscription/subscription_test.go | 7 ++++--- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/apis/pubsub/v1alpha1/subscription_types.go b/apis/pubsub/v1alpha1/subscription_types.go index 7c089b80e..77e994c8a 100644 --- a/apis/pubsub/v1alpha1/subscription_types.go +++ b/apis/pubsub/v1alpha1/subscription_types.go @@ -155,19 +155,19 @@ type BigqueryConfig struct { // When enabled, the topic schema will be used when writing to BigQuery. Else, // tes the message bytes to a column called data in BigQuery. // +optional - UseTopicSchema bool `json:"useTopicSchema,omitempty"` + UseTopicSchema *bool `json:"useTopicSchema,omitempty"` // When enabled, the metadata of each message is written to additional columns in // the BigQuery table. Else, the metadata is not written to the BigQuery table. // https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata // +optional - WriteMetadata bool `json:"writeMetadata,omitempty"` + WriteMetadata *bool `json:"writeMetadata,omitempty"` // When enabled along with the "Use topic schema" option, any field that is present in // the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written // and remain in the subscription backlog. // +optional - DropUnknownFields bool `json:"dropUnknownFields,omitempty"` + DropUnknownFields *bool `json:"dropUnknownFields,omitempty"` } // OidcToken contains information needed for generating an OpenID Connect token diff --git a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go index 9463bfd3f..e9983ea74 100644 --- a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go +++ b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go @@ -29,6 +29,21 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BigqueryConfig) DeepCopyInto(out *BigqueryConfig) { *out = *in + if in.UseTopicSchema != nil { + in, out := &in.UseTopicSchema, &out.UseTopicSchema + *out = new(bool) + **out = **in + } + if in.WriteMetadata != nil { + in, out := &in.WriteMetadata, &out.WriteMetadata + *out = new(bool) + **out = **in + } + if in.DropUnknownFields != nil { + in, out := &in.DropUnknownFields, &out.DropUnknownFields + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BigqueryConfig. @@ -235,7 +250,7 @@ func (in *SubscriptionParameters) DeepCopyInto(out *SubscriptionParameters) { if in.BigqueryConfig != nil { in, out := &in.BigqueryConfig, &out.BigqueryConfig *out = new(BigqueryConfig) - **out = **in + (*in).DeepCopyInto(*out) } if in.RetryPolicy != nil { in, out := &in.RetryPolicy, &out.RetryPolicy diff --git a/pkg/clients/subscription/subscription.go b/pkg/clients/subscription/subscription.go index f6473aaed..aeca00757 100644 --- a/pkg/clients/subscription/subscription.go +++ b/pkg/clients/subscription/subscription.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1" + gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients" "github.com/crossplane-contrib/provider-gcp/pkg/clients/topic" "github.com/google/go-cmp/cmp" @@ -91,9 +92,9 @@ func setBigqueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription if p.BigqueryConfig != nil { s.BigqueryConfig = &pubsub.BigQueryConfig{ Table: p.BigqueryConfig.Table, - UseTopicSchema: p.BigqueryConfig.UseTopicSchema, - WriteMetadata: p.BigqueryConfig.WriteMetadata, - DropUnknownFields: p.BigqueryConfig.DropUnknownFields, + UseTopicSchema: gcp.BoolValue(p.BigqueryConfig.UseTopicSchema), + WriteMetadata: gcp.BoolValue(p.BigqueryConfig.WriteMetadata), + DropUnknownFields: gcp.BoolValue(p.BigqueryConfig.DropUnknownFields), } } } @@ -185,9 +186,9 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) { if p.BigqueryConfig == nil && s.BigqueryConfig != nil { p.BigqueryConfig = &v1alpha1.BigqueryConfig{ Table: s.BigqueryConfig.Table, - DropUnknownFields: s.BigqueryConfig.DropUnknownFields, - UseTopicSchema: s.BigqueryConfig.UseTopicSchema, - WriteMetadata: s.BigqueryConfig.WriteMetadata, + DropUnknownFields: gcp.BoolPtr(s.BigqueryConfig.DropUnknownFields), + UseTopicSchema: gcp.BoolPtr(s.BigqueryConfig.UseTopicSchema), + WriteMetadata: gcp.BoolPtr(s.BigqueryConfig.WriteMetadata), } } diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index 81cc7e6e4..31faff1a3 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -23,6 +23,7 @@ import ( pubsub "google.golang.org/api/pubsub/v1" "github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1" + gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients" ) const ( @@ -55,9 +56,9 @@ func params() *v1alpha1.SubscriptionParameters { }, BigqueryConfig: &v1alpha1.BigqueryConfig{ Table: "projects/my-project/subscriptions/my-bigquery-subscription", - UseTopicSchema: true, - WriteMetadata: true, - DropUnknownFields: true, + UseTopicSchema: gcp.BoolPtr(true), + WriteMetadata: gcp.BoolPtr(true), + DropUnknownFields: gcp.BoolPtr(true), }, RetryPolicy: &v1alpha1.RetryPolicy{ MaximumBackoff: "100s", From af8e2ed37d72969dbd2da54709df30dbb8d9f04d Mon Sep 17 00:00:00 2001 From: Viktor Danyliuk Date: Tue, 1 Nov 2022 14:42:23 +0200 Subject: [PATCH 5/5] Add test cases for TestLateInitialize Signed-off-by: Viktor Danyliuk --- pkg/clients/subscription/subscription_test.go | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index 31faff1a3..50600e989 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -180,6 +180,98 @@ func TestLateInitialize(t *testing.T) { }, out: params(), }, + "Minimal": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + }, + param: &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Topic: topicName, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Topic: topicName, + BigqueryConfig: nil, + PushConfig: nil, + }, + }, + "PushConfig": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + PushConfig: &pubsub.PushConfig{ + PushEndpoint: "example.com", + }, + }, + param: &v1alpha1.SubscriptionParameters{ + PushConfig: &v1alpha1.PushConfig{ + Attributes: map[string]string{"attribute": "my-attribute"}, + OidcToken: &v1alpha1.OidcToken{ + Audience: "my-audience", + }, + PushEndpoint: "example.com", + }, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + PushConfig: &v1alpha1.PushConfig{ + Attributes: map[string]string{"attribute": "my-attribute"}, + OidcToken: &v1alpha1.OidcToken{ + Audience: "my-audience", + ServiceAccountEmail: "", + }, + PushEndpoint: "example.com", + }, + Topic: topicNameExternal, + BigqueryConfig: nil, + }, + }, + "BigqueryConfig": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + BigqueryConfig: &pubsub.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + DropUnknownFields: true, + }, + }, + param: &v1alpha1.SubscriptionParameters{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: gcp.BoolPtr(true), + }, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: gcp.BoolPtr(true), + WriteMetadata: nil, + DropUnknownFields: nil, + }, + Topic: topicNameExternal, + PushConfig: nil, + }, + }, } for name, tc := range cases {