Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Fix access connection tests for latest version of testing library #6917

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 20 additions & 47 deletions engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestProxyAccessAPI(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -75,15 +75,15 @@ func TestProxyAccessAPI(t *testing.T) {
// make the call to the collection node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPI(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -124,15 +124,15 @@ func TestProxyExecutionAPI(t *testing.T) {
// make the call to the execution node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyAccessAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -186,15 +186,15 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -248,7 +248,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := executionAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientTimeout tests that the execution API client times out after the timeout duration
Expand All @@ -259,7 +259,7 @@ func TestExecutionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -371,31 +371,14 @@ func TestConnectionPoolFull(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn1, cn2, cn3 := new(collectionNode), new(collectionNode), new(collectionNode)
cn1, cn2, cn3 := newCollectionNode(t), newCollectionNode(t), newCollectionNode(t)
cn1.start(t)
cn2.start(t)
cn3.start(t)
defer cn1.stop(t)
defer cn2.stop(t)
defer cn3.stop(t)

expected := &access.PingResponse{}
cn1.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn2.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn3.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)

// create the factory
connectionFactory := new(ConnectionFactoryImpl)
// set the collection grpc port
Expand Down Expand Up @@ -467,7 +450,7 @@ func TestConnectionPoolStale(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -534,7 +517,7 @@ func TestConnectionPoolStale(t *testing.T) {
ctx = context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientClosedGracefully tests the scenario where the execution node client is closed gracefully.
Expand All @@ -550,7 +533,7 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -650,7 +633,7 @@ func TestEvictingCacheClients(t *testing.T) {
metrics := metrics.NewNoopCollector()

// Create a new collection node for testing
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand All @@ -674,10 +657,6 @@ func TestEvictingCacheClients(t *testing.T) {
func(context.Context, *access.PingRequest) error { return nil },
)

netReq := &access.GetNetworkParametersRequest{}
netResp := &access.GetNetworkParametersResponse{}
cn.handler.On("GetNetworkParameters", testifymock.Anything, netReq).Return(netResp, nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not used and isn't needed for the test


// Create the connection factory
connectionFactory := new(ConnectionFactoryImpl)
// Set the gRPC port
Expand Down Expand Up @@ -740,7 +719,7 @@ func TestEvictingCacheClients(t *testing.T) {
}, 100*time.Millisecond, 10*time.Millisecond, "client timed out closing connection")

// Call a gRPC method on the client, requests should be blocked since the connection is invalidated
resp, err := client.GetNetworkParameters(ctx, netReq)
resp, err := client.GetNetworkParameters(ctx, &access.GetNetworkParametersRequest{})
assert.Equal(t, status.Errorf(codes.Unavailable, "the connection to %s was closed", clientAddress), err)
assert.Nil(t, resp)

Expand All @@ -749,9 +728,7 @@ func TestEvictingCacheClients(t *testing.T) {

// Call a gRPC method on the client
_, err = client.Ping(ctx, pingReq)
// Check that Ping was called
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, pingReq)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these asserts are called automatically now for the collection and execution mock clients

assert.NoError(t, err)
require.NoError(t, err)

// Wait for the client connection to change state from "Ready" to "Shutdown" as connection was closed.
require.Eventually(t, func() bool {
Expand All @@ -770,7 +747,7 @@ func TestConcurrentConnections(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -886,7 +863,7 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create an execution node for testing.
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -934,8 +911,6 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {

// Make the call to the execution node.
_, err = client.Ping(ctx, req)
en.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down Expand Up @@ -1005,7 +980,7 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create a collection node for testing.
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -1053,8 +1028,6 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {

// Make the call to the collection node.
_, err = client.Ping(ctx, req)
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func BenchmarkWithDeflateCompression(b *testing.B) {
// runBenchmark is a helper function that performs the benchmarking for different compressors.
func runBenchmark(b *testing.B, compressorName string) {
// create an execution node
en := new(executionNode)
en := newExecutionNode(b)
en.start(b)
defer en.stop(b)

Expand Down
38 changes: 27 additions & 11 deletions engine/access/rpc/connection/node_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ type executionNode struct {
handler *mock.ExecutionAPIServer
}

func newExecutionNode(tb testing.TB) *executionNode {
return &executionNode{
handler: mock.NewExecutionAPIServer(tb),
}
}

func (en *executionNode) start(tb testing.TB) {
if en.handler == nil {
tb.Fatalf("executionNode must be initialized using newExecutionNode")
}

en.setupNode(tb)
handler := new(mock.ExecutionAPIServer)
execution.RegisterExecutionAPIServer(en.server, handler)
en.handler = handler
execution.RegisterExecutionAPIServer(en.server, en.handler)
en.node.start(tb)
}

Expand All @@ -81,14 +89,22 @@ type collectionNode struct {
handler *mock.AccessAPIServer
}

func (cn *collectionNode) start(t *testing.T) {
cn.setupNode(t)
handler := new(mock.AccessAPIServer)
access.RegisterAccessAPIServer(cn.server, handler)
cn.handler = handler
cn.node.start(t)
func newCollectionNode(tb testing.TB) *collectionNode {
return &collectionNode{
handler: mock.NewAccessAPIServer(tb),
}
}

func (cn *collectionNode) start(tb testing.TB) {
if cn.handler == nil {
tb.Fatalf("collectionNode must be initialized using newCollectionNode")
}

cn.setupNode(tb)
access.RegisterAccessAPIServer(cn.server, cn.handler)
cn.node.start(tb)
}

func (cn *collectionNode) stop(t *testing.T) {
cn.node.stop(t)
func (cn *collectionNode) stop(tb testing.TB) {
cn.node.stop(tb)
}
Loading