diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..a62d4f1c Binary files /dev/null and b/.DS_Store differ diff --git a/go.mod b/go.mod index 5668f8a6..72d93f0d 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tetratelabs/wazero v1.0.0-pre.4 github.com/thejerf/suture/v4 v4.0.2 - github.com/wetware/casm v0.0.0-20230103165725-c29acf614f84 + github.com/wetware/casm v0.0.0-20230111190411-32f57045e0d4 golang.org/x/sync v0.1.0 gopkg.in/alexcesaro/statsd.v2 v2.0.0 ) diff --git a/go.sum b/go.sum index 04cff7f2..84ac062a 100644 --- a/go.sum +++ b/go.sum @@ -466,8 +466,8 @@ github.com/urfave/cli/v2 v2.23.7/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6f github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= -github.com/wetware/casm v0.0.0-20230103165725-c29acf614f84 h1:++13uCsaHApqYq4llKyKNYQM6qtjrTAYqLlvGNpWrfs= -github.com/wetware/casm v0.0.0-20230103165725-c29acf614f84/go.mod h1:hNSpIZKBzkQtZBw3D17iznaMb+7dA52pbfYscE4T1Qc= +github.com/wetware/casm v0.0.0-20230111190411-32f57045e0d4 h1:vJA/XG+wrvMqT6Swy+0LogetHXMxRyuPTYzN/QOdpNE= +github.com/wetware/casm v0.0.0-20230111190411-32f57045e0d4/go.mod h1:QAPoA9c6Sv7akZEGAby3mmCGKpBucwAyTjHgJ4tWj7U= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= diff --git a/pkg/csp/chan.go b/pkg/csp/chan.go index 2648931e..fd7c1b66 100644 --- a/pkg/csp/chan.go +++ b/pkg/csp/chan.go @@ -7,8 +7,6 @@ import ( "errors" "capnproto.org/go/capnp/v3" - "capnproto.org/go/capnp/v3/exp/clock" - "capnproto.org/go/capnp/v3/flowcontrol/bbr" casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/util/stream" "github.com/wetware/ww/internal/api/channel" @@ -209,7 +207,7 @@ func (s Sender) Send(ctx context.Context, v Value) (casm.Future, capnp.ReleaseFu // for a given sender. func (s Sender) NewStream(ctx context.Context) SendStream { sender := channel.Sender(s) - sender.SetFlowLimiter(bbr.NewLimiter(clock.System)) + // TODO: use BBR once scheduler bug is fixed return SendStream{ ctx: ctx, diff --git a/pkg/pubsub/topic.go b/pkg/pubsub/topic.go index 7697e1ec..b18b9608 100644 --- a/pkg/pubsub/topic.go +++ b/pkg/pubsub/topic.go @@ -4,8 +4,6 @@ import ( "context" "capnproto.org/go/capnp/v3" - "capnproto.org/go/capnp/v3/exp/clock" - "capnproto.org/go/capnp/v3/flowcontrol/bbr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/lthibault/log" @@ -57,7 +55,7 @@ func (t Topic) Publish(ctx context.Context, b []byte) error { // through a flow-controlled channel. This will override the existing // FlowLimiter. func (t Topic) NewStream(ctx context.Context) Stream { - api.Topic(t).SetFlowLimiter(bbr.NewLimiter(clock.System)) + // TODO: use BBR once scheduler bug is fixed cherr := make(chan error, 1) done := make(chan struct{}) @@ -235,7 +233,7 @@ func (t topicServer) Subscribe(ctx context.Context, call MethodSubscribe) error defer sub.Cancel() sender := call.Args().Chan() - sender.SetFlowLimiter(bbr.NewLimiter(clock.System)) + // TODO: use BBR once scheduler bug is fixed t.log.Debug("registered subscription handler") defer t.log.Debug("unregistered subscription handler")