diff --git a/fvt/conn_test.go b/fvt/conn_test.go index bcabba880..7d2bd9230 100644 --- a/fvt/conn_test.go +++ b/fvt/conn_test.go @@ -286,7 +286,7 @@ func (s *ConnectionTestSuite) TestSourcePing() { props: map[string]any{ "url": "tcp://127.0.0.1:7081", }, - err: "{\"error\":1000,\"message\":\"not connected\"}\n", + err: "{\"error\":1000,\"message\":\"source neuron doesn't support ping connection\"}\n", }, { name: "file", @@ -430,7 +430,7 @@ func (s *ConnectionTestSuite) TestSinkPing() { props: map[string]any{ "url": "tcp://127.0.0.1:7081", }, - err: "{\"error\":1000,\"message\":\"not connected\"}\n", + err: "{\"error\":1000,\"message\":\"sink neuron doesn't support ping connection\"}\n", }, { name: "file", diff --git a/internal/io/neuron/connector.go b/internal/io/neuron/connector.go deleted file mode 100644 index a3b37fed2..000000000 --- a/internal/io/neuron/connector.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package neuron - -import ( - "time" - - "github.com/lf-edge/ekuiper/contract/v2/api" - - "github.com/lf-edge/ekuiper/v2/pkg/nng" -) - -func ping(ctx api.StreamContext, props map[string]any) error { - props["protocol"] = PROTOCOL - cli := nng.CreateConnection(ctx) - err := cli.Provision(ctx, "test", props) - if err != nil { - return err - } - err = cli.Dial(ctx) - if err != nil { - return err - } - defer cli.Close(ctx) - time.Sleep(1000 * time.Millisecond) - return cli.Ping(ctx) -} diff --git a/internal/io/neuron/sink.go b/internal/io/neuron/sink.go index 294e5229a..f4e4adcc8 100644 --- a/internal/io/neuron/sink.go +++ b/internal/io/neuron/sink.go @@ -22,7 +22,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/conf" - "github.com/lf-edge/ekuiper/v2/internal/pkg/util" "github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/connection" @@ -80,11 +79,6 @@ func (s *sink) Provision(_ api.StreamContext, props map[string]any) error { return nil } -func (s *sink) Ping(ctx api.StreamContext, props map[string]any) error { - props["protocol"] = PROTOCOL - return ping(ctx, props) -} - func (s *sink) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error { ctx.GetLogger().Infof("Connecting to neuron") cw, err := connection.FetchConnection(ctx, PROTOCOL+s.cc.Url, "nng", s.props, sc) @@ -240,5 +234,3 @@ func extractSpanContextIntoData(ctx api.StreamContext, data any, sendBytes []byt func GetSink() api.Sink { return &sink{} } - -var _ util.PingableConn = &sink{} diff --git a/internal/io/neuron/sink_test.go b/internal/io/neuron/sink_test.go index 9cc39285d..440a7a94d 100644 --- a/internal/io/neuron/sink_test.go +++ b/internal/io/neuron/sink_test.go @@ -35,9 +35,6 @@ func TestSink(t *testing.T) { defer server.Close() s := GetSink().(api.TupleCollector) - ctx := mockContext.NewMockContext("t", "tt") - err := s.(*sink).Ping(ctx, map[string]any{"url": DefaultNeuronUrl}) - assert.NoError(t, err) data := []any{ &xsql.Tuple{ Message: map[string]any{ @@ -69,7 +66,7 @@ func TestSink(t *testing.T) { }, }, } - err = mock.RunTupleSinkCollect(s, data, map[string]any{ + err := mock.RunTupleSinkCollect(s, data, map[string]any{ "url": DefaultNeuronUrl, "nodeName": "test1", "groupName": "grp", @@ -106,9 +103,6 @@ func TestSinkNoTags(t *testing.T) { defer server.Close() s := GetSink().(api.TupleCollector) - ctx := mockContext.NewMockContext("t", "tt") - err := s.(*sink).Ping(ctx, map[string]any{"url": DefaultNeuronUrl}) - assert.NoError(t, err) data := []any{ &xsql.Tuple{ Message: map[string]any{ @@ -122,7 +116,7 @@ func TestSinkNoTags(t *testing.T) { }, }, } - err = mock.RunTupleSinkCollect(s, data, map[string]any{ + err := mock.RunTupleSinkCollect(s, data, map[string]any{ "url": DefaultNeuronUrl, "nodeName": "test1", "groupName": "grp", diff --git a/internal/io/neuron/source.go b/internal/io/neuron/source.go index 49c354402..59c0d9c4f 100644 --- a/internal/io/neuron/source.go +++ b/internal/io/neuron/source.go @@ -23,7 +23,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "go.nanomsg.org/mangos/v3" - "github.com/lf-edge/ekuiper/v2/internal/pkg/util" "github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode" "github.com/lf-edge/ekuiper/v2/pkg/connection" "github.com/lf-edge/ekuiper/v2/pkg/infra" @@ -63,11 +62,6 @@ func (s *source) Provision(_ api.StreamContext, props map[string]any) error { return nil } -func (s *source) Ping(ctx api.StreamContext, props map[string]any) error { - props["protocol"] = PROTOCOL - return ping(ctx, props) -} - func (s *source) ConnId(props map[string]any) string { var url string u, ok := props["url"] @@ -152,5 +146,3 @@ func extractTraceMeta(ctx api.StreamContext, data []byte) ([]byte, map[string]in } return rawData, meta } - -var _ util.PingableConn = &source{} diff --git a/internal/io/neuron/source_test.go b/internal/io/neuron/source_test.go index 0ffe06cc5..3c71317da 100644 --- a/internal/io/neuron/source_test.go +++ b/internal/io/neuron/source_test.go @@ -43,10 +43,6 @@ func TestRun(t *testing.T) { }, exp, func() { // do nothing }) - - ctx := mockContext.NewMockContext("t", "tt") - err := s.(*source).Ping(ctx, map[string]any{"url": DefaultNeuronUrl}) - assert.NoError(t, err) } func TestProvision(t *testing.T) {