Skip to content

Commit

Permalink
Data race fix in client (#502)
Browse files Browse the repository at this point in the history
* First data race fixed

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fixed all data races

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fixed all port options things

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Enabled race in integration-test.sh

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* staticcheck stuff

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Disabled pubsub test

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored May 13, 2020
1 parent 5f32742 commit ca525ce
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 82 deletions.
2 changes: 1 addition & 1 deletion hack/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fi
COVERPKG=$(go list ./... | grep -v /vendor | tr "\n" ",")
for gomodule in $(go list ./test/integration/... | grep -v /cmd | grep -v /vendor)
do
go test -v -parallel 1 -timeout 60s -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
go test -v -parallel 1 -timeout 60s -race -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
tail -n +2 coverage.tmp >> ./coverage.txt
done
rm coverage.tmp
Expand Down
2 changes: 1 addition & 1 deletion hack/unit-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ echo 'mode: atomic' > ./coverage.txt
COVERPKG=$(go list ./... | grep -v /vendor | grep -v /test | tr "\n" ",")
for gomodule in $(go list ./... | grep -v /cmd | grep -v /vendor | grep -v /test)
do
go test -v -timeout 15s -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
go test -v -timeout 15s -race -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
tail -n +2 coverage.tmp >> ./coverage.txt
done
rm coverage.tmp
Expand Down
7 changes: 4 additions & 3 deletions v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,17 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
}()
}

var msg binding.Message
var respFn protocol.ResponseFn

// Start Polling.
wg := sync.WaitGroup{}
for i := 0; i < c.pollGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
var msg binding.Message
var respFn protocol.ResponseFn
var err error

if c.responder != nil {
msg, respFn, err = c.responder.Respond(ctx)
} else if c.receiver != nil {
Expand Down
10 changes: 5 additions & 5 deletions v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func TestClientReceive(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
go func() {
err = c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error {
err := c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error {
go func() {
events <- event
}()
Expand All @@ -409,7 +409,7 @@ func TestClientReceive(t *testing.T) {
}()
time.Sleep(5 * time.Millisecond) // let the server start

target, _ := url.Parse(fmt.Sprintf("http://localhost:%d%s", p.GetPort(), p.GetPath()))
target, _ := url.Parse(fmt.Sprintf("http://localhost:%d%s", p.GetListeningPort(), p.GetPath()))

if tc.wantErr != "" {
if err == nil {
Expand All @@ -436,7 +436,7 @@ func TestClientReceive(t *testing.T) {
ContentLength: int64(len([]byte(tc.req.Body))),
}

_, err = http.DefaultClient.Do(req)
_, _ = http.DefaultClient.Do(req)

//// Make a copy of the request.
//body, err := ioutil.ReadAll(resp.Body)
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestClientReceive(t *testing.T) {

// try the request again, expecting an error:

if _, err = http.DefaultClient.Do(req); err == nil {
if _, err := http.DefaultClient.Do(req); err == nil {
t.Fatalf("expected error to when sending request to stopped client")
}
})
Expand Down Expand Up @@ -529,7 +529,7 @@ func TestTracedClientReceive(t *testing.T) {
}()
time.Sleep(5 * time.Millisecond) // let the server start

target := fmt.Sprintf("http://localhost:%d", p.GetPort())
target := fmt.Sprintf("http://localhost:%d", p.GetListeningPort())
sender := simpleTracingBinaryClient(target)

ctx, span := trace.StartSpan(context.TODO(), "test-span")
Expand Down
2 changes: 0 additions & 2 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ require (
go.opencensus.io v0.22.0
go.uber.org/zap v1.10.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/api v0.15.0
google.golang.org/grpc v1.26.0
)

go 1.13
17 changes: 7 additions & 10 deletions v2/protocol/http/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ func WithShutdownTimeout(timeout time.Duration) Option {

func checkListen(t *Protocol, prefix string) error {
switch {
case t.Port != nil:
return fmt.Errorf("%v port already set", prefix)
case t.listener != nil:
return fmt.Errorf("%v listener already set", prefix)
case t.listener.Load() != nil:
return fmt.Errorf("error setting %v: listener already set", prefix)
}
return nil
}

// WithPort sets the listening port for StartReceiver.
// Only one of WithListener or WithPort is allowed.
// Only one of WithListener or WithPort is allowed.
func WithPort(port int) Option {
return func(t *Protocol) error {
if t == nil {
Expand All @@ -101,7 +99,7 @@ func WithPort(port int) Option {
if err := checkListen(t, "http port option"); err != nil {
return err
}
t.setPort(port)
t.Port = port
return nil
}
}
Expand All @@ -113,12 +111,11 @@ func WithListener(l net.Listener) Option {
if t == nil {
return fmt.Errorf("http listener option can not set nil protocol")
}
if err := checkListen(t, "http port option"); err != nil {
if err := checkListen(t, "http listener"); err != nil {
return err
}
t.listener = l
_, err := t.listen()
return err
t.listener.Store(l)
return nil
}
}

Expand Down
52 changes: 24 additions & 28 deletions v2/protocol/http/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"net"
"net/http"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
)

func TestWithTarget(t *testing.T) {
Expand Down Expand Up @@ -308,10 +310,6 @@ func TestWithShutdownTimeout(t *testing.T) {
}
}

func intptr(i int) *int {
return &i
}

func TestWithPort(t *testing.T) {
testCases := map[string]struct {
t *Protocol
Expand All @@ -323,7 +321,7 @@ func TestWithPort(t *testing.T) {
t: &Protocol{},
port: 8181,
want: &Protocol{
Port: intptr(8181),
Port: 8181,
},
},
"invalid port, low": {
Expand All @@ -336,22 +334,17 @@ func TestWithPort(t *testing.T) {
port: 65536,
wantErr: `http port option was given an invalid port: 65536`,
},
"port already set": {
t: &Protocol{
Port: intptr(8080),
},
port: 8181,
wantErr: `http port option port already set`,
},
"listener already set": {
t: &Protocol{
listener: func() net.Listener {
listener: func() atomic.Value {
l, _ := net.Listen("tcp", ":0")
return l
v := atomic.Value{}
v.Store(l)
return v
}(),
},
port: 8181,
wantErr: `http port option listener already set`,
wantErr: `error setting http port option: listener already set`,
},
"nil protocol": {
wantErr: `http port option can not set nil protocol`,
Expand Down Expand Up @@ -393,7 +386,7 @@ func forceClose(tr *Protocol) {
func TestWithPort0(t *testing.T) {
testCases := map[string]func() (*Protocol, error){
"WithPort0": func() (*Protocol, error) { return New(WithPort(0)) },
"SetPort0": func() (*Protocol, error) { return &Protocol{Port: new(int)}, nil },
"SetPort0": func() (*Protocol, error) { return &Protocol{Port: 0}, nil },
}
for name, f := range testCases {
t.Run(name, func(t *testing.T) {
Expand All @@ -402,13 +395,14 @@ func TestWithPort0(t *testing.T) {
t.Fatal(err)
}
defer func() { forceClose(tr) }()
port := tr.GetPort()
if port <= 0 {
t.Error("no dynamic port")
}
if d := cmp.Diff(port, *tr.Port); d != "" {
t.Error(d)
}
// Start listening
listener, err := tr.listen()
require.NoError(t, err)
require.NotNil(t, listener)

// Check the listening port is correct
port := tr.GetListeningPort()
require.Greater(t, port, 0)
})
}
}
Expand All @@ -423,7 +417,7 @@ func TestWithListener_forcecloser(t *testing.T) {
if err != nil {
t.Fatal(err)
}
port := tr.GetPort()
port := tr.GetListeningPort()
if port <= 0 {
t.Error("no dynamic port")
}
Expand All @@ -446,21 +440,23 @@ func TestWithListener(t *testing.T) {
return l
}(),
want: &Protocol{
Port: intptr(0),
Port: 0,
},
},
"listener already set": {
t: &Protocol{
listener: func() net.Listener {
listener: func() atomic.Value {
l, _ := net.Listen("tcp", ":0")
return l
v := atomic.Value{}
v.Store(l)
return v
}(),
},
listener: func() net.Listener {
l, _ := net.Listen("tcp", ":0")
return l
}(),
wantErr: `http port option listener already set`,
wantErr: `error setting http listener: listener already set`,
},
"nil protocol": {
wantErr: `http listener option can not set nil protocol`,
Expand Down
13 changes: 8 additions & 5 deletions v2/protocol/http/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -41,17 +41,19 @@ type Protocol struct {
// If nil, DefaultShutdownTimeout is used.
ShutdownTimeout time.Duration

// Port is the port to bind the receiver to. Defaults to 8080.
Port *int
// Port is the port configured to bind the receiver to. Defaults to 8080.
// If you want to know the effective port you're listening to, use GetListeningPort()
Port int
// Path is the path to bind the receiver to. Defaults to "/".
Path string

// Receive Mutex
reMu sync.Mutex
// Handler is the handler the http Server will use. Use this to reuse the
// http server. If nil, the Protocol will create a one.
Handler *http.ServeMux
listener net.Listener
Handler *http.ServeMux

listener atomic.Value
roundTripper http.RoundTripper
server *http.Server
handlerRegistered bool
Expand All @@ -61,6 +63,7 @@ type Protocol struct {
func New(opts ...Option) (*Protocol, error) {
p := &Protocol{
incoming: make(chan msgErr),
Port: -1,
}
if err := p.applyOptions(opts...); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ca525ce

Please sign in to comment.