Skip to content

Commit

Permalink
feat(timeout): enable different timeout for last replica
Browse files Browse the repository at this point in the history
Longhorn 8711

Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber committed Aug 21, 2024
1 parent 1e25c7c commit 6f050a1
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 51 deletions.
19 changes: 12 additions & 7 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,13 @@ func startController(c *cli.Context) error {
}

timeout := c.Int64("engine-replica-timeout")
engineReplicaTimeout := time.Duration(timeout) * time.Second
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)
engineReplicaTimeoutShort := time.Duration(timeout) * time.Second
engineReplicaTimeoutShort = controller.DetermineEngineReplicaTimeout(engineReplicaTimeoutShort)
// At the conclusion of https://github.com/longhorn/longhorn/issues/8711 we should have a strategy for determining
// engineReplicaTimeoutLong. For now, we set it to engineReplicaTimeoutShort to maintain existing behavior and
// modify it here for testing.
engineReplicaTimeoutLong := engineReplicaTimeoutShort
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeoutLong)

snapshotMaxCount := c.Int("snapshot-max-count")
snapshotMaxSize := int64(0)
Expand Down Expand Up @@ -187,10 +191,11 @@ func startController(c *cli.Context) error {
}

logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout, snapshotMaxCount, snapshotMaxSize)
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeoutShort)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout,
snapshotMaxCount, snapshotMaxSize)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions pkg/backend/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dynamic
import (
"fmt"
"strings"
"time"

"github.com/longhorn/longhorn-engine/pkg/types"
)
Expand All @@ -18,12 +17,13 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory {
}
}

func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
parts := strings.SplitN(address, "://", 2)

if len(parts) == 2 {
if factory, ok := d.factories[parts[0]]; ok {
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout)
return factory.Create(volumeName, parts[1], dataServerProtocol, sharedTimeouts)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package file
import (
"fmt"
"os"
"time"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -132,7 +131,8 @@ func (f *Wrapper) SetSnapshotMaxSize(size int64) error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ func (r *Remote) info() (*types.ReplicaInfo, error) {
return replicaClient.GetReplicaInfo(resp.Replica), nil
}

func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol,
sharedTimeouts types.SharedTimeouts) (types.Backend, error) {
logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol)

controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol)
Expand Down Expand Up @@ -419,7 +420,7 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
conns = append(conns, conn)
}

dataConnClient := dataconn.NewClient(conns, engineToReplicaTimeout)
dataConnClient := dataconn.NewClient(conns, sharedTimeouts)
r.ReaderWriterUnmapperAt = dataConnClient

if err := r.open(); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Controller struct {
frontend types.Frontend
isUpgrade bool
iscsiTargetRequestTimeout time.Duration
engineReplicaTimeout time.Duration
sharedTimeouts *util.SharedTimeouts
DataServerProtocol types.DataServerProtocol

isExpanding bool
Expand Down Expand Up @@ -72,8 +72,10 @@ const (
lastModifyCheckPeriod = 5 * time.Second
)

func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout, snapshotMaxCount int, snapshotMaxSize int64) *Controller {
func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter,
salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort,
engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout,
snapshotMaxCount int, snapshotMaxSize int64) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
Expand All @@ -89,7 +91,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
SnapshotMaxSize: snapshotMaxSize,

iscsiTargetRequestTimeout: iscsiTargetRequestTimeout,
engineReplicaTimeout: engineReplicaTimeout,
sharedTimeouts: util.NewSharedTimeouts(engineReplicaTimeoutShort, engineReplicaTimeoutLong),
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
Expand Down Expand Up @@ -172,7 +174,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts)
if err != nil {
return err
}
Expand Down Expand Up @@ -895,7 +897,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand Down
71 changes: 41 additions & 30 deletions pkg/dataconn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/longhorn/longhorn-engine/pkg/types"
journal "github.com/longhorn/sparse-tools/stats"
)

Expand All @@ -18,32 +19,32 @@ var (

// Client replica client
type Client struct {
end chan struct{}
requests chan *Message
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
wires []*Wire
peerAddr string
opTimeout time.Duration
end chan struct{}
requests chan *Message
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
wires []*Wire
peerAddr string
sharedTimeouts types.SharedTimeouts
}

// NewClient replica client
func NewClient(conns []net.Conn, engineToReplicaTimeout time.Duration) *Client {
func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client {
var wires []*Wire
for _, conn := range conns {
wires = append(wires, NewWire(conn))
}
c := &Client{
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
opTimeout: engineToReplicaTimeout,
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
sharedTimeouts: sharedTimeouts,
}
go c.loop()
c.write()
Expand Down Expand Up @@ -128,7 +129,16 @@ func (c *Client) loop() {

var clientError error
var ioInflight int
var ioDeadline time.Time
var timeOfLastActivity time.Time

decremented := false
c.sharedTimeouts.Increment()
// Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop.
defer func() {
if !decremented {
c.sharedTimeouts.Decrement()
}
}()

// handleClientError cleans up all in flight messages
// also stores the error so that future requests/responses get errored immediately.
Expand All @@ -139,21 +149,25 @@ func (c *Client) loop() {
}

ioInflight = 0
ioDeadline = time.Time{}
timeOfLastActivity = time.Time{}
}

for {
select {
case <-c.end:
return
case <-ticker.C:
if ioDeadline.IsZero() || time.Now().Before(ioDeadline) {
if timeOfLastActivity.IsZero() || ioInflight == 0 {
continue
}

logrus.Errorf("R/W Timeout. No response received in %v", c.opTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity))
if exceededTimeout > 0 {
decremented = true
logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
}
case req := <-c.requests:
if clientError != nil {
c.replyError(req, clientError)
Expand All @@ -162,7 +176,8 @@ func (c *Client) loop() {

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
if ioInflight == 0 {
ioDeadline = time.Now().Add(c.opTimeout)
// If nothing is in-flight, we should get a fresh timeout.
timeOfLastActivity = time.Now()
}
ioInflight++
}
Expand All @@ -182,11 +197,7 @@ func (c *Client) loop() {

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
ioInflight--
if ioInflight > 0 {
ioDeadline = time.Now().Add(c.opTimeout)
} else if ioInflight == 0 {
ioDeadline = time.Time{}
}
timeOfLastActivity = time.Now()
}

if clientError != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ type Backend interface {
}

type BackendFactory interface {
Create(volumeName, address string, dataServerProtocol DataServerProtocol, engineReplicaTimeout time.Duration) (Backend, error)
Create(volumeName, address string, dataServerProtocol DataServerProtocol,
sharedTimeouts SharedTimeouts) (Backend, error)
}

type SharedTimeouts interface {
Increment()
Decrement()
CheckAndDecrement(duration time.Duration) time.Duration
}

type Controller interface {
Expand Down
63 changes: 63 additions & 0 deletions pkg/util/shared_timeouts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"sync"
"time"
)

// SharedTimeouts has the following use case:
// - Multiple goroutines may need to time out eventually.
// - Only the goroutines themselves know if the conditions for a timeout have been met.
// - It is fine for some of the goroutines to time out quickly.
// - The last goroutine should time out more slowly.
// SharedTimeouts implements the types.SharedTimeouts instead of directly defining the concrete type to avoid an import
// loop.
type SharedTimeouts struct {
mutex sync.RWMutex
longTimeout time.Duration
shortTimeout time.Duration
numConsumers int
}

func NewSharedTimeouts(shortTimeout, longTimeout time.Duration) *SharedTimeouts {
return &SharedTimeouts{
longTimeout: longTimeout,
shortTimeout: shortTimeout,
}
}

func (t *SharedTimeouts) Increment() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers++
}

func (t *SharedTimeouts) Decrement() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers--
}

// CheckAndDecrement checks if duration exceeds longTimeout or shortTimeout, returns the timeout exceeded (if
// applicable) and decrements numConsumers.
// - shortTimeout is only considered exceeded if there is still one other consumer to wait for longTimeout.
// - The caller MUST take whatever action is required for a timeout if a value > 0 is returned.
func (t *SharedTimeouts) CheckAndDecrement(duration time.Duration) time.Duration {
if duration > t.longTimeout {
t.mutex.Lock()
defer t.mutex.Unlock()
t.numConsumers--
return t.longTimeout
}

if duration > t.shortTimeout {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.numConsumers > 1 {
t.numConsumers--
return t.shortTimeout
}
}

return 0
}
Loading

0 comments on commit 6f050a1

Please sign in to comment.