Skip to content

Commit

Permalink
Subscriptions: bi-directional subscription streaming- subscribe on no
Browse files Browse the repository at this point in the history
healthz.

Refactors pubsub machinery to allow for gRPC bi-directional subcription
streaming when there is no application, or the application in unhealhty.

dapr/proposals#52

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed May 28, 2024
1 parent 0e304b5 commit 9ce6a81
Show file tree
Hide file tree
Showing 44 changed files with 5,109 additions and 4,680 deletions.
15 changes: 11 additions & 4 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/dapr/dapr/pkg/encryption"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/outbox"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand Down Expand Up @@ -88,6 +89,8 @@ type api struct {
directMessaging invokev1.DirectMessaging
channels *channels.Channels
pubsubAdapter runtimePubsub.Adapter
pubsubAdapterStreamer runtimePubsub.AdapterStreamer
outbox outbox.Outbox
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
tracingSpec config.TracingSpec
accessControlList *config.AccessControlList
Expand All @@ -102,7 +105,9 @@ type APIOpts struct {
Universal *universal.Universal
Logger logger.Logger
Channels *channels.Channels
PubsubAdapter runtimePubsub.Adapter
PubSubAdapter runtimePubsub.Adapter
PubSubAdapterStreamer runtimePubsub.AdapterStreamer
Outbox outbox.Outbox
DirectMessaging invokev1.DirectMessaging
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
Expand All @@ -118,7 +123,9 @@ func NewAPI(opts APIOpts) API {
logger: opts.Logger,
directMessaging: opts.DirectMessaging,
channels: opts.Channels,
pubsubAdapter: opts.PubsubAdapter,
pubsubAdapter: opts.PubSubAdapter,
pubsubAdapterStreamer: opts.PubSubAdapterStreamer,
outbox: opts.Outbox,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
accessControlList: opts.AccessControlList,
Expand Down Expand Up @@ -961,11 +968,11 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(in.GetStoreName())
outboxEnabled := a.outbox.Enabled(in.GetStoreName())
if outboxEnabled {
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
ops, err := a.outbox.PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.AppID(), err)
apiServerLogger.Debug(nerr)
Expand Down
27 changes: 14 additions & 13 deletions pkg/api/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/dapr/dapr/pkg/expr"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
outboxfake "github.com/dapr/dapr/pkg/outbox/fake"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand Down Expand Up @@ -2379,7 +2380,7 @@ func TestPublishTopic(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
srv.Universal.CompStore().AddPubSub("pubsub", compstore.PubsubItem{Component: &mock})
srv.Universal.CompStore().AddPubSub("pubsub", &runtimePubsub.PubsubItem{Component: &mock})

server, lis := startTestServerAPI(srv)
defer server.Stop()
Expand Down Expand Up @@ -2575,7 +2576,7 @@ func TestBulkPublish(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
fakeAPI.Universal.CompStore().AddPubSub("pubsub", compstore.PubsubItem{Component: &mock})
fakeAPI.Universal.CompStore().AddPubSub("pubsub", &runtimePubsub.PubsubItem{Component: &mock})

server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -2753,6 +2754,7 @@ func TestExecuteStateTransaction(t *testing.T) {
Resiliency: resiliency.New(nil),
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -3308,6 +3310,7 @@ func TestStateAPIWithResiliency(t *testing.T) {
Resiliency: res,
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -4149,17 +4152,15 @@ func TestMetadata(t *testing.T) {
},
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down
69 changes: 63 additions & 6 deletions pkg/api/grpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ limitations under the License.
package grpc

import (
"errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

Expand All @@ -24,19 +29,71 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
errCh := make(chan error, 2)
subDone := make(chan struct{})
a.wg.Add(2)

go func() {
errCh <- a.processor.PubSub().Streamer().Subscribe(stream)
close(subDone)
a.wg.Done()
}()
go func() {
defer a.wg.Done()

select {
case <-a.closeCh:
case <-subDone:
}
errCh <- nil
a.wg.Done()
}()

go func() {
defer a.wg.Done()
errCh <- a.streamSubscribe(stream, errCh, subDone)
}()

return <-errCh
}

func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsServer, errCh chan error, subDone chan struct{}) error {
defer close(subDone)

ireq, err := stream.Recv()
if err != nil {
return err
}

req := ireq.GetInitialRequest()

if req == nil {
return errors.New("initial request is required")
}

if len(req.GetPubsubName()) == 0 {
return errors.New("pubsubName name is required")
}

if len(req.GetTopic()) == 0 {
return errors.New("topic is required")
}

// TODO: @joshvanl: handle duplicate key names.
key := a.pubsubAdapterStreamer.StreamerKey(req.GetPubsubName(), req.GetTopic())
a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{
ObjectMeta: metav1.ObjectMeta{Name: key},
Spec: subapi.SubscriptionSpec{
Pubsubname: req.GetPubsubName(),
Topic: req.GetTopic(),
Metadata: req.GetMetadata(),
DeadLetterTopic: req.GetDeadLetterTopic(),
Routes: subapi.Routes{Default: "/"},
},
})

if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
a.Universal.CompStore().DeleteStreamSubscription(key)
return err
}

defer func() {
a.Universal.CompStore().DeleteStreamSubscription(key)
if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
a.logger.Errorf("Error reloading subscriptions after gRPC Subscribe shutdown: %s", err)
}
}()

return a.pubsubAdapterStreamer.Subscribe(stream, req)
}
12 changes: 8 additions & 4 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/dapr/dapr/pkg/encryption"
"github.com/dapr/dapr/pkg/messages"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/outbox"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/channels"
Expand All @@ -70,6 +71,7 @@ type api struct {
directMessaging invokev1.DirectMessaging
channels *channels.Channels
pubsubAdapter runtimePubsub.Adapter
outbox outbox.Outbox
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
readyStatus bool
outboundReadyStatus bool
Expand Down Expand Up @@ -111,7 +113,8 @@ type APIOpts struct {
Universal *universal.Universal
Channels *channels.Channels
DirectMessaging invokev1.DirectMessaging
PubsubAdapter runtimePubsub.Adapter
PubSubAdapter runtimePubsub.Adapter
Outbox outbox.Outbox
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
MaxRequestBodySize int64 // In bytes
Expand All @@ -123,7 +126,8 @@ func NewAPI(opts APIOpts) API {
universal: opts.Universal,
channels: opts.Channels,
directMessaging: opts.DirectMessaging,
pubsubAdapter: opts.PubsubAdapter,
pubsubAdapter: opts.PubSubAdapter,
outbox: opts.Outbox,
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
maxRequestBodySize: opts.MaxRequestBodySize,
Expand Down Expand Up @@ -1574,11 +1578,11 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(storeName)
outboxEnabled := a.outbox.Enabled(storeName)
if outboxEnabled {
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
ops, err := a.outbox.PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.universal.AppID(), err)
universalFastHTTPErrorResponder(reqCtx, nerr)
Expand Down
39 changes: 20 additions & 19 deletions pkg/api/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/middleware"
middlewarehttp "github.com/dapr/dapr/pkg/middleware/http"
outboxfake "github.com/dapr/dapr/pkg/outbox/fake"
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/channels"
Expand Down Expand Up @@ -168,10 +169,10 @@ func TestPubSubEndpoints(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
testAPI.universal.CompStore().AddPubSub("pubsubname", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("pubsubname", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", &runtimePubsub.PubsubItem{Component: &mock})

fakeServer.StartServer(testAPI.constructPubSubEndpoints(), nil)

Expand Down Expand Up @@ -347,10 +348,10 @@ func TestBulkPubSubEndpoints(t *testing.T) {

mock := daprt.MockPubSub{}
mock.On("Features").Return([]pubsub.Feature{})
testAPI.universal.CompStore().AddPubSub("pubsubname", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", compstore.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("pubsubname", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errorpubsub", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotfound", &runtimePubsub.PubsubItem{Component: &mock})
testAPI.universal.CompStore().AddPubSub("errnotallowed", &runtimePubsub.PubsubItem{Component: &mock})

fakeServer.StartServer(testAPI.constructPubSubEndpoints(), nil)

Expand Down Expand Up @@ -1746,17 +1747,15 @@ func TestV1MetadataEndpoint(t *testing.T) {
},
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down Expand Up @@ -3235,6 +3234,7 @@ func TestV1StateEndpoints(t *testing.T) {
Resiliency: rc,
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)

Expand Down Expand Up @@ -4393,6 +4393,7 @@ func TestV1TransactionEndpoints(t *testing.T) {
Resiliency: resiliency.New(nil),
}),
pubsubAdapter: &daprt.MockPubSubAdapter{},
outbox: outboxfake.New(),
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)
fakeBodyObject := map[string]interface{}{"data": "fakeData"}
Expand Down
20 changes: 9 additions & 11 deletions pkg/api/universal/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,15 @@ func TestGetMetadata(t *testing.T) {
compStore := compstore.New()
require.NoError(t, compStore.AddPendingComponentForCommit(fakeComponent))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
compStore.SetProgramaticSubscriptions(runtimePubsub.Subscription{
PubsubName: "test",
Topic: "topic",
DeadLetterTopic: "dead",
Metadata: map[string]string{},
Rules: []*runtimePubsub.Rule{
{
Match: &expr.Expr{},
Path: "path",
},
},
})
Expand Down
Loading

0 comments on commit 9ce6a81

Please sign in to comment.