From 6f050a10bb84a6c6d9fc6c0194633f31a1eef66d Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Mon, 5 Aug 2024 21:19:26 +0000 Subject: [PATCH 1/3] feat(timeout): enable different timeout for last replica Longhorn 8711 Signed-off-by: Eric Weber --- app/cmd/controller.go | 19 +++++---- pkg/backend/dynamic/dynamic.go | 6 +-- pkg/backend/file/file.go | 4 +- pkg/backend/remote/remote.go | 5 ++- pkg/controller/control.go | 14 ++++--- pkg/dataconn/client.go | 71 ++++++++++++++++++++-------------- pkg/types/types.go | 9 ++++- pkg/util/shared_timeouts.go | 63 ++++++++++++++++++++++++++++++ pkg/util/util_test.go | 59 ++++++++++++++++++++++++++++ 9 files changed, 199 insertions(+), 51 deletions(-) create mode 100644 pkg/util/shared_timeouts.go diff --git a/app/cmd/controller.go b/app/cmd/controller.go index a23386616..aab765b50 100644 --- a/app/cmd/controller.go +++ b/app/cmd/controller.go @@ -143,9 +143,13 @@ func startController(c *cli.Context) error { } timeout := c.Int64("engine-replica-timeout") - engineReplicaTimeout := time.Duration(timeout) * time.Second - engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout) - iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout) + engineReplicaTimeoutShort := time.Duration(timeout) * time.Second + engineReplicaTimeoutShort = controller.DetermineEngineReplicaTimeout(engineReplicaTimeoutShort) + // At the conclusion of https://github.com/longhorn/longhorn/issues/8711 we should have a strategy for determining + // engineReplicaTimeoutLong. For now, we set it to engineReplicaTimeoutShort to maintain existing behavior and + // modify it here for testing. + engineReplicaTimeoutLong := engineReplicaTimeoutShort + iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeoutLong) snapshotMaxCount := c.Int("snapshot-max-count") snapshotMaxSize := int64(0) @@ -187,10 +191,11 @@ func startController(c *cli.Context) error { } logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v", - volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout) - control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested, - unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol), - fileSyncHTTPClientTimeout, snapshotMaxCount, snapshotMaxSize) + volumeName, iscsiTargetRequestTimeout, engineReplicaTimeoutShort) + control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, + salvageRequested, unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeoutShort, + engineReplicaTimeoutLong, types.DataServerProtocol(dataServerProtocol), fileSyncHTTPClientTimeout, + snapshotMaxCount, snapshotMaxSize) // need to wait for Shutdown() completion control.ShutdownWG.Add(1) diff --git a/pkg/backend/dynamic/dynamic.go b/pkg/backend/dynamic/dynamic.go index 32d3027ba..1f8f9d47a 100644 --- a/pkg/backend/dynamic/dynamic.go +++ b/pkg/backend/dynamic/dynamic.go @@ -3,7 +3,6 @@ package dynamic import ( "fmt" "strings" - "time" "github.com/longhorn/longhorn-engine/pkg/types" ) @@ -18,12 +17,13 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory { } } -func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) { +func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, + sharedTimeouts types.SharedTimeouts) (types.Backend, error) { parts := strings.SplitN(address, "://", 2) if len(parts) == 2 { if factory, ok := d.factories[parts[0]]; ok { - return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout) + return factory.Create(volumeName, parts[1], dataServerProtocol, sharedTimeouts) } } diff --git a/pkg/backend/file/file.go b/pkg/backend/file/file.go index abf7cc30c..d25ecca3c 100644 --- a/pkg/backend/file/file.go +++ b/pkg/backend/file/file.go @@ -3,7 +3,6 @@ package file import ( "fmt" "os" - "time" "github.com/sirupsen/logrus" @@ -132,7 +131,8 @@ func (f *Wrapper) SetSnapshotMaxSize(size int64) error { return nil } -func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) { +func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, + sharedTimeouts types.SharedTimeouts) (types.Backend, error) { logrus.Infof("Creating file: %s", address) file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600) if err != nil { diff --git a/pkg/backend/remote/remote.go b/pkg/backend/remote/remote.go index f67f73de1..e39a6f28e 100644 --- a/pkg/backend/remote/remote.go +++ b/pkg/backend/remote/remote.go @@ -383,7 +383,8 @@ func (r *Remote) info() (*types.ReplicaInfo, error) { return replicaClient.GetReplicaInfo(resp.Replica), nil } -func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) { +func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, + sharedTimeouts types.SharedTimeouts) (types.Backend, error) { logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol) controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol) @@ -419,7 +420,7 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D conns = append(conns, conn) } - dataConnClient := dataconn.NewClient(conns, engineToReplicaTimeout) + dataConnClient := dataconn.NewClient(conns, sharedTimeouts) r.ReaderWriterUnmapperAt = dataConnClient if err := r.open(); err != nil { diff --git a/pkg/controller/control.go b/pkg/controller/control.go index ec759bf8b..3ea9da0b0 100644 --- a/pkg/controller/control.go +++ b/pkg/controller/control.go @@ -36,7 +36,7 @@ type Controller struct { frontend types.Frontend isUpgrade bool iscsiTargetRequestTimeout time.Duration - engineReplicaTimeout time.Duration + sharedTimeouts *util.SharedTimeouts DataServerProtocol types.DataServerProtocol isExpanding bool @@ -72,8 +72,10 @@ const ( lastModifyCheckPeriod = 5 * time.Second ) -func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool, - iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout, snapshotMaxCount int, snapshotMaxSize int64) *Controller { +func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, + salvageRequested, unmapMarkSnapChainRemoved bool, iscsiTargetRequestTimeout, engineReplicaTimeoutShort, + engineReplicaTimeoutLong time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout, + snapshotMaxCount int, snapshotMaxSize int64) *Controller { c := &Controller{ factory: factory, VolumeName: name, @@ -89,7 +91,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro SnapshotMaxSize: snapshotMaxSize, iscsiTargetRequestTimeout: iscsiTargetRequestTimeout, - engineReplicaTimeout: engineReplicaTimeout, + sharedTimeouts: util.NewSharedTimeouts(engineReplicaTimeoutShort, engineReplicaTimeoutLong), DataServerProtocol: dataServerProtocol, fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout, @@ -172,7 +174,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type return err } - newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout) + newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts) if err != nil { return err } @@ -895,7 +897,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str errorCodes := map[string]codes.Code{} first := true for _, address := range addresses { - newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout) + newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.sharedTimeouts) if err != nil { if strings.Contains(err.Error(), "rpc error: code = Unavailable") { errorCodes[address] = codes.Unavailable diff --git a/pkg/dataconn/client.go b/pkg/dataconn/client.go index dcfc02885..5aa58d5f7 100644 --- a/pkg/dataconn/client.go +++ b/pkg/dataconn/client.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/longhorn/longhorn-engine/pkg/types" journal "github.com/longhorn/sparse-tools/stats" ) @@ -18,32 +19,32 @@ var ( // Client replica client type Client struct { - end chan struct{} - requests chan *Message - send chan *Message - responses chan *Message - seq uint32 - messages map[uint32]*Message - wires []*Wire - peerAddr string - opTimeout time.Duration + end chan struct{} + requests chan *Message + send chan *Message + responses chan *Message + seq uint32 + messages map[uint32]*Message + wires []*Wire + peerAddr string + sharedTimeouts types.SharedTimeouts } // NewClient replica client -func NewClient(conns []net.Conn, engineToReplicaTimeout time.Duration) *Client { +func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client { var wires []*Wire for _, conn := range conns { wires = append(wires, NewWire(conn)) } c := &Client{ - wires: wires, - peerAddr: conns[0].RemoteAddr().String(), - end: make(chan struct{}, 1024), - requests: make(chan *Message, 1024), - send: make(chan *Message, 1024), - responses: make(chan *Message, 1024), - messages: map[uint32]*Message{}, - opTimeout: engineToReplicaTimeout, + wires: wires, + peerAddr: conns[0].RemoteAddr().String(), + end: make(chan struct{}, 1024), + requests: make(chan *Message, 1024), + send: make(chan *Message, 1024), + responses: make(chan *Message, 1024), + messages: map[uint32]*Message{}, + sharedTimeouts: sharedTimeouts, } go c.loop() c.write() @@ -128,7 +129,16 @@ func (c *Client) loop() { var clientError error var ioInflight int - var ioDeadline time.Time + var timeOfLastActivity time.Time + + decremented := false + c.sharedTimeouts.Increment() + // Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop. + defer func() { + if !decremented { + c.sharedTimeouts.Decrement() + } + }() // handleClientError cleans up all in flight messages // also stores the error so that future requests/responses get errored immediately. @@ -139,7 +149,7 @@ func (c *Client) loop() { } ioInflight = 0 - ioDeadline = time.Time{} + timeOfLastActivity = time.Time{} } for { @@ -147,13 +157,17 @@ func (c *Client) loop() { case <-c.end: return case <-ticker.C: - if ioDeadline.IsZero() || time.Now().Before(ioDeadline) { + if timeOfLastActivity.IsZero() || ioInflight == 0 { continue } - logrus.Errorf("R/W Timeout. No response received in %v", c.opTimeout) - handleClientError(ErrRWTimeout) - journal.PrintLimited(1000) + exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity)) + if exceededTimeout > 0 { + decremented = true + logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout) + handleClientError(ErrRWTimeout) + journal.PrintLimited(1000) + } case req := <-c.requests: if clientError != nil { c.replyError(req, clientError) @@ -162,7 +176,8 @@ func (c *Client) loop() { if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap { if ioInflight == 0 { - ioDeadline = time.Now().Add(c.opTimeout) + // If nothing is in-flight, we should get a fresh timeout. + timeOfLastActivity = time.Now() } ioInflight++ } @@ -182,11 +197,7 @@ func (c *Client) loop() { if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap { ioInflight-- - if ioInflight > 0 { - ioDeadline = time.Now().Add(c.opTimeout) - } else if ioInflight == 0 { - ioDeadline = time.Time{} - } + timeOfLastActivity = time.Now() } if clientError != nil { diff --git a/pkg/types/types.go b/pkg/types/types.go index 3e02bbb7e..83197a71e 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -109,7 +109,14 @@ type Backend interface { } type BackendFactory interface { - Create(volumeName, address string, dataServerProtocol DataServerProtocol, engineReplicaTimeout time.Duration) (Backend, error) + Create(volumeName, address string, dataServerProtocol DataServerProtocol, + sharedTimeouts SharedTimeouts) (Backend, error) +} + +type SharedTimeouts interface { + Increment() + Decrement() + CheckAndDecrement(duration time.Duration) time.Duration } type Controller interface { diff --git a/pkg/util/shared_timeouts.go b/pkg/util/shared_timeouts.go new file mode 100644 index 000000000..c609d1cec --- /dev/null +++ b/pkg/util/shared_timeouts.go @@ -0,0 +1,63 @@ +package util + +import ( + "sync" + "time" +) + +// SharedTimeouts has the following use case: +// - Multiple goroutines may need to time out eventually. +// - Only the goroutines themselves know if the conditions for a timeout have been met. +// - It is fine for some of the goroutines to time out quickly. +// - The last goroutine should time out more slowly. +// SharedTimeouts implements the types.SharedTimeouts instead of directly defining the concrete type to avoid an import +// loop. +type SharedTimeouts struct { + mutex sync.RWMutex + longTimeout time.Duration + shortTimeout time.Duration + numConsumers int +} + +func NewSharedTimeouts(shortTimeout, longTimeout time.Duration) *SharedTimeouts { + return &SharedTimeouts{ + longTimeout: longTimeout, + shortTimeout: shortTimeout, + } +} + +func (t *SharedTimeouts) Increment() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.numConsumers++ +} + +func (t *SharedTimeouts) Decrement() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.numConsumers-- +} + +// CheckAndDecrement checks if duration exceeds longTimeout or shortTimeout, returns the timeout exceeded (if +// applicable) and decrements numConsumers. +// - shortTimeout is only considered exceeded if there is still one other consumer to wait for longTimeout. +// - The caller MUST take whatever action is required for a timeout if a value > 0 is returned. +func (t *SharedTimeouts) CheckAndDecrement(duration time.Duration) time.Duration { + if duration > t.longTimeout { + t.mutex.Lock() + defer t.mutex.Unlock() + t.numConsumers-- + return t.longTimeout + } + + if duration > t.shortTimeout { + t.mutex.Lock() + defer t.mutex.Unlock() + if t.numConsumers > 1 { + t.numConsumers-- + return t.shortTimeout + } + } + + return 0 +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 459dab9a7..c3f3659be 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -1,9 +1,13 @@ package util import ( + "fmt" "os" "path/filepath" + "sync" + "sync/atomic" "testing" + "time" . "gopkg.in/check.v1" ) @@ -97,3 +101,58 @@ func (s *TestSuite) TestResolveFilepathSubdirectory(c *C) { _, err = ResolveBackingFilepath(dirpath) c.Assert(err, ErrorMatches, ".*found a subdirectory") } + +func (s *TestSuite) TestSharedTimeouts(c *C) { + shortTimeout := 8 * time.Second + longTimeout := 16 * time.Second + sharedTimeouts := NewSharedTimeouts(shortTimeout, longTimeout) + + // Increment the SharedTimeouts in multiple goroutines. + wg := new(sync.WaitGroup) + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + sharedTimeouts.Increment() + wg.Done() + }() + } + wg.Wait() + c.Assert(sharedTimeouts.numConsumers, Equals, 3) + + // CheckAndDecrement with a duration smaller than shortTimeout. + exceededTimeout := sharedTimeouts.CheckAndDecrement(4 * time.Second) + c.Assert(exceededTimeout, Equals, time.Duration(0)) + c.Assert(sharedTimeouts.numConsumers, Equals, 3) + + // Decrement the SharedTimeouts. + sharedTimeouts.Decrement() + c.Assert(sharedTimeouts.numConsumers, Equals, 2) + + // Simultaneously CheckAndDecrement a duration larger than shortTimeout but smaller than longTimeout. One goroutine + // "exceeds" the timeout. The other does not. + numExceeded := &atomic.Int64{} + timeoutExceeded := &atomic.Int64{} + wg = new(sync.WaitGroup) + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + exceededTimeout := sharedTimeouts.CheckAndDecrement(12 * time.Second) + fmt.Println(exceededTimeout) + if exceededTimeout > time.Duration(0) { + fmt.Println(exceededTimeout) + numExceeded.Add(1) + timeoutExceeded.Store(int64(exceededTimeout)) + } + wg.Done() + }() + } + wg.Wait() + c.Assert(int(numExceeded.Load()), Equals, 1) + c.Assert(time.Duration(timeoutExceeded.Load()), Equals, shortTimeout) + c.Assert(sharedTimeouts.numConsumers, Equals, 1) + + // CheckAndDecrement with a duration larger than longTimeout. + exceededTimeout = sharedTimeouts.CheckAndDecrement(20 * time.Second) + c.Assert(exceededTimeout, Equals, longTimeout) + c.Assert(sharedTimeouts.numConsumers, Equals, 0) +} From 109aaf7f2a1df5a0532c5f9900abcb95e5fbcb24 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Thu, 8 Aug 2024 14:59:26 +0000 Subject: [PATCH 2/3] fix(datconn): don't orphan client goroutine when remote is closed Longhorn 8711 Signed-off-by: Eric Weber --- pkg/backend/remote/remote.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/backend/remote/remote.go b/pkg/backend/remote/remote.go index e39a6f28e..a594c94b5 100644 --- a/pkg/backend/remote/remote.go +++ b/pkg/backend/remote/remote.go @@ -51,6 +51,12 @@ type Remote struct { func (r *Remote) Close() error { logrus.Infof("Closing: %s", r.name) + + // Close the dataconn client to avoid orphaning goroutines. + if dataconnClient, ok := r.ReaderWriterUnmapperAt.(*dataconn.Client); ok { + dataconnClient.Close() + } + conn, err := grpc.NewClient(r.replicaServiceURL, grpc.WithTransportCredentials(insecure.NewCredentials()), interceptor.WithIdentityValidationClientInterceptor(r.volumeName, "")) if err != nil { From 0e9215bfe9f7cca711c54c754b5287c72b6e5d11 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Wed, 21 Aug 2024 19:07:06 +0000 Subject: [PATCH 3/3] feat(timeout): make engineReplicaTimeoutLong double engineReplicaTimeoutShort Longhorn 8711 Signed-off-by: Eric Weber --- app/cmd/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/cmd/controller.go b/app/cmd/controller.go index aab765b50..4bf0784a7 100644 --- a/app/cmd/controller.go +++ b/app/cmd/controller.go @@ -145,10 +145,10 @@ func startController(c *cli.Context) error { timeout := c.Int64("engine-replica-timeout") engineReplicaTimeoutShort := time.Duration(timeout) * time.Second engineReplicaTimeoutShort = controller.DetermineEngineReplicaTimeout(engineReplicaTimeoutShort) - // At the conclusion of https://github.com/longhorn/longhorn/issues/8711 we should have a strategy for determining - // engineReplicaTimeoutLong. For now, we set it to engineReplicaTimeoutShort to maintain existing behavior and - // modify it here for testing. - engineReplicaTimeoutLong := engineReplicaTimeoutShort + // In https://github.com/longhorn/longhorn/issues/8711 we decided to allow the last replica twice as long as the + // others before a timeout. We can optionally adjust this strategy (e.g. to a fixed sixty seconds or some + // configurable value) in the future. + engineReplicaTimeoutLong := 2 * engineReplicaTimeoutShort iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeoutLong) snapshotMaxCount := c.Int("snapshot-max-count")