Skip to content

Commit 235fa11

Browse files
committed
protect against data loss by not destroying pundat when btrdb is unreachable
1 parent 0a48eb1 commit 235fa11

File tree

5 files changed

+36
-21
lines changed

5 files changed

+36
-21
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
APP?=pundat
2-
RELEASE?=0.3.3
2+
RELEASE?=0.3.4
33
COMMIT?=$(shell git rev-parse --short HEAD)
44
PROJECT?=github.com/gtfierro/pundat
55
PERSISTDIR?=/etc/pundat

archiver/archiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (a *Archiver) Serve() {
130130
}()
131131
for _, namespace := range a.config.BOSSWAVE.ListenNS {
132132
go a.vm.subscribeNamespace(ctx, namespace)
133-
time.Sleep(10 * time.Second)
133+
time.Sleep(2 * time.Second)
134134
}
135135

136136
<-a.stop

archiver/btrdb.v4.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"gopkg.in/btrdb.v4"
1414
)
1515

16-
var timeout = time.Second * 20
16+
var timeout = time.Second * 60
1717

1818
var errStreamNotExist = errors.New("Stream does not exist")
1919

@@ -36,7 +36,8 @@ func newBTrDBv4(c *btrdbv4Config) *btrdbv4Iface {
3636
log.Noticef("Connecting to BtrDBv4 at addresses %v...", b.addresses)
3737
conn, err := btrdb.Connect(context.Background(), b.addresses...)
3838
if err != nil {
39-
log.Fatalf("Could not connect to btrdbv4: %v", err)
39+
log.Warningf("Could not connect to btrdbv4: %v", err)
40+
return nil
4041
}
4142
b.conn = conn
4243
log.Notice("Connected to BtrDB!")

archiver/stream.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,20 @@ func (s *Stream) initialize(timeseriesStore TimeseriesStore, metadataStore Metad
8080
ts := s.timeseries[msg.URI]
8181
s.RUnlock()
8282

83-
ts.Lock()
83+
commitme := ts.Copy()
8484
// if no readings, then we give up
85-
if len(ts.Records) == 0 {
86-
ts.Unlock()
85+
if len(commitme.Records) == 0 {
8786
continue
8887
}
8988
// now we can assume the stream exists and can write to it
90-
if err := timeseriesStore.AddReadings(ts); err != nil {
91-
log.Error(errors.Wrap(err, "Could not write timeseries reading (probably deadline exceeded)"), len(ts.Records))
92-
ts.Unlock()
89+
if err := timeseriesStore.AddReadings(commitme); err != nil {
90+
log.Error(errors.Wrap(err, "Could not write timeseries reading (probably deadline exceeded)"), len(commitme.Records))
9391
continue
9492
}
95-
ts.Records = []*common.TimeseriesReading{}
93+
ts.Lock()
94+
ts.Records = ts.Records[len(commitme.Records):]
9695
ts.Unlock()
96+
9797
s.Lock()
9898
s.timeseries[msg.URI] = ts
9999
s.Unlock()
@@ -219,16 +219,17 @@ func (s *Stream) start(timeseriesStore TimeseriesStore, metadataStore MetadataSt
219219
ts.Records = append(ts.Records, &common.TimeseriesReading{Time: timestamp, Value: value_f64})
220220
ts.Unlock()
221221
}
222-
ts.Lock()
223-
if len(ts.Records) > commitCount {
224-
// now we can assume the stream exists and can write to it
225-
if err := timeseriesStore.AddReadings(ts); err != nil {
226-
log.Error(errors.Wrapf(err, "Could not write timeseries reading %+v", ts))
227-
} else {
228-
ts.Records = []*common.TimeseriesReading{}
229-
}
230-
}
231-
ts.Unlock()
222+
//ts.Lock()
223+
//if len(ts.Records) > commitCount {
224+
// // now we can assume the stream exists and can write to it
225+
// if err := timeseriesStore.AddReadings(ts); err != nil {
226+
// //TODO: when server is degraded, need to reconnect?
227+
// log.Error(errors.Wrapf(err, "Could not write timeseries reading %+v", ts))
228+
// } else {
229+
// ts.Records = []*common.TimeseriesReading{}
230+
// }
231+
//}
232+
//ts.Unlock()
232233
s.Lock()
233234
s.timeseries[msg.URI] = ts
234235
s.Unlock()

common/timeseries.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ type Timeseries struct {
4444
UUID UUID
4545
}
4646

47+
func (ts Timeseries) Copy() Timeseries {
48+
ts.Lock()
49+
newts := Timeseries{
50+
Generation: ts.Generation,
51+
SrcURI: ts.SrcURI,
52+
UUID: ts.UUID,
53+
Records: make([]*TimeseriesReading, len(ts.Records)),
54+
}
55+
copy(newts.Records, ts.Records)
56+
ts.Unlock()
57+
return newts
58+
}
59+
4760
// sort by timestamp
4861
func (ts Timeseries) Len() int {
4962
return len(ts.Records)

0 commit comments

Comments
 (0)