-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPubSubEmulator_test.go
107 lines (97 loc) · 3.22 KB
/
PubSubEmulator_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main
import (
"context"
"fmt"
"math/rand"
"os"
"testing"
"githb.com/lwahlmeier/go-pubsub-emulator/internal/fspubsub"
"githb.com/lwahlmeier/go-pubsub-emulator/internal/mempubsub"
"github.com/stretchr/testify/assert"
"google.golang.org/genproto/googleapis/pubsub/v1"
)
var BASES = []string{"mem", "fs"}
// var BASES = []string{"fs"}
var letterRunes = []rune("1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func CreateBase(base string) *PubSubEmulator {
if base == "mem" {
return &PubSubEmulator{baseBackend: mempubsub.NewMemBase()}
}
dir, _ := os.MkdirTemp("", "go-test")
os.MkdirAll(dir, os.ModePerm)
fsb, _ := fspubsub.NewFSBase(dir)
return &PubSubEmulator{baseBackend: fsb}
}
func makeString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func TestBasicTopic(t *testing.T) {
for _, bs := range BASES {
pse := CreateBase(bs)
projectName := makeString(30)
count := 10
topicNames := make([]string, count)
topicPubSubs := make([]*pubsub.Topic, count)
for i := 0; i < count; i++ {
topicNames[i] = makeString(30)
topicPubSubs[i] = &pubsub.Topic{Name: fmt.Sprintf("projects/%s/topics/%s", projectName, topicNames[i])}
}
for i := 0; i < count; i++ {
rpst, err := pse.CreateTopic(context.TODO(), topicPubSubs[i])
assert.NoError(t, err)
assert.NotNil(t, rpst)
assert.Equal(t, topicPubSubs[i], rpst)
}
for i := 0; i < count; i++ {
rpst, err := pse.GetTopic(context.TODO(), &pubsub.GetTopicRequest{Topic: topicPubSubs[i].Name})
assert.NoError(t, err)
assert.NotNil(t, rpst)
assert.Equal(t, topicPubSubs[i], rpst)
}
tList, err := pse.ListTopics(context.TODO(), &pubsub.ListTopicsRequest{Project: fmt.Sprintf("projects/%s", projectName)})
assert.NoError(t, err)
assert.NotNil(t, tList)
assert.Equal(t, count, len(tList.Topics))
subs := make(map[string][]string)
subsPubSub := make(map[string][]*pubsub.Subscription)
for _, tn := range topicNames {
subs[tn] = make([]string, count)
subsPubSub[tn] = make([]*pubsub.Subscription, count)
for i := 0; i < count; i++ {
subs[tn][i] = makeString(30)
subsPubSub[tn][i] = &pubsub.Subscription{
Name: fmt.Sprintf("projects/%s/subscriptions/%s", projectName, subs[tn][i]),
Topic: fmt.Sprintf("projects/%s/topics/%s", projectName, tn),
}
}
}
for _, subs := range subsPubSub {
for _, sub := range subs {
rs, err := pse.CreateSubscription(context.TODO(), sub)
assert.NoError(t, err)
assert.NotNil(t, rs)
assert.Equal(t, sub, rs)
}
}
for _, topic := range topicPubSubs {
pse.Publish(context.TODO(), &pubsub.PublishRequest{
Topic: topic.Name,
Messages: []*pubsub.PubsubMessage{{Data: []byte("TEST")}},
})
}
for _, subs := range subsPubSub {
for _, sub := range subs {
pr, err := pse.Pull(context.TODO(), &pubsub.PullRequest{Subscription: sub.Name, MaxMessages: 1})
assert.NoError(t, err)
assert.NotNil(t, pr)
assert.Equal(t, "TEST", string(pr.ReceivedMessages[0].Message.Data))
_, err = pse.Acknowledge(context.TODO(), &pubsub.AcknowledgeRequest{Subscription: sub.Name, AckIds: []string{pr.ReceivedMessages[0].AckId}})
assert.NoError(t, err)
}
}
}
}