Skip to content

Commit

Permalink
fix and add stream log
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 25, 2020
1 parent 06647ed commit f0ccf58
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 15 deletions.
35 changes: 35 additions & 0 deletions bind/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,41 @@ func (m LogNetHTTPHandlerConfig) MarshalJSON() ([]byte, error) {
//
// ========= End log@net/http.Handler type =========

// ========= Begin log@stream.Handler type =========
//

const kindLogStreamHandlerConfig = `log@stream.Handler`

// LogStreamHandlerConfig log@stream.Handler
type LogStreamHandlerConfig struct {
Output IoWriter
Handler StreamHandler
}

func init() {
_ = provider.Register(
kindLogStreamHandlerConfig,
func(r *LogStreamHandlerConfig) StreamHandler { return r },
)
}

func (LogStreamHandlerConfig) isStreamHandler() {}
func (LogStreamHandlerConfig) isComponent() {}

// MarshalJSON returns m as the JSON encoding of m.
func (m LogStreamHandlerConfig) MarshalJSON() ([]byte, error) {
type t LogStreamHandlerConfig
data, err := json.Marshal(t(m))
if err != nil {
return nil, err
}
data = prepend(kindKey, kindLogStreamHandlerConfig, data)
return data, nil
}

//
// ========= End log@stream.Handler type =========

// ========= Begin merge@tls.TLS type =========
//

Expand Down
5 changes: 3 additions & 2 deletions components/service/packet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (s *Server) Close() error {
}

func (s *Server) ServePacket(ctx context.Context, pkt packet.Packet) {
log := logger.FromContext(ctx)
log = log.WithName("packet")
ctx = logger.WithContext(ctx, log)
s.handler.ServePacket(ctx, nopCloser{pkt})
err := pkt.Close()
if listener.IsClosedConnError(err) {
err = nil
}
if err != nil {
log := logger.FromContext(ctx)
log = log.WithName("packet")
log.Error(err, "close listen")
return
}
Expand Down
17 changes: 5 additions & 12 deletions components/service/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stream

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -31,13 +30,13 @@ func NewServer(listenConfig stream.ListenConfig, handler stream.Handler, disconn

func (s *Server) Run(ctx context.Context) error {
log := logger.FromContext(ctx)
log = log.WithName("stream")
ctx = logger.WithContext(ctx, log)
listen, err := s.listenConfig.ListenStream(ctx)
if err != nil {
return err
}
s.listener = listen

var size uint64
for {
conn, err := s.listener.Accept()
if err != nil {
Expand All @@ -47,14 +46,6 @@ func (s *Server) Run(ctx context.Context) error {
log.Error(err, "listener accept")
continue
}
log := log.WithName(fmt.Sprintf("stream-%d", size))
size++
ctx = logger.WithContext(ctx, log)
log = log.WithValues(
"localAddress", conn.LocalAddr(),
"remoteAddress", conn.RemoteAddr(),
)
log.Info("New stream")
go s.ServeStream(ctx, conn)
}
}
Expand Down Expand Up @@ -94,7 +85,9 @@ func (s *Server) ServeStream(ctx context.Context, stm stream.Stream) {
err := stm.Close()
if err != nil {
addr := stm.LocalAddr()
logger.Log.V(-2).Info("Close %s://%s error: %s", addr.Network(), addr.String(), err)
logger.Log.Error(err, "Close",
"address", addr.String(),
)
return
}
}
Expand Down
25 changes: 25 additions & 0 deletions components/stream/handler/log/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package log

import (
"github.com/pipeproxy/pipe/components/stream"

"github.com/pipeproxy/pipe/components/common/register"
"github.com/pipeproxy/pipe/components/stdio/output"
)

const (
name = "log"
)

func init() {
register.Register(name, NewLogWithConfig)
}

type Config struct {
Output output.Output
Handler stream.Handler
}

func NewLogWithConfig(conf *Config) stream.Handler {
return NewLog(conf.Handler)
}
32 changes: 32 additions & 0 deletions components/stream/handler/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package log

import (
"context"
"fmt"
"sync/atomic"

"github.com/pipeproxy/pipe/components/stream"
"github.com/wzshiming/logger"
)

type Log struct {
handler stream.Handler
size uint64
}

func NewLog(h stream.Handler) *Log {
return &Log{handler: h, size: 0}
}

func (l *Log) ServeStream(ctx context.Context, stm stream.Stream) {
log := logger.FromContext(ctx)
log = log.WithName(fmt.Sprintf("stream-%d", atomic.AddUint64(&l.size, 1)))
ctx = logger.WithContext(ctx, log)
log = log.WithValues(
"localAddress", stm.LocalAddr(),
"remoteAddress", stm.RemoteAddr(),
)
log.Info("Connect")
l.handler.ServeStream(ctx, stm)
log.Info("Disconnect")
}
1 change: 1 addition & 0 deletions init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
_ "github.com/pipeproxy/pipe/components/stream/handler/http1"
_ "github.com/pipeproxy/pipe/components/stream/handler/http2"
_ "github.com/pipeproxy/pipe/components/stream/handler/lb"
_ "github.com/pipeproxy/pipe/components/stream/handler/log"
_ "github.com/pipeproxy/pipe/components/stream/handler/multi"
_ "github.com/pipeproxy/pipe/components/stream/handler/mux"
_ "github.com/pipeproxy/pipe/components/stream/handler/mux/pattern"
Expand Down
2 changes: 1 addition & 1 deletion internal/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewFile(file string) (io.WriteCloser, error) {

func (l *fileLogger) Close() error {
old := l.file
logger.Log.V(1).Info("close log file: %s", l.path)
logger.Log.V(1).Info("close log file", "path", l.path)
return old.Close()
}

Expand Down

0 comments on commit f0ccf58

Please sign in to comment.