From f0ccf587f9cbd73adf1d10ef3a55325050cc0fa1 Mon Sep 17 00:00:00 2001 From: wzshiming Date: Wed, 25 Nov 2020 19:11:14 +0800 Subject: [PATCH] fix and add stream log --- bind/bind.go | 35 +++++++++++++++++++++++++++ components/service/packet/server.go | 5 ++-- components/service/stream/server.go | 17 ++++--------- components/stream/handler/log/init.go | 25 +++++++++++++++++++ components/stream/handler/log/log.go | 32 ++++++++++++++++++++++++ init/init.go | 1 + internal/log/file.go | 2 +- 7 files changed, 102 insertions(+), 15 deletions(-) create mode 100644 components/stream/handler/log/init.go create mode 100644 components/stream/handler/log/log.go diff --git a/bind/bind.go b/bind/bind.go index 31f9d19..3552c3b 100644 --- a/bind/bind.go +++ b/bind/bind.go @@ -2081,6 +2081,41 @@ func (m LogNetHTTPHandlerConfig) MarshalJSON() ([]byte, error) { // // ========= End log@net/http.Handler type ========= +// ========= Begin log@stream.Handler type ========= +// + +const kindLogStreamHandlerConfig = `log@stream.Handler` + +// LogStreamHandlerConfig log@stream.Handler +type LogStreamHandlerConfig struct { + Output IoWriter + Handler StreamHandler +} + +func init() { + _ = provider.Register( + kindLogStreamHandlerConfig, + func(r *LogStreamHandlerConfig) StreamHandler { return r }, + ) +} + +func (LogStreamHandlerConfig) isStreamHandler() {} +func (LogStreamHandlerConfig) isComponent() {} + +// MarshalJSON returns m as the JSON encoding of m. +func (m LogStreamHandlerConfig) MarshalJSON() ([]byte, error) { + type t LogStreamHandlerConfig + data, err := json.Marshal(t(m)) + if err != nil { + return nil, err + } + data = prepend(kindKey, kindLogStreamHandlerConfig, data) + return data, nil +} + +// +// ========= End log@stream.Handler type ========= + // ========= Begin merge@tls.TLS type ========= // diff --git a/components/service/packet/server.go b/components/service/packet/server.go index e977454..3c0921a 100644 --- a/components/service/packet/server.go +++ b/components/service/packet/server.go @@ -47,14 +47,15 @@ func (s *Server) Close() error { } func (s *Server) ServePacket(ctx context.Context, pkt packet.Packet) { + log := logger.FromContext(ctx) + log = log.WithName("packet") + ctx = logger.WithContext(ctx, log) s.handler.ServePacket(ctx, nopCloser{pkt}) err := pkt.Close() if listener.IsClosedConnError(err) { err = nil } if err != nil { - log := logger.FromContext(ctx) - log = log.WithName("packet") log.Error(err, "close listen") return } diff --git a/components/service/stream/server.go b/components/service/stream/server.go index 54eba90..9df8dfe 100644 --- a/components/service/stream/server.go +++ b/components/service/stream/server.go @@ -2,7 +2,6 @@ package stream import ( "context" - "fmt" "sync" "time" @@ -31,13 +30,13 @@ func NewServer(listenConfig stream.ListenConfig, handler stream.Handler, disconn func (s *Server) Run(ctx context.Context) error { log := logger.FromContext(ctx) + log = log.WithName("stream") + ctx = logger.WithContext(ctx, log) listen, err := s.listenConfig.ListenStream(ctx) if err != nil { return err } s.listener = listen - - var size uint64 for { conn, err := s.listener.Accept() if err != nil { @@ -47,14 +46,6 @@ func (s *Server) Run(ctx context.Context) error { log.Error(err, "listener accept") continue } - log := log.WithName(fmt.Sprintf("stream-%d", size)) - size++ - ctx = logger.WithContext(ctx, log) - log = log.WithValues( - "localAddress", conn.LocalAddr(), - "remoteAddress", conn.RemoteAddr(), - ) - log.Info("New stream") go s.ServeStream(ctx, conn) } } @@ -94,7 +85,9 @@ func (s *Server) ServeStream(ctx context.Context, stm stream.Stream) { err := stm.Close() if err != nil { addr := stm.LocalAddr() - logger.Log.V(-2).Info("Close %s://%s error: %s", addr.Network(), addr.String(), err) + logger.Log.Error(err, "Close", + "address", addr.String(), + ) return } } diff --git a/components/stream/handler/log/init.go b/components/stream/handler/log/init.go new file mode 100644 index 0000000..8085739 --- /dev/null +++ b/components/stream/handler/log/init.go @@ -0,0 +1,25 @@ +package log + +import ( + "github.com/pipeproxy/pipe/components/stream" + + "github.com/pipeproxy/pipe/components/common/register" + "github.com/pipeproxy/pipe/components/stdio/output" +) + +const ( + name = "log" +) + +func init() { + register.Register(name, NewLogWithConfig) +} + +type Config struct { + Output output.Output + Handler stream.Handler +} + +func NewLogWithConfig(conf *Config) stream.Handler { + return NewLog(conf.Handler) +} diff --git a/components/stream/handler/log/log.go b/components/stream/handler/log/log.go new file mode 100644 index 0000000..9dfd9b6 --- /dev/null +++ b/components/stream/handler/log/log.go @@ -0,0 +1,32 @@ +package log + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/pipeproxy/pipe/components/stream" + "github.com/wzshiming/logger" +) + +type Log struct { + handler stream.Handler + size uint64 +} + +func NewLog(h stream.Handler) *Log { + return &Log{handler: h, size: 0} +} + +func (l *Log) ServeStream(ctx context.Context, stm stream.Stream) { + log := logger.FromContext(ctx) + log = log.WithName(fmt.Sprintf("stream-%d", atomic.AddUint64(&l.size, 1))) + ctx = logger.WithContext(ctx, log) + log = log.WithValues( + "localAddress", stm.LocalAddr(), + "remoteAddress", stm.RemoteAddr(), + ) + log.Info("Connect") + l.handler.ServeStream(ctx, stm) + log.Info("Disconnect") +} diff --git a/init/init.go b/init/init.go index ad56189..d46f370 100644 --- a/init/init.go +++ b/init/init.go @@ -55,6 +55,7 @@ import ( _ "github.com/pipeproxy/pipe/components/stream/handler/http1" _ "github.com/pipeproxy/pipe/components/stream/handler/http2" _ "github.com/pipeproxy/pipe/components/stream/handler/lb" + _ "github.com/pipeproxy/pipe/components/stream/handler/log" _ "github.com/pipeproxy/pipe/components/stream/handler/multi" _ "github.com/pipeproxy/pipe/components/stream/handler/mux" _ "github.com/pipeproxy/pipe/components/stream/handler/mux/pattern" diff --git a/internal/log/file.go b/internal/log/file.go index 678df83..8d5d1d5 100644 --- a/internal/log/file.go +++ b/internal/log/file.go @@ -56,7 +56,7 @@ func NewFile(file string) (io.WriteCloser, error) { func (l *fileLogger) Close() error { old := l.file - logger.Log.V(1).Info("close log file: %s", l.path) + logger.Log.V(1).Info("close log file", "path", l.path) return old.Close() }