Skip to content

Commit

Permalink
Subscriptions: bi-directional subscription streaming- subscribe on no…
Browse files Browse the repository at this point in the history
… healthz. (dapr#7757)

* Subscriptions: bi-directional subscription & publish streaming.

Adds SubscribeTopicEvents proto API which dynamically subscribes to
pubsub topics based on dapr/proposals#52.

This is a basic gRPC implementation of the API whereby, like
Subscription hot-reloading today, subscribing to a topic will reload
_every_ active subscription for the current daprd. In a future PR,
reloading of Subscriptions will be granular to the specific pubsub
topic.

Stream subscriptions are also only active once daprd declares the
application as both present and ready. Dynamic stream subscriptions
should be active both whether a app is running or not, as well as
whether it is ready or not. This will be addressed in a future PR.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Subscriptions: bi-directional subscription streaming- subscribe on no
healthz.

Refactors pubsub machinery to allow for gRPC bi-directional subcription
streaming when there is no application, or the application in unhealhty.

dapr/proposals#52

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix unit tests

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix subscription allowed

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds review comments

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Signed-off-by: Jake Engelberg <jake@diagrid.io>
  • Loading branch information
3 people authored and jake-engelberg committed Sep 20, 2024
1 parent 001018c commit 674a6f4
Show file tree
Hide file tree
Showing 44 changed files with 3,755 additions and 3,529 deletions.
15 changes: 11 additions & 4 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/dapr/dapr/pkg/encryption"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/outbox"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand Down Expand Up @@ -88,6 +89,8 @@ type api struct {
directMessaging invokev1.DirectMessaging
channels *channels.Channels
pubsubAdapter runtimePubsub.Adapter
pubsubAdapterStreamer runtimePubsub.AdapterStreamer
outbox outbox.Outbox
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
tracingSpec config.TracingSpec
accessControlList *config.AccessControlList
Expand All @@ -102,7 +105,9 @@ type APIOpts struct {
Universal *universal.Universal
Logger logger.Logger
Channels *channels.Channels
PubsubAdapter runtimePubsub.Adapter
PubSubAdapter runtimePubsub.Adapter
PubSubAdapterStreamer runtimePubsub.AdapterStreamer
Outbox outbox.Outbox
DirectMessaging invokev1.DirectMessaging
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
Expand All @@ -118,7 +123,9 @@ func NewAPI(opts APIOpts) API {
logger: opts.Logger,
directMessaging: opts.DirectMessaging,
channels: opts.Channels,
pubsubAdapter: opts.PubsubAdapter,
pubsubAdapter: opts.PubSubAdapter,
pubsubAdapterStreamer: opts.PubSubAdapterStreamer,
outbox: opts.Outbox,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
accessControlList: opts.AccessControlList,
Expand Down Expand Up @@ -961,11 +968,11 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(in.GetStoreName())
outboxEnabled := a.outbox.Enabled(in.GetStoreName())
if outboxEnabled {
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
ops, err := a.outbox.PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.AppID(), err)
apiServerLogger.Debug(nerr)
Expand Down
27 changes: 14 additions & 13 deletions pkg/api/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/dapr/dapr/pkg/expr"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
outboxfake "github.com/dapr/dapr/pkg/outbox/fake"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand Down Expand Up @@ -2379,7 +2380,7 @@ func TestPublishTopic(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
srv.Universal.CompStore().AddPubSub("pubsub", compstore.PubsubItem{Component: &mock})
srv.Universal.CompStore().AddPubSub("pubsub", &runtimePubsub.PubsubItem{Component: &mock})

server, lis := startTestServerAPI(srv)
defer server.Stop()
Expand Down Expand Up @@ -2575,7 +2576,7 @@ func TestBulkPublish(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
fakeAPI.Universal.CompStore().AddPubSub("pubsub", compstore.PubsubItem{Component: &mock})
fakeAPI.Universal.CompStore().AddPubSub("pubsub", &runtimePubsub.PubsubItem{Component: &mock})

server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -2753,6 +2754,7 @@ func TestExecuteStateTransaction(t *testing.T) {
Resiliency: resiliency.New(nil),
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -3308,6 +3310,7 @@ func TestStateAPIWithResiliency(t *testing.T) {
Resiliency: res,
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -4149,17 +4152,15 @@ func TestMetadata(t *testing.T) {
},
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down
69 changes: 63 additions & 6 deletions pkg/api/grpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ limitations under the License.
package grpc

import (
"errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

Expand All @@ -24,19 +29,71 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
errCh := make(chan error, 2)
subDone := make(chan struct{})
a.wg.Add(2)

go func() {
errCh <- a.processor.PubSub().Streamer().Subscribe(stream)
close(subDone)
a.wg.Done()
}()
go func() {
defer a.wg.Done()

select {
case <-a.closeCh:
case <-subDone:
}
errCh <- nil
a.wg.Done()
}()

go func() {
defer a.wg.Done()
errCh <- a.streamSubscribe(stream, errCh, subDone)
}()

return <-errCh
}

func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1Server, errCh chan error, subDone chan struct{}) error {
defer close(subDone)

ireq, err := stream.Recv()
if err != nil {
return err
}

req := ireq.GetInitialRequest()

if req == nil {
return errors.New("initial request is required")
}

if len(req.GetPubsubName()) == 0 {
return errors.New("pubsubName is required")
}

if len(req.GetTopic()) == 0 {
return errors.New("topic is required")
}

// TODO: @joshvanl: handle duplicate key names.
key := a.pubsubAdapterStreamer.StreamerKey(req.GetPubsubName(), req.GetTopic())
a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{
ObjectMeta: metav1.ObjectMeta{Name: key},
Spec: subapi.SubscriptionSpec{
Pubsubname: req.GetPubsubName(),
Topic: req.GetTopic(),
Metadata: req.GetMetadata(),
DeadLetterTopic: req.GetDeadLetterTopic(),
Routes: subapi.Routes{Default: "/"},
},
})

if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
a.Universal.CompStore().DeleteStreamSubscription(key)
return err
}

defer func() {
a.Universal.CompStore().DeleteStreamSubscription(key)
if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
a.logger.Errorf("Error reloading subscriptions after gRPC Subscribe shutdown: %s", err)
}
}()

return a.pubsubAdapterStreamer.Subscribe(stream, req)
}
12 changes: 8 additions & 4 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/dapr/dapr/pkg/encryption"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/outbox"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/channels"
Expand All @@ -70,6 +71,7 @@ type api struct {
directMessaging invokev1.DirectMessaging
channels *channels.Channels
pubsubAdapter runtimePubsub.Adapter
outbox outbox.Outbox
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
readyStatus bool
outboundReadyStatus bool
Expand Down Expand Up @@ -112,7 +114,8 @@ type APIOpts struct {
Universal *universal.Universal
Channels *channels.Channels
DirectMessaging invokev1.DirectMessaging
PubsubAdapter runtimePubsub.Adapter
PubSubAdapter runtimePubsub.Adapter
Outbox outbox.Outbox
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
MetricSpec *config.MetricSpec
Expand All @@ -125,7 +128,8 @@ func NewAPI(opts APIOpts) API {
universal: opts.Universal,
channels: opts.Channels,
directMessaging: opts.DirectMessaging,
pubsubAdapter: opts.PubsubAdapter,
pubsubAdapter: opts.PubSubAdapter,
outbox: opts.Outbox,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
metricSpec: opts.MetricSpec,
Expand Down Expand Up @@ -1577,11 +1581,11 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(storeName)
outboxEnabled := a.outbox.Enabled(storeName)
if outboxEnabled {
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
ops, err := a.outbox.PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.universal.AppID(), err)
universalFastHTTPErrorResponder(reqCtx, nerr)
Expand Down
39 changes: 20 additions & 19 deletions pkg/api/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/middleware"
middlewarehttp "github.com/dapr/dapr/pkg/middleware/http"
outboxfake "github.com/dapr/dapr/pkg/outbox/fake"
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/channels"
Expand Down Expand Up @@ -168,10 +169,10 @@ func TestPubSubEndpoints(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
testAPI.universal.CompStore().AddPubSub("pubsubname", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("pubsubname", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", &runtimePubsub.PubsubItem{Component: &mock})

fakeServer.StartServer(testAPI.constructPubSubEndpoints(), nil)

Expand Down Expand Up @@ -347,10 +348,10 @@ func TestBulkPubSubEndpoints(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
testAPI.universal.CompStore().AddPubSub("pubsubname", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("pubsubname", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", &runtimePubsub.PubsubItem{Component: &mock})

fakeServer.StartServer(testAPI.constructPubSubEndpoints(), nil)

Expand Down Expand Up @@ -1746,17 +1747,15 @@ func TestV1MetadataEndpoint(t *testing.T) {
},
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down Expand Up @@ -3235,6 +3234,7 @@ func TestV1StateEndpoints(t *testing.T) {
Resiliency: rc,
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)

Expand Down Expand Up @@ -4393,6 +4393,7 @@ func TestV1TransactionEndpoints(t *testing.T) {
Resiliency: resiliency.New(nil),
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)
fakeBodyObject := map[string]interface{}{"data": "fakeData"}
Expand Down
20 changes: 9 additions & 11 deletions pkg/api/universal/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,15 @@ func TestGetMetadata(t *testing.T) {
compStore := compstore.New()
require.NoError(t, compStore.AddPendingComponentForCommit(fakeComponent))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down
Loading

0 comments on commit 674a6f4

Please sign in to comment.