Skip to content

Commit

Permalink
Merge pull request #723 from synnaxlabs/eislam/sy-964-server-panic-ni…
Browse files Browse the repository at this point in the history
…-disconnect

sy-964 server panic ni disconnect
  • Loading branch information
Lham42 authored Jul 15, 2024
2 parents 86ec58a + 957bcf5 commit 0d374b8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
5 changes: 5 additions & 0 deletions cesium/internal/core/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (f Frame) SquashSameKeyData(key ChannelKey) (data []byte) {
return
}

// Len is meant for testing use only.
func (f Frame) Len() int64 {
f.assertEven("Len")
if len(f.Series) == 0 {
Expand All @@ -106,6 +107,10 @@ func (f Frame) Len() int64 {
return f.Series[0].Len()
}

func (f Frame) Empty() bool {
return len(f.Series) == 0
}

func (f Frame) assertEven(method string) {
if !f.Even() {
panic("[telem] - cannot call " + method + " on uneven frame")
Expand Down
5 changes: 4 additions & 1 deletion cesium/writer_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (w *idxWriter) Write(fr Frame) (Frame, error) {
key := fr.Keys[i]
uWriter, ok := w.internal[key]

if !ok {
if !ok || series.Len() == 0 {
continue
}

Expand All @@ -370,6 +370,9 @@ func (w *idxWriter) Write(fr Frame) (Frame, error) {
}

func (w *idxWriter) Commit(ctx context.Context) (telem.TimeStamp, error) {
if w.sampleCount == 0 {
return w.start, nil
}
end, err := w.resolveCommitEnd(ctx)
if err != nil {
return 0, err
Expand Down
32 changes: 32 additions & 0 deletions cesium/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,38 @@ var _ = Describe("Writer Behavior", func() {
Expect(f.Series[2].Data).To(Equal(telem.NewSeriesV[int64](100, 105, 110, 115, 120, 125, 130, 135, 140, 145).Data))
Expect(f.Series[3].Data).To(Equal(telem.NewSeriesV[int64](100, 105, 110, 115, 120, 125, 130, 135, 140, 145).Data))
})
It("Should not write an empty frame", func() {

var (
rate1 = GenerateChannelKey()
rate2 = GenerateChannelKey()
)
By("Creating a channel")
Expect(db.CreateChannel(
ctx,
cesium.Channel{Key: rate1, Rate: 2 * telem.Hz, DataType: telem.Int64T},
cesium.Channel{Key: rate2, Rate: 2 * telem.Hz, DataType: telem.Int64T},
)).To(Succeed())
w := MustSucceed(db.OpenWriter(ctx, cesium.WriterConfig{
Channels: []cesium.ChannelKey{rate1, rate2},
Start: 10 * telem.SecondTS,
}))

By("Writing data")
ok := w.Write(cesium.NewFrame(
[]cesium.ChannelKey{rate1, rate2},
[]telem.Series{
{DataType: "int64"},
{DataType: "int64"},
},
))
Expect(ok).To(BeTrue())
end, ok := w.Commit()
Expect(ok).To(BeTrue())
Expect(end).To(Equal(10 * telem.SecondTS))

Expect(w.Close()).To(Succeed())
})
})
Describe("Auto-commit", func() {
Describe("Indexed channels", func() {
Expand Down
6 changes: 3 additions & 3 deletions x/go/telem/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s Series) Split() [][]byte {
return o
}

func ValueAt[T types.Numeric](a Series, i int64) T {
b := a.Data[i*int64(a.DataType.Density()) : (i+1)*int64(a.DataType.Density())]
return UnmarshalF[T](a.DataType)(b)
func ValueAt[T types.Numeric](s Series, i int64) T {
b := s.Data[i*int64(s.DataType.Density()) : (i+1)*int64(s.DataType.Density())]
return UnmarshalF[T](s.DataType)(b)
}

0 comments on commit 0d374b8

Please sign in to comment.