Skip to content

Commit 524b27c

Browse files
committed
message
1 parent 05dc3b2 commit 524b27c

9 files changed

+180
-0
lines changed

.idea/.gitignore

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/sugar.iml

+9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

concurrency.go

+12
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,15 @@ func Async[A any](f func() A) chan A {
3131

3232
return ch
3333
}
34+
35+
// FanIn ...
36+
func FanIn(in ...chan any) <-chan any {
37+
out := make(chan any)
38+
for i := range in {
39+
tmp := in[i]
40+
go func() {
41+
out <- tmp
42+
}()
43+
}
44+
return out
45+
}

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
golang.org/x/exp v0.0.0-20220713135740-79cabaa25d75 h1:x03zeu7B2B11ySp+daztnwM5oBJ/8wGUSqrwcw9L0RA=
2+
golang.org/x/exp v0.0.0-20220713135740-79cabaa25d75/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA=

message.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package sugar
2+
3+
import (
4+
"errors"
5+
"time"
6+
)
7+
8+
type Message struct {
9+
// Contents
10+
}
11+
12+
type Subscription struct {
13+
ch chan Message
14+
15+
Inbox chan Message
16+
}
17+
18+
func (s *Subscription) Publish(msg Message) error {
19+
if _, ok := <-s.ch; !ok {
20+
return errors.New("Topic has been closed")
21+
}
22+
23+
s.ch <- msg
24+
25+
return nil
26+
}
27+
28+
type Topic struct {
29+
Subscribers []Session
30+
MessageHistory []Message
31+
ch <-chan Message
32+
}
33+
34+
func (t *Topic) Subscribe(uid uint64, name string) (Subscription, error) {
35+
// Get session and create one if it's the first
36+
s := Session{
37+
User: User{
38+
ID: uid,
39+
Name: name,
40+
},
41+
Timestamp: time.Now(),
42+
}
43+
44+
// Add session to the Topic & MessageHistory
45+
t.Subscribers = append(t.Subscribers, s)
46+
// Create a subscription
47+
return Subscription{}, nil
48+
}
49+
50+
func (t *Topic) Unsubscribe(Subscription) error {
51+
// Implementation
52+
return nil
53+
}
54+
55+
func (t *Topic) Delete() error {
56+
// Implementation
57+
return nil
58+
}
59+
60+
type User struct {
61+
ID uint64
62+
Name string
63+
}
64+
65+
type Session struct {
66+
User User
67+
Timestamp time.Time
68+
}
69+
70+
var (
71+
gSubscription Subscription
72+
gTopic Topic
73+
c = make(chan Message)
74+
)
75+
76+
func work() {
77+
78+
gSubscription.ch = c
79+
gSubscription.Publish(Message{})
80+
81+
}
82+
83+
func hub() {
84+
for {
85+
select {
86+
case data := <-c:
87+
gTopic.MessageHistory = append(gTopic.MessageHistory, data)
88+
89+
}
90+
}
91+
92+
}

ring_buffer.go

+21
Original file line numberDiff line numberDiff line change
@@ -1 +1,22 @@
11
package sugar
2+
3+
type RingBuffer struct {
4+
inputChannel <-chan any
5+
outputChannel chan any
6+
}
7+
8+
func NewRingBuffer(inputChannel <-chan any, outputChannel chan any) *RingBuffer {
9+
return &RingBuffer{inputChannel, outputChannel}
10+
}
11+
12+
func (r *RingBuffer) Run() {
13+
for v := range r.inputChannel {
14+
select {
15+
case r.outputChannel <- v:
16+
default:
17+
<-r.outputChannel
18+
r.outputChannel <- v
19+
}
20+
}
21+
close(r.outputChannel)
22+
}

ring_buffer_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -1 +1,23 @@
11
package sugar
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
)
7+
8+
func TestRingBuffer(t *testing.T) {
9+
in := make(chan any)
10+
out := make(chan any, 5)
11+
rb := NewRingBuffer(in, out)
12+
go rb.Run()
13+
14+
for i := 0; i < 10; i++ {
15+
in <- i
16+
}
17+
18+
close(in)
19+
20+
for res := range out {
21+
fmt.Println(res)
22+
}
23+
}

0 commit comments

Comments
 (0)