-
Notifications
You must be signed in to change notification settings - Fork 1
/
recorder.go
129 lines (106 loc) · 2.83 KB
/
recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2018 Dan Jacques. All rights reserved.
// Use of this source code is governed under the MIT License
// that can be found in the LICENSE file.
package replay
import (
"sync"
"time"
"github.com/danjacques/gopushpixels/device"
"github.com/danjacques/gopushpixels/protocol"
"github.com/danjacques/gopushpixels/replay/streamfile"
"github.com/pkg/errors"
)
// RecorderStatus is a snapshot of the current recorder status.
type RecorderStatus struct {
Name string
Error error
Events int64
Bytes int64
Duration time.Duration
}
// A Recorder handles the recoridng and playback of packets.
type Recorder struct {
mu sync.Mutex
// sw is the currently-active stream writer.
sw *streamfile.EventStreamWriter
// stopped is true if we've been stopped.
stopped bool
// err is an error that occurred while receiving a packet.
recvErr error
}
// Start starts recording a stream.
//
// The recording will continue until the Stop method is called.
//
// Start will take ownership of sw and close it on completion (Stop).
func (r *Recorder) Start(sw *streamfile.EventStreamWriter) {
r.mu.Lock()
defer r.mu.Unlock()
if r.sw != nil {
panic("already started")
}
// Register this recorder with our Proxy.
r.sw = sw
recorderRecordingGauge.Inc()
}
// Stop stops the Recorder, finalizing its output file and releasing its
// resources.
func (r *Recorder) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.sw == nil {
return nil
}
// Finalize our recorded file.
err := r.sw.Close()
r.sw = nil
// Propagate our receive error, if Close didn't return an error.
if err == nil {
err = r.recvErr
}
r.recvErr = nil
recorderRecordingGauge.Dec()
return err
}
// Status returns a snapshot of the current Recorder status.
//
// If the Recorder is not currently recording, Status will return nil.
func (r *Recorder) Status() *RecorderStatus {
r.mu.Lock()
defer r.mu.Unlock()
if r.sw == nil {
return nil
}
return &RecorderStatus{
Name: r.sw.Path(),
Error: r.recvErr,
Events: r.sw.NumEvents(),
Bytes: r.sw.NumBytes(),
Duration: r.sw.Duration(),
}
}
// RecordPacket adds pkt from device d to the recording.
func (r *Recorder) RecordPacket(d device.D, pkt *protocol.Packet) error {
recorderEvents.Inc()
r.mu.Lock()
defer r.mu.Unlock()
// If we've been stopped, but not yet unregistered, then do nothing.
if r.sw == nil {
return nil
}
// We're already in an error state.
switch err := r.sw.WritePacket(d, pkt); errors.Cause(err) {
case nil:
// Write succeeded!
return nil
case streamfile.ErrEncodingNotSupported:
// This packet contained an unsupported data type. Ignore it.
recorderErrors.WithLabelValues("encoding").Inc()
return err
default:
// Record the error. We're done; let's not waste time on more packets.
recorderErrors.WithLabelValues("unknown").Inc()
r.recvErr = err
return err
}
}