Skip to content

Commit 4952a95

Browse files
committed
runtime: subscription engine refactor [1]
This change is the first in a series of refactors to the subscription engine. This work is in preparation for adding Actor Subscriptions as detailed [here](dapr/proposals#78). In order to implement this new subscription type, the engine needs a hierarchical reordering of the internal data structures in flow to make adding the new implementation sane and maintainable. The existing implementation on subscription message processing and routing to various sinks are desperate in the codebase and somewhat spaghetti. This first change introduces a new `Postman` interface that is used to send messages to subscription sinks- HTTP, gRPC or streaming channels. Actors would be another sink added in future. Shared types have been moved to a new `pkg/runtime/subscription/todo` package. Appreciate that there will be further changes made to this code path. Signed-off-by: joshvanl <[email protected]>
1 parent 7fe7262 commit 4952a95

File tree

21 files changed

+2295
-1928
lines changed

21 files changed

+2295
-1928
lines changed

pkg/runtime/processor/subscriber/subscriber.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ import (
3232
"github.com/dapr/dapr/pkg/runtime/compstore"
3333
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
3434
"github.com/dapr/dapr/pkg/runtime/subscription"
35+
"github.com/dapr/dapr/pkg/runtime/subscription/postman"
36+
postmangrpc "github.com/dapr/dapr/pkg/runtime/subscription/postman/grpc"
37+
"github.com/dapr/dapr/pkg/runtime/subscription/postman/http"
38+
"github.com/dapr/dapr/pkg/runtime/subscription/postman/streaming"
3539
"github.com/dapr/kit/logger"
3640
)
3741

@@ -492,24 +496,43 @@ func (s *Subscriber) initProgramaticSubscriptions(ctx context.Context) error {
492496
}
493497

494498
func (s *Subscriber) startSubscription(pubsub *rtpubsub.PubsubItem, comp *compstore.NamedSubscription, isStreamer bool) (*subscription.Subscription, error) {
499+
// TODO: @joshvanl
500+
var postman postman.Interface
495501
var streamer rtpubsub.AdapterStreamer
496502
if isStreamer {
497503
streamer = s.adapterStreamer
504+
postman = streaming.New(streaming.Options{
505+
Tracing: s.tracingSpec,
506+
Channel: s.adapterStreamer,
507+
})
508+
} else {
509+
if s.isHTTP {
510+
postman = http.New(http.Options{
511+
Channels: s.channels,
512+
Tracing: s.tracingSpec,
513+
Adapter: s.adapter,
514+
})
515+
} else {
516+
postman = postmangrpc.New(postmangrpc.Options{
517+
Channel: s.grpc,
518+
Tracing: s.tracingSpec,
519+
Adapter: s.adapter,
520+
})
521+
}
498522
}
499523
return subscription.New(subscription.Options{
500524
AppID: s.appID,
501525
Namespace: s.namespace,
502526
PubSubName: comp.PubsubName,
503527
Topic: comp.Topic,
504-
IsHTTP: s.isHTTP,
505528
PubSub: pubsub,
506529
Resiliency: s.resiliency,
507530
TraceSpec: s.tracingSpec,
508531
Route: comp.Subscription,
509-
Channels: s.channels,
510532
GRPC: s.grpc,
511533
Adapter: s.adapter,
512534
AdapterStreamer: streamer,
513535
ConnectionID: comp.ConnectionID,
536+
Postman: postman,
514537
})
515538
}

pkg/runtime/pubsub/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Adapter interface {
4545

4646
type AdapterStreamer interface {
4747
Subscribe(rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server, *rtv1pb.SubscribeTopicEventsRequestInitialAlpha1, ConnectionID) error
48-
Publish(context.Context, *SubscribedMessage) error
48+
Publish(context.Context, *SubscribedMessage) (*rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, error)
4949
StreamerKey(pubsub, topic string) string
5050
Close(key string, connectionID ConnectionID)
5151
}

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -164,27 +164,28 @@ func (s *streamer) recvLoop(
164164
}
165165
}
166166

167-
func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage) error {
167+
// TODO: @joshvanl: move diagnostics.
168+
func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage) (*rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, error) {
168169
s.lock.RLock()
169170
key := s.StreamerKey(msg.PubSub, msg.Topic)
170171
connection, ok := s.subscribers[key][msg.SubscriberID]
171172
s.lock.RUnlock()
172173
if !ok {
173-
return fmt.Errorf("no streamer subscribed to pubsub %q topic %q", msg.PubSub, msg.Topic)
174+
return nil, fmt.Errorf("no streamer subscribed to pubsub %q topic %q", msg.PubSub, msg.Topic)
174175
}
175176

176177
if connection.closed.Load() {
177-
return errors.New("subscription is closed")
178+
return nil, errors.New("subscription is closed")
178179
}
179180

180181
envelope, span, err := rtpubsub.GRPCEnvelopeFromSubscriptionMessage(ctx, msg, log, s.tracingSpec)
181182
if err != nil {
182-
return err
183+
return nil, err
183184
}
184185

185186
ch, cleanup := connection.registerPublishResponse(envelope.GetId())
186187
if ch == nil {
187-
return fmt.Errorf("no client stream expecting publish response for id %s ConnectionID%d", envelope.GetId(), connection.connectionID)
188+
return nil, fmt.Errorf("no client stream expecting publish response for id %s ConnectionID%d", envelope.GetId(), connection.connectionID)
188189
}
189190
defer cleanup()
190191

@@ -209,35 +210,16 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage)
209210
err = fmt.Errorf("error returned from app while processing pub/sub event %v: %w", msg.CloudEvent[contribpubsub.IDField], rterrors.NewRetriable(err))
210211
log.Debug(err)
211212
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
212-
return err
213+
return nil, err
213214
}
214215

215-
var resp *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
216216
select {
217217
case <-ctx.Done():
218-
return ctx.Err()
218+
return nil, ctx.Err()
219219
case <-connection.closeCh:
220-
case resp = <-ch:
221-
}
222-
223-
switch resp.GetStatus().GetStatus() {
224-
case rtv1pb.TopicEventResponse_SUCCESS: //nolint:nosnakecase
225-
// on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is
226-
// success from protobuf definition
227-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Success)), "", msg.Topic, elapsed)
228-
return nil
229-
case rtv1pb.TopicEventResponse_RETRY: //nolint:nosnakecase
230-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
231-
// TODO: add retry error info
232-
return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", msg.CloudEvent[contribpubsub.IDField], rterrors.NewRetriable(nil))
233-
case rtv1pb.TopicEventResponse_DROP: //nolint:nosnakecase
234-
log.Warnf("DROP status returned from app while processing pub/sub event %v", msg.CloudEvent[contribpubsub.IDField])
235-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Drop)), "", msg.Topic, elapsed)
236-
return rtpubsub.ErrMessageDropped
237-
default:
238-
// Consider unknown status field as error and retry
239-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
240-
return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", msg.CloudEvent[contribpubsub.IDField], resp.GetStatus(), rterrors.NewRetriable(nil))
220+
return nil, errors.New("stream closed")
221+
case resp := <-ch:
222+
return resp, nil
241223
}
242224
}
243225

pkg/runtime/subscription/bulkresiliency.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,59 @@ import (
1919

2020
contribpubsub "github.com/dapr/components-contrib/pubsub"
2121
"github.com/dapr/dapr/pkg/resiliency"
22+
"github.com/dapr/dapr/pkg/runtime/subscription/postman"
23+
"github.com/dapr/dapr/pkg/runtime/subscription/todo"
2224
"github.com/dapr/dapr/utils"
2325
)
2426

25-
type bulkSubscribeResiliencyRes struct {
26-
entries []contribpubsub.BulkSubscribeResponseEntry
27-
envelope map[string]interface{}
28-
}
29-
3027
// applyBulkSubscribeResiliency applies resiliency support to bulk subscribe. It tries to filter
3128
// out the messages that have been successfully processed and only retries the ones that have failed
32-
func (s *Subscription) applyBulkSubscribeResiliency(ctx context.Context, bulkSubCallData *bulkSubscribeCallData,
33-
psm bulkSubscribedMessage, deadLetterTopic string, path string, policyDef *resiliency.PolicyDefinition,
29+
func (s *Subscription) applyBulkSubscribeResiliency(ctx context.Context, bulkSubCallData *todo.BulkSubscribeCallData,
30+
psm todo.BulkSubscribedMessage, deadLetterTopic string, path string, policyDef *resiliency.PolicyDefinition,
3431
rawPayload bool, envelope map[string]interface{},
3532
) (*[]contribpubsub.BulkSubscribeResponseEntry, error) {
3633
bscData := *bulkSubCallData
3734
policyRunner := resiliency.NewRunnerWithOptions(
38-
ctx, policyDef, resiliency.RunnerOpts[*bulkSubscribeResiliencyRes]{
39-
Accumulator: func(bsrr *bulkSubscribeResiliencyRes) {
40-
for _, v := range bsrr.entries {
35+
ctx, policyDef, resiliency.RunnerOpts[*todo.BulkSubscribeResiliencyRes]{
36+
Accumulator: func(bsrr *todo.BulkSubscribeResiliencyRes) {
37+
for _, v := range bsrr.Entries {
4138
// add to main bulkResponses
42-
if index, ok := (*bscData.entryIdIndexMap)[v.EntryId]; ok {
43-
(*bscData.bulkResponses)[index].EntryId = v.EntryId
44-
(*bscData.bulkResponses)[index].Error = v.Error
39+
if index, ok := (*bscData.EntryIdIndexMap)[v.EntryId]; ok {
40+
(*bscData.BulkResponses)[index].EntryId = v.EntryId
41+
(*bscData.BulkResponses)[index].Error = v.Error
4542
}
4643
}
47-
filteredPubSubMsgs := utils.Filter(psm.pubSubMessages, func(ps message) bool {
48-
if index, ok := (*bscData.entryIdIndexMap)[ps.entry.EntryId]; ok {
49-
return (*bscData.bulkResponses)[index].Error != nil
44+
filteredPubSubMsgs := utils.Filter(psm.PubSubMessages, func(ps todo.Message) bool {
45+
if index, ok := (*bscData.EntryIdIndexMap)[ps.Entry.EntryId]; ok {
46+
return (*bscData.BulkResponses)[index].Error != nil
5047
}
5148
return false
5249
})
53-
psm.pubSubMessages = filteredPubSubMsgs
54-
psm.length = len(filteredPubSubMsgs)
50+
psm.PubSubMessages = filteredPubSubMsgs
51+
psm.Length = len(filteredPubSubMsgs)
5552
},
5653
})
57-
_, err := policyRunner(func(ctx context.Context) (*bulkSubscribeResiliencyRes, error) {
58-
var pErr error
59-
bsrr := &bulkSubscribeResiliencyRes{
60-
entries: make([]contribpubsub.BulkSubscribeResponseEntry, 0, len(psm.pubSubMessages)),
61-
envelope: maps.Clone(envelope),
54+
_, err := policyRunner(func(ctx context.Context) (*todo.BulkSubscribeResiliencyRes, error) {
55+
bsrr := &todo.BulkSubscribeResiliencyRes{
56+
Entries: make([]contribpubsub.BulkSubscribeResponseEntry, 0, len(psm.PubSubMessages)),
57+
Envelope: maps.Clone(envelope),
6258
}
63-
if s.isHTTP {
64-
pErr = s.publishBulkMessageHTTP(ctx, &bscData, &psm, bsrr, deadLetterTopic)
65-
} else {
66-
pErr = s.publishBulkMessageGRPC(ctx, &bscData, &psm, &bsrr.entries, rawPayload, deadLetterTopic)
67-
}
68-
return bsrr, pErr
59+
err := s.postman.DeliverBulk(ctx, &postman.DelivererBulkRequest{
60+
BulkSubCallData: &bscData,
61+
BulkSubMsg: &psm,
62+
BulkSubResiliencyRes: bsrr,
63+
BulkResponses: &bsrr.Entries,
64+
RawPayload: rawPayload,
65+
DeadLetterTopic: deadLetterTopic,
66+
})
67+
return bsrr, err
6968
})
7069
// setting error if any entry has not been yet touched - only use case that seems possible is of timeout
71-
for eId, ind := range *bscData.entryIdIndexMap { //nolint:stylecheck
72-
if (*bscData.bulkResponses)[ind].EntryId == "" {
73-
(*bscData.bulkResponses)[ind].EntryId = eId
74-
(*bscData.bulkResponses)[ind].Error = err
70+
for eId, ind := range *bscData.EntryIdIndexMap { //nolint:stylecheck
71+
if (*bscData.BulkResponses)[ind].EntryId == "" {
72+
(*bscData.BulkResponses)[ind].EntryId = eId
73+
(*bscData.BulkResponses)[ind].Error = err
7574
}
7675
}
77-
return bscData.bulkResponses, err
76+
return bscData.BulkResponses, err
7877
}

0 commit comments

Comments
 (0)