Skip to content

Commit

Permalink
5b
Browse files Browse the repository at this point in the history
  • Loading branch information
Teiva Harsanyi committed Feb 27, 2023
1 parent e9796b8 commit 427146e
Show file tree
Hide file tree
Showing 364 changed files with 197,473 additions and 2 deletions.
2 changes: 1 addition & 1 deletion challenge-4-grow-only-counter/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/teivah/gossip-glomers/challenge-1-echo
module github.com/teivah/gossip-glomers/challenge-4-grow-only-counter

go 1.20

Expand Down
2 changes: 1 addition & 1 deletion challenge-5a-kafka-log/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/teivah/gossip-glomers/challenge-1-echo
module github.com/teivah/gossip-glomers/challenge-5a-kafka-log

go 1.20

Expand Down
10 changes: 10 additions & 0 deletions challenge-5b-kafka-log/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/teivah/gossip-glomers/challenge-5b-kafka-log

go 1.20

require (
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054
github.com/sirupsen/logrus v1.9.0
)

require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
17 changes: 17 additions & 0 deletions challenge-5b-kafka-log/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054 h1:NF9yM6z/+jj+LpIsv9dBzF3o4IOgVjqg6hkpuxy55mc=
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230113211434-22f433519054/go.mod h1:i6aVIs5AIOOaQF1lAisBm7DDeWM1Iopf+26UxjagsCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
205 changes: 205 additions & 0 deletions challenge-5b-kafka-log/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"

maelstrom "github.com/jepsen-io/maelstrom/demo/go"
log "github.com/sirupsen/logrus"
)

const (
prefixCommit = "commit_"
prefixLatest = "latest_"
prefixEntry = "entry_"
)

func init() {
f, err := os.OpenFile("/tmp/maelstrom.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
log.SetOutput(f)
}

func main() {
n := maelstrom.NewNode()
kv := maelstrom.NewLinKV(n)
s := &server{
n: n,
kv: kv,
}

n.Handle("init", wrap(s.initHandler))
n.Handle("send", wrap(s.sendHandler))
n.Handle("poll", wrap(s.pollHandler))
n.Handle("commit_offsets", wrap(s.commitHandler))
n.Handle("list_committed_offsets", wrap(s.listHandler))

if err := n.Run(); err != nil {
log.Fatal(err)
}
}

func wrap(f func(msg maelstrom.Message) error) func(msg maelstrom.Message) error {
return func(msg maelstrom.Message) error {
if err := f(msg); err != nil {
log.Error(err)
return err
}
return nil
}
}

type server struct {
n *maelstrom.Node
kv *maelstrom.KV
nodeID string
id int
}

func (s *server) initHandler(_ maelstrom.Message) error {
s.nodeID = s.n.ID()
id, err := strconv.Atoi(s.nodeID[1:])
if err != nil {
return err
}
s.id = id
return nil
}

func (s *server) sendHandler(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

key := body["key"].(string)
message := int(body["msg"].(float64))

keyLatest := fmt.Sprintf("%s%s", prefixLatest, key)
offset, err := s.kv.ReadInt(context.Background(), keyLatest)
if err != nil {
rpcErr := err.(*maelstrom.RPCError)
if rpcErr.Code == maelstrom.KeyDoesNotExist {
offset = 0

for ; ; offset++ {
if err := s.kv.CompareAndSwap(context.Background(),
keyLatest, offset-1, offset, true); err != nil {
continue
}
break
}
} else {
return err
}
} else {
offset++

for ; ; offset++ {
if err := s.kv.CompareAndSwap(context.Background(),
keyLatest, offset-1, offset, true); err != nil {
continue
}
break
}
}

if err := s.kv.Write(context.Background(), fmt.Sprintf("%s%s_%d", prefixEntry, key, offset), message); err != nil {
return err
}

return s.n.Reply(msg, map[string]any{
"type": "send_ok",
"offset": offset,
})
}

type offsetReq struct {
Offsets map[string]int `json:"offsets"`
}

func (s *server) pollHandler(msg maelstrom.Message) error {
var req offsetReq
if err := json.Unmarshal(msg.Body, &req); err != nil {
return err
}

res := make(map[string][][2]int)
for key, startingOffset := range req.Offsets {
for offset := startingOffset; ; offset++ {
message, exists, err := s.getValue(key, offset)
if err != nil {
return err
}
if !exists {
break
}
res[key] = append(res[key], [2]int{offset, message})
}
}

return s.n.Reply(msg, map[string]any{
"type": "poll_ok",
"msgs": res,
})
}

func (s *server) getValue(key string, offset int) (int, bool, error) {
v, err := s.kv.ReadInt(context.Background(), fmt.Sprintf("%s%s_%d", prefixEntry, key, offset))
if err != nil {
rpcErr := err.(*maelstrom.RPCError)
if rpcErr.Code == maelstrom.KeyDoesNotExist {
return 0, false, nil
}
return 0, false, err
}
return v, true, nil
}

func (s *server) commitHandler(msg maelstrom.Message) error {
var req offsetReq
if err := json.Unmarshal(msg.Body, &req); err != nil {
return err
}

for key, offset := range req.Offsets {
if err := s.kv.Write(context.Background(), fmt.Sprintf("%s%s", prefixCommit, key), offset); err != nil {
return err
}
}

return s.n.Reply(msg, map[string]any{
"type": "commit_offsets_ok",
})
}

func (s *server) listHandler(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

res := make(map[string]int)

keys := body["keys"].([]any)
for _, key := range keys {
k := key.(string)

v, err := s.kv.ReadInt(context.Background(), fmt.Sprintf("%s%s", prefixCommit, k))
if err != nil {
v = 0
}

res[k] = v
}

return s.n.Reply(msg, map[string]any{
"type": "list_committed_offsets_ok",
"offsets": res,
})
}
7 changes: 7 additions & 0 deletions challenge-5b-kafka-log/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

cwd=$(pwd)
go build -o bin
cd $MAELSTROM_PATH
./maelstrom test -w kafka --bin $cwd/bin --node-count 1 --concurrency 2n --time-limit 20 --rate 1000
cd $cwd
Loading

0 comments on commit 427146e

Please sign in to comment.