From 4d61ebb746adad6862f2ef380aef7b17052b448c Mon Sep 17 00:00:00 2001 From: Alisha Date: Thu, 3 Apr 2025 13:09:18 -0700 Subject: [PATCH 1/2] turn on postgres schema drift --- .../worker/workflow/datasync-workflow.go | 31 +++++++------------ .../worker/workflow/postgres_test.go | 4 +-- worker/internal/cmds/worker/serve/serve.go | 2 -- .../activities/sync-activity-opts/activity.go | 9 ++---- .../datasync/workflow/register/register.go | 3 +- .../workflows/datasync/workflow/workflow.go | 11 ++----- 6 files changed, 19 insertions(+), 41 deletions(-) diff --git a/internal/integration-tests/worker/workflow/datasync-workflow.go b/internal/integration-tests/worker/workflow/datasync-workflow.go index 4b39cf2635..0092209a99 100644 --- a/internal/integration-tests/worker/workflow/datasync-workflow.go +++ b/internal/integration-tests/worker/workflow/datasync-workflow.go @@ -35,14 +35,13 @@ import ( type Option func(*TestWorkflowEnv) type TestWorkflowEnv struct { - neosyncApi *tcneosyncapi.NeosyncApiTestClient - redisconfig *neosync_redis.RedisConfig - fakeEELicense *testutil.FakeEELicense - pageLimit int - maxIterations int - postgresSchemaDrift bool - TestEnv *testsuite.TestWorkflowEnvironment - Redisclient redis.UniversalClient + neosyncApi *tcneosyncapi.NeosyncApiTestClient + redisconfig *neosync_redis.RedisConfig + fakeEELicense *testutil.FakeEELicense + pageLimit int + maxIterations int + TestEnv *testsuite.TestWorkflowEnvironment + Redisclient redis.UniversalClient } // WithRedis creates redis client with provided URL @@ -78,12 +77,6 @@ func WithMaxIterations(maxIterations int) Option { } } -func WithPostgresSchemaDrift() Option { - return func(c *TestWorkflowEnv) { - c.postgresSchemaDrift = true - } -} - // NewTestDataSyncWorkflowEnv creates and configures a new test datasync workflow environment func NewTestDataSyncWorkflowEnv( t testing.TB, @@ -94,11 +87,10 @@ func NewTestDataSyncWorkflowEnv( t.Helper() workflowEnv := &TestWorkflowEnv{ - neosyncApi: neosyncApi, - fakeEELicense: testutil.NewFakeEELicense(), - pageLimit: 10, - maxIterations: 5, - postgresSchemaDrift: false, + neosyncApi: neosyncApi, + fakeEELicense: testutil.NewFakeEELicense(), + pageLimit: 10, + maxIterations: 5, } for _, opt := range opts { @@ -134,7 +126,6 @@ func NewTestDataSyncWorkflowEnv( workflowEnv.Redisclient, false, workflowEnv.pageLimit, - workflowEnv.postgresSchemaDrift, ) schemainit_workflow_register.Register( diff --git a/internal/integration-tests/worker/workflow/postgres_test.go b/internal/integration-tests/worker/workflow/postgres_test.go index 0cacd5b550..4c38e4d5a0 100644 --- a/internal/integration-tests/worker/workflow/postgres_test.go +++ b/internal/integration-tests/worker/workflow/postgres_test.go @@ -1423,7 +1423,7 @@ func test_postgres_schema_reconciliation( }) destinationId := job.GetDestinations()[0].GetId() - testworkflow := NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithPostgresSchemaDrift(), WithMaxIterations(100), WithPageLimit(10000)) + testworkflow := NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithMaxIterations(100), WithPageLimit(10000)) testworkflow.RequireActivitiesCompletedSuccessfully(t) testworkflow.ExecuteTestDataSyncWorkflow(job.GetId()) require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: schema_drift") @@ -1470,7 +1470,7 @@ func test_postgres_schema_reconciliation( updatedMappings = append(updatedMappings, pg_schema_init.GetAlteredSyncJobMappings(schema)...) job = updateJobMappings(t, ctx, jobclient, job.GetId(), updatedMappings, job.GetSource()) - testworkflow = NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithPostgresSchemaDrift(), WithMaxIterations(100), WithPageLimit(1000)) + testworkflow = NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithMaxIterations(100), WithPageLimit(1000)) testworkflow.RequireActivitiesCompletedSuccessfully(t) testworkflow.ExecuteTestDataSyncWorkflow(job.GetId()) require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: postgres-schema-reconciliation-run-2") diff --git a/worker/internal/cmds/worker/serve/serve.go b/worker/internal/cmds/worker/serve/serve.go index b661f7e4b5..ff54470704 100644 --- a/worker/internal/cmds/worker/serve/serve.go +++ b/worker/internal/cmds/worker/serve/serve.go @@ -406,14 +406,12 @@ func serve(ctx context.Context) error { cascadelicense, ) - postgresSchemaDrift := false datasync_workflow_register.Register( w, userclient, jobclient, connclient, transformerclient, sqlmanager, cascadelicense, redisclient, otelconfig.IsEnabled, pageLimit, - postgresSchemaDrift, ) if cascadelicense.IsValid() { diff --git a/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity.go b/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity.go index 78528890a6..9b0928fdce 100644 --- a/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity.go @@ -15,17 +15,14 @@ import ( ) type Activity struct { - jobclient mgmtv1alpha1connect.JobServiceClient - postgresSchemaDrift bool + jobclient mgmtv1alpha1connect.JobServiceClient } func New( jobclient mgmtv1alpha1connect.JobServiceClient, - postgresSchemaDrift bool, ) *Activity { return &Activity{ - jobclient: jobclient, - postgresSchemaDrift: postgresSchemaDrift, + jobclient: jobclient, } } @@ -37,7 +34,6 @@ type RetrieveActivityOptionsResponse struct { AccountId string RequestedRecordCount *uint64 Destinations []*mgmtv1alpha1.JobDestination - PostgresSchemaDrift bool } func (a *Activity) RetrieveActivityOptions( @@ -66,7 +62,6 @@ func (a *Activity) RetrieveActivityOptions( AccountId: job.GetAccountId(), RequestedRecordCount: getRequestedRecordCount(job), Destinations: job.GetDestinations(), - PostgresSchemaDrift: a.postgresSchemaDrift, }, nil } diff --git a/worker/pkg/workflows/datasync/workflow/register/register.go b/worker/pkg/workflows/datasync/workflow/register/register.go index 368ad22568..536623d1e3 100644 --- a/worker/pkg/workflows/datasync/workflow/register/register.go +++ b/worker/pkg/workflows/datasync/workflow/register/register.go @@ -30,7 +30,6 @@ func Register( redisclient redis.UniversalClient, isOtelEnabled bool, pageLimit int, - postgresSchemaDrift bool, ) { genbenthosActivity := genbenthosconfigs_activity.New( jobclient, @@ -41,7 +40,7 @@ func Register( pageLimit, ) - retrieveActivityOpts := syncactivityopts_activity.New(jobclient, postgresSchemaDrift) + retrieveActivityOpts := syncactivityopts_activity.New(jobclient) accountStatusActivity := accountstatus_activity.New(userclient) runPostTableSyncActivity := posttablesync_activity.New(jobclient, sqlmanager, connclient) jobhookByTimingActivity := jobhooks_by_timing_activity.New( diff --git a/worker/pkg/workflows/datasync/workflow/workflow.go b/worker/pkg/workflows/datasync/workflow/workflow.go index 5b9a8e77a3..56a312b335 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow.go +++ b/worker/pkg/workflows/datasync/workflow/workflow.go @@ -195,7 +195,6 @@ func executeWorkflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowRes req.JobId, info.WorkflowExecution.ID, actOptResp.Destinations, - actOptResp.PostgresSchemaDrift, ) if err != nil { return nil, err @@ -487,7 +486,6 @@ func runSchemaInitWorkflowByDestination( logger log.Logger, accountId, jobId, jobRunId string, destinations []*mgmtv1alpha1.JobDestination, - postgresSchemaDrift bool, ) error { initSchemaActivityOptions := &workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, @@ -498,7 +496,7 @@ func runSchemaInitWorkflowByDestination( } for _, destination := range destinations { // right now only mysql supports schema drift - schemaDrift := shouldUseSchemaDrift(destination, postgresSchemaDrift) + schemaDrift := shouldUseSchemaDrift(destination) logger.Info( "scheduling Schema Initialization workflow for execution.", "destinationId", @@ -534,11 +532,8 @@ func runSchemaInitWorkflowByDestination( return nil } -func shouldUseSchemaDrift(destination *mgmtv1alpha1.JobDestination, postgresSchemaDrift bool) bool { - if destination.GetOptions().GetPostgresOptions() != nil && postgresSchemaDrift { - return true - } - return destination.GetOptions().GetMysqlOptions() != nil +func shouldUseSchemaDrift(destination *mgmtv1alpha1.JobDestination) bool { + return destination.GetOptions().GetMysqlOptions() != nil || destination.GetOptions().GetPostgresOptions() != nil } func retrieveActivityOptions( From 0e4e19d85033840a969bbea350608055b28b47ae Mon Sep 17 00:00:00 2001 From: Alisha Date: Thu, 3 Apr 2025 13:12:38 -0700 Subject: [PATCH 2/2] fix comment and test --- .../datasync/activities/sync-activity-opts/activity_test.go | 4 ++-- worker/pkg/workflows/datasync/workflow/workflow.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity_test.go b/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity_test.go index 4bce179155..5bfc06a022 100644 --- a/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity_test.go +++ b/worker/pkg/workflows/datasync/activities/sync-activity-opts/activity_test.go @@ -23,7 +23,7 @@ import ( ) func Test_New(t *testing.T) { - a := New(mgmtv1alpha1connect.NewMockJobServiceClient(t), false) + a := New(mgmtv1alpha1connect.NewMockJobServiceClient(t)) require.NotNil(t, a) } @@ -145,7 +145,7 @@ func Test_Activity(t *testing.T) { srv := startHTTPServer(t, mux) jobclient := mgmtv1alpha1connect.NewJobServiceClient(srv.Client(), srv.URL) - activity := New(jobclient, false) + activity := New(jobclient) env.RegisterActivity(activity.RetrieveActivityOptions) t.Run("sync activity options", func(t *testing.T) { diff --git a/worker/pkg/workflows/datasync/workflow/workflow.go b/worker/pkg/workflows/datasync/workflow/workflow.go index 56a312b335..dc85419538 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow.go +++ b/worker/pkg/workflows/datasync/workflow/workflow.go @@ -495,7 +495,7 @@ func runSchemaInitWorkflowByDestination( HeartbeatTimeout: 1 * time.Minute, } for _, destination := range destinations { - // right now only mysql supports schema drift + // right now only mysql and postgres supports schema drift schemaDrift := shouldUseSchemaDrift(destination) logger.Info( "scheduling Schema Initialization workflow for execution.",