Skip to content

Commit bf93cf9

Browse files
authored
feat: better epoll detection, allow to disable epoll (#984)
Cosmo PR: wundergraph/cosmo#1381 Tested manually on docker with limited host capabilities.
1 parent 9d45b54 commit bf93cf9

14 files changed

+257
-160
lines changed

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,6 +1754,8 @@ func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input [
17541754
return s.client.SubscribeAsync(ctx, id, options, updater)
17551755
}
17561756

1757+
// AsyncStop stops the subscription with the given id. AsyncStop is only effective when netPoll is enabled
1758+
// because without netPoll we manage the lifecycle of the connection in the subscription client.
17571759
func (s *SubscriptionSource) AsyncStop(id uint64) {
17581760
s.client.Unsubscribe(id)
17591761
}

v2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client.go

Lines changed: 92 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@ import (
2323
"github.com/gobwas/ws/wsutil"
2424
"github.com/jensneuse/abstractlogger"
2525
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
26-
"github.com/wundergraph/graphql-go-tools/v2/pkg/epoller"
26+
"github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll"
2727
"go.uber.org/atomic"
2828
)
2929

3030
const ackWaitTimeout = 30 * time.Second
3131

32-
type epollState struct {
32+
type netPollState struct {
3333
// connections is a map of fd -> connection to keep track of all active connections
3434
connections map[int]*connection
3535
hasConnections atomic.Bool
3636
// triggers is a map of subscription id -> fd to easily look up the connection for a subscription id
3737
triggers map[uint64]int
3838

39-
// clientUnsubscribe is a channel to signal to the epoll run loop that a client needs to be unsubscribed
39+
// clientUnsubscribe is a channel to signal to the netPoll run loop that a client needs to be unsubscribed
4040
clientUnsubscribe chan uint64
41-
// addConn is a channel to signal to the epoll run loop that a new connection needs to be added
41+
// addConn is a channel to signal to the netPoll run loop that a new connection needs to be added
4242
addConn chan *connection
43-
// waitForEventsTicker is the ticker for the epoll run loop
43+
// waitForEventsTicker is the ticker for the netPoll run loop
4444
// it is used to prevent busy waiting and to limit the CPU usage
45-
// instead of polling the epoll instance all the time, we wait until the next tick to throttle the epoll loop
45+
// instead of polling the netPoll instance all the time, we wait until the next tick to throttle the netPoll loop
4646
waitForEventsTicker *time.Ticker
4747

4848
// waitForEventsTick is the channel to receive the tick from the waitForEventsTicker
@@ -65,9 +65,9 @@ type subscriptionClient struct {
6565

6666
readTimeout time.Duration
6767

68-
epoll epoller.Poller
69-
epollConfig EpollConfiguration
70-
epollState *epollState
68+
netPoll netpoll.Poller
69+
netPollConfig NetPollConfiguration
70+
netPollState *netPollState
7171
}
7272

7373
func (c *subscriptionClient) SubscribeAsync(ctx *resolve.Context, id uint64, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
@@ -85,12 +85,12 @@ func (c *subscriptionClient) SubscribeAsync(ctx *resolve.Context, id uint64, opt
8585
}
8686

8787
func (c *subscriptionClient) Unsubscribe(id uint64) {
88-
// if we don't have epoll, we don't have a channel consumer of the clientUnsubscribe channel
88+
// if we don't have netPoll, we don't have a channel consumer of the clientUnsubscribe channel
8989
// we have to return to prevent a deadlock
90-
if c.epoll == nil {
90+
if c.netPoll == nil {
9191
return
9292
}
93-
c.epollState.clientUnsubscribe <- id
93+
c.netPollState.clientUnsubscribe <- id
9494
}
9595

9696
type InvalidWsSubprotocolError struct {
@@ -121,21 +121,23 @@ func WithReadTimeout(timeout time.Duration) Options {
121121
}
122122
}
123123

124-
type EpollConfiguration struct {
125-
// Disable can be set to true to disable epoll
126-
Disable bool
127-
// BufferSize defines the size of the buffer for the epoll loop
124+
type NetPollConfiguration struct {
125+
// Enable can be set to true to enable netPoll
126+
Enable bool
127+
// BufferSize defines the size of the buffer for the netPoll loop
128128
BufferSize int
129-
// WaitForNumEvents defines how many events are waited for in the epoll loop before TickInterval cancels the wait
129+
// WaitForNumEvents defines how many events are waited for in the netPoll loop before TickInterval cancels the wait
130130
WaitForNumEvents int
131131
// MaxEventWorkers defines the parallelism of how many connections can be handled at the same time
132132
// The higher the number, the more CPU is used.
133133
MaxEventWorkers int
134-
// TickInterval defines the time between each epoll loop when WaitForNumEvents is not reached
134+
// TickInterval defines the time between each netPoll loop when WaitForNumEvents is not reached
135135
TickInterval time.Duration
136136
}
137137

138-
func (e *EpollConfiguration) ApplyDefaults() {
138+
func (e *NetPollConfiguration) ApplyDefaults() {
139+
e.Enable = true
140+
139141
if e.BufferSize == 0 {
140142
e.BufferSize = 1024
141143
}
@@ -150,17 +152,17 @@ func (e *EpollConfiguration) ApplyDefaults() {
150152
}
151153
}
152154

153-
func WithEpollConfiguration(config EpollConfiguration) Options {
155+
func WithNetPollConfiguration(config NetPollConfiguration) Options {
154156
return func(options *opts) {
155-
options.epollConfiguration = config
157+
options.netPollConfiguration = config
156158
}
157159
}
158160

159161
type opts struct {
160162
readTimeout time.Duration
161163
log abstractlogger.Logger
162164
onWsConnectionInitCallback *OnWsConnectionInitCallback
163-
epollConfiguration EpollConfiguration
165+
netPollConfiguration NetPollConfiguration
164166
}
165167

166168
// GraphQLSubscriptionClientFactory abstracts the way of creating a new GraphQLSubscriptionClient.
@@ -185,10 +187,13 @@ func NewGraphQLSubscriptionClient(httpClient, streamingClient *http.Client, engi
185187
readTimeout: time.Millisecond * 100,
186188
log: abstractlogger.NoopLogger,
187189
}
190+
191+
op.netPollConfiguration.ApplyDefaults()
192+
188193
for _, option := range options {
189194
option(op)
190195
}
191-
op.epollConfiguration.ApplyDefaults()
196+
192197
client := &subscriptionClient{
193198
httpClient: httpClient,
194199
streamingClient: streamingClient,
@@ -201,27 +206,27 @@ func NewGraphQLSubscriptionClient(httpClient, streamingClient *http.Client, engi
201206
},
202207
},
203208
onWsConnectionInitCallback: op.onWsConnectionInitCallback,
204-
epollConfig: op.epollConfiguration,
209+
netPollConfig: op.netPollConfiguration,
205210
}
206-
if !op.epollConfiguration.Disable {
207-
client.epollState = &epollState{
211+
if op.netPollConfiguration.Enable {
212+
client.netPollState = &netPollState{
208213
connections: make(map[int]*connection),
209214
triggers: make(map[uint64]int),
210-
clientUnsubscribe: make(chan uint64, op.epollConfiguration.BufferSize),
211-
addConn: make(chan *connection, op.epollConfiguration.BufferSize),
215+
clientUnsubscribe: make(chan uint64, op.netPollConfiguration.BufferSize),
216+
addConn: make(chan *connection, op.netPollConfiguration.BufferSize),
212217
// this is not needed, but we want to make it explicit that we're starting with nil as the tick channel
213-
// reading from nil channels blocks forever, which allows us to prevent the epoll loop from starting
218+
// reading from nil channels blocks forever, which allows us to prevent the netPoll loop from starting
214219
// once we add the first connection, we start the ticker and set the tick channel
215220
// after the last connection is removed, we set the tick channel to nil again
216221
// this way we can start and stop the epoll loop dynamically
217222
waitForEventsTick: nil,
218223
}
219224

220-
// ignore error is ok, it means that epoll is not supported, which is handled gracefully by the client
221-
epoll, _ := epoller.NewPoller(op.epollConfiguration.BufferSize, op.epollConfiguration.TickInterval)
222-
if epoll != nil {
223-
client.epoll = epoll
224-
go client.runEpoll(engineCtx)
225+
// ignore error is ok, it means that netPoll is not supported, which is handled gracefully by the client
226+
poller, _ := netpoll.NewPoller(op.netPollConfiguration.BufferSize, op.netPollConfiguration.TickInterval)
227+
if poller != nil {
228+
client.netPoll = poller
229+
go client.runNetPoll(engineCtx)
225230
}
226231
}
227232
return client
@@ -317,7 +322,7 @@ func (c *subscriptionClient) asyncSubscribeWS(requestContext, engineContext cont
317322
return err
318323
}
319324

320-
if c.epoll == nil {
325+
if c.netPoll == nil {
321326
go func() {
322327
err := conn.handler.StartBlocking()
323328
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
@@ -332,10 +337,10 @@ func (c *subscriptionClient) asyncSubscribeWS(requestContext, engineContext cont
332337
return err
333338
}
334339

335-
fd := epoller.SocketFD(conn.conn)
340+
fd := netpoll.SocketFD(conn.conn)
336341
conn.id, conn.fd = id, fd
337-
// submit the connection to the epoll run loop
338-
c.epollState.addConn <- conn
342+
// submit the connection to the netPoll run loop
343+
c.netPollState.addConn <- conn
339344
return nil
340345
}
341346

@@ -614,20 +619,20 @@ type connResult struct {
614619
shouldClose bool
615620
}
616621

617-
func (c *subscriptionClient) runEpoll(ctx context.Context) {
622+
func (c *subscriptionClient) runNetPoll(ctx context.Context) {
618623
defer c.close()
619624
done := ctx.Done()
620625
// both handleConnCh and connResults are buffered channels with a size of WaitForNumEvents
621626
// this is important because we submit all events before we start processing them
622627
// and we start evaluating the results only after all events have been submitted
623628
// this would not be possible with unbuffered channels
624-
handleConnCh := make(chan *connection, c.epollConfig.WaitForNumEvents)
625-
connResults := make(chan connResult, c.epollConfig.WaitForNumEvents)
629+
handleConnCh := make(chan *connection, c.netPollConfig.WaitForNumEvents)
630+
connResults := make(chan connResult, c.netPollConfig.WaitForNumEvents)
626631

627632
// Start workers to handle connection events
628633
// MaxEventWorkers defines the parallelism of how many connections can be handled at the same time
629634
// This is the critical number on how much CPU is used
630-
for i := 0; i < c.epollConfig.MaxEventWorkers; i++ {
635+
for i := 0; i < c.netPollConfig.MaxEventWorkers; i++ {
631636
go func() {
632637
for {
633638
select {
@@ -641,34 +646,34 @@ func (c *subscriptionClient) runEpoll(ctx context.Context) {
641646
}()
642647
}
643648

644-
// This is the main epoll run loop
649+
// This is the main netPoll run loop
645650
// It's a single threaded event loop that reacts to several events, such as added connections, clients unsubscribing, etc.
646651
for {
647652
select {
648-
// if the engine context is done, we close the epoll loop
653+
// if the engine context is done, we close the netPoll loop
649654
case <-done:
650655
return
651-
case conn := <-c.epollState.addConn:
656+
case conn := <-c.netPollState.addConn:
652657
c.handleAddConn(conn)
653-
case id := <-c.epollState.clientUnsubscribe:
658+
case id := <-c.netPollState.clientUnsubscribe:
654659
c.handleClientUnsubscribe(id)
655-
// while len(c.connections) == 0, this channel is nil, so we will never try to wait for epoll events
660+
// while len(c.connections) == 0, this channel is nil, so we will never try to wait for netPoll events
656661
// this is important to prevent busy waiting
657662
// once we add the first connection, we start the ticker and set the tick channel
658-
// the ticker ensures that we don't poll the epoll instance all the time,
663+
// the ticker ensures that we don't poll the netPoll instance all the time,
659664
// but at most every TickInterval
660-
case <-c.epollState.waitForEventsTick:
661-
events, err := c.epoll.Wait(c.epollConfig.WaitForNumEvents)
665+
case <-c.netPollState.waitForEventsTick:
666+
events, err := c.netPoll.Wait(c.netPollConfig.WaitForNumEvents)
662667
if err != nil {
663-
c.log.Error("epoll.Wait", abstractlogger.Error(err))
668+
c.log.Error("netPoll.Wait", abstractlogger.Error(err))
664669
continue
665670
}
666671

667672
waitForEvents := len(events)
668673

669674
for i := range events {
670-
fd := epoller.SocketFD(events[i])
671-
conn, ok := c.epollState.connections[fd]
675+
fd := netpoll.SocketFD(events[i])
676+
conn, ok := c.netPollState.connections[fd]
672677
if !ok {
673678
// Should never happen
674679
panic(fmt.Sprintf("connection with fd %d not found", fd))
@@ -696,9 +701,9 @@ func (c *subscriptionClient) runEpoll(ctx context.Context) {
696701
}
697702
// we decrease the number of events we're waiting for to eventually break the loop
698703
waitForEvents--
699-
case conn := <-c.epollState.addConn:
704+
case conn := <-c.netPollState.addConn:
700705
c.handleAddConn(conn)
701-
case id := <-c.epollState.clientUnsubscribe:
706+
case id := <-c.netPollState.clientUnsubscribe:
702707
c.handleClientUnsubscribe(id)
703708
case <-done:
704709
return
@@ -709,74 +714,74 @@ func (c *subscriptionClient) runEpoll(ctx context.Context) {
709714
}
710715

711716
func (c *subscriptionClient) close() {
712-
defer c.log.Debug("subscriptionClient.close", abstractlogger.String("reason", "epoll closed by context"))
713-
if c.epollState.waitForEventsTicker != nil {
714-
c.epollState.waitForEventsTicker.Stop()
717+
defer c.log.Debug("subscriptionClient.close", abstractlogger.String("reason", "netPoll closed by context"))
718+
if c.netPollState.waitForEventsTicker != nil {
719+
c.netPollState.waitForEventsTicker.Stop()
715720
}
716-
for _, conn := range c.epollState.connections {
717-
_ = c.epoll.Remove(conn.conn)
721+
for _, conn := range c.netPollState.connections {
722+
_ = c.netPoll.Remove(conn.conn)
718723
conn.handler.ServerClose()
719724
}
720-
if c.epoll != nil {
721-
err := c.epoll.Close(false)
725+
if c.netPoll != nil {
726+
err := c.netPoll.Close(false)
722727
if err != nil {
723728
c.log.Error("subscriptionClient.close", abstractlogger.Error(err))
724729
}
725730
}
726731
}
727732

728733
func (c *subscriptionClient) handleAddConn(conn *connection) {
729-
if err := c.epoll.Add(conn.conn); err != nil {
734+
if err := c.netPoll.Add(conn.conn); err != nil {
730735
c.log.Error("subscriptionClient.handleAddConn", abstractlogger.Error(err))
731736
conn.handler.ServerClose()
732737
return
733738
}
734-
c.epollState.connections[conn.fd] = conn
735-
c.epollState.triggers[conn.id] = conn.fd
739+
c.netPollState.connections[conn.fd] = conn
740+
c.netPollState.triggers[conn.id] = conn.fd
736741
// when we previously had 0 connections, we will have 1 connection now
737-
// this means we need to start the ticker so that we get epoll events
738-
if len(c.epollState.connections) == 1 {
739-
c.epollState.waitForEventsTicker = time.NewTicker(c.epollConfig.TickInterval)
740-
c.epollState.waitForEventsTick = c.epollState.waitForEventsTicker.C
741-
c.epollState.hasConnections.Store(true)
742+
// this means we need to start the ticker so that we get netPoll events
743+
if len(c.netPollState.connections) == 1 {
744+
c.netPollState.waitForEventsTicker = time.NewTicker(c.netPollConfig.TickInterval)
745+
c.netPollState.waitForEventsTick = c.netPollState.waitForEventsTicker.C
746+
c.netPollState.hasConnections.Store(true)
742747
}
743748
}
744749

745750
func (c *subscriptionClient) handleClientUnsubscribe(id uint64) {
746-
fd, ok := c.epollState.triggers[id]
751+
fd, ok := c.netPollState.triggers[id]
747752
if !ok {
748753
return
749754
}
750-
delete(c.epollState.triggers, id)
751-
conn, ok := c.epollState.connections[fd]
755+
delete(c.netPollState.triggers, id)
756+
conn, ok := c.netPollState.connections[fd]
752757
if !ok {
753758
return
754759
}
755-
delete(c.epollState.connections, fd)
756-
_ = c.epoll.Remove(conn.conn)
760+
delete(c.netPollState.connections, fd)
761+
_ = c.netPoll.Remove(conn.conn)
757762
conn.handler.ClientClose()
758763
// if we have no connections left, we stop the ticker
759-
if len(c.epollState.connections) == 0 {
760-
c.epollState.waitForEventsTicker.Stop()
761-
c.epollState.waitForEventsTick = nil
762-
c.epollState.hasConnections.Store(false)
764+
if len(c.netPollState.connections) == 0 {
765+
c.netPollState.waitForEventsTicker.Stop()
766+
c.netPollState.waitForEventsTick = nil
767+
c.netPollState.hasConnections.Store(false)
763768
}
764769
}
765770

766771
func (c *subscriptionClient) handleServerUnsubscribe(fd int) {
767-
conn, ok := c.epollState.connections[fd]
772+
conn, ok := c.netPollState.connections[fd]
768773
if !ok {
769774
return
770775
}
771-
delete(c.epollState.connections, fd)
772-
delete(c.epollState.triggers, conn.id)
773-
_ = c.epoll.Remove(conn.conn)
776+
delete(c.netPollState.connections, fd)
777+
delete(c.netPollState.triggers, conn.id)
778+
_ = c.netPoll.Remove(conn.conn)
774779
conn.handler.ServerClose()
775780
// if we have no connections left, we stop the ticker
776-
if len(c.epollState.connections) == 0 {
777-
c.epollState.waitForEventsTicker.Stop()
778-
c.epollState.waitForEventsTick = nil
779-
c.epollState.hasConnections.Store(false)
781+
if len(c.netPollState.connections) == 0 {
782+
c.netPollState.waitForEventsTicker.Stop()
783+
c.netPollState.waitForEventsTick = nil
784+
c.netPollState.hasConnections.Store(false)
780785
}
781786
}
782787

0 commit comments

Comments
 (0)