Skip to content

Commit

Permalink
fix(neuron): remove ping (#3321)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Oct 22, 2024
1 parent 4757a49 commit cc74d47
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 69 deletions.
4 changes: 2 additions & 2 deletions fvt/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (s *ConnectionTestSuite) TestSourcePing() {
props: map[string]any{
"url": "tcp://127.0.0.1:7081",
},
err: "{\"error\":1000,\"message\":\"not connected\"}\n",
err: "{\"error\":1000,\"message\":\"source neuron doesn't support ping connection\"}\n",
},
{
name: "file",
Expand Down Expand Up @@ -430,7 +430,7 @@ func (s *ConnectionTestSuite) TestSinkPing() {
props: map[string]any{
"url": "tcp://127.0.0.1:7081",
},
err: "{\"error\":1000,\"message\":\"not connected\"}\n",
err: "{\"error\":1000,\"message\":\"sink neuron doesn't support ping connection\"}\n",
},
{
name: "file",
Expand Down
39 changes: 0 additions & 39 deletions internal/io/neuron/connector.go

This file was deleted.

8 changes: 0 additions & 8 deletions internal/io/neuron/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
Expand Down Expand Up @@ -80,11 +79,6 @@ func (s *sink) Provision(_ api.StreamContext, props map[string]any) error {
return nil
}

func (s *sink) Ping(ctx api.StreamContext, props map[string]any) error {
props["protocol"] = PROTOCOL
return ping(ctx, props)
}

func (s *sink) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error {
ctx.GetLogger().Infof("Connecting to neuron")
cw, err := connection.FetchConnection(ctx, PROTOCOL+s.cc.Url, "nng", s.props, sc)
Expand Down Expand Up @@ -240,5 +234,3 @@ func extractSpanContextIntoData(ctx api.StreamContext, data any, sendBytes []byt
func GetSink() api.Sink {
return &sink{}
}

var _ util.PingableConn = &sink{}
10 changes: 2 additions & 8 deletions internal/io/neuron/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ func TestSink(t *testing.T) {
defer server.Close()

s := GetSink().(api.TupleCollector)
ctx := mockContext.NewMockContext("t", "tt")
err := s.(*sink).Ping(ctx, map[string]any{"url": DefaultNeuronUrl})
assert.NoError(t, err)
data := []any{
&xsql.Tuple{
Message: map[string]any{
Expand Down Expand Up @@ -69,7 +66,7 @@ func TestSink(t *testing.T) {
},
},
}
err = mock.RunTupleSinkCollect(s, data, map[string]any{
err := mock.RunTupleSinkCollect(s, data, map[string]any{
"url": DefaultNeuronUrl,
"nodeName": "test1",
"groupName": "grp",
Expand Down Expand Up @@ -106,9 +103,6 @@ func TestSinkNoTags(t *testing.T) {
defer server.Close()

s := GetSink().(api.TupleCollector)
ctx := mockContext.NewMockContext("t", "tt")
err := s.(*sink).Ping(ctx, map[string]any{"url": DefaultNeuronUrl})
assert.NoError(t, err)
data := []any{
&xsql.Tuple{
Message: map[string]any{
Expand All @@ -122,7 +116,7 @@ func TestSinkNoTags(t *testing.T) {
},
},
}
err = mock.RunTupleSinkCollect(s, data, map[string]any{
err := mock.RunTupleSinkCollect(s, data, map[string]any{
"url": DefaultNeuronUrl,
"nodeName": "test1",
"groupName": "grp",
Expand Down
8 changes: 0 additions & 8 deletions internal/io/neuron/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"go.nanomsg.org/mangos/v3"

"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
Expand Down Expand Up @@ -63,11 +62,6 @@ func (s *source) Provision(_ api.StreamContext, props map[string]any) error {
return nil
}

func (s *source) Ping(ctx api.StreamContext, props map[string]any) error {
props["protocol"] = PROTOCOL
return ping(ctx, props)
}

func (s *source) ConnId(props map[string]any) string {
var url string
u, ok := props["url"]
Expand Down Expand Up @@ -152,5 +146,3 @@ func extractTraceMeta(ctx api.StreamContext, data []byte) ([]byte, map[string]in
}
return rawData, meta
}

var _ util.PingableConn = &source{}
4 changes: 0 additions & 4 deletions internal/io/neuron/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TestRun(t *testing.T) {
}, exp, func() {
// do nothing
})

ctx := mockContext.NewMockContext("t", "tt")
err := s.(*source).Ping(ctx, map[string]any{"url": DefaultNeuronUrl})
assert.NoError(t, err)
}

func TestProvision(t *testing.T) {
Expand Down

0 comments on commit cc74d47

Please sign in to comment.