Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ebonilla/sy 749 opc ua read task cleanup and tests #640

Merged
merged 41 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
abca46a
[cesium] - added test case for ErrOnUnauthorized
LeonLiur May 28, 2024
6dffeca
[synnax] - added service support for ErrOnUnauthorized
LeonLiur May 28, 2024
5fac362
[client] - Added tests and support for ErrOnUnauthorized
LeonLiur May 28, 2024
2f6096f
[ops] - Bumped VERSION
LeonLiur May 28, 2024
8e6fdb2
[client] - added error on authorized parameter to writer config
Lham42 Jun 1, 2024
2447422
Merge branch 'main' of https://github.com/synnaxlabs/synnax into 585-…
emilbon99 Jun 3, 2024
a878a78
[driver/opc] - frame refactoring
emilbon99 Jun 3, 2024
845ed3d
Merge branch 'synnax-21-rc' of https://github.com/synnaxlabs/synnax i…
emilbon99 Jun 5, 2024
7b3bd54
[client/cpp] - implement test cases for error on unauthorized
emilbon99 Jun 5, 2024
af15bae
[client/cpp] - added writer config defaults
emilbon99 Jun 5, 2024
bb11bf9
[pipeline] - refactoring and test cases for accurate behaviro
emilbon99 Jun 6, 2024
3b4a2d4
[driver] - made some touchups to the OPC read task
emilbon99 Jun 6, 2024
93543bb
[driver] - implemented basic data saving
emilbon99 Jun 6, 2024
ddca8e3
[cesium] - fixed a number of race conditions
emilbon99 Jun 6, 2024
c42852c
[cesium] - removed unsafe property access from commit
emilbon99 Jun 6, 2024
1136f44
[cesium] - added error checkon fs rename during channel delete
emilbon99 Jun 6, 2024
ff6558e
[cesium] - fixed typo
emilbon99 Jun 6, 2024
a69d39a
[framer/api] - added documentation to err on unauthorized
emilbon99 Jun 6, 2024
2798298
[cesium] - removed mutex lock from directory removal on cahnnel deletion
emilbon99 Jun 6, 2024
7aa4e05
Merge branch 'synnax-21-rc' of https://github.com/synnaxlabs/synnax i…
emilbon99 Jun 6, 2024
c536471
[cesium] - minor touchups related to control and mutex locking
emilbon99 Jun 6, 2024
1f1c0fd
Merge branch '585-sy-693-support-for-cesium-writer-err-on-unauthorize…
emilbon99 Jun 6, 2024
6ebdf91
Merge branch 'synnax-21-rc' of https://github.com/synnaxlabs/synnax i…
emilbon99 Jun 6, 2024
1f5c005
[driver] - refactoring and documentation
emilbon99 Jun 7, 2024
0f00f60
Merge branch 'synnax-21-rc' of https://github.com/synnaxlabs/synnax i…
emilbon99 Jun 12, 2024
5d305da
[driver] - tuning
emilbon99 Jun 12, 2024
b859f99
Merge branch 'rc' of https://github.com/synnaxlabs/synnax into ebonil…
emilbon99 Jun 14, 2024
95d8623
[client/cpp] - fixed issues with series copy constructor by removing it
emilbon99 Jun 14, 2024
17ca2f8
[driver] - more minor touch ups
emilbon99 Jun 14, 2024
c3a292a
[client/py] - black reformatting
emilbon99 Jun 14, 2024
4dcdf7f
[state/tracker] - fixed issue with test cases
emilbon99 Jun 14, 2024
fe6a4e8
Merge branch 'rc' of https://github.com/synnaxlabs/synnax into ebonil…
emilbon99 Jun 14, 2024
71aca51
[cesium] - resolved issues with virtual writer test cases
emilbon99 Jun 14, 2024
06dd7fc
[driver] - resolved build issues on ubuntu
emilbon99 Jun 14, 2024
36a2c4c
[driver] - resolved build issues on ubuntu
emilbon99 Jun 14, 2024
c918cca
[driver] - resolved build issues on ubuntu
emilbon99 Jun 14, 2024
d561ba9
[client/cpp] - minor refactoring and import sorting
emilbon99 Jun 14, 2024
8985eb0
[driver] - fixed client tests and updated loop tests in driver
emilbon99 Jun 14, 2024
6d26177
[ops] - added client cpp tests to CI
emilbon99 Jun 14, 2024
6efc309
Merge branch 'rc' of https://github.com/synnaxlabs/synnax into ebonil…
emilbon99 Jun 14, 2024
aedfedd
[client/cpp] - updated chrono include
emilbon99 Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion .github/workflows/test.client.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
filters: |
changed:
- "client/py/**"
- "client/**"
- "freighter/py/**"
- "synnax/**"
- "cesium/**"
Expand Down Expand Up @@ -194,3 +194,37 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
ports:
- 9090:9090

test-cpp:
needs: [setup, build-server]
if: needs.setup.outputs.changed == 'true'
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v3

- name: Setup Bazel
uses: bazel-contrib/setup-bazel@0.8.1
with:
bazelisk-cache: true
disk-cache: ${{ github.workflow }}
repository-cache: true

- name: Test
run: bazel test //client/...
services:
synnax:
image: ghcr.io/synnaxlabs/synnax:${{ github.sha }}
env:
SYNNAX_LISTEN: localhost:9090
SYNNAX_VERBOSE: true
SYNNAX_INSECURE: true
SYNNAX_MEM: true
SYNNAX_LICENSE_KEY: ${{ secrets.SYNNAX_LICENSE_KEY }}

credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
ports:
- 9090:9090

16 changes: 10 additions & 6 deletions cesium/internal/unary/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
// closed stores whether the writer is closed. Operations like Write and Commit do not
// succeed on closed writers.
closed bool
// virtualAlignment tracks the alignment of the writer when persist is off.
virtualAlignment uint32
}

func (db *DB) OpenWriter(ctx context.Context, cfgs ...WriterConfig) (w *Writer, transfer controller.Transfer, err error) {
Expand All @@ -139,11 +141,13 @@
if err != nil {
return nil, transfer, err
}
w = &Writer{WriterConfig: cfg,
w = &Writer{
WriterConfig: cfg,
Channel: db.Channel,
idx: db.index(),
decrementCounter: func() { db.entityCount.add(-1) },
wrapError: db.wrapError,
virtualAlignment: 0,
}
gateCfg := controller.GateConfig{
TimeRange: cfg.controlTimeRange(),
Expand Down Expand Up @@ -206,23 +210,23 @@
if err = w.Channel.ValidateSeries(series); err != nil {
return 0, w.wrapError(err)
}
// ok signifies whether w is allowed to write.
dw, err := w.control.Authorize()
if err != nil {
return 0, err
return 0, w.wrapError(err)
}
a = telem.NewAlignmentPair(math.MaxUint32, uint32(w.len(dw.Writer)))
if w.Channel.IsIndex {
w.updateHwm(series)
}
if *w.Persist {
a = telem.NewAlignmentPair(math.MaxUint32, uint32(w.len(dw.Writer)))
_, err = dw.Write(series.Data)
} else {
a = telem.NewAlignmentPair(math.MaxUint32, w.virtualAlignment)
w.virtualAlignment += uint32(series.Len())

Check warning on line 225 in cesium/internal/unary/writer.go

View check run for this annotation

Codecov / codecov/patch

cesium/internal/unary/writer.go#L224-L225

Added lines #L224 - L225 were not covered by tests
}
return a, w.wrapError(err)
}

func (w *Writer) SetPersist(persist bool) { w.Persist = config.Bool(persist) }

func (w *Writer) SetAuthority(a control.Authority) controller.Transfer {
return w.control.SetAuthority(a)
}
Expand Down
37 changes: 35 additions & 2 deletions cesium/internal/virtual/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
"github.com/synnaxlabs/cesium/internal/meta"
"github.com/synnaxlabs/cesium/internal/version"
"github.com/synnaxlabs/x/binary"
"github.com/synnaxlabs/x/config"
"github.com/synnaxlabs/x/control"
"github.com/synnaxlabs/x/errors"
xfs "github.com/synnaxlabs/x/io/fs"
"github.com/synnaxlabs/x/override"
"github.com/synnaxlabs/x/telem"
"github.com/synnaxlabs/x/validate"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -58,8 +62,37 @@
Channel core.Channel
}

func Open(cfg Config) (db *DB, err error) {
c, err := controller.New[*controlEntity](controller.Config{Concurrency: cfg.Channel.Concurrency, Instrumentation: cfg.Instrumentation})
var (
_ config.Config[Config] = Config{}
DefaultConfig = Config{}
)

// Validate implements config.Config.
func (cfg Config) Validate() error {
v := validate.New("cesium.virtual")
validate.NotNil(v, "FS", cfg.FS)
return v.Error()
}

// Override implements config.Config.
func (cfg Config) Override(other Config) Config {
cfg.FS = override.Nil(cfg.FS, other.FS)
if cfg.Channel.Key == 0 {
cfg.Channel = other.Channel
}
cfg.Instrumentation = override.Zero(cfg.Instrumentation, other.Instrumentation)
return cfg
}

func Open(configs ...Config) (db *DB, err error) {
cfg, err := config.New(DefaultConfig, configs...)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap

}

Check warning on line 91 in cesium/internal/virtual/db.go

View check run for this annotation

Codecov / codecov/patch

cesium/internal/virtual/db.go#L90-L91

Added lines #L90 - L91 were not covered by tests
c, err := controller.New[*controlEntity](controller.Config{
Concurrency: control.Shared,
Instrumentation: cfg.Instrumentation,
})
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap

}
Expand Down
4 changes: 3 additions & 1 deletion cesium/internal/virtual/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/synnaxlabs/cesium/internal/virtual"
"github.com/synnaxlabs/x/config"
"github.com/synnaxlabs/x/control"
xfs "github.com/synnaxlabs/x/io/fs"
"github.com/synnaxlabs/x/telem"
. "github.com/synnaxlabs/x/testutil"
)
Expand All @@ -20,6 +21,7 @@ var _ = Describe("Write", func() {
DataType: telem.TimeStampT,
Virtual: true,
},
FS: xfs.NewMem(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only mem FS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FS implementation is still checked for in the config but does not apply to any test cases. Feel free to adjust as you build out future test suites

}))
})
AfterEach(func() {
Expand All @@ -37,7 +39,7 @@ var _ = Describe("Write", func() {
Expect(t.Occurred()).To(BeTrue())
w2, t, err := db.OpenWriter(ctx, virtual.WriterConfig{
Start: 10 * telem.SecondTS,
Authority: control.Absolute,
Authority: control.Absolute - 1,
Subject: control.Subject{Key: "bar"},
ErrOnUnauthorized: config.True(),
})
Expand Down
5 changes: 1 addition & 4 deletions cesium/internal/virtual/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ type Writer struct {
WriterConfig
}

func (db *DB) OpenWriter(
_ context.Context,
cfgs ...WriterConfig,
) (w *Writer, transfer controller.Transfer, err error) {
func (db *DB) OpenWriter(_ context.Context, cfgs ...WriterConfig) (w *Writer, transfer controller.Transfer, err error) {
if db.closed.Load() {
err = dbClosed
return nil, transfer, db.wrapError(err)
Expand Down
19 changes: 0 additions & 19 deletions cesium/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,6 @@ func (w *Writer) Error() error {
return errors.New(unexpectedSteamClosure)
}

func (w *Writer) SetMode(mode WriterMode) bool {
if w.closed || w.hasAccumulatedErr {
return false
}
select {
case <-w.responses.Outlet():
w.hasAccumulatedErr = true
return false
case w.requests.Inlet() <- WriterRequest{Command: WriterSetMode, Config: WriterConfig{Mode: mode}}:
}
for res := range w.responses.Outlet() {
if res.Command == WriterSetMode {
return res.Ack
}
}
w.logger.DPanic(unexpectedSteamClosure)
return false
}

func (w *Writer) Close() (err error) {
if w.closed {
return nil
Expand Down
15 changes: 0 additions & 15 deletions cesium/writer_behavior_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,21 +1035,6 @@ var _ = Describe("Writer Behavior", func() {
Expect(frame.Series).To(HaveLen(0))
tsFrame := MustSucceed(db.Read(ctx, telem.TimeRangeMax, basic1Index))
Expect(tsFrame.Series).To(HaveLen(0))

By("Using SetMode to change the mode to persist")
Expect(w.SetMode(cesium.WriterPersistStream)).To(BeTrue())
ok = w.Write(cesium.NewFrame(
[]cesium.ChannelKey{basic1Index, basic1},
[]telem.Series{
telem.NewSecondsTSV(10, 11, 12, 13),
telem.NewSeriesV[int64](1, 2, 3, 4),
}),
)
Expect(ok).To(BeTrue())
end, ok = w.Commit()
Expect(ok).To(BeTrue())
Expect(end).To(Equal(13*telem.SecondTS + 1))

Expect(w.Close()).To(Succeed())
})
})
Expand Down
24 changes: 2 additions & 22 deletions cesium/writer_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import (
"context"
"fmt"
"github.com/synnaxlabs/cesium/internal/controller"
"github.com/synnaxlabs/cesium/internal/core"
"github.com/synnaxlabs/cesium/internal/index"
Expand All @@ -37,9 +36,6 @@
WriterError
// WriterSetAuthority represents a call to Writer.SetAuthority.
WriterSetAuthority
// WriterSetMode sets the operating WriterMode for the Writer. See the WriterMode
// documentation for more.
WriterSetMode
)

// WriterRequest is a request containing an arrow.Record to write to the DB.
Expand Down Expand Up @@ -126,8 +122,8 @@
}

func (w *streamWriter) process(ctx context.Context, req WriterRequest) {
if req.Command < WriterWrite || req.Command > WriterSetMode {
panic(fmt.Sprintf("invalid command %v", req.Command))
if req.Command < WriterWrite || req.Command > WriterSetAuthority {
return

Check warning on line 126 in cesium/writer_stream.go

View check run for this annotation

Codecov / codecov/patch

cesium/writer_stream.go#L126

Added line #L126 was not covered by tests
}
if req.Command == WriterError {
w.seqNum++
Expand All @@ -141,12 +137,6 @@
w.sendRes(req, true, nil, 0)
return
}
if req.Command == WriterSetMode {
w.seqNum++
w.setMode(req.Config)
w.sendRes(req, true, nil, 0)
return
}
if w.err != nil {
w.seqNum++
w.sendRes(req, false, nil, 0)
Expand Down Expand Up @@ -216,16 +206,6 @@
}
}

func (w *streamWriter) setMode(cfg WriterConfig) {
persist := cfg.Mode < WriterStreamOnly
for _, idx := range w.internal {
for _, chW := range idx.internal {
chW.SetPersist(persist)
}
}
w.Mode = cfg.Mode
}

func (w *streamWriter) sendRes(req WriterRequest, ack bool, err error, end telem.TimeStamp) {
w.Out.Inlet() <- WriterResponse{
Command: req.Command,
Expand Down
6 changes: 3 additions & 3 deletions client/cpp/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"files.associations": {
"memory": "cpp"
}
"files.associations": {
"memory": "cpp"
}
}
3 changes: 2 additions & 1 deletion client/cpp/auth/auth.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
#pragma once

#include <string>

#include "client/cpp/errors/errors.h"
#include "freighter/cpp/freighter.h"
#include "synnax/pkg/api/grpc/v1/synnax/pkg/api/grpc/v1/auth.pb.h"
#include "client/cpp/errors/errors.h"

/// Auth meta data key. NOTE: This must be lowercase, GRPC will panic on capitalized
/// or uppercase keys.
Expand Down
65 changes: 5 additions & 60 deletions client/cpp/auth/auth_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
// License, use of this software will be governed by the Apache License, Version 2.0,
// included in the file licenses/APL.txt.

#include <gtest/gtest.h>
#include <memory>
#include "client/cpp/auth/auth.h"
#include "synnax/pkg/api/grpc/v1/synnax/pkg/api/grpc/v1/auth.pb.h"

#include <memory>
#include <gtest/gtest.h>

#include "freighter/cpp/mock/mock.h"
#include "synnax/pkg/api/grpc/v1/synnax/pkg/api/grpc/v1/auth.pb.h"

/// @brief it should correctly authenticate with a Synnax cluster.
TEST(TestAuth, testLoginHappyPath) {
Expand Down Expand Up @@ -54,61 +56,4 @@ TEST(TestAuth, testLoginInvalidCredentials) {
auto [r, err] = mock_client.send("", v);
EXPECT_TRUE(err) << err.message();
EXPECT_TRUE(err.matches(synnax::INVALID_CREDENTIALS));
}

/// @brief it should retry authentication if the authentication token is invalid.
TEST(TestAuth, testLoginRetry) {
auto res = api::v1::LoginResponse();
res.set_token("abc");
auto mock_login_client = std::make_unique<MockUnaryClient<
api::v1::LoginRequest,
api::v1::LoginResponse
>>(
std::vector<api::v1::LoginResponse>{res, res},
std::vector<freighter::Error>{freighter::NIL, freighter::NIL}
);
auto mw = std::make_shared<AuthMiddleware>(
std::move(mock_login_client),
"synnax",
"seldon",
3
);
auto mock_client = MockUnaryClient<int, int>{
{1, 1},
{freighter::Error(synnax::INVALID_TOKEN, ""), freighter::NIL}
};
mock_client.use(mw);
auto v = 1;
auto [r, err] = mock_client.send("", v);
EXPECT_FALSE(err) << err.message();
EXPECT_TRUE(err.matches(freighter::NIL));
}

/// @brief it should return an invalid token error if the maximum number of retries is exceeded.
TEST(TestAuth, testExceedMaxRetries) {
auto res = api::v1::LoginResponse();
res.set_token("abc");
auto mock_login_client = std::make_unique<MockUnaryClient<
api::v1::LoginRequest,
api::v1::LoginResponse
>>(
std::vector<api::v1::LoginResponse>{res, res, res},
std::vector<freighter::Error>{freighter::NIL, freighter::NIL, freighter::NIL}
);
auto mw = std::make_shared<AuthMiddleware>(
std::move(mock_login_client),
"synnax",
"seldon",
2
);
auto invalid_token = freighter::Error(synnax::INVALID_TOKEN, "");
auto mock_client = MockUnaryClient<int, int>{
{1, 1, 1},
{invalid_token, invalid_token, invalid_token}
};
mock_client.use(mw);
auto v = 1;
auto [r, err] = mock_client.send("", v);
EXPECT_TRUE(err) << err.message();
EXPECT_TRUE(err.matches(synnax::INVALID_TOKEN));
}
Loading
Loading