diff --git a/alamos/py/pyproject.toml b/alamos/py/pyproject.toml index e6c2ba1af..1b2a45a70 100644 --- a/alamos/py/pyproject.toml +++ b/alamos/py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "alamos" -version = "0.29.0" +version = "0.30.0" description = "" authors = ["Emiliano Bonilla "] readme = "README.md" diff --git a/alamos/ts/package.json b/alamos/ts/package.json index c69521908..e8622d27d 100644 --- a/alamos/ts/package.json +++ b/alamos/ts/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/alamos", - "version": "0.29.0", + "version": "0.30.0", "type": "module", "description": "Distributed instrumentation for Synnax", "repository": "https://github.com/synnaxlabs/synnax/tree/main/freighter/ts", diff --git a/client/cpp/framer/streamer.cpp b/client/cpp/framer/streamer.cpp index 8e812e431..7e0cc0ca4 100644 --- a/client/cpp/framer/streamer.cpp +++ b/client/cpp/framer/streamer.cpp @@ -28,7 +28,9 @@ std::pair FrameClient::openStreamer( auto req = api::v1::FrameStreamerRequest(); config.toProto(req); auto exc2 = s->send(req); - return {Streamer(std::move(s)), exc2}; + if (exc2) return {Streamer(std::move(s)), exc2}; + auto [_, resExc] = s->receive(); + return {Streamer(std::move(s)), resExc}; } Streamer::Streamer(std::unique_ptr s) : stream(std::move(s)) { diff --git a/client/py/pyproject.toml b/client/py/pyproject.toml index 5ae03d973..28c0f3e4d 100644 --- a/client/py/pyproject.toml +++ b/client/py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "synnax" -version = "0.29.0" +version = "0.30.0" description = "Synnax Client Library" keywords = ["Synnax", "Synnax Python Client"] authors = ["emiliano bonilla "] diff --git a/client/py/synnax/control/controller.py b/client/py/synnax/control/controller.py index 0a135ca50..34c6edec2 100644 --- a/client/py/synnax/control/controller.py +++ b/client/py/synnax/control/controller.py @@ -10,7 +10,7 @@ from __future__ import annotations from collections.abc import Callable -from threading import Event +from threading import Event, Lock from typing import Any, Protocol, overload from asyncio import create_task, Future @@ -281,13 +281,13 @@ def _internal_wait_until( raise ValueError("First argument to wait_until must be a callable.") processor = WaitUntil(cond, reverse) try: - self._receiver.processors.add(processor) + self._receiver.add_processor(processor) timeout_seconds = ( TimeSpan.from_seconds(timeout).seconds if timeout else None ) ok = processor.event.wait(timeout=timeout_seconds) finally: - self._receiver.processors.remove(processor) + self._receiver.remove_processor(processor) if processor.exc: raise processor.exc return ok @@ -458,6 +458,7 @@ class _Receiver(AsyncThread): client: framer.Client streamer: framer.AsyncStreamer processors: set[Processor] + processor_lock: Lock retriever: ChannelRetriever controller: Controller startup_ack: Event @@ -475,12 +476,22 @@ def __init__( self.client = client self.state = dict() self.controller = controller + self.processor_lock = Lock() self.startup_ack = Event() self.processors = set() + def add_processor(self, processor: Processor): + with self.processor_lock: + self.processors.add(processor) + + def remove_processor(self, processor: Processor): + with self.processor_lock: + self.processors.remove(processor) + def _process(self): - for p in self.processors: - p.process(self.controller) + with self.processor_lock: + for p in self.processors: + p.process(self.controller) async def _listen_for_close(self): await self.shutdown_future diff --git a/client/py/synnax/framer/streamer.py b/client/py/synnax/framer/streamer.py index af9b6b7c2..33d3b735d 100644 --- a/client/py/synnax/framer/streamer.py +++ b/client/py/synnax/framer/streamer.py @@ -70,6 +70,10 @@ def __init__( def __open(self): self._stream.send(_Request(keys=self._adapter.keys)) + _, exc = self._stream.receive() + if exc is not None: + raise exc + @overload def read(self, timeout: float | int | TimeSpan) -> Frame | None: @@ -201,6 +205,9 @@ def __init__( async def _open(self): self._stream = await self._client.stream(_ENDPOINT, _Request, _Response) await self._stream.send(_Request(keys=self._adapter.keys)) + _, exc = await self._stream.receive() + if exc is not None: + raise exc @property def received(self) -> bool: diff --git a/client/py/tests/test_control.py b/client/py/tests/test_control.py index f8465c4c5..ec5d8d9a3 100644 --- a/client/py/tests/test_control.py +++ b/client/py/tests/test_control.py @@ -365,7 +365,6 @@ def test_controller_channel_not_found(self, client: sy.Synnax): v = auto[press_en.key] assert v is None - @pytest.mark.focus def test_controller_set_authority_mechanisms(self, client: sy.Synnax): press_end_cmd_time, press_en_cmd, press_en, daq_time = create_valve_set(client) diff --git a/client/py/tests/test_framer.py b/client/py/tests/test_framer.py index 6ca3ebcc5..e643c6270 100644 --- a/client/py/tests/test_framer.py +++ b/client/py/tests/test_framer.py @@ -403,6 +403,14 @@ def test_open_streamer_no_channels(self, client: sy.Synnax): with client.open_streamer([]): pass + + @pytest.mark.focus + def test_open_streamer_channel_not_found(self, client: sy.Synnax): + """Should throw an exception when a streamer is opened with an unknown channel""" + with pytest.raises(sy.NotFoundError): + with client.open_streamer([123]): + pass + def test_update_channels(self, channel: sy.Channel, client: sy.Synnax): """Should update the list of channels to stream""" with client.open_streamer([]) as s: diff --git a/client/py/tests/test_timing.py b/client/py/tests/test_timing.py index 67781052f..fdf1fef25 100644 --- a/client/py/tests/test_timing.py +++ b/client/py/tests/test_timing.py @@ -39,7 +39,6 @@ def test_sleep(self): assert sum(accumulated_precise) < sum(accumulated_standard) - @pytest.mark.focus def test_sleep_rate(self): """Should sleep correctly based on a rate argument """ diff --git a/client/ts/package.json b/client/ts/package.json index 1171033f1..39d50805e 100644 --- a/client/ts/package.json +++ b/client/ts/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/client", - "version": "0.29.0", + "version": "0.30.0", "description": "The Synnax Client Library", "keywords": [ "synnax", diff --git a/client/ts/src/framer/streamer.spec.ts b/client/ts/src/framer/streamer.spec.ts index d062629c4..420ac92db 100644 --- a/client/ts/src/framer/streamer.spec.ts +++ b/client/ts/src/framer/streamer.spec.ts @@ -47,4 +47,7 @@ describe("Streamer", () => { it("should not throw an error when the streamer is opened with zero channels", async () => { await expect(client.openStreamer([])).resolves.not.toThrow(); }); + it("should throw an error when the streamer is opened with a channel that does not exist", async () => { + await expect(client.openStreamer([5678])).rejects.toThrow("not found"); + }); }); diff --git a/client/ts/src/framer/streamer.ts b/client/ts/src/framer/streamer.ts index 69df5c136..88f02eeb0 100644 --- a/client/ts/src/framer/streamer.ts +++ b/client/ts/src/framer/streamer.ts @@ -55,6 +55,8 @@ export class Streamer implements AsyncIterator, AsyncIterable { const stream = await client.stream(ENDPOINT, reqZ, resZ); const streamer = new Streamer(stream, adapter); stream.send({ keys: adapter.keys }); + const [, err] = await stream.receive(); + if (err != null) throw err; return streamer; } diff --git a/console/package.json b/console/package.json index 406f81f4d..df135fc68 100644 --- a/console/package.json +++ b/console/package.json @@ -1,7 +1,7 @@ { "name": "@synnaxlabs/console", "private": true, - "version": "0.29.1", + "version": "0.30.0", "type": "module", "scripts": { "dev": "tauri dev", diff --git a/drift/package.json b/drift/package.json index 92b62e83c..f09c321cd 100644 --- a/drift/package.json +++ b/drift/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/drift", - "version": "0.29.0", + "version": "0.30.0", "description": "State synchronization and Redux state synchronization for Tauri Apps", "repository": "https://github.com/synnaxlabs/synnax/tree/main/drift", "type": "module", diff --git a/freighter/py/pyproject.toml b/freighter/py/pyproject.toml index 450c679c1..152aa9c71 100644 --- a/freighter/py/pyproject.toml +++ b/freighter/py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "synnax-freighter" -version = "0.29.0" +version = "0.30.0" description = "" authors = ["emiliano bonilla "] packages = [ diff --git a/freighter/ts/package.json b/freighter/ts/package.json index 0ca5e37d3..fc2012c4d 100644 --- a/freighter/ts/package.json +++ b/freighter/ts/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/freighter", - "version": "0.29.0", + "version": "0.30.0", "type": "module", "description": "a modular transport abstraction", "repository": "https://github.com/synnaxlabs/synnax/tree/main/freighter/ts", diff --git a/package.json b/package.json index a80df35a6..30a96b628 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "version": "0.29.0", + "version": "0.30.0", "private": true, "scripts": { "build": "npx turbo build --cache-dir=./.turbo-cache", diff --git a/pluto/package.json b/pluto/package.json index 99e0485a8..5a628fa69 100644 --- a/pluto/package.json +++ b/pluto/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/pluto", - "version": "0.29.0", + "version": "0.30.0", "type": "module", "scripts": { "build": "tsc --noEmit && vite build", diff --git a/synnax/pkg/api/framer.go b/synnax/pkg/api/framer.go index 1c8e47954..4289cb980 100644 --- a/synnax/pkg/api/framer.go +++ b/synnax/pkg/api/framer.go @@ -178,7 +178,10 @@ func (s *FrameService) openStreamer(ctx context.Context, stream StreamerStream) return nil, err } reader, err := s.Internal.NewStreamer(ctx, framer.StreamerConfig{Keys: req.Keys}) - return reader, err + if err != nil { + return nil, err + } + return reader, stream.Send(framer.StreamerResponse{}) } type FrameWriterConfig struct { diff --git a/synnax/pkg/distribution/framer/relay/relay.go b/synnax/pkg/distribution/framer/relay/relay.go index 2b5fc61ac..d7a9da3f7 100644 --- a/synnax/pkg/distribution/framer/relay/relay.go +++ b/synnax/pkg/distribution/framer/relay/relay.go @@ -11,6 +11,7 @@ package relay import ( "fmt" + "github.com/synnaxlabs/synnax/pkg/distribution/channel" "time" "github.com/synnaxlabs/alamos" @@ -27,10 +28,11 @@ import ( type Config struct { alamos.Instrumentation - Transport Transport - HostResolver core.HostResolver - TS *ts.DB - FreeWrites confluence.Outlet[Response] + Transport Transport + HostResolver core.HostResolver + TS *ts.DB + FreeWrites confluence.Outlet[Response] + ChannelReader channel.Readable } var ( @@ -45,6 +47,7 @@ func (c Config) Override(other Config) Config { c.HostResolver = override.Nil(c.HostResolver, other.HostResolver) c.TS = override.Nil(c.TS, other.TS) c.FreeWrites = override.Nil(c.FreeWrites, other.FreeWrites) + c.ChannelReader = override.Nil(c.ChannelReader, other.ChannelReader) return c } @@ -55,10 +58,12 @@ func (c Config) Validate() error { validate.NotNil(v, "HostProvider", c.HostResolver) validate.NotNil(v, "TS", c.TS) validate.NotNil(v, "FreeWrites", c.FreeWrites) + validate.NotNil(v, "ChannelReader", c.ChannelReader) return v.Error() } type Relay struct { + cfg Config ins alamos.Instrumentation delta *confluence.DynamicDeltaMultiplier[Response] demands confluence.Inlet[demand] @@ -75,7 +80,7 @@ func Open(configs ...Config) (*Relay, error) { return nil, err } - r := &Relay{ins: cfg.Instrumentation} + r := &Relay{cfg: cfg, ins: cfg.Instrumentation} tpr := newTapper(cfg) demands := confluence.NewStream[demand](defaultBuffer) diff --git a/synnax/pkg/distribution/framer/relay/relay_suite_test.go b/synnax/pkg/distribution/framer/relay/relay_suite_test.go index b13848ba8..192123510 100644 --- a/synnax/pkg/distribution/framer/relay/relay_suite_test.go +++ b/synnax/pkg/distribution/framer/relay/relay_suite_test.go @@ -75,6 +75,7 @@ func provision(n int) (*mock.CoreBuilder, map[core.NodeKey]serviceContainer) { TS: c.Storage.TS, Transport: relayNet.New(c.Config.AdvertiseAddress), HostResolver: c.Cluster, + ChannelReader: container.channel, FreeWrites: freeWrites, })) container.writer = MustSucceed(writer.OpenService(writer.ServiceConfig{ diff --git a/synnax/pkg/distribution/framer/relay/relay_test.go b/synnax/pkg/distribution/framer/relay/relay_test.go index 3ccfc4096..47ea95acf 100644 --- a/synnax/pkg/distribution/framer/relay/relay_test.go +++ b/synnax/pkg/distribution/framer/relay/relay_test.go @@ -24,6 +24,7 @@ import ( "github.com/synnaxlabs/x/confluence" "github.com/synnaxlabs/x/errors" xio "github.com/synnaxlabs/x/io" + "github.com/synnaxlabs/x/query" "github.com/synnaxlabs/x/signal" "github.com/synnaxlabs/x/telem" . "github.com/synnaxlabs/x/testutil" @@ -99,6 +100,19 @@ var _ = Describe("Relay", func() { }) } }) + Describe("Errors", func() { + It("Should raise an error if a channel is not found", func() { + builder, services := provision(1) + defer func() { + Expect(builder.Close()).To(Succeed()) + }() + svc := services[1] + _, err := svc.relay.NewStreamer(context.TODO(), relay.StreamerConfig{ + Keys: []channel.Key{12345}, + }) + Expect(err).To(HaveOccurredAs(query.NotFound)) + }) + }) }) func newChannelSet() []channel.Channel { diff --git a/synnax/pkg/distribution/framer/relay/streamer.go b/synnax/pkg/distribution/framer/relay/streamer.go index a7fbf2707..575811f8d 100644 --- a/synnax/pkg/distribution/framer/relay/streamer.go +++ b/synnax/pkg/distribution/framer/relay/streamer.go @@ -11,6 +11,7 @@ package relay import ( "context" + "github.com/samber/lo" "github.com/synnaxlabs/synnax/pkg/distribution/channel" "github.com/synnaxlabs/x/address" @@ -31,13 +32,15 @@ type streamer struct { type StreamerConfig = Request -func (r *Relay) NewStreamer(_ context.Context, cfg StreamerConfig) (Streamer, error) { - return &streamer{ - keys: lo.Uniq(cfg.Keys), - addr: address.Rand(), - demands: r.demands, - relay: r, - }, nil +func (r *Relay) NewStreamer(ctx context.Context, cfg StreamerConfig) (Streamer, error) { + keys := lo.Uniq(cfg.Keys) + // Check that all keys exist. + if err := r.cfg.ChannelReader. + NewRetrieve(). + WhereKeys(keys...).Exec(ctx, nil); err != nil { + return nil, err + } + return &streamer{keys: keys, addr: address.Rand(), demands: r.demands, relay: r}, nil } // Flow implements confluence.Flow. diff --git a/synnax/pkg/distribution/framer/service.go b/synnax/pkg/distribution/framer/service.go index a1427b061..cb14d0266 100644 --- a/synnax/pkg/distribution/framer/service.go +++ b/synnax/pkg/distribution/framer/service.go @@ -116,6 +116,7 @@ func Open(configs ...Config) (*Service, error) { freeWrites := confluence.NewStream[relay.Response](25) s.Relay, err = relay.Open(relay.Config{ Instrumentation: cfg.Instrumentation.Child("Relay"), + ChannelReader: cfg.ChannelReader, TS: cfg.TS, HostResolver: cfg.HostResolver, Transport: cfg.Transport.Relay(), diff --git a/synnax/pkg/version/VERSION b/synnax/pkg/version/VERSION index ae6dd4e20..c25c8e5b7 100644 --- a/synnax/pkg/version/VERSION +++ b/synnax/pkg/version/VERSION @@ -1 +1 @@ -0.29.0 +0.30.0 diff --git a/x/ts/package.json b/x/ts/package.json index 667edb987..43d491a20 100644 --- a/x/ts/package.json +++ b/x/ts/package.json @@ -1,6 +1,6 @@ { "name": "@synnaxlabs/x", - "version": "0.29.0", + "version": "0.30.0", "type": "module", "description": "Common Utilities for Synnax Labs", "repository": "https://github.com/synnaxlabs/synnax/tree/main/x/go",