Skip to content

Commit

Permalink
Make session heartbeat timeout customizable (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 16, 2020
1 parent f4fba53 commit 0b9ff3d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
21 changes: 17 additions & 4 deletions internal/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ type (
// ExecutionTimeout: required, no default
// Specifies the maximum amount of time the session can run
// CreationTimeout: required, no default
// Specfifies how long session creation can take before returning an error
// Specifies how long session creation can take before returning an error
// HeartbeatTimeout: optional, default 20s
// Specifies the heartbeat timeout. If heartbeat is not received by server
// within the timeout, the session will be declared as failed
SessionOptions struct {
ExecutionTimeout time.Duration
CreationTimeout time.Duration
HeartbeatTimeout time.Duration
}

recreateSessionParams struct {
Expand Down Expand Up @@ -117,7 +121,8 @@ const (

errTooManySessionsMsg string = "too many outstanding sessions"

sessionHeartBeatTimeout time.Duration = time.Second * 10
defaultSessionHeartBeatTimeout time.Duration = time.Second * 20
maxSessionHeartBeatInterval time.Duration = time.Second * 10
)

var (
Expand Down Expand Up @@ -300,11 +305,15 @@ func createSession(ctx Context, creationTasklist string, options *SessionOptions
},
}

heartbeatTimeout := defaultSessionHeartBeatTimeout
if options.HeartbeatTimeout != time.Duration(0) {
heartbeatTimeout = options.HeartbeatTimeout
}
ao := ActivityOptions{
TaskList: creationTasklist,
ScheduleToStartTimeout: options.CreationTimeout,
StartToCloseTimeout: options.ExecutionTimeout,
HeartbeatTimeout: sessionHeartBeatTimeout,
HeartbeatTimeout: heartbeatTimeout,
}
if retryable {
ao.RetryPolicy = retryPolicy
Expand Down Expand Up @@ -400,7 +409,11 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error {
}

activityEnv := getActivityEnv(ctx)
ticker := time.NewTicker(activityEnv.heartbeatTimeout / 2)
heartbeatInterval := activityEnv.heartbeatTimeout / 3
if heartbeatInterval > maxSessionHeartBeatInterval {
heartbeatInterval = maxSessionHeartBeatInterval
}
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()

for {
Expand Down
5 changes: 4 additions & 1 deletion workflow/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ type (
// ExecutionTimeout: required, no default
// Specifies the maximum amount of time the session can run
// CreationTimeout: required, no default
// Specfifies how long session creation can take before returning an error
// Specifies how long session creation can take before returning an error
// HeartbeatTimeout: optional, default 20s
// Specifies the heartbeat timeout. If heartbeat is not received by server
// within the timeout, the session will be declared as failed
SessionOptions = internal.SessionOptions
)

Expand Down

0 comments on commit 0b9ff3d

Please sign in to comment.