diff --git a/client.go b/client.go index 244c6a0a7..f4529daa5 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,7 @@ package cadence import ( + "context" "time" "github.com/uber-go/tally" @@ -40,14 +41,14 @@ type ( // StartWorkflow starts a workflow execution // The user can use this to start using a function or workflow type name. // Either by - // StartWorkflow(options, "workflowTypeName", input) + // StartWorkflow(ctx, options, "workflowTypeName", input) // or - // StartWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) + // StartWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3) // The errors it can return: // - EntityNotExistsError // - BadRequestError // - WorkflowExecutionAlreadyStartedError - StartWorkflow(options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error) + StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error) // SignalWorkflow sends a signals to a workflow in execution // - workflow ID of the workflow. @@ -56,7 +57,7 @@ type ( // The errors it can return: // - EntityNotExistsError // - InternalServiceError - SignalWorkflow(workflowID string, runID string, signalName string, arg interface{}) error + SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. @@ -65,7 +66,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - CancelWorkflow(workflowID string, runID string) error + CancelWorkflow(ctx context.Context, workflowID string, runID string) error // TerminateWorkflow terminates a workflow execution. // workflowID is required, other parameters are optional. @@ -75,7 +76,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - TerminateWorkflow(workflowID string, runID string, reason string, details []byte) error + TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error // GetWorkflowHistory gets history of a particular workflow. // - workflow ID of the workflow. @@ -84,7 +85,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - GetWorkflowHistory(workflowID string, runID string) (*s.History, error) + GetWorkflowHistory(ctx context.Context, workflowID string, runID string) (*s.History, error) // GetWorkflowStackTrace gets a stack trace of all goroutines of a particular workflow. // atDecisionTaskCompletedEventID is the eventID of the CompleteDecisionTask event at which stack trace should be taken. @@ -94,7 +95,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - GetWorkflowStackTrace(workflowID string, runID string, atDecisionTaskCompletedEventID int64) (string, error) + GetWorkflowStackTrace(ctx context.Context, workflowID string, runID string, atDecisionTaskCompletedEventID int64) (string, error) // CompleteActivity reports activity completed. // activity Execute method can return cadence.ErrActivityResultPending to @@ -109,28 +110,28 @@ type ( // To fail the activity with an error. // CompleteActivity(token, nil, NewErrorWithDetails("reason", details) // The activity can fail with below errors ErrorWithDetails, TimeoutError, CanceledError. - CompleteActivity(taskToken []byte, result interface{}, err error) error + CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error // RecordActivityHeartbeat records heartbeat for an activity. // details - is the progress you want to record along with heart beat for this activity. // The errors it can return: // - EntityNotExistsError // - InternalServiceError - RecordActivityHeartbeat(taskToken []byte, details ...interface{}) error + RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error // ListClosedWorkflow gets closed workflow executions based on request filters // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError - ListClosedWorkflow(request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) + ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) // ListClosedWorkflow gets open workflow executions based on request filters // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError - ListOpenWorkflow(request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) + ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) // QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID // and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the @@ -150,7 +151,7 @@ type ( // - InternalServiceError // - EntityNotExistError // - QueryFailError - QueryWorkflow(workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error) + QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error) } // ClientOptions are optional parameters for Client creation. @@ -191,7 +192,7 @@ type ( // - DomainAlreadyExistsError // - BadRequestError // - InternalServiceError - Register(request *s.RegisterDomainRequest) error + Register(ctx context.Context, request *s.RegisterDomainRequest) error // Describe a domain. The domain has two part of information. // DomainInfo - Which has Name, Status, Description, Owner Email. @@ -200,7 +201,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Describe(name string) (*s.DomainInfo, *s.DomainConfiguration, error) + Describe(ctx context.Context, name string) (*s.DomainInfo, *s.DomainConfiguration, error) // Update a domain. The domain has two part of information. // UpdateDomainInfo - To update domain Description and Owner Email. @@ -209,7 +210,7 @@ type ( // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Update(name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error + Update(ctx context.Context, name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error } ) diff --git a/internal_task_handlers.go b/internal_task_handlers.go index f1e60d141..e3b1c3e5b 100644 --- a/internal_task_handlers.go +++ b/internal_task_handlers.go @@ -904,7 +904,7 @@ func (i *cadenceInvoker) Heartbeat(details []byte) error { func (i *cadenceInvoker) internalHeartBeat(details []byte) (bool, error) { isActivityCancelled := false - err := recordActivityHeartbeat(i.service, i.identity, i.taskToken, details, i.retryPolicy) + err := recordActivityHeartbeat(context.Background(), i.service, i.identity, i.taskToken, details, i.retryPolicy) switch err.(type) { case *CanceledError: @@ -1030,6 +1030,7 @@ func createNewDecision(decisionType s.DecisionType) *s.Decision { } func recordActivityHeartbeat( + ctx context.Context, service m.TChanWorkflowService, identity string, taskToken, details []byte, @@ -1043,11 +1044,11 @@ func recordActivityHeartbeat( var heartbeatResponse *s.RecordActivityTaskHeartbeatResponse heartbeatErr := backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var err error - heartbeatResponse, err = service.RecordActivityTaskHeartbeat(ctx, request) + heartbeatResponse, err = service.RecordActivityTaskHeartbeat(tchCtx, request) return err }, retryPolicy, isServiceTransientError) diff --git a/internal_task_handlers_test.go b/internal_task_handlers_test.go index c681533cb..1a7c8bf33 100644 --- a/internal_task_handlers_test.go +++ b/internal_task_handlers_test.go @@ -565,7 +565,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowStackTraceByID() { domain := "testDomain" workflowClient := NewClient(service, domain, nil) - dump, err := workflowClient.GetWorkflowStackTrace("id1", "runId1", 0) + dump, err := workflowClient.GetWorkflowStackTrace(context.Background(), "id1", "runId1", 0) t.NoError(err) t.NotNil(dump) t.True(strings.Contains(dump, ".Receive]")) @@ -643,7 +643,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowStackTraceByIDAndDecisionTaskComp domain := "testDomain" workflowClient := NewClient(service, domain, nil) - dump, err := workflowClient.GetWorkflowStackTrace("id1", "runId1", 5) + dump, err := workflowClient.GetWorkflowStackTrace(context.Background(), "id1", "runId1", 5) t.NoError(err) t.NotNil(dump) t.True(strings.Contains(dump, ".Receive]")) diff --git a/internal_task_pollers.go b/internal_task_pollers.go index 68f5b0470..edb8aece4 100644 --- a/internal_task_pollers.go +++ b/internal_task_pollers.go @@ -165,19 +165,19 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { // Respond task completion. err = backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(context.Background()) defer cancel() var err1 error switch request := completedRequest.(type) { case *s.RespondDecisionTaskCompletedRequest: - err1 = wtp.service.RespondDecisionTaskCompleted(ctx, request) + err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request) if err1 != nil { traceLog(func() { wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1)) }) } case *s.RespondQueryTaskCompletedRequest: - err1 = wtp.service.RespondQueryTaskCompleted(ctx, request) + err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request) if err1 != nil { traceLog(func() { wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1)) @@ -217,10 +217,10 @@ func (wtp *workflowTaskPoller) poll() (*workflowTask, error) { Identity: common.StringPtr(wtp.identity), } - ctx, cancel := newTChannelContext(tchanTimeout(pollTaskServiceTimeOut), tchanRetryOption(retryNeverOptions)) + tchCtx, cancel := newTChannelContext(context.Background(), tchanTimeout(pollTaskServiceTimeOut), tchanRetryOption(retryNeverOptions)) defer cancel() - response, err := wtp.service.PollForDecisionTask(ctx, request) + response, err := wtp.service.PollForDecisionTask(tchCtx, request) if err != nil { if isServiceTransientError(err) { wtp.metricsScope.Counter(metrics.DecisionPollTransientFailedCounter).Inc(1) @@ -236,7 +236,7 @@ func (wtp *workflowTaskPoller) poll() (*workflowTask, error) { } execution := response.GetWorkflowExecution() - iterator := newGetHistoryPageFunc(wtp.service, wtp.domain, execution, math.MaxInt64, wtp.metricsScope) + iterator := newGetHistoryPageFunc(context.Background(), wtp.service, wtp.domain, execution, math.MaxInt64, wtp.metricsScope) task := &workflowTask{task: response, getHistoryPageFunc: iterator, pollStartTime: startTime} wtp.metricsScope.Counter(metrics.DecisionPollSucceedCounter).Inc(1) wtp.metricsScope.Timer(metrics.DecisionPollLatency).Record(time.Now().Sub(startTime)) @@ -244,6 +244,7 @@ func (wtp *workflowTaskPoller) poll() (*workflowTask, error) { } func newGetHistoryPageFunc( + ctx context.Context, service m.TChanWorkflowService, domain string, execution *s.WorkflowExecution, @@ -256,11 +257,11 @@ func newGetHistoryPageFunc( var resp *s.GetWorkflowExecutionHistoryResponse err := backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var err1 error - resp, err1 = service.GetWorkflowExecutionHistory(ctx, &s.GetWorkflowExecutionHistoryRequest{ + resp, err1 = service.GetWorkflowExecutionHistory(tchCtx, &s.GetWorkflowExecutionHistoryRequest{ Domain: common.StringPtr(domain), Execution: execution, NextPageToken: nextPageToken, @@ -317,10 +318,10 @@ func (atp *activityTaskPoller) poll() (*activityTask, error) { Identity: common.StringPtr(atp.identity), } - ctx, cancel := newTChannelContext(tchanTimeout(pollTaskServiceTimeOut), tchanRetryOption(retryNeverOptions)) + tchCtx, cancel := newTChannelContext(context.Background(), tchanTimeout(pollTaskServiceTimeOut), tchanRetryOption(retryNeverOptions)) defer cancel() - response, err := atp.service.PollForActivityTask(ctx, request) + response, err := atp.service.PollForActivityTask(tchCtx, request) if err != nil { if isServiceTransientError(err) { atp.metricsScope.Counter(metrics.ActivityPollTransientFailedCounter).Inc(1) @@ -375,7 +376,7 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { } responseStartTime := time.Now() - reportErr := reportActivityComplete(atp.service, request, atp.metricsScope) + reportErr := reportActivityComplete(context.Background(), atp.service, request, atp.metricsScope) if reportErr != nil { atp.metricsScope.Counter(metrics.ActivityResponseFailedCounter).Inc(1) traceLog(func() { @@ -389,30 +390,30 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { return nil } -func reportActivityComplete(service m.TChanWorkflowService, request interface{}, metricsScope tally.Scope) error { +func reportActivityComplete(ctx context.Context, service m.TChanWorkflowService, request interface{}, metricsScope tally.Scope) error { if request == nil { // nothing to report return nil } - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var reportErr error switch request := request.(type) { case *s.RespondActivityTaskCanceledRequest: reportErr = backoff.Retry( func() error { - return service.RespondActivityTaskCanceled(ctx, request) + return service.RespondActivityTaskCanceled(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) case *s.RespondActivityTaskFailedRequest: reportErr = backoff.Retry( func() error { - return service.RespondActivityTaskFailed(ctx, request) + return service.RespondActivityTaskFailed(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) case *s.RespondActivityTaskCompletedRequest: reportErr = backoff.Retry( func() error { - return service.RespondActivityTaskCompleted(ctx, request) + return service.RespondActivityTaskCompleted(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) } if reportErr == nil { diff --git a/internal_utils.go b/internal_utils.go index 724e7f058..566e66cd8 100644 --- a/internal_utils.go +++ b/internal_utils.go @@ -69,8 +69,11 @@ func tchanRetryOption(retryOpt *tchannel.RetryOptions) func(builder *tchannel.Co } // newTChannelContext - Get a tchannel context -func newTChannelContext(options ...func(builder *tchannel.ContextBuilder)) (tchannel.ContextWithHeaders, context.CancelFunc) { +func newTChannelContext(ctx context.Context, options ...func(builder *tchannel.ContextBuilder)) (tchannel.ContextWithHeaders, context.CancelFunc) { builder := tchannel.NewContextBuilder(defaultRPCTimeout) + if ctx != nil { + builder.SetParentContext(ctx) + } builder.SetRetryOptions(retryDefaultOptions) builder.AddHeader(versionHeaderName, LibraryVersion) for _, opt := range options { diff --git a/internal_worker.go b/internal_worker.go index f622b8b4c..7c09ca2cd 100644 --- a/internal_worker.go +++ b/internal_worker.go @@ -164,9 +164,9 @@ func ensureRequiredParams(params *workerExecutionParameters) { func verifyDomainExist(client m.TChanWorkflowService, domain string, logger *zap.Logger) error { descDomainOp := func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(context.Background()) defer cancel() - _, err := client.DescribeDomain(ctx, &shared.DescribeDomainRequest{Name: &domain}) + _, err := client.DescribeDomain(tchCtx, &shared.DescribeDomainRequest{Name: &domain}) if err != nil { if _, ok := err.(*shared.EntityNotExistsError); ok { logger.Error("domain does not exist", zap.String("domain", domain), zap.Error(err)) diff --git a/internal_worker_interfaces_test.go b/internal_worker_interfaces_test.go index b529fd2e1..3e86151d1 100644 --- a/internal_worker_interfaces_test.go +++ b/internal_worker_interfaces_test.go @@ -179,7 +179,7 @@ func (s *InterfacesTestSuite) TestInterface() { DecisionTaskStartToCloseTimeout: 10 * time.Second, } workflowClient := NewClient(service, domain, nil) - wfExecution, err := workflowClient.StartWorkflow(workflowOptions, "workflowType") + wfExecution, err := workflowClient.StartWorkflow(context.Background(), workflowOptions, "workflowType") s.NoError(err) fmt.Printf("Started workflow: %v \n", wfExecution) } diff --git a/internal_worker_test.go b/internal_worker_test.go index 03a6b6bc0..2898de0b3 100644 --- a/internal_worker_test.go +++ b/internal_worker_test.go @@ -369,13 +369,13 @@ func TestCompleteActivity(t *testing.T) { failedRequest = args.Get(1).(*s.RespondActivityTaskFailedRequest) }) - wfClient.CompleteActivity([]byte("task-token"), nil, nil) + wfClient.CompleteActivity(context.Background(), []byte("task-token"), nil, nil) require.NotNil(t, completedRequest) - wfClient.CompleteActivity([]byte("task-token"), nil, NewCanceledError()) + wfClient.CompleteActivity(context.Background(), []byte("task-token"), nil, NewCanceledError()) require.NotNil(t, canceledRequest) - wfClient.CompleteActivity([]byte("task-token"), nil, errors.New("")) + wfClient.CompleteActivity(context.Background(), []byte("task-token"), nil, errors.New("")) require.NotNil(t, failedRequest) } @@ -391,8 +391,8 @@ func TestRecordActivityHeartbeat(t *testing.T) { heartbeatRequest = args.Get(1).(*s.RecordActivityTaskHeartbeatRequest) }) - wfClient.RecordActivityHeartbeat(nil) - wfClient.RecordActivityHeartbeat(nil, "testStack", "customerObjects", 4) + wfClient.RecordActivityHeartbeat(context.Background(), nil) + wfClient.RecordActivityHeartbeat(context.Background(), nil, "testStack", "customerObjects", 4) require.NotNil(t, heartbeatRequest) } diff --git a/internal_workflow_client.go b/internal_workflow_client.go index 85a443f94..1fd325743 100644 --- a/internal_workflow_client.go +++ b/internal_workflow_client.go @@ -70,6 +70,7 @@ type ( // or // StartWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) func (wc *workflowClient) StartWorkflow( + ctx context.Context, options StartWorkflowOptions, workflowFunc interface{}, args ...interface{}, @@ -118,11 +119,11 @@ func (wc *workflowClient) StartWorkflow( // Start creating workflow request. err = backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var err1 error - response, err1 = wc.workflowService.StartWorkflowExecution(ctx, startRequest) + response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest) return err1 }, serviceOperationRetryPolicy, isServiceTransientError) @@ -141,7 +142,7 @@ func (wc *workflowClient) StartWorkflow( } // SignalWorkflow signals a workflow in execution. -func (wc *workflowClient) SignalWorkflow(workflowID string, runID string, signalName string, arg interface{}) error { +func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error { var input []byte if arg != nil { var err error @@ -163,14 +164,14 @@ func (wc *workflowClient) SignalWorkflow(workflowID string, runID string, signal return backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - return wc.workflowService.SignalWorkflowExecution(ctx, request) + return wc.workflowService.SignalWorkflowExecution(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) } // CancelWorkflow cancels a workflow in execution. -func (wc *workflowClient) CancelWorkflow(workflowID string, runID string) error { +func (wc *workflowClient) CancelWorkflow(ctx context.Context, workflowID string, runID string) error { request := &s.RequestCancelWorkflowExecutionRequest{ Domain: common.StringPtr(wc.domain), WorkflowExecution: &s.WorkflowExecution{ @@ -182,16 +183,16 @@ func (wc *workflowClient) CancelWorkflow(workflowID string, runID string) error return backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - return wc.workflowService.RequestCancelWorkflowExecution(ctx, request) + return wc.workflowService.RequestCancelWorkflowExecution(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) } // TerminateWorkflow terminates a workflow execution. // workflowID is required, other parameters are optional. // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. -func (wc *workflowClient) TerminateWorkflow(workflowID string, runID string, reason string, details []byte) error { +func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error { request := &s.TerminateWorkflowExecutionRequest{ Domain: common.StringPtr(wc.domain), WorkflowExecution: &s.WorkflowExecution{ @@ -204,16 +205,16 @@ func (wc *workflowClient) TerminateWorkflow(workflowID string, runID string, rea err := backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - return wc.workflowService.TerminateWorkflowExecution(ctx, request) + return wc.workflowService.TerminateWorkflowExecution(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) return err } // GetWorkflowHistory gets history of a particular workflow. -func (wc *workflowClient) GetWorkflowHistory(workflowID string, runID string) (*s.History, error) { +func (wc *workflowClient) GetWorkflowHistory(ctx context.Context, workflowID string, runID string) (*s.History, error) { history := s.NewHistory() history.Events = make([]*s.HistoryEvent, 0) var nextPageToken []byte @@ -233,9 +234,9 @@ GetHistoryLoop: err := backoff.Retry( func() error { var err1 error - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - response, err1 = wc.workflowService.GetWorkflowExecutionHistory(ctx, request) + response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request) return err1 }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { @@ -250,8 +251,9 @@ GetHistoryLoop: return history, nil } -func (wc *workflowClient) GetWorkflowStackTrace(workflowID string, runID string, atDecisionTaskCompletedEventID int64) (string, error) { +func (wc *workflowClient) GetWorkflowStackTrace(ctx context.Context, workflowID string, runID string, atDecisionTaskCompletedEventID int64) (string, error) { getHistoryPage := newGetHistoryPageFunc( + ctx, wc.workflowService, wc.domain, &s.WorkflowExecution{WorkflowId: common.StringPtr(workflowID), RunId: common.StringPtr(runID)}, @@ -314,7 +316,7 @@ func getWorkflowStackTraceImpl(workflowID string, runID string, getHistoryPage G // should be called when that activity is completed with the actual result and error. If err is nil, activity task // completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise, // activity task failed event will be reported. -func (wc *workflowClient) CompleteActivity(taskToken []byte, result interface{}, err error) error { +func (wc *workflowClient) CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error { if taskToken == nil { return errors.New("invalid task token provided") } @@ -328,16 +330,16 @@ func (wc *workflowClient) CompleteActivity(taskToken []byte, result interface{}, } } request := convertActivityResultToRespondRequest(wc.identity, taskToken, data, err) - return reportActivityComplete(wc.workflowService, request, wc.metricsScope) + return reportActivityComplete(ctx, wc.workflowService, request, wc.metricsScope) } // RecordActivityHeartbeat records heartbeat for an activity. -func (wc *workflowClient) RecordActivityHeartbeat(taskToken []byte, details ...interface{}) error { +func (wc *workflowClient) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error { data, err := getHostEnvironment().encodeArgs(details) if err != nil { return err } - return recordActivityHeartbeat(wc.workflowService, wc.identity, taskToken, data, serviceOperationRetryPolicy) + return recordActivityHeartbeat(ctx, wc.workflowService, wc.identity, taskToken, data, serviceOperationRetryPolicy) } // ListClosedWorkflow gets closed workflow executions based on request filters @@ -345,7 +347,7 @@ func (wc *workflowClient) RecordActivityHeartbeat(taskToken []byte, details ...i // - BadRequestError // - InternalServiceError // - EntityNotExistError -func (wc *workflowClient) ListClosedWorkflow(request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) { +func (wc *workflowClient) ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) { if len(request.GetDomain()) == 0 { request.Domain = common.StringPtr(wc.domain) } @@ -353,9 +355,9 @@ func (wc *workflowClient) ListClosedWorkflow(request *s.ListClosedWorkflowExecut err := backoff.Retry( func() error { var err1 error - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - response, err1 = wc.workflowService.ListClosedWorkflowExecutions(ctx, request) + response, err1 = wc.workflowService.ListClosedWorkflowExecutions(tchCtx, request) return err1 }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { @@ -369,7 +371,7 @@ func (wc *workflowClient) ListClosedWorkflow(request *s.ListClosedWorkflowExecut // - BadRequestError // - InternalServiceError // - EntityNotExistError -func (wc *workflowClient) ListOpenWorkflow(request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) { +func (wc *workflowClient) ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) { if len(request.GetDomain()) == 0 { request.Domain = common.StringPtr(wc.domain) } @@ -377,9 +379,9 @@ func (wc *workflowClient) ListOpenWorkflow(request *s.ListOpenWorkflowExecutions err := backoff.Retry( func() error { var err1 error - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - response, err1 = wc.workflowService.ListOpenWorkflowExecutions(ctx, request) + response, err1 = wc.workflowService.ListOpenWorkflowExecutions(tchCtx, request) return err1 }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { @@ -400,7 +402,7 @@ func (wc *workflowClient) ListOpenWorkflow(request *s.ListOpenWorkflowExecutions // - InternalServiceError // - EntityNotExistError // - QueryFailError -func (wc *workflowClient) QueryWorkflow(workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error) { +func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error) { var input []byte if len(args) > 0 { var err error @@ -423,10 +425,10 @@ func (wc *workflowClient) QueryWorkflow(workflowID string, runID string, queryTy var resp *s.QueryWorkflowResponse err := backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var err error - resp, err = wc.workflowService.QueryWorkflow(ctx, request) + resp, err = wc.workflowService.QueryWorkflow(tchCtx, request) return err }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { @@ -441,12 +443,12 @@ func (wc *workflowClient) QueryWorkflow(workflowID string, runID string, queryTy // - DomainAlreadyExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Register(request *s.RegisterDomainRequest) error { +func (dc *domainClient) Register(ctx context.Context, request *s.RegisterDomainRequest) error { return backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - return dc.workflowService.RegisterDomain(ctx, request) + return dc.workflowService.RegisterDomain(tchCtx, request) }, serviceOperationRetryPolicy, isServiceTransientError) } @@ -457,7 +459,7 @@ func (dc *domainClient) Register(request *s.RegisterDomainRequest) error { // - EntityNotExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Describe(name string) (*s.DomainInfo, *s.DomainConfiguration, error) { +func (dc *domainClient) Describe(ctx context.Context, name string) (*s.DomainInfo, *s.DomainConfiguration, error) { request := &s.DescribeDomainRequest{ Name: common.StringPtr(name), } @@ -465,10 +467,10 @@ func (dc *domainClient) Describe(name string) (*s.DomainInfo, *s.DomainConfigura var response *s.DescribeDomainResponse err := backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() var err error - response, err = dc.workflowService.DescribeDomain(ctx, request) + response, err = dc.workflowService.DescribeDomain(tchCtx, request) return err }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { @@ -484,7 +486,7 @@ func (dc *domainClient) Describe(name string) (*s.DomainInfo, *s.DomainConfigura // - EntityNotExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Update(name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error { +func (dc *domainClient) Update(ctx context.Context, name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error { request := &s.UpdateDomainRequest{ Name: common.StringPtr(name), UpdatedInfo: domainInfo, @@ -493,9 +495,9 @@ func (dc *domainClient) Update(name string, domainInfo *s.UpdateDomainInfo, doma return backoff.Retry( func() error { - ctx, cancel := newTChannelContext() + tchCtx, cancel := newTChannelContext(ctx) defer cancel() - _, err := dc.workflowService.UpdateDomain(ctx, request) + _, err := dc.workflowService.UpdateDomain(tchCtx, request) return err }, serviceOperationRetryPolicy, isServiceTransientError) }