From cea1d9dc64099e9b68cf5fdd6ea2df90477e18fc Mon Sep 17 00:00:00 2001 From: tshinde-splunk <42387242+tshinde-splunk@users.noreply.github.com> Date: Wed, 7 Dec 2022 10:59:12 -0800 Subject: [PATCH] Change Attach signature to return ConnContext func (#153) Resolves https://github.com/open-telemetry/opamp-go/issues/152 Make sure the library-provided ConnContext function is used by the caller when they setup the HTTP Server to Attach() to, so that the context correctly stores a references to the connection. This is an internal requirement for plain http server implementation. --- server/server.go | 16 +++++--- server/serverimpl.go | 6 +-- server/serverimpl_test.go | 84 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/server/server.go b/server/server.go index d28f9b67..44033047 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "crypto/tls" + "net" "net/http" "github.com/open-telemetry/opamp-go/server/types" @@ -34,18 +35,21 @@ type StartSettings struct { type HTTPHandlerFunc func(http.ResponseWriter, *http.Request) +type ConnContext func(ctx context.Context, c net.Conn) context.Context + type OpAMPServer interface { // Attach prepares the OpAMP Server to begin handling requests from an existing - // http.Server. The returned HTTPHandlerFunc should be added as a handler to the - // desired http.Server by the caller and the http.Server should be started by - // the caller after that. + // http.Server. The returned HTTPHandlerFunc and ConnContext should be added as a + // handler and ConnContext respectively to the desired http.Server by the caller + // and the http.Server should be started by the caller after that. The ConnContext + // is only used for plain http connections. // For example: - // handler, _ := Server.Attach() + // handler, connContext, _ := Server.Attach() // mux := http.NewServeMux() // mux.HandleFunc("/opamp", handler) - // httpSrv := &http.Server{Handler:mux,Addr:"127.0.0.1:4320"} + // httpSrv := &http.Server{Handler:mux,Addr:"127.0.0.1:4320", ConnContext: connContext} // httpSrv.ListenAndServe() - Attach(settings Settings) (HTTPHandlerFunc, error) + Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error) // Start an OpAMP Server and begin accepting connections. Starts its own http.Server // using provided settings. This should block until the http.Server is ready to diff --git a/server/serverimpl.go b/server/serverimpl.go index 88264064..9758acda 100644 --- a/server/serverimpl.go +++ b/server/serverimpl.go @@ -50,12 +50,12 @@ func New(logger types.Logger) *server { return &server{logger: logger} } -func (s *server) Attach(settings Settings) (HTTPHandlerFunc, error) { +func (s *server) Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error) { s.settings = settings s.wsUpgrader = websocket.Upgrader{ EnableCompression: settings.EnableCompression, } - return s.httpHandler, nil + return s.httpHandler, contextWithConn, nil } func (s *server) Start(settings StartSettings) error { @@ -63,7 +63,7 @@ func (s *server) Start(settings StartSettings) error { return errAlreadyStarted } - _, err := s.Attach(settings.Settings) + _, _, err := s.Attach(settings.Settings) if err != nil { return err } diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index 3b0749fb..129518e6 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -422,7 +422,7 @@ func TestServerAttachAcceptConnection(t *testing.T) { settings := Settings{Callbacks: callbacks} srv := New(&sharedinternal.NopLogger{}) require.NotNil(t, srv) - handlerFunc, err := srv.Attach(settings) + handlerFunc, _, err := srv.Attach(settings) require.NoError(t, err) // Create an HTTP Server and make it handle OpAMP connections. @@ -447,6 +447,88 @@ func TestServerAttachAcceptConnection(t *testing.T) { eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 }) } +func TestServerAttachSendMessagePlainHTTP(t *testing.T) { + connectedCalled := int32(0) + connectionCloseCalled := int32(0) + var rcvMsg atomic.Value + + var srvConn types.Connection + callbacks := CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) types.ConnectionResponse { + return types.ConnectionResponse{Accept: true} + }, + OnConnectedFunc: func(conn types.Connection) { + atomic.StoreInt32(&connectedCalled, 1) + srvConn = conn + }, + OnMessageFunc: func(conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + // Remember received message. + rcvMsg.Store(message) + + // Send a response. + response := protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + Capabilities: uint64(protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus), + } + return &response + }, + OnConnectionCloseFunc: func(conn types.Connection) { + atomic.StoreInt32(&connectionCloseCalled, 1) + assert.EqualValues(t, srvConn, conn) + }, + } + + // Prepare to attach OpAMP Server to an HTTP Server created separately. + settings := Settings{Callbacks: callbacks} + srv := New(&sharedinternal.NopLogger{}) + require.NotNil(t, srv) + handlerFunc, ContextWithConn, err := srv.Attach(settings) + require.NoError(t, err) + + // Create an HTTP Server and make it handle OpAMP connections. + mux := http.NewServeMux() + path := "/opamppath" + mux.HandleFunc(path, handlerFunc) + hs := httptest.NewUnstartedServer(mux) + hs.Config.ConnContext = ContextWithConn + hs.Start() + defer hs.Close() + + // Send a message to the Server. + sendMsg := protobufs.AgentToServer{ + InstanceUid: "12345678", + } + b, err := proto.Marshal(&sendMsg) + require.NoError(t, err) + resp, err := http.Post("http://"+hs.Listener.Addr().String()+path, contentTypeProtobuf, bytes.NewReader(b)) + require.NoError(t, err) + + // Wait until Server receives the message. + eventually(t, func() bool { return rcvMsg.Load() != nil }) + assert.True(t, atomic.LoadInt32(&connectedCalled) == 1) + + // Verify the received message is what was sent. + assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg)) + + // Read Server's response. + b, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + assert.EqualValues(t, http.StatusOK, resp.StatusCode) + assert.EqualValues(t, contentTypeProtobuf, resp.Header.Get(headerContentType)) + + // Decode the response. + var response protobufs.ServerToAgent + err = proto.Unmarshal(b, &response) + require.NoError(t, err) + + // Verify the response. + assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid) + assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities) + + eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 }) +} + func TestServerHonoursClientRequestContentEncoding(t *testing.T) { hc := http.Client{}