Skip to content

Commit

Permalink
No more CAS
Browse files Browse the repository at this point in the history
  • Loading branch information
Teiva Harsanyi committed Feb 27, 2023
1 parent 560f259 commit 0f4a6d6
Showing 1 changed file with 22 additions and 39 deletions.
61 changes: 22 additions & 39 deletions challenge-5c-kafka-log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,44 +121,34 @@ func (s *server) sendHandler(msg maelstrom.Message) error {
offset++
}

for ; ; offset++ {
if err := s.kv.CompareAndSwap(context.Background(),
keyLatest, offset-1, offset, true); err != nil {
log.Error("cas 2")
continue
}
break
if err := s.kv.Write(context.Background(), keyLatest, offset); err != nil {
return err
}

repeat := -1
for {
repeat++
keyEntry := fmt.Sprintf("%s%s", prefixEntry, key)
v, err := s.kv.Read(context.Background(), keyEntry)
if err != nil {
rpcErr := err.(*maelstrom.RPCError)
if rpcErr.Code == maelstrom.KeyDoesNotExist {
v = ""
} else {
return err
}
}

logs, err := toLogEntries(v.(string))
if err != nil {
keyEntry := fmt.Sprintf("%s%s", prefixEntry, key)
v, err := s.kv.Read(context.Background(), keyEntry)
if err != nil {
rpcErr := err.(*maelstrom.RPCError)
if rpcErr.Code == maelstrom.KeyDoesNotExist {
v = ""
} else {
return err
}
}

logs.entries = append(logs.entries, entry{
offset: offset,
message: message,
})
logs, err := toLogEntries(v.(string))
if err != nil {
return err
}

err = s.kv.CompareAndSwap(context.Background(), keyEntry, v, logs.String(), true)
if err != nil {
continue
}
break
logs.entries = append(logs.entries, entry{
offset: offset,
message: message,
})

err = s.kv.Write(context.Background(), keyEntry, logs.String())
if err != nil {
return err
}

return s.n.Reply(msg, map[string]any{
Expand Down Expand Up @@ -232,13 +222,6 @@ func (s *server) pollHandler(msg maelstrom.Message) error {
return err
}

//for i := 0; i < len(logs.entries); i++ {
// e := logs.entries[i]
// if e.offset >= startingOffset {
// res[key] = append(res[key], [2]int{e.offset, e.message})
// }
//}

i := findOffset(logs, startingOffset)
for ; i < len(logs.entries); i++ {
e := logs.entries[i]
Expand Down

0 comments on commit 0f4a6d6

Please sign in to comment.