Skip to content

Commit

Permalink
Merge pull request #837 from synnaxlabs/sy-1245-fix-race-condition-in…
Browse files Browse the repository at this point in the history
…-auto-controller

Sy 1245 fix race condition in auto controller
  • Loading branch information
Lham42 committed Sep 26, 2024
2 parents 37e0fd9 + d7e5039 commit 890b5c6
Show file tree
Hide file tree
Showing 26 changed files with 91 additions and 33 deletions.
2 changes: 1 addition & 1 deletion alamos/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "alamos"
version = "0.29.0"
version = "0.30.0"
description = ""
authors = ["Emiliano Bonilla <ebonilla@synnaxlabs.com>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion alamos/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/alamos",
"version": "0.29.0",
"version": "0.30.0",
"type": "module",
"description": "Distributed instrumentation for Synnax",
"repository": "https://github.com/synnaxlabs/synnax/tree/main/freighter/ts",
Expand Down
4 changes: 3 additions & 1 deletion client/cpp/framer/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ std::pair<Streamer, freighter::Error> FrameClient::openStreamer(
auto req = api::v1::FrameStreamerRequest();
config.toProto(req);
auto exc2 = s->send(req);
return {Streamer(std::move(s)), exc2};
if (exc2) return {Streamer(std::move(s)), exc2};
auto [_, resExc] = s->receive();
return {Streamer(std::move(s)), resExc};
}

Streamer::Streamer(std::unique_ptr<StreamerStream> s) : stream(std::move(s)) {
Expand Down
2 changes: 1 addition & 1 deletion client/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "synnax"
version = "0.29.0"
version = "0.30.0"
description = "Synnax Client Library"
keywords = ["Synnax", "Synnax Python Client"]
authors = ["emiliano bonilla <emilbon99@gmail.com>"]
Expand Down
21 changes: 16 additions & 5 deletions client/py/synnax/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from __future__ import annotations

from collections.abc import Callable
from threading import Event
from threading import Event, Lock
from typing import Any, Protocol, overload
from asyncio import create_task, Future

Expand Down Expand Up @@ -281,13 +281,13 @@ def _internal_wait_until(
raise ValueError("First argument to wait_until must be a callable.")
processor = WaitUntil(cond, reverse)
try:
self._receiver.processors.add(processor)
self._receiver.add_processor(processor)
timeout_seconds = (
TimeSpan.from_seconds(timeout).seconds if timeout else None
)
ok = processor.event.wait(timeout=timeout_seconds)
finally:
self._receiver.processors.remove(processor)
self._receiver.remove_processor(processor)
if processor.exc:
raise processor.exc
return ok
Expand Down Expand Up @@ -458,6 +458,7 @@ class _Receiver(AsyncThread):
client: framer.Client
streamer: framer.AsyncStreamer
processors: set[Processor]
processor_lock: Lock
retriever: ChannelRetriever
controller: Controller
startup_ack: Event
Expand All @@ -475,12 +476,22 @@ def __init__(
self.client = client
self.state = dict()
self.controller = controller
self.processor_lock = Lock()
self.startup_ack = Event()
self.processors = set()

def add_processor(self, processor: Processor):
with self.processor_lock:
self.processors.add(processor)

def remove_processor(self, processor: Processor):
with self.processor_lock:
self.processors.remove(processor)

def _process(self):
for p in self.processors:
p.process(self.controller)
with self.processor_lock:
for p in self.processors:
p.process(self.controller)

async def _listen_for_close(self):
await self.shutdown_future
Expand Down
7 changes: 7 additions & 0 deletions client/py/synnax/framer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def __init__(

def __open(self):
self._stream.send(_Request(keys=self._adapter.keys))
_, exc = self._stream.receive()
if exc is not None:
raise exc


@overload
def read(self, timeout: float | int | TimeSpan) -> Frame | None:
Expand Down Expand Up @@ -201,6 +205,9 @@ def __init__(
async def _open(self):
self._stream = await self._client.stream(_ENDPOINT, _Request, _Response)
await self._stream.send(_Request(keys=self._adapter.keys))
_, exc = await self._stream.receive()
if exc is not None:
raise exc

@property
def received(self) -> bool:
Expand Down
1 change: 0 additions & 1 deletion client/py/tests/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ def test_controller_channel_not_found(self, client: sy.Synnax):
v = auto[press_en.key]
assert v is None

@pytest.mark.focus
def test_controller_set_authority_mechanisms(self, client: sy.Synnax):
press_end_cmd_time, press_en_cmd, press_en, daq_time = create_valve_set(client)

Expand Down
8 changes: 8 additions & 0 deletions client/py/tests/test_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,14 @@ def test_open_streamer_no_channels(self, client: sy.Synnax):
with client.open_streamer([]):
pass


@pytest.mark.focus
def test_open_streamer_channel_not_found(self, client: sy.Synnax):
"""Should throw an exception when a streamer is opened with an unknown channel"""
with pytest.raises(sy.NotFoundError):
with client.open_streamer([123]):
pass

def test_update_channels(self, channel: sy.Channel, client: sy.Synnax):
"""Should update the list of channels to stream"""
with client.open_streamer([]) as s:
Expand Down
1 change: 0 additions & 1 deletion client/py/tests/test_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def test_sleep(self):

assert sum(accumulated_precise) < sum(accumulated_standard)

@pytest.mark.focus
def test_sleep_rate(self):
"""Should sleep correctly based on a rate argument
"""
Expand Down
2 changes: 1 addition & 1 deletion client/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/client",
"version": "0.29.0",
"version": "0.30.0",
"description": "The Synnax Client Library",
"keywords": [
"synnax",
Expand Down
3 changes: 3 additions & 0 deletions client/ts/src/framer/streamer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ describe("Streamer", () => {
it("should not throw an error when the streamer is opened with zero channels", async () => {
await expect(client.openStreamer([])).resolves.not.toThrow();
});
it("should throw an error when the streamer is opened with a channel that does not exist", async () => {
await expect(client.openStreamer([5678])).rejects.toThrow("not found");
});
});
2 changes: 2 additions & 0 deletions client/ts/src/framer/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export class Streamer implements AsyncIterator<Frame>, AsyncIterable<Frame> {
const stream = await client.stream(ENDPOINT, reqZ, resZ);
const streamer = new Streamer(stream, adapter);
stream.send({ keys: adapter.keys });
const [, err] = await stream.receive();
if (err != null) throw err;
return streamer;
}

Expand Down
2 changes: 1 addition & 1 deletion console/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@synnaxlabs/console",
"private": true,
"version": "0.29.1",
"version": "0.30.0",
"type": "module",
"scripts": {
"dev": "tauri dev",
Expand Down
2 changes: 1 addition & 1 deletion drift/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/drift",
"version": "0.29.0",
"version": "0.30.0",
"description": "State synchronization and Redux state synchronization for Tauri Apps",
"repository": "https://github.com/synnaxlabs/synnax/tree/main/drift",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion freighter/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "synnax-freighter"
version = "0.29.0"
version = "0.30.0"
description = ""
authors = ["emiliano bonilla <emilbon99@gmail.com>"]
packages = [
Expand Down
2 changes: 1 addition & 1 deletion freighter/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/freighter",
"version": "0.29.0",
"version": "0.30.0",
"type": "module",
"description": "a modular transport abstraction",
"repository": "https://github.com/synnaxlabs/synnax/tree/main/freighter/ts",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.29.0",
"version": "0.30.0",
"private": true,
"scripts": {
"build": "npx turbo build --cache-dir=./.turbo-cache",
Expand Down
2 changes: 1 addition & 1 deletion pluto/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/pluto",
"version": "0.29.0",
"version": "0.30.0",
"type": "module",
"scripts": {
"build": "tsc --noEmit && vite build",
Expand Down
5 changes: 4 additions & 1 deletion synnax/pkg/api/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func (s *FrameService) openStreamer(ctx context.Context, stream StreamerStream)
return nil, err
}
reader, err := s.Internal.NewStreamer(ctx, framer.StreamerConfig{Keys: req.Keys})
return reader, err
if err != nil {
return nil, err
}
return reader, stream.Send(framer.StreamerResponse{})
}

type FrameWriterConfig struct {
Expand Down
15 changes: 10 additions & 5 deletions synnax/pkg/distribution/framer/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package relay

import (
"fmt"
"github.com/synnaxlabs/synnax/pkg/distribution/channel"
"time"

"github.com/synnaxlabs/alamos"
Expand All @@ -27,10 +28,11 @@ import (

type Config struct {
alamos.Instrumentation
Transport Transport
HostResolver core.HostResolver
TS *ts.DB
FreeWrites confluence.Outlet[Response]
Transport Transport
HostResolver core.HostResolver
TS *ts.DB
FreeWrites confluence.Outlet[Response]
ChannelReader channel.Readable
}

var (
Expand All @@ -45,6 +47,7 @@ func (c Config) Override(other Config) Config {
c.HostResolver = override.Nil(c.HostResolver, other.HostResolver)
c.TS = override.Nil(c.TS, other.TS)
c.FreeWrites = override.Nil(c.FreeWrites, other.FreeWrites)
c.ChannelReader = override.Nil(c.ChannelReader, other.ChannelReader)
return c
}

Expand All @@ -55,10 +58,12 @@ func (c Config) Validate() error {
validate.NotNil(v, "HostProvider", c.HostResolver)
validate.NotNil(v, "TS", c.TS)
validate.NotNil(v, "FreeWrites", c.FreeWrites)
validate.NotNil(v, "ChannelReader", c.ChannelReader)
return v.Error()
}

type Relay struct {
cfg Config
ins alamos.Instrumentation
delta *confluence.DynamicDeltaMultiplier[Response]
demands confluence.Inlet[demand]
Expand All @@ -75,7 +80,7 @@ func Open(configs ...Config) (*Relay, error) {
return nil, err
}

r := &Relay{ins: cfg.Instrumentation}
r := &Relay{cfg: cfg, ins: cfg.Instrumentation}

tpr := newTapper(cfg)
demands := confluence.NewStream[demand](defaultBuffer)
Expand Down
1 change: 1 addition & 0 deletions synnax/pkg/distribution/framer/relay/relay_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func provision(n int) (*mock.CoreBuilder, map[core.NodeKey]serviceContainer) {
TS: c.Storage.TS,
Transport: relayNet.New(c.Config.AdvertiseAddress),
HostResolver: c.Cluster,
ChannelReader: container.channel,
FreeWrites: freeWrites,
}))
container.writer = MustSucceed(writer.OpenService(writer.ServiceConfig{
Expand Down
14 changes: 14 additions & 0 deletions synnax/pkg/distribution/framer/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/synnaxlabs/x/confluence"
"github.com/synnaxlabs/x/errors"
xio "github.com/synnaxlabs/x/io"
"github.com/synnaxlabs/x/query"
"github.com/synnaxlabs/x/signal"
"github.com/synnaxlabs/x/telem"
. "github.com/synnaxlabs/x/testutil"
Expand Down Expand Up @@ -99,6 +100,19 @@ var _ = Describe("Relay", func() {
})
}
})
Describe("Errors", func() {
It("Should raise an error if a channel is not found", func() {
builder, services := provision(1)
defer func() {
Expect(builder.Close()).To(Succeed())
}()
svc := services[1]
_, err := svc.relay.NewStreamer(context.TODO(), relay.StreamerConfig{
Keys: []channel.Key{12345},
})
Expect(err).To(HaveOccurredAs(query.NotFound))
})
})
})

func newChannelSet() []channel.Channel {
Expand Down
17 changes: 10 additions & 7 deletions synnax/pkg/distribution/framer/relay/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package relay

import (
"context"

"github.com/samber/lo"
"github.com/synnaxlabs/synnax/pkg/distribution/channel"
"github.com/synnaxlabs/x/address"
Expand All @@ -31,13 +32,15 @@ type streamer struct {

type StreamerConfig = Request

func (r *Relay) NewStreamer(_ context.Context, cfg StreamerConfig) (Streamer, error) {
return &streamer{
keys: lo.Uniq(cfg.Keys),
addr: address.Rand(),
demands: r.demands,
relay: r,
}, nil
func (r *Relay) NewStreamer(ctx context.Context, cfg StreamerConfig) (Streamer, error) {
keys := lo.Uniq(cfg.Keys)
// Check that all keys exist.
if err := r.cfg.ChannelReader.
NewRetrieve().
WhereKeys(keys...).Exec(ctx, nil); err != nil {
return nil, err
}
return &streamer{keys: keys, addr: address.Rand(), demands: r.demands, relay: r}, nil
}

// Flow implements confluence.Flow.
Expand Down
1 change: 1 addition & 0 deletions synnax/pkg/distribution/framer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func Open(configs ...Config) (*Service, error) {
freeWrites := confluence.NewStream[relay.Response](25)
s.Relay, err = relay.Open(relay.Config{
Instrumentation: cfg.Instrumentation.Child("Relay"),
ChannelReader: cfg.ChannelReader,
TS: cfg.TS,
HostResolver: cfg.HostResolver,
Transport: cfg.Transport.Relay(),
Expand Down
2 changes: 1 addition & 1 deletion synnax/pkg/version/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.29.0
0.30.0
2 changes: 1 addition & 1 deletion x/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/x",
"version": "0.29.0",
"version": "0.30.0",
"type": "module",
"description": "Common Utilities for Synnax Labs",
"repository": "https://github.com/synnaxlabs/synnax/tree/main/x/go",
Expand Down

0 comments on commit 890b5c6

Please sign in to comment.