Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,92 @@ func (pr *Reader) Read() (Record, error) {

var errMustBePaused = fmt.Errorf("perf ringbuffer: must have been paused before reading overwritable buffer")

// EpollWait wraps the epoll waiting logic, similar to __reader_epoll_wait / reader_epoll_wait.
func (pr *Reader) EpollWait(d time.Duration) (int, error) {
pr.pauseMu.Lock()
defer pr.pauseMu.Unlock()

pr.SetDeadline(time.Now().Add(d))

nfds, err := pr.poller.Wait(pr.epollEvents, pr.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
pr.pendingErr = err
return -1, nil
}

// Re-validate pr.paused since we dropped pauseMu.
if pr.overwritable && !pr.paused {
return -1, errMustBePaused
}

// Waking up userspace is expensive; make the most out of it by checking
// all rings.
for _, ring := range pr.rings {
ring.loadHead()
pr.epollRings = append(pr.epollRings, ring)
}

return nfds, err
}

// ReadAllRings iterates through all ready rings and reads events,
// similar to reader_event_read.
func (pr *Reader) ReadAllRings() ([]*Record, error) {
pr.mu.Lock()
defer pr.mu.Unlock()

pr.pauseMu.Lock()
defer pr.pauseMu.Unlock()

if pr.overwritable && !pr.paused {
return nil, errMustBePaused
}

if pr.rings == nil {
return nil, fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

if len(pr.epollRings) == 0 {
return nil, fmt.Errorf("no epoll rings to read")
}

var recBatch []*Record

// Iterate over all epoll-ready rings.
for len(pr.epollRings) > 0 {
// Process the last ring in the list.
ering := pr.epollRings[len(pr.epollRings)-1]

// Read records from this ring until no more data is available.
for {
rec := &Record{}
err := pr.readRecordFromRing(rec, ering)

if err == errEOR || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// The current ring has been fully consumed; move to the next ring.
break
}

if err != nil {
// Real error; return immediately.
return nil, fmt.Errorf("failed to read record from ring (cpu=%d): %w", ering.cpu, err)
}

if rec.LostSamples != 0 {
fmt.Printf("cpu=%d lost samples: %d\n", rec.CPU, rec.LostSamples)
continue
}

recBatch = append(recBatch, rec)
}

// Pop the processed ring.
pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
}

return recBatch, nil
}

// ReadInto is like [Reader.Read] except that it allows reusing Record and associated buffers.
func (pr *Reader) ReadInto(rec *Record) error {
pr.mu.Lock()
Expand Down