From ec20a4f9e545cba25355ac4388c1e6d420319837 Mon Sep 17 00:00:00 2001 From: kk Date: Wed, 14 May 2025 16:55:55 +0800 Subject: [PATCH 1/4] Provide a method that can handle messages --- example/1.6/cs/central_system_sim.go | 5 +++++ ocpp1.6/central_system.go | 5 +++++ ocpp1.6/v16.go | 5 ++++- ocppj/server.go | 23 ++++++++++++++++++++++- 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/example/1.6/cs/central_system_sim.go b/example/1.6/cs/central_system_sim.go index 8eeee105..38c423dc 100644 --- a/example/1.6/cs/central_system_sim.go +++ b/example/1.6/cs/central_system_sim.go @@ -3,6 +3,7 @@ package main import ( "crypto/tls" "crypto/x509" + "fmt" "os" "strconv" "time" @@ -231,6 +232,10 @@ func main() { log.WithField("client", chargePoint.ID()).Info("charge point disconnected") delete(handler.chargePoints, chargePoint.ID()) }) + // set message hook + centralSystem.SetMessageHooks(func(direction, chargePointID, msgType string, payload []byte) { + fmt.Printf("direction:%s \n cpid:%s \n msgType: %s \n payload: %s\n", direction, chargePointID, msgType, string(payload)) + }) ocppj.SetLogger(log.WithField("logger", "ocppj")) ws.SetLogger(log.WithField("logger", "websocket")) // Run central system diff --git a/ocpp1.6/central_system.go b/ocpp1.6/central_system.go index 6dbfac4a..7f6f83e9 100644 --- a/ocpp1.6/central_system.go +++ b/ocpp1.6/central_system.go @@ -724,3 +724,8 @@ func (cs *centralSystem) handleCanceledRequest(chargePointID string, request ocp cs.error(err) } } + +// SetMessageHooks sets the hooks for logging incoming and outgoing messages. +func (cs *centralSystem) SetMessageHooks(message func(direction, clientId, messageType string, payload []byte)) { + cs.server.SetMessageHooks(message) +} diff --git a/ocpp1.6/v16.go b/ocpp1.6/v16.go index 4fada30f..7209872d 100644 --- a/ocpp1.6/v16.go +++ b/ocpp1.6/v16.go @@ -325,6 +325,9 @@ type CentralSystem interface { Stop() // Errors returns a channel for error messages. If it doesn't exist it es created. Errors() <-chan error + + // SetMessageHooks sets a function that will be called whenever a message is received or sent. + SetMessageHooks(logger func(direction, chargePointID, messageType string, payload []byte)) } // Creates a new OCPP 1.6 central system. @@ -345,7 +348,7 @@ func NewCentralSystem(endpoint *ocppj.Server, server ws.Server) CentralSystem { } server.AddSupportedSubprotocol(types.V16Subprotocol) if endpoint == nil { - endpoint = ocppj.NewServer(server, nil, nil, + endpoint = ocppj.NewServer(server, nil, nil, nil, core.Profile, localauth.Profile, firmware.Profile, diff --git a/ocppj/server.go b/ocppj/server.go index 2b0365da..f6081b8f 100644 --- a/ocppj/server.go +++ b/ocppj/server.go @@ -24,6 +24,7 @@ type Server struct { invalidMessageHook InvalidMessageHook dispatcher ServerDispatcher RequestState ServerState + MessageHooks MessageHooks } type ClientHandler func(client ws.Channel) @@ -31,6 +32,7 @@ type RequestHandler func(client ws.Channel, request ocpp.Request, requestId stri type ResponseHandler func(client ws.Channel, response ocpp.Response, requestId string) type ErrorHandler func(client ws.Channel, err *ocpp.Error, details interface{}) type InvalidMessageHook func(client ws.Channel, err *ocpp.Error, rawJson string, parsedFields []interface{}) *ocpp.Error +type MessageHooks func(direction string, chargePointID string, messageType string, payload []byte) // Creates a new Server endpoint. // Requires a a websocket server. Optionally a structure for queueing/dispatching requests, @@ -41,7 +43,7 @@ type InvalidMessageHook func(client ws.Channel, err *ocpp.Error, rawJson string, // s := ocppj.NewServer(ws.NewServer(), nil, nil) // // The dispatcher's associated ClientState will be set during initialization. -func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler ServerState, profiles ...*ocpp.Profile) *Server { +func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler ServerState, msgHooks MessageHooks, profiles ...*ocpp.Profile) *Server { if dispatcher == nil { dispatcher = NewDefaultServerDispatcher(NewFIFOQueueMap(0)) } @@ -64,6 +66,7 @@ func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler Ser for _, profile := range profiles { s.AddProfile(profile) } + s.MessageHooks = msgHooks return &s } @@ -173,6 +176,9 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error { log.Errorf("error dispatching request [%s, %s] to %s: %v", call.UniqueId, call.Action, clientID, err) return err } + if s.MessageHooks != nil { + s.MessageHooks("in", clientID, "request", jsonMessage) + } log.Debugf("enqueued CALL [%s, %s] for %s", call.UniqueId, call.Action, clientID) return nil } @@ -200,6 +206,10 @@ func (s *Server) SendResponse(clientID string, requestId string, response ocpp.R log.Errorf("error sending response [%s] to %s: %v", callResult.GetUniqueId(), clientID, err) return ocpp.NewError(GenericError, err.Error(), requestId) } + + if s.MessageHooks != nil { + s.MessageHooks("out", clientID, "response", jsonMessage) + } log.Debugf("sent CALL RESULT [%s] for %s", callResult.GetUniqueId(), clientID) log.Debugf("sent JSON message to %s: %s", clientID, string(jsonMessage)) return nil @@ -226,6 +236,9 @@ func (s *Server) SendError(clientID string, requestId string, errorCode ocpp.Err log.Errorf("error sending response error [%s] to %s: %v", callError.UniqueId, clientID, err) return ocpp.NewError(GenericError, err.Error(), requestId) } + if s.MessageHooks != nil { + s.MessageHooks("out", clientID, "error", jsonMessage) + } log.Debugf("sent CALL ERROR [%s] for %s", callError.UniqueId, clientID) log.Debugf("sent JSON message to %s: %s", clientID, string(jsonMessage)) return nil @@ -237,6 +250,9 @@ func (s *Server) ocppMessageHandler(wsChannel ws.Channel, data []byte) error { log.Error(err) return err } + if s.MessageHooks != nil { + s.MessageHooks("in", wsChannel.ID(), "request", data) + } log.Debugf("received JSON message from %s: %s", wsChannel.ID(), string(data)) // Get pending requests for client pending := s.RequestState.GetClientState(wsChannel.ID()) @@ -337,3 +353,8 @@ func (s *Server) onClientDisconnected(ws ws.Channel) { s.disconnectedClientHandler(ws) } } + +// SetMessageHooks sets the message hooks for the server. +func (s *Server) SetMessageHooks(msg MessageHooks) { + s.MessageHooks = msg +} From d3872bc50e7aac0105d3abb026ec14e410976f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E8=BF=9B?= <8009010+kinrosslong@user.noreply.gitee.com> Date: Thu, 26 Jun 2025 09:43:13 +0800 Subject: [PATCH 2/4] 1.The fmt.Printf was modified to logger 2.It was modified to include it as a parameter in unnecessary constructors as well --- example/1.6/cs/central_system_sim.go | 3 +-- ocpp1.6/v16.go | 2 +- ocppj/server.go | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/example/1.6/cs/central_system_sim.go b/example/1.6/cs/central_system_sim.go index 38c423dc..b0826fd6 100644 --- a/example/1.6/cs/central_system_sim.go +++ b/example/1.6/cs/central_system_sim.go @@ -3,7 +3,6 @@ package main import ( "crypto/tls" "crypto/x509" - "fmt" "os" "strconv" "time" @@ -234,7 +233,7 @@ func main() { }) // set message hook centralSystem.SetMessageHooks(func(direction, chargePointID, msgType string, payload []byte) { - fmt.Printf("direction:%s \n cpid:%s \n msgType: %s \n payload: %s\n", direction, chargePointID, msgType, string(payload)) + log.Infof("direction:%s \n cpid:%s \n msgType: %s \n payload: %s\n", direction, chargePointID, msgType, string(payload)) }) ocppj.SetLogger(log.WithField("logger", "ocppj")) ws.SetLogger(log.WithField("logger", "websocket")) diff --git a/ocpp1.6/v16.go b/ocpp1.6/v16.go index 7209872d..b2f46fca 100644 --- a/ocpp1.6/v16.go +++ b/ocpp1.6/v16.go @@ -348,7 +348,7 @@ func NewCentralSystem(endpoint *ocppj.Server, server ws.Server) CentralSystem { } server.AddSupportedSubprotocol(types.V16Subprotocol) if endpoint == nil { - endpoint = ocppj.NewServer(server, nil, nil, nil, + endpoint = ocppj.NewServer(server, nil, nil, core.Profile, localauth.Profile, firmware.Profile, diff --git a/ocppj/server.go b/ocppj/server.go index f6081b8f..7589e3b1 100644 --- a/ocppj/server.go +++ b/ocppj/server.go @@ -43,7 +43,7 @@ type MessageHooks func(direction string, chargePointID string, messageType strin // s := ocppj.NewServer(ws.NewServer(), nil, nil) // // The dispatcher's associated ClientState will be set during initialization. -func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler ServerState, msgHooks MessageHooks, profiles ...*ocpp.Profile) *Server { +func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler ServerState, profiles ...*ocpp.Profile) *Server { if dispatcher == nil { dispatcher = NewDefaultServerDispatcher(NewFIFOQueueMap(0)) } @@ -66,7 +66,7 @@ func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler Ser for _, profile := range profiles { s.AddProfile(profile) } - s.MessageHooks = msgHooks + //s.MessageHooks = msgHooks return &s } From d5899faf42c3d61763b50198666e7cbd371ced9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E8=BF=9B?= <8009010+kinrosslong@user.noreply.gitee.com> Date: Thu, 26 Jun 2025 10:10:43 +0800 Subject: [PATCH 3/4] set up ocpp2.0.1 csms set message hook supplement --- example/2.0.1/csms/csms_sim.go | 4 ++++ ocpp2.0.1/csms.go | 5 +++++ ocpp2.0.1/v2.go | 3 +++ 3 files changed, 12 insertions(+) diff --git a/example/2.0.1/csms/csms_sim.go b/example/2.0.1/csms/csms_sim.go index 57b88ec9..d3526b43 100644 --- a/example/2.0.1/csms/csms_sim.go +++ b/example/2.0.1/csms/csms_sim.go @@ -288,6 +288,10 @@ func main() { log.WithField("client", chargingStation.ID()).Info("charging station disconnected") delete(handler.chargingStations, chargingStation.ID()) }) + // set message hook + csms.SetMessageHooks(func(direction, chargePointID, msgType string, payload []byte) { + log.Infof("direction:%s \n cpid:%s \n msgType: %s \n payload: %s\n", direction, chargePointID, msgType, string(payload)) + }) ocppj.SetLogger(log) // Run CSMS log.Infof("starting CSMS on port %v", listenPort) diff --git a/ocpp2.0.1/csms.go b/ocpp2.0.1/csms.go index 6d7b1684..db503c35 100644 --- a/ocpp2.0.1/csms.go +++ b/ocpp2.0.1/csms.go @@ -1047,3 +1047,8 @@ func (cs *csms) handleCanceledRequest(chargePointID string, request ocpp.Request cs.error(err) } } + +// SetMessageHooks sets the hooks for logging incoming and outgoing messages. +func (cs *csms) SetMessageHooks(message func(direction, clientId, messageType string, payload []byte)) { + cs.server.SetMessageHooks(message) +} diff --git a/ocpp2.0.1/v2.go b/ocpp2.0.1/v2.go index 3bc38ac3..bc55e053 100644 --- a/ocpp2.0.1/v2.go +++ b/ocpp2.0.1/v2.go @@ -401,6 +401,9 @@ type CSMS interface { Stop() // Errors returns a channel for error messages. If it doesn't exist it es created. Errors() <-chan error + + // SetMessageHooks sets a function that will be called whenever a message is received or sent. + SetMessageHooks(logger func(direction, chargePointID, messageType string, payload []byte)) } // Creates a new OCPP 2.0 CSMS. From 331500fb4b1a84606f10ef57aec26114a5f1b1f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E8=BF=9B?= <8009010+kinrosslong@user.noreply.gitee.com> Date: Thu, 23 Oct 2025 11:07:34 +0800 Subject: [PATCH 4/4] Delete unnecessary comments and perform a nil check on the parameter --- ocpp1.6/central_system.go | 4 +++- ocpp2.0.1/csms.go | 4 +++- ocppj/server.go | 1 - 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ocpp1.6/central_system.go b/ocpp1.6/central_system.go index 7f6f83e9..72bb5c3c 100644 --- a/ocpp1.6/central_system.go +++ b/ocpp1.6/central_system.go @@ -727,5 +727,7 @@ func (cs *centralSystem) handleCanceledRequest(chargePointID string, request ocp // SetMessageHooks sets the hooks for logging incoming and outgoing messages. func (cs *centralSystem) SetMessageHooks(message func(direction, clientId, messageType string, payload []byte)) { - cs.server.SetMessageHooks(message) + if message != nil { + cs.server.SetMessageHooks(message) + } } diff --git a/ocpp2.0.1/csms.go b/ocpp2.0.1/csms.go index db503c35..2924c084 100644 --- a/ocpp2.0.1/csms.go +++ b/ocpp2.0.1/csms.go @@ -1050,5 +1050,7 @@ func (cs *csms) handleCanceledRequest(chargePointID string, request ocpp.Request // SetMessageHooks sets the hooks for logging incoming and outgoing messages. func (cs *csms) SetMessageHooks(message func(direction, clientId, messageType string, payload []byte)) { - cs.server.SetMessageHooks(message) + if message != nil { + cs.server.SetMessageHooks(message) + } } diff --git a/ocppj/server.go b/ocppj/server.go index 7589e3b1..befb4f04 100644 --- a/ocppj/server.go +++ b/ocppj/server.go @@ -66,7 +66,6 @@ func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler Ser for _, profile := range profiles { s.AddProfile(profile) } - //s.MessageHooks = msgHooks return &s }