Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5831,6 +5831,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
if t == nil {
return nil
}
thriftPolicy := FromActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy)
return &shared.StartWorkflowExecutionRequest{
Domain: &t.Domain,
WorkflowId: &t.WorkflowID,
Expand All @@ -5851,7 +5852,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
JitterStartSeconds: t.JitterStartSeconds,
FirstRunAtTimestamp: t.FirstRunAtTimeStamp,
CronOverlapPolicy: FromCronOverlapPolicy(t.CronOverlapPolicy),
ActiveClusterSelectionPolicy: FromActiveClusterSelectionPolicy(t.ActiveClusterSelectionPolicy),
ActiveClusterSelectionPolicy: thriftPolicy,
}
}

Expand Down
2 changes: 2 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ system.domainAuditLogTTL:
- value: "15m"
matching.enableClientAutoConfig:
- value: true
frontend.enableActiveClusterSelectionPolicyInStartWorkflow:
- value: true
shardDistributor.migrationMode:
- value: "onboarded"
- value: "local_pass"
Expand Down
12 changes: 12 additions & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ const (
FlagNumReadPartitions = "num_read_partitions"
FlagNumWritePartitions = "num_write_partitions"
FlagCronOverlapPolicy = "cron_overlap_policy"
FlagClusterAttributeScope = "cluster_attribute_scope"
FlagClusterAttributeName = "cluster_attribute_name"

FlagClustersUsage = "Clusters (example: --clusters clusterA,clusterB or --cl clusterA --cl clusterB)"
)
Expand Down Expand Up @@ -445,6 +447,16 @@ func getFlagsForStart() []cli.Flag {
Name: FirstRunAtTime,
Usage: "Optional workflow's first run start time in RFC3339 format, like \"1970-01-01T00:00:00Z\". If set, first run of the workflow will start at the specified time.",
},
&cli.StringFlag{
Name: FlagClusterAttributeScope,
Usage: "Optional cluster attribute to specify how to select the active cluster. Examples might be 'region' or 'location'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: As the command will fail if one of these two isn't specified I'd recommend adding something like

... Required if ClusterAttributeName is specified.

Aliases: []string{"cascope"},
},
&cli.StringFlag{
Name: FlagClusterAttributeName,
Usage: "Optional cluster attribute to be set for the workflow, used to determine, in active-active domains. This specifies which attribute to tie the workflow to, for example, if the scope is 'region' and the name is 'Lisbon' or 'San Francisco'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: As the command will fail if one of these two isn't specified I'd recommend adding something like ... Required if ClusterAttributeScope is specified.

NIT: I think rephrasing to remove some commas is possible, e.g:

Optional cluster attribute name, paired with a cluster attribute scope, to specify how to select the active cluster. This specifies which attribute to tie the workflow to, for example, if the scope is 'region' and the name is 'Lisbon' or 'San Francisco'

(This doesn't actually reduce the number of commas but I think is more clear 😅 ).

Aliases: []string{"caname"},
},
}
}

Expand Down
23 changes: 23 additions & 0 deletions tools/cli/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ func constructStartWorkflowRequest(c *cli.Context) (*types.StartWorkflowExecutio
if err != nil {
return nil, commoncli.Problem("Error in starting wf request: ", err)
}
activeClusterSelectionPolicy, err := parseClusterAttributes(c.String(FlagClusterAttributeScope), c.String(FlagClusterAttributeName))
if err != nil {
return nil, commoncli.Problem("Error parsing cluster attributes: ", err)
}
startRequest := &types.StartWorkflowExecutionRequest{
RequestID: uuid.New(),
Domain: domain,
Expand All @@ -431,6 +435,7 @@ func constructStartWorkflowRequest(c *cli.Context) (*types.StartWorkflowExecutio
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(int32(dt)),
Identity: getCliIdentity(),
WorkflowIDReusePolicy: reusePolicy,
ActiveClusterSelectionPolicy: activeClusterSelectionPolicy,
}
if c.IsSet(FlagCronSchedule) {
startRequest.CronSchedule = c.String(FlagCronSchedule)
Expand Down Expand Up @@ -856,6 +861,7 @@ func constructSignalWithStartWorkflowRequest(c *cli.Context) (*types.SignalWithS
DelayStartSeconds: startRequest.DelayStartSeconds,
JitterStartSeconds: startRequest.JitterStartSeconds,
FirstRunAtTimestamp: startRequest.FirstRunAtTimeStamp,
ActiveClusterSelectionPolicy: startRequest.ActiveClusterSelectionPolicy,
}, nil
}

Expand Down Expand Up @@ -2692,3 +2698,20 @@ func mapQueryRejectConditionFromFlag(flag string) (types.QueryRejectCondition, e

return rejectCondition, nil
}

func parseClusterAttributes(clusterAttributeScope string, clusterAttributeName string) (*types.ActiveClusterSelectionPolicy, error) {
if clusterAttributeScope == "" && clusterAttributeName == "" {
// default case, these values are optional so most workflows will not use them
return nil, nil
}
if clusterAttributeScope == "" || clusterAttributeName == "" {
return nil, fmt.Errorf("invalid cluster attribute, scope or name is empty, either use both or none to start workflows. got %q.%q", clusterAttributeScope, clusterAttributeName)
}
policy := &types.ActiveClusterSelectionPolicy{
ClusterAttribute: &types.ClusterAttribute{
Scope: clusterAttributeScope,
Name: clusterAttributeName,
},
}
return policy, nil
}
62 changes: 62 additions & 0 deletions tools/cli/workflow_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3134,3 +3134,65 @@ func TestMapQueryRejectConditionFromFlag(t *testing.T) {
})
}
}

func TestParseClusterAttributes(t *testing.T) {
testCases := []struct {
name string
clusterAttributeScope string
clusterAttributeName string
expectedPolicy *types.ActiveClusterSelectionPolicy
expectError bool
expectedErrorSubstring string
}{
{
name: "both empty - should return nil",
clusterAttributeScope: "",
clusterAttributeName: "",
expectedPolicy: nil,
expectError: false,
},
{
name: "both provided - should return valid policy",
clusterAttributeScope: "test-scope",
clusterAttributeName: "test-name",
expectedPolicy: &types.ActiveClusterSelectionPolicy{
ClusterAttribute: &types.ClusterAttribute{
Scope: "test-scope",
Name: "test-name",
},
},
expectError: false,
},
{
name: "empty scope with name provided - should error",
clusterAttributeScope: "",
clusterAttributeName: "test-name",
expectedPolicy: nil,
expectError: true,
expectedErrorSubstring: "invalid cluster attribute",
},
{
name: "scope provided with empty name - should error",
clusterAttributeScope: "test-scope",
clusterAttributeName: "",
expectedPolicy: nil,
expectError: true,
expectedErrorSubstring: "invalid cluster attribute",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := parseClusterAttributes(tc.clusterAttributeScope, tc.clusterAttributeName)

if tc.expectError {
assert.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErrorSubstring)
assert.Nil(t, result)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedPolicy, result)
}
})
}
}
Loading