diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..fc84c39 --- /dev/null +++ b/pool.go @@ -0,0 +1,41 @@ +package logger + +import ( + "bytes" + "sync" + "sync/atomic" +) + +var bpool = newBufferPool(500) + +type bufferPool struct { + p sync.Pool + count int64 + size int +} + +func newBufferPool(size int) *bufferPool { + p := &bufferPool{} + p.p.New = func() interface{} { return bytes.NewBuffer(nil) } + p.size = size + return p +} + +func putBuffer(b *bytes.Buffer) { + bpool.p.Put(b) + atomic.AddInt64(&bpool.count, -1) +} + +func getBuffer() *bytes.Buffer { + for { + c := atomic.LoadInt64(&bpool.count) + if c > int64(bpool.size) { + return nil + } + if atomic.CompareAndSwapInt64(&bpool.count, c, c+1) { + b := bpool.p.Get().(*bytes.Buffer) + b.Reset() + return b + } + } +} diff --git a/rotatelogger.go b/rotatelogger.go index 3921fcb..88ab66e 100644 --- a/rotatelogger.go +++ b/rotatelogger.go @@ -1,6 +1,7 @@ package logger import ( + "bytes" "compress/gzip" "errors" "fmt" @@ -10,12 +11,17 @@ import ( "runtime/debug" "sync" "time" - - "github.com/smallnest/ringbuffer" ) -// Placeholder is a placeholder object that can be used globally. -var Placeholder PlaceholderType +var ( + // Placeholder is a placeholder object that can be used globally. + Placeholder PlaceholderType + + // ErrClosedRollingFile is returned when the rolling file is closed. + ErrClosedRollingFile = errors.New("rolling file is closed") + + ErrBuffer = errors.New("buffer exceeds the limit") +) type ( // AnyType can be used to hold any type. @@ -25,10 +31,6 @@ type ( ) const ( - dateFormat = "20060102" - hourFormat = "2006010215" - fileTimeFormat = "20060102150405.000" - hoursPerDay = 24 defaultDirMode = 0o755 defaultFileMode = 0o600 gzipExt = ".gz" @@ -37,8 +39,8 @@ const ( hourRotationRule = "hour" dayRotationRule = "day" megaBytes = 1 << 20 - logPageCacheByteSize = 1 * 1024 * 1024 // 1MB - logPageCacheMinSize = 1024 // 1KB + logPageNumber = 2 + logPageCacheByteSize = 4096 // 4KB ) type ( @@ -48,10 +50,14 @@ type ( backup string fp *os.File - ringBuffer *ringbuffer.RingBuffer - done chan struct{} - rule RotateRule - compress bool + syncFlush chan struct{} + current *bytes.Buffer + fullBuffer chan *bytes.Buffer + + closed bool + done chan struct{} + rule RotateRule + compress bool // can't use threading.RoutineGroup because of cycle import waitGroup sync.WaitGroup closeOnce sync.Once @@ -68,7 +74,9 @@ func NewRotateLogger(filename string, rule RotateRule, compress bool) (*RotateLo rule: rule, compress: compress, done: make(chan struct{}), - ringBuffer: ringbuffer.New(logPageCacheByteSize), + syncFlush: make(chan struct{}), + fullBuffer: make(chan *bytes.Buffer, logPageNumber+1), + current: getBuffer(), } if err := l.initialize(); err != nil { return nil, err @@ -80,18 +88,21 @@ func NewRotateLogger(filename string, rule RotateRule, compress bool) (*RotateLo // flush flushes the buffer to the file. func (l *RotateLogger) flush() { - l.mu.Lock() - defer l.mu.Unlock() - if l.fp == nil || l.ringBuffer.Length() == 0 { - return + readyLen := len(l.fullBuffer) + for i := 0; i < readyLen; i++ { + buff := <-l.fullBuffer + l.writeBuffer(buff) + putBuffer(buff) + } + if l.current != nil { + l.writeBuffer(l.current) + putBuffer(l.current) } - readByte := l.ringBuffer.Bytes(nil) - l.ringBuffer.Reset() - if _, err := l.write(readByte); err != nil { - log.Printf("failed to write to file: %v", err) + l.current = nil + if l.fp != nil { + l.fp.Sync() } - return } func (l *RotateLogger) startWorker() { @@ -100,40 +111,85 @@ func (l *RotateLogger) startWorker() { go func() { defer l.waitGroup.Done() + defer func() { + l.flush() + if l.fp != nil { + l.fp.Close() + l.fp = nil + } + }() + t := time.NewTicker(time.Millisecond * 500) defer t.Stop() for { select { - case <-t.C: + case <-l.syncFlush: + l.mu.Lock() l.flush() + l.mu.Unlock() + l.syncFlush <- struct{}{} + case buff := <-l.fullBuffer: + l.writeBuffer(buff) + putBuffer(buff) + case <-t.C: + l.mu.Lock() + if len(l.fullBuffer) != 0 { + l.mu.Unlock() + continue + } + // 清空buffer + buff := l.current + if buff == nil { + l.mu.Unlock() + continue + } + l.current = nil + l.mu.Unlock() + + l.writeBuffer(buff) + putBuffer(buff) case <-l.done: - l.flush() return } } }() } -func (l *RotateLogger) Write(data []byte) (int, error) { +func (l *RotateLogger) Write(b []byte) (n int, err error) { + l.mu.Lock() + if l.closed { + l.mu.Unlock() + return 0, ErrClosedRollingFile + } + + // check if logger is closed select { case <-l.done: + l.mu.Unlock() return 0, fmt.Errorf("logger is closed") default: } - l.mu.Lock() - if l.ringBuffer.IsFull() || l.ringBuffer.Free() <= logPageCacheMinSize || l.ringBuffer.Length() >= logPageCacheByteSize { + // write to buffer + if l.current == nil { + l.current = getBuffer() + if l.current == nil { + l.mu.Unlock() + return 0, ErrBuffer + } + } + + // write to buffer + n, err = l.current.Write(b) + if l.current.Len() > logPageCacheByteSize { + buf := l.current + l.current = nil l.mu.Unlock() - l.flush() + l.fullBuffer <- buf } else { l.mu.Unlock() } - - l.mu.Lock() - size, err := l.ringBuffer.Write(data) - l.mu.Unlock() - - return size, err + return } func (l *RotateLogger) getBackupFilename() string { @@ -228,8 +284,8 @@ func (l *RotateLogger) rotate() error { return err } -func (l *RotateLogger) write(v []byte) (int, error) { - if l.rule.ShallRotate(l.currentSize + int64(len(v))) { +func (l *RotateLogger) writeBuffer(buff *bytes.Buffer) (int64, error) { + if l.rule.ShallRotate(l.currentSize + int64(buff.Len())) { if err := l.rotate(); err != nil { log.Println(err) } else { @@ -241,11 +297,11 @@ func (l *RotateLogger) write(v []byte) (int, error) { return 0, nil } - size, err := l.fp.Write(v) + size, err := buff.WriteTo(l.fp) if err != nil { return size, err } - l.currentSize += int64(size) + l.currentSize += size return size, nil } @@ -270,7 +326,15 @@ func (l *RotateLogger) close() (err error) { // Close closes l. func (l *RotateLogger) Close() (err error) { + l.mu.Lock() + if l.closed { + l.mu.Unlock() + return nil + } + l.mu.Unlock() + l.closeOnce.Do(func() { + l.closed = true close(l.done) l.waitGroup.Wait() err = l.close() @@ -280,9 +344,15 @@ func (l *RotateLogger) Close() (err error) { } func (l *RotateLogger) Sync() error { - if l.fp != nil { - return l.fp.Sync() + l.mu.Lock() + if l.closed { + l.mu.Unlock() + return ErrClosedRollingFile } + l.mu.Unlock() + + l.syncFlush <- struct{}{} + <-l.syncFlush return nil } @@ -296,18 +366,6 @@ func compressLogFile(file string) { } } -func getNowDate() string { - return time.Now().Format(dateFormat) -} - -func getNowHour() string { - return time.Now().Format(hourFormat) -} - -func getNowDateInRFC3339Format() string { - return time.Now().Format(fileTimeFormat) -} - func gzipFile(file string, fsys FileSystem) (err error) { in, err := fsys.Open(file) if err != nil {