Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Each composes dotcontext types. Mutators return deltas for replication.
| `dwflag/` | DWFlag | 2 source + 2 test |
| `mvregister/` | MVRegister | 2 source + 2 test |
| `rga/` | RGA, Node, Element | 2 source + 2 test |
| `gset/` | GSet | 2 source + 2 test |
| `lwweset/` | LWWESet | 2 source + 2 test |
| `replication/` | PeerTracker, GC, WriteDeltaBatch | 4 source + 4 test |
| `transport/` | Conn, Transport, Handler, PeerError | 3 source + 3 test |

Expand Down
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
- [x] `dwflag` — disable-wins flag (complement of ewflag)
- [x] `rwset` — remove-wins observed-remove set (dual of AWSet)
- [x] `rga` — replicated growable array (`DotFun[Node[E]]`, immutable elements, tombstone ordering)
- [x] `gset` — grow-only set (no causal context; merge = set union)
- [x] `lwweset` — LWW element set (per-element timestamp; add-wins on equal timestamps)

## Optimization

Expand Down
10 changes: 10 additions & 0 deletions gset/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Package gset implements a grow-only set (GSet), a delta-state CRDT
// where elements can only be added, never removed. Merge is set union.
//
// GSet does not use causal dot stores — there are no concurrent
// add/remove conflicts to resolve. Merge correctness follows directly
// from set union being commutative, associative, and idempotent.
//
// Mutators return deltas suitable for replication. Merge incorporates
// a delta or full state from another replica.
package gset
52 changes: 52 additions & 0 deletions gset/gset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package gset

// GSet is a grow-only set. Elements can only be added, never removed.
//
// Unlike other CRDTs in this library, GSet requires no causal context:
// there are no remove operations, so no conflict between add and remove
// is possible. The semilattice join is plain set union.
type GSet[E comparable] struct {
elems map[E]struct{}
}

// New creates an empty GSet.
func New[E comparable]() *GSet[E] {
return &GSet[E]{elems: make(map[E]struct{})}
}

// Add inserts elem into the set and returns a delta for replication.
// If elem is already present, local state is unchanged and the returned
// delta is a singleton set containing elem.
func (s *GSet[E]) Add(elem E) *GSet[E] {
s.elems[elem] = struct{}{}
return &GSet[E]{elems: map[E]struct{}{elem: {}}}
}

// Has reports whether elem is in the set.
func (s *GSet[E]) Has(elem E) bool {
_, ok := s.elems[elem]
return ok
}

// Elements returns all elements currently in the set.
// The order is non-deterministic.
func (s *GSet[E]) Elements() []E {
elems := make([]E, 0, len(s.elems))
for e := range s.elems {
elems = append(elems, e)
}
return elems
}

// Len returns the number of elements in the set.
func (s *GSet[E]) Len() int {
return len(s.elems)
}

// Merge incorporates a delta or full state from another GSet.
// Merge is set union: every element in other is added to s.
func (s *GSet[E]) Merge(other *GSet[E]) {
for e := range other.elems {
s.elems[e] = struct{}{}
}
}
150 changes: 150 additions & 0 deletions gset/gset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package gset

import (
"slices"
"testing"

qt "github.com/frankban/quicktest"
)

func TestBasicOps(t *testing.T) {
c := qt.New(t)

c.Run("NewEmpty", func(c *qt.C) {
s := New[string]()
c.Assert(s.Len(), qt.Equals, 0)
c.Assert(s.Has("x"), qt.IsFalse)
c.Assert(s.Elements(), qt.HasLen, 0)
})

c.Run("AddHas", func(c *qt.C) {
s := New[string]()
s.Add("x")
s.Add("y")

c.Assert(s.Has("x"), qt.IsTrue)
c.Assert(s.Has("y"), qt.IsTrue)
c.Assert(s.Has("z"), qt.IsFalse)
c.Assert(s.Len(), qt.Equals, 2)
})

c.Run("AddIdempotent", func(c *qt.C) {
s := New[string]()
s.Add("x")
s.Add("x")

c.Assert(s.Has("x"), qt.IsTrue)
c.Assert(s.Len(), qt.Equals, 1)
})

c.Run("Elements", func(c *qt.C) {
s := New[int]()
s.Add(3)
s.Add(1)
s.Add(2)

elems := s.Elements()
slices.Sort(elems)
c.Assert(elems, qt.DeepEquals, []int{1, 2, 3})
})
}

func TestDeltaReturn(t *testing.T) {
c := qt.New(t)

c.Run("AddReturnsSingleton", func(c *qt.C) {
s := New[string]()
delta := s.Add("x")

c.Assert(delta.Has("x"), qt.IsTrue)
c.Assert(delta.Len(), qt.Equals, 1)
})

c.Run("DeltaDoesNotContainOtherElements", func(c *qt.C) {
s := New[string]()
s.Add("x")
delta := s.Add("y")

c.Assert(delta.Has("y"), qt.IsTrue)
c.Assert(delta.Has("x"), qt.IsFalse)
c.Assert(delta.Len(), qt.Equals, 1)
})

c.Run("DeltaAlreadyPresentElement", func(c *qt.C) {
s := New[string]()
s.Add("x")
delta := s.Add("x") // already present

c.Assert(delta.Has("x"), qt.IsTrue)
c.Assert(delta.Len(), qt.Equals, 1)
c.Assert(s.Len(), qt.Equals, 1)
})
}

func TestMerge(t *testing.T) {
c := qt.New(t)

c.Run("UnionOfDisjointSets", func(c *qt.C) {
a := New[string]()
b := New[string]()
a.Add("x")
b.Add("y")

a.Merge(b)
c.Assert(a.Has("x"), qt.IsTrue)
c.Assert(a.Has("y"), qt.IsTrue)
c.Assert(a.Len(), qt.Equals, 2)
})

c.Run("UnionOfOverlappingSets", func(c *qt.C) {
a := New[string]()
b := New[string]()
a.Add("x")
a.Add("y")
b.Add("y")
b.Add("z")

a.Merge(b)
c.Assert(a.Len(), qt.Equals, 3)
})

c.Run("ConcurrentAdds", func(c *qt.C) {
a := New[string]()
b := New[string]()

da := a.Add("x")
db := b.Add("y")

a.Merge(db)
b.Merge(da)

c.Assert(a.Has("x"), qt.IsTrue)
c.Assert(a.Has("y"), qt.IsTrue)
c.Assert(b.Has("x"), qt.IsTrue)
c.Assert(b.Has("y"), qt.IsTrue)
})

c.Run("MergeIntoEmpty", func(c *qt.C) {
a := New[string]()
a.Add("x")
a.Add("y")

b := New[string]()
b.Merge(a)

c.Assert(b.Has("x"), qt.IsTrue)
c.Assert(b.Has("y"), qt.IsTrue)
c.Assert(b.Len(), qt.Equals, 2)
})

c.Run("MergeEmptyIntoPopulated", func(c *qt.C) {
a := New[string]()
a.Add("x")

empty := New[string]()
a.Merge(empty)

c.Assert(a.Has("x"), qt.IsTrue)
c.Assert(a.Len(), qt.Equals, 1)
})
}
29 changes: 29 additions & 0 deletions gset/shared_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package gset

import (
"slices"
"testing"

"github.com/aalpar/crdt/crdttest"
)

func TestSharedProperties(t *testing.T) {
crdttest.Harness[*GSet[string]]{
New: func(_ string) *GSet[string] { return New[string]() },
Merge: func(dst, src *GSet[string]) { dst.Merge(src) },
Equal: func(a, b *GSet[string]) bool {
ae := a.Elements()
be := b.Elements()
slices.Sort(ae)
slices.Sort(be)
return slices.Equal(ae, be)
},
Ops: []func(*GSet[string]) *GSet[string]{
func(s *GSet[string]) *GSet[string] { return s.Add("x") },
func(s *GSet[string]) *GSet[string] { return s.Add("y") },
func(s *GSet[string]) *GSet[string] { return s.Add("z") },
func(s *GSet[string]) *GSet[string] { return s.Add("w") },
func(s *GSet[string]) *GSet[string] { return s.Add("v") },
},
}.Run(t)
}
13 changes: 13 additions & 0 deletions lwweset/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Package lwweset implements a last-writer-wins element set (LWWESet),
// a delta-state CRDT where each element independently carries an add
// timestamp and a remove timestamp. An element is present when its
// add timestamp is greater than or equal to its remove timestamp.
//
// Unlike AWSet and RWSet, conflict resolution does not use causal dot
// stores — timestamps provide a total order that makes causal tracking
// redundant. The timestamp source (wall clock, logical clock, Lamport
// clock) is supplied by the caller.
//
// Mutators return deltas suitable for replication. Merge incorporates
// a delta or full state from another replica.
package lwweset
99 changes: 99 additions & 0 deletions lwweset/lwweset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package lwweset

// LWWESet is a last-writer-wins element set. Each element carries
// an add timestamp and a remove timestamp; an element is present
// when its add timestamp is greater than or equal to its remove
// timestamp (add-wins on equal timestamps).
//
// The timestamp source is controlled by the caller. Use a monotonic
// source (wall clock, Lamport clock, etc.) to ensure meaningful
// ordering. Identical timestamps on concurrent add and remove of the
// same element resolve in favor of add.
//
// Like GSet, LWWESet requires no causal dot stores — timestamps
// provide a total order that makes per-replica dot tracking redundant.
type LWWESet[E comparable] struct {
added map[E]int64
removed map[E]int64
}

// New creates an empty LWWESet.
func New[E comparable]() *LWWESet[E] {
return &LWWESet[E]{
added: make(map[E]int64),
removed: make(map[E]int64),
}
}

// Add inserts elem with the given timestamp and returns a delta for
// replication. If a higher add timestamp for elem already exists
// locally, local state is unchanged but the delta still carries ts.
func (s *LWWESet[E]) Add(elem E, ts int64) *LWWESet[E] {
if ts > s.added[elem] {
s.added[elem] = ts
}
return &LWWESet[E]{
added: map[E]int64{elem: ts},
removed: make(map[E]int64),
}
}

// Remove marks elem absent with the given timestamp and returns a
// delta for replication. An element is absent when its remove
// timestamp is strictly greater than its add timestamp.
func (s *LWWESet[E]) Remove(elem E, ts int64) *LWWESet[E] {
if ts > s.removed[elem] {
s.removed[elem] = ts
}
return &LWWESet[E]{
added: make(map[E]int64),
removed: map[E]int64{elem: ts},
}
}

// Has reports whether elem is currently in the set.
// An element is present iff it has been added (add timestamp > 0)
// and its add timestamp is greater than or equal to its remove timestamp.
func (s *LWWESet[E]) Has(elem E) bool {
a, ok := s.added[elem]
return ok && a >= s.removed[elem]
}

// Elements returns all elements currently in the set.
// The order is non-deterministic.
func (s *LWWESet[E]) Elements() []E {
elems := make([]E, 0)
for e, a := range s.added {
if a >= s.removed[e] {
elems = append(elems, e)
}
}
return elems
}

// Len returns the number of elements currently in the set.
func (s *LWWESet[E]) Len() int {
n := 0
for e, a := range s.added {
if a >= s.removed[e] {
n++
}
}
return n
}

// Merge incorporates a delta or full state from another LWWESet.
// For each element, Merge takes the maximum of the add timestamps
// and the maximum of the remove timestamps.
func (s *LWWESet[E]) Merge(other *LWWESet[E]) {
for e, ts := range other.added {
if ts > s.added[e] {
s.added[e] = ts
}
}
for e, ts := range other.removed {
if ts > s.removed[e] {
s.removed[e] = ts
}
}
}
Loading
Loading