Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support a different timeout for the last replica (backport #1176) #1198

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,13 @@ func startController(c *cli.Context) error {
}

timeout := c.Int64("engine-replica-timeout")
engineReplicaTimeout := time.Duration(timeout) * time.Second
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)
engineReplicaTimeoutShort := time.Duration(timeout) * time.Second
engineReplicaTimeoutShort = controller.DetermineEngineReplicaTimeout(engineReplicaTimeoutShort)
// In https://github.com/longhorn/longhorn/issues/8711 we decided to allow the last replica twice as long as the
// others before a timeout. We can optionally adjust this strategy (e.g. to a fixed sixty seconds or some
// configurable value) in the future.
engineReplicaTimeoutLong := 2 * engineReplicaTimeoutShort
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeoutLong)

snapshotMaxCount := c.Int("snapshot-max-count")
snapshotMaxSize := int64(0)
Expand Down Expand Up @@ -187,10 +191,11 @@ func startController(c *cli.Context) error {
}

logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout, snapshotMaxCount, snapshotMaxSize)
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeoutShort)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout,
snapshotMaxCount, snapshotMaxSize)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions pkg/backend/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dynamic
import (
"fmt"
"strings"
"time"

"github.com/longhorn/longhorn-engine/pkg/types"
)
Expand All @@ -18,12 +17,13 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory {
}
}

func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
parts := strings.SplitN(address, "://", 2)

if len(parts) == 2 {
if factory, ok := d.factories[parts[0]]; ok {
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout)
return factory.Create(volumeName, parts[1], dataServerProtocol, sharedTimeouts)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package file
import (
"fmt"
"os"
"time"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -132,7 +131,8 @@ func (f *Wrapper) SetSnapshotMaxSize(size int64) error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type Remote struct {

func (r *Remote) Close() error {
logrus.Infof("Closing: %s", r.name)

// Close the dataconn client to avoid orphaning goroutines.
if dataconnClient, ok := r.ReaderWriterUnmapperAt.(*dataconn.Client); ok {
dataconnClient.Close()
}

conn, err := grpc.NewClient(r.replicaServiceURL, grpc.WithTransportCredentials(insecure.NewCredentials()),
interceptor.WithIdentityValidationClientInterceptor(r.volumeName, ""))
if err != nil {
Expand Down Expand Up @@ -380,7 +386,8 @@ func (r *Remote) info() (*types.ReplicaInfo, error) {
return replicaClient.GetReplicaInfo(resp.Replica), nil
}

func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol)

controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol)
Expand Down Expand Up @@ -416,7 +423,7 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
conns = append(conns, conn)
}

dataConnClient := dataconn.NewClient(conns, engineToReplicaTimeout)
dataConnClient := dataconn.NewClient(conns, sharedTimeouts)
r.ReaderWriterUnmapperAt = dataConnClient

if err := r.open(); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Controller struct {
frontend types.Frontend
isUpgrade bool
iscsiTargetRequestTimeout time.Duration
engineReplicaTimeout time.Duration
sharedTimeouts *util.SharedTimeouts
DataServerProtocol types.DataServerProtocol

isExpanding bool
Expand Down Expand Up @@ -72,8 +72,10 @@ const (
lastModifyCheckPeriod = 5 * time.Second
)

func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout, snapshotMaxCount int, snapshotMaxSize int64) *Controller {
func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout,
snapshotMaxCount int, snapshotMaxSize int64) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -89,7 +91,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
SnapshotMaxSize: snapshotMaxSize,

iscsiTargetRequestTimeout: iscsiTargetRequestTimeout,
engineReplicaTimeout: engineReplicaTimeout,
sharedTimeouts: util.NewSharedTimeouts(engineReplicaTimeoutShort, engineReplicaTimeoutLong),
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
Expand Down Expand Up @@ -172,7 +174,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts)
if err != nil {
return err
}
Expand Down Expand Up @@ -895,7 +897,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand Down
71 changes: 41 additions & 30 deletions pkg/dataconn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/longhorn/longhorn-engine/pkg/types"
journal "github.com/longhorn/sparse-tools/stats"
)

Expand All @@ -18,32 +19,32 @@ var (

// Client replica client
type Client struct {
end chan struct{}
requests chan *Message
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
wires []*Wire
peerAddr string
opTimeout time.Duration
end chan struct{}
requests chan *Message
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
wires []*Wire
peerAddr string
sharedTimeouts types.SharedTimeouts
}

// NewClient replica client
func NewClient(conns []net.Conn, engineToReplicaTimeout time.Duration) *Client {
func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client {
var wires []*Wire
for _, conn := range conns {
wires = append(wires, NewWire(conn))
}
c := &Client{
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
opTimeout: engineToReplicaTimeout,
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
sharedTimeouts: sharedTimeouts,
}
go c.loop()
c.write()
Expand Down Expand Up @@ -128,7 +129,16 @@ func (c *Client) loop() {

var clientError error
var ioInflight int
var ioDeadline time.Time
var timeOfLastActivity time.Time

decremented := false
c.sharedTimeouts.Increment()
// Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop.
defer func() {
if !decremented {
c.sharedTimeouts.Decrement()
}
}()

// handleClientError cleans up all in flight messages
// also stores the error so that future requests/responses get errored immediately.
Expand All @@ -139,21 +149,25 @@ func (c *Client) loop() {
}

ioInflight = 0
ioDeadline = time.Time{}
timeOfLastActivity = time.Time{}
}

for {
select {
case <-c.end:
return
case <-ticker.C:
if ioDeadline.IsZero() || time.Now().Before(ioDeadline) {
if timeOfLastActivity.IsZero() || ioInflight == 0 {
continue
}

logrus.Errorf("R/W Timeout. No response received in %v", c.opTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity))
if exceededTimeout > 0 {
decremented = true
logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
}
case req := <-c.requests:
if clientError != nil {
c.replyError(req, clientError)
Expand All @@ -162,7 +176,8 @@ func (c *Client) loop() {

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
if ioInflight == 0 {
ioDeadline = time.Now().Add(c.opTimeout)
// If nothing is in-flight, we should get a fresh timeout.
timeOfLastActivity = time.Now()
}
ioInflight++
}
Expand All @@ -182,11 +197,7 @@ func (c *Client) loop() {

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
ioInflight--
if ioInflight > 0 {
ioDeadline = time.Now().Add(c.opTimeout)
} else if ioInflight == 0 {
ioDeadline = time.Time{}
}
timeOfLastActivity = time.Now()
}

if clientError != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ type Backend interface {
}

type BackendFactory interface {
Create(volumeName, address string, dataServerProtocol DataServerProtocol, engineReplicaTimeout time.Duration) (Backend, error)
Create(volumeName, address string, dataServerProtocol DataServerProtocol,
sharedTimeouts SharedTimeouts) (Backend, error)
}

type SharedTimeouts interface {
Increment()
Decrement()
CheckAndDecrement(duration time.Duration) time.Duration
}

type Controller interface {
Expand Down
63 changes: 63 additions & 0 deletions pkg/util/shared_timeouts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"sync"
"time"
)

// SharedTimeouts has the following use case:
// - Multiple goroutines may need to time out eventually.
// - Only the goroutines themselves know if the conditions for a timeout have been met.
// - It is fine for some of the goroutines to time out quickly.
// - The last goroutine should time out more slowly.
// SharedTimeouts implements the types.SharedTimeouts instead of directly defining the concrete type to avoid an import
// loop.
type SharedTimeouts struct {
mutex sync.RWMutex
longTimeout time.Duration
shortTimeout time.Duration
numConsumers int
}

func NewSharedTimeouts(shortTimeout, longTimeout time.Duration) *SharedTimeouts {
return &SharedTimeouts{
longTimeout: longTimeout,
shortTimeout: shortTimeout,
}
}

func (t *SharedTimeouts) Increment() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers++
}

func (t *SharedTimeouts) Decrement() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers--
}

// CheckAndDecrement checks if duration exceeds longTimeout or shortTimeout, returns the timeout exceeded (if
// applicable) and decrements numConsumers.
// - shortTimeout is only considered exceeded if there is still one other consumer to wait for longTimeout.
// - The caller MUST take whatever action is required for a timeout if a value > 0 is returned.
func (t *SharedTimeouts) CheckAndDecrement(duration time.Duration) time.Duration {
if duration > t.longTimeout {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers--
return t.longTimeout
}

if duration > t.shortTimeout {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.numConsumers > 1 {
t.numConsumers--
return t.shortTimeout
}
}

return 0
}
Loading