Skip to content

Commit

Permalink
3e
Browse files Browse the repository at this point in the history
  • Loading branch information
Teiva Harsanyi committed Feb 27, 2023
1 parent 09750f6 commit f39e397
Show file tree
Hide file tree
Showing 379 changed files with 199,717 additions and 43 deletions.
2 changes: 1 addition & 1 deletion challenge-3d-broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func init() {
f, err := os.OpenFile("/tmp/log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
f, err := os.OpenFile("/tmp/maelstrom.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
Expand Down
42 changes: 0 additions & 42 deletions challenge-3d-broadcast/main_test.go

This file was deleted.

11 changes: 11 additions & 0 deletions challenge-3e-broadcast/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/teivah/gossip-glomers/challenge-3e-broadcast

go 1.20

require (
github.com/emirpasic/gods v1.18.1
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054
github.com/sirupsen/logrus v1.9.0
)

require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
19 changes: 19 additions & 0 deletions challenge-3e-broadcast/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054 h1:NF9yM6z/+jj+LpIsv9dBzF3o4IOgVjqg6hkpuxy55mc=
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054/go.mod h1:i6aVIs5AIOOaQF1lAisBm7DDeWM1Iopf+26UxjagsCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
262 changes: 262 additions & 0 deletions challenge-3e-broadcast/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/emirpasic/gods/trees/btree"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
log "github.com/sirupsen/logrus"
)

const (
batchFrequency = 500 * time.Millisecond
maxRetry = 100
)

func init() {
f, err := os.OpenFile("/tmp/maelstrom.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
log.SetOutput(f)
}

func main() {
n := maelstrom.NewNode()
s := &server{n: n, ids: make(map[int]struct{}), broadcasts: make(map[string][]int)}

n.Handle("init", s.initHandler)
n.Handle("broadcast", s.broadcastHandler)
n.Handle("read", s.readHandler)
n.Handle("topology", s.topologyHandler)

go func() {
for {
select {
case <-time.After(batchFrequency):
s.batchRPC()
}
}
}()

if err := n.Run(); err != nil {
log.Fatal(err)
}
}

type server struct {
n *maelstrom.Node
nodeID string
id int

idsMu sync.RWMutex
ids map[int]struct{}

nodesMu sync.RWMutex
tree *btree.Tree

broadcastsMu sync.Mutex
broadcasts map[string][]int
}

func (s *server) initHandler(_ maelstrom.Message) error {
s.nodeID = s.n.ID()
v, err := id(s.nodeID)
if err != nil {
return err
}
s.id = v
return nil
}

func (s *server) broadcastHandler(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

go func() {
_ = s.n.Reply(msg, map[string]any{
"type": "broadcast_ok",
})
}()

if _, contains := body["message"]; contains {
message := int(body["message"].(float64))
s.idsMu.Lock()
if _, exists := s.ids[message]; exists {
s.idsMu.Unlock()
return nil
}
s.ids[message] = struct{}{}
s.idsMu.Unlock()
return s.broadcast(msg.Src, body)
}

// Batch message
values := body["messages"].([]any)
messages := make([]int, 0, len(values))
s.idsMu.Lock()
for _, v := range values {
message := int(v.(float64))
if _, exists := s.ids[message]; exists {
continue
}
s.ids[message] = struct{}{}
messages = append(messages, message)
}
s.idsMu.Unlock()
return s.batchBroadcast(msg.Src, messages)
}

func (s *server) broadcast(src string, body map[string]any) error {
s.nodesMu.RLock()
n := s.tree.GetNode(s.id)
defer s.nodesMu.RUnlock()

var neighbors []string

if n.Parent != nil {
neighbors = append(neighbors, n.Parent.Entries[0].Value.(string))
}

for _, children := range n.Children {
for _, entry := range children.Entries {
neighbors = append(neighbors, entry.Value.(string))
}
}

message := int(body["message"].(float64))

s.broadcastsMu.Lock()
defer s.broadcastsMu.Unlock()
for _, dst := range neighbors {
if dst == src || dst == s.nodeID {
continue
}

s.broadcasts[dst] = append(s.broadcasts[dst], message)
}
return nil
}

func (s *server) batchBroadcast(src string, messages []int) error {
s.nodesMu.RLock()
n := s.tree.GetNode(s.id)
defer s.nodesMu.RUnlock()

var neighbors []string

if n.Parent != nil {
neighbors = append(neighbors, n.Parent.Entries[0].Value.(string))
}

for _, children := range n.Children {
for _, entry := range children.Entries {
neighbors = append(neighbors, entry.Value.(string))
}
}

s.broadcastsMu.Lock()
defer s.broadcastsMu.Unlock()
for _, dst := range neighbors {
if dst == src || dst == s.nodeID {
continue
}

for _, message := range messages {
s.broadcasts[dst] = append(s.broadcasts[dst], message)
}
}
return nil
}

func (s *server) batchRPC() {
s.broadcastsMu.Lock()
defer s.broadcastsMu.Unlock()

wg := sync.WaitGroup{}
for dst, messages := range s.broadcasts {
dst := dst
messages := messages
go func() {
if err := s.rpcWithRetry(dst, map[string]any{
"type": "broadcast",
"messages": messages,
}, maxRetry); err != nil {
log.Error(err)
}
}()
}
s.broadcasts = make(map[string][]int)
wg.Wait()
}

func (s *server) rpcWithRetry(dst string, body map[string]any, retry int) error {
var err error
for i := 0; i < retry; i++ {
if err = s.rpc(dst, body); err != nil {
// Sleep and retry
time.Sleep(100 * time.Duration(i) * time.Millisecond)
continue
}
return nil
}
return err
}

func (s *server) rpc(dst string, body map[string]any) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := s.n.SyncRPC(ctx, dst, body)
return err
}

func (s *server) readHandler(msg maelstrom.Message) error {
ids := s.getAllIDs()

return s.n.Reply(msg, map[string]any{
"type": "read_ok",
"messages": ids,
})
}

func (s *server) getAllIDs() []int {
s.idsMu.RLock()
ids := make([]int, 0, len(s.ids))
for id := range s.ids {
ids = append(ids, id)
}
s.idsMu.RUnlock()

return ids
}

func (s *server) topologyHandler(msg maelstrom.Message) error {
tree := btree.NewWithIntComparator(len(s.n.NodeIDs()))
for i := 0; i < len(s.n.NodeIDs()); i++ {
tree.Put(i, fmt.Sprintf("n%d", i))
}

s.nodesMu.Lock()
s.tree = tree
s.nodesMu.Unlock()

return s.n.Reply(msg, map[string]any{
"type": "topology_ok",
})
}

func id(s string) (int, error) {
i, err := strconv.Atoi(s[1:])
if err != nil {
return 0, err
}
return i, nil
}
9 changes: 9 additions & 0 deletions challenge-3e-broadcast/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

cwd=$(pwd)
rm bin
go build -o bin
cd $MAELSTROM_PATH
./maelstrom test -w broadcast --bin $cwd/bin --node-count 25 --time-limit 20 --rate 100 --latency 100
#./maelstrom test -w broadcast --bin $cwd/bin --node-count 25 --time-limit 20 --rate 100 --latency 100 --nemesis partition
cd $cwd
41 changes: 41 additions & 0 deletions challenge-3e-broadcast/vendor/github.com/emirpasic/gods/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f39e397

Please sign in to comment.