Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sy 1245 fix race condition in auto controller #837

Merged
merged 10 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/cpp/framer/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ std::pair<Streamer, freighter::Error> FrameClient::openStreamer(
auto req = api::v1::FrameStreamerRequest();
config.toProto(req);
auto exc2 = s->send(req);
return {Streamer(std::move(s)), exc2};
if (exc2) return {Streamer(std::move(s)), exc2};
Lham42 marked this conversation as resolved.
Show resolved Hide resolved
auto [_, resExc] = s->receive();
return {Streamer(std::move(s)), resExc};
}

Streamer::Streamer(std::unique_ptr<StreamerStream> s) : stream(std::move(s)) {
Expand Down
2 changes: 1 addition & 1 deletion client/py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "synnax"
version = "0.29.0"
version = "0.29.1"
description = "Synnax Client Library"
keywords = ["Synnax", "Synnax Python Client"]
authors = ["emiliano bonilla <emilbon99@gmail.com>"]
Expand Down
21 changes: 16 additions & 5 deletions client/py/synnax/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from __future__ import annotations

from collections.abc import Callable
from threading import Event
from threading import Event, Lock
from typing import Any, Protocol, overload
from asyncio import create_task, Future

Expand Down Expand Up @@ -281,13 +281,13 @@ def _internal_wait_until(
raise ValueError("First argument to wait_until must be a callable.")
processor = WaitUntil(cond, reverse)
try:
self._receiver.processors.add(processor)
self._receiver.add_processor(processor)
timeout_seconds = (
TimeSpan.from_seconds(timeout).seconds if timeout else None
)
ok = processor.event.wait(timeout=timeout_seconds)
finally:
self._receiver.processors.remove(processor)
self._receiver.remove_processor(processor)
if processor.exc:
raise processor.exc
return ok
Expand Down Expand Up @@ -458,6 +458,7 @@ class _Receiver(AsyncThread):
client: framer.Client
streamer: framer.AsyncStreamer
processors: set[Processor]
processor_lock: Lock
retriever: ChannelRetriever
controller: Controller
startup_ack: Event
Expand All @@ -475,12 +476,22 @@ def __init__(
self.client = client
self.state = dict()
self.controller = controller
self.processor_lock = Lock()
self.startup_ack = Event()
self.processors = set()

def add_processor(self, processor: Processor):
with self.processor_lock:
self.processors.add(processor)

def remove_processor(self, processor: Processor):
with self.processor_lock:
self.processors.remove(processor)

def _process(self):
for p in self.processors:
p.process(self.controller)
with self.processor_lock:
for p in self.processors:
p.process(self.controller)

async def _listen_for_close(self):
await self.shutdown_future
Expand Down
7 changes: 7 additions & 0 deletions client/py/synnax/framer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def __init__(

def __open(self):
self._stream.send(_Request(keys=self._adapter.keys))
_, exc = self._stream.receive()
if exc is not None:
raise exc


@overload
def read(self, timeout: float | int | TimeSpan) -> Frame | None:
Expand Down Expand Up @@ -201,6 +205,9 @@ def __init__(
async def _open(self):
self._stream = await self._client.stream(_ENDPOINT, _Request, _Response)
await self._stream.send(_Request(keys=self._adapter.keys))
_, exc = await self._stream.receive()
if exc is not None:
raise exc

@property
def received(self) -> bool:
Expand Down
1 change: 0 additions & 1 deletion client/py/tests/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ def test_controller_channel_not_found(self, client: sy.Synnax):
v = auto[press_en.key]
assert v is None

@pytest.mark.focus
def test_controller_set_authority_mechanisms(self, client: sy.Synnax):
press_end_cmd_time, press_en_cmd, press_en, daq_time = create_valve_set(client)

Expand Down
8 changes: 8 additions & 0 deletions client/py/tests/test_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,14 @@ def test_open_streamer_no_channels(self, client: sy.Synnax):
with client.open_streamer([]):
pass


@pytest.mark.focus
def test_open_streamer_channel_not_found(self, client: sy.Synnax):
"""Should throw an exception when a streamer is opened with an unknown channel"""
with pytest.raises(sy.NotFoundError):
with client.open_streamer([123]):
pass

def test_update_channels(self, channel: sy.Channel, client: sy.Synnax):
"""Should update the list of channels to stream"""
with client.open_streamer([]) as s:
Expand Down
1 change: 0 additions & 1 deletion client/py/tests/test_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def test_sleep(self):

assert sum(accumulated_precise) < sum(accumulated_standard)

@pytest.mark.focus
def test_sleep_rate(self):
"""Should sleep correctly based on a rate argument
"""
Expand Down
2 changes: 1 addition & 1 deletion client/ts/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@synnaxlabs/client",
"version": "0.29.0",
"version": "0.29.1",
"description": "The Synnax Client Library",
"keywords": [
"synnax",
Expand Down
3 changes: 3 additions & 0 deletions client/ts/src/framer/streamer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ describe("Streamer", () => {
it("should not throw an error when the streamer is opened with zero channels", async () => {
await expect(client.openStreamer([])).resolves.not.toThrow();
});
it("should throw an error when the streamer is opened with a channel that does not exist", async () => {
await expect(client.openStreamer([5678])).rejects.toThrow("not found");
});
});
2 changes: 2 additions & 0 deletions client/ts/src/framer/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export class Streamer implements AsyncIterator<Frame>, AsyncIterable<Frame> {
const stream = await client.stream(ENDPOINT, reqZ, resZ);
const streamer = new Streamer(stream, adapter);
stream.send({ keys: adapter.keys });
const [, err] = await stream.receive();
if (err != null) throw err;
return streamer;
}

Expand Down
2 changes: 1 addition & 1 deletion console/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@synnaxlabs/console",
"private": true,
"version": "0.29.1",
"version": "0.29.2",
"type": "module",
"scripts": {
"dev": "tauri dev",
Expand Down
5 changes: 4 additions & 1 deletion synnax/pkg/api/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func (s *FrameService) openStreamer(ctx context.Context, stream StreamerStream)
return nil, err
}
reader, err := s.Internal.NewStreamer(ctx, framer.StreamerConfig{Keys: req.Keys})
return reader, err
if err != nil {
return nil, err
}
return reader, stream.Send(framer.StreamerResponse{})
}

type FrameWriterConfig struct {
Expand Down
15 changes: 10 additions & 5 deletions synnax/pkg/distribution/framer/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package relay

import (
"fmt"
"github.com/synnaxlabs/synnax/pkg/distribution/channel"
"time"

"github.com/synnaxlabs/alamos"
Expand All @@ -27,10 +28,11 @@ import (

type Config struct {
alamos.Instrumentation
Transport Transport
HostResolver core.HostResolver
TS *ts.DB
FreeWrites confluence.Outlet[Response]
Transport Transport
HostResolver core.HostResolver
TS *ts.DB
FreeWrites confluence.Outlet[Response]
ChannelReader channel.Readable
}

var (
Expand All @@ -45,6 +47,7 @@ func (c Config) Override(other Config) Config {
c.HostResolver = override.Nil(c.HostResolver, other.HostResolver)
c.TS = override.Nil(c.TS, other.TS)
c.FreeWrites = override.Nil(c.FreeWrites, other.FreeWrites)
c.ChannelReader = override.Nil(c.ChannelReader, other.ChannelReader)
return c
}

Expand All @@ -55,10 +58,12 @@ func (c Config) Validate() error {
validate.NotNil(v, "HostProvider", c.HostResolver)
validate.NotNil(v, "TS", c.TS)
validate.NotNil(v, "FreeWrites", c.FreeWrites)
validate.NotNil(v, "ChannelReader", c.ChannelReader)
return v.Error()
}

type Relay struct {
cfg Config
ins alamos.Instrumentation
delta *confluence.DynamicDeltaMultiplier[Response]
demands confluence.Inlet[demand]
Expand All @@ -75,7 +80,7 @@ func Open(configs ...Config) (*Relay, error) {
return nil, err
}

r := &Relay{ins: cfg.Instrumentation}
r := &Relay{cfg: cfg, ins: cfg.Instrumentation}

tpr := newTapper(cfg)
demands := confluence.NewStream[demand](defaultBuffer)
Expand Down
1 change: 1 addition & 0 deletions synnax/pkg/distribution/framer/relay/relay_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func provision(n int) (*mock.CoreBuilder, map[core.NodeKey]serviceContainer) {
TS: c.Storage.TS,
Transport: relayNet.New(c.Config.AdvertiseAddress),
HostResolver: c.Cluster,
ChannelReader: container.channel,
FreeWrites: freeWrites,
}))
container.writer = MustSucceed(writer.OpenService(writer.ServiceConfig{
Expand Down
14 changes: 7 additions & 7 deletions synnax/pkg/distribution/framer/relay/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ type streamer struct {

type StreamerConfig = Request

func (r *Relay) NewStreamer(_ context.Context, cfg StreamerConfig) (Streamer, error) {
return &streamer{
keys: lo.Uniq(cfg.Keys),
addr: address.Rand(),
demands: r.demands,
relay: r,
}, nil
func (r *Relay) NewStreamer(ctx context.Context, cfg StreamerConfig) (Streamer, error) {
keys := lo.Uniq(cfg.Keys)
// Check that all keys exist.
if err := r.cfg.ChannelReader.NewRetrieve().WhereKeys(keys...).Exec(ctx, nil); err != nil {
Lham42 marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
return &streamer{keys: keys, addr: address.Rand(), demands: r.demands, relay: r}, nil
}

// Flow implements confluence.Flow.
Expand Down
1 change: 1 addition & 0 deletions synnax/pkg/distribution/framer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func Open(configs ...Config) (*Service, error) {
freeWrites := confluence.NewStream[relay.Response](25)
s.Relay, err = relay.Open(relay.Config{
Instrumentation: cfg.Instrumentation.Child("Relay"),
ChannelReader: cfg.ChannelReader,
TS: cfg.TS,
HostResolver: cfg.HostResolver,
Transport: cfg.Transport.Relay(),
Expand Down
2 changes: 1 addition & 1 deletion synnax/pkg/version/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.29.0
0.29.1
Loading