diff --git a/CLAUDE.md b/CLAUDE.md index 2b71984..80745cd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -91,6 +91,8 @@ Each composes dotcontext types. Mutators return deltas for replication. | `dwflag/` | DWFlag | 2 source + 2 test | | `mvregister/` | MVRegister | 2 source + 2 test | | `rga/` | RGA, Node, Element | 2 source + 2 test | +| `gset/` | GSet | 2 source + 2 test | +| `lwweset/` | LWWESet | 2 source + 2 test | | `replication/` | PeerTracker, GC, WriteDeltaBatch | 4 source + 4 test | | `transport/` | Conn, Transport, Handler, PeerError | 3 source + 3 test | diff --git a/TODO.md b/TODO.md index 6cfe7b5..a3929b4 100644 --- a/TODO.md +++ b/TODO.md @@ -28,6 +28,8 @@ - [x] `dwflag` — disable-wins flag (complement of ewflag) - [x] `rwset` — remove-wins observed-remove set (dual of AWSet) - [x] `rga` — replicated growable array (`DotFun[Node[E]]`, immutable elements, tombstone ordering) +- [x] `gset` — grow-only set (no causal context; merge = set union) +- [x] `lwweset` — LWW element set (per-element timestamp; add-wins on equal timestamps) ## Optimization diff --git a/gset/doc.go b/gset/doc.go new file mode 100644 index 0000000..319a198 --- /dev/null +++ b/gset/doc.go @@ -0,0 +1,10 @@ +// Package gset implements a grow-only set (GSet), a delta-state CRDT +// where elements can only be added, never removed. Merge is set union. +// +// GSet does not use causal dot stores — there are no concurrent +// add/remove conflicts to resolve. Merge correctness follows directly +// from set union being commutative, associative, and idempotent. +// +// Mutators return deltas suitable for replication. Merge incorporates +// a delta or full state from another replica. +package gset diff --git a/gset/gset.go b/gset/gset.go new file mode 100644 index 0000000..eeb9dab --- /dev/null +++ b/gset/gset.go @@ -0,0 +1,52 @@ +package gset + +// GSet is a grow-only set. Elements can only be added, never removed. +// +// Unlike other CRDTs in this library, GSet requires no causal context: +// there are no remove operations, so no conflict between add and remove +// is possible. The semilattice join is plain set union. +type GSet[E comparable] struct { + elems map[E]struct{} +} + +// New creates an empty GSet. +func New[E comparable]() *GSet[E] { + return &GSet[E]{elems: make(map[E]struct{})} +} + +// Add inserts elem into the set and returns a delta for replication. +// If elem is already present, local state is unchanged and the returned +// delta is a singleton set containing elem. +func (s *GSet[E]) Add(elem E) *GSet[E] { + s.elems[elem] = struct{}{} + return &GSet[E]{elems: map[E]struct{}{elem: {}}} +} + +// Has reports whether elem is in the set. +func (s *GSet[E]) Has(elem E) bool { + _, ok := s.elems[elem] + return ok +} + +// Elements returns all elements currently in the set. +// The order is non-deterministic. +func (s *GSet[E]) Elements() []E { + elems := make([]E, 0, len(s.elems)) + for e := range s.elems { + elems = append(elems, e) + } + return elems +} + +// Len returns the number of elements in the set. +func (s *GSet[E]) Len() int { + return len(s.elems) +} + +// Merge incorporates a delta or full state from another GSet. +// Merge is set union: every element in other is added to s. +func (s *GSet[E]) Merge(other *GSet[E]) { + for e := range other.elems { + s.elems[e] = struct{}{} + } +} diff --git a/gset/gset_test.go b/gset/gset_test.go new file mode 100644 index 0000000..29c1473 --- /dev/null +++ b/gset/gset_test.go @@ -0,0 +1,150 @@ +package gset + +import ( + "slices" + "testing" + + qt "github.com/frankban/quicktest" +) + +func TestBasicOps(t *testing.T) { + c := qt.New(t) + + c.Run("NewEmpty", func(c *qt.C) { + s := New[string]() + c.Assert(s.Len(), qt.Equals, 0) + c.Assert(s.Has("x"), qt.IsFalse) + c.Assert(s.Elements(), qt.HasLen, 0) + }) + + c.Run("AddHas", func(c *qt.C) { + s := New[string]() + s.Add("x") + s.Add("y") + + c.Assert(s.Has("x"), qt.IsTrue) + c.Assert(s.Has("y"), qt.IsTrue) + c.Assert(s.Has("z"), qt.IsFalse) + c.Assert(s.Len(), qt.Equals, 2) + }) + + c.Run("AddIdempotent", func(c *qt.C) { + s := New[string]() + s.Add("x") + s.Add("x") + + c.Assert(s.Has("x"), qt.IsTrue) + c.Assert(s.Len(), qt.Equals, 1) + }) + + c.Run("Elements", func(c *qt.C) { + s := New[int]() + s.Add(3) + s.Add(1) + s.Add(2) + + elems := s.Elements() + slices.Sort(elems) + c.Assert(elems, qt.DeepEquals, []int{1, 2, 3}) + }) +} + +func TestDeltaReturn(t *testing.T) { + c := qt.New(t) + + c.Run("AddReturnsSingleton", func(c *qt.C) { + s := New[string]() + delta := s.Add("x") + + c.Assert(delta.Has("x"), qt.IsTrue) + c.Assert(delta.Len(), qt.Equals, 1) + }) + + c.Run("DeltaDoesNotContainOtherElements", func(c *qt.C) { + s := New[string]() + s.Add("x") + delta := s.Add("y") + + c.Assert(delta.Has("y"), qt.IsTrue) + c.Assert(delta.Has("x"), qt.IsFalse) + c.Assert(delta.Len(), qt.Equals, 1) + }) + + c.Run("DeltaAlreadyPresentElement", func(c *qt.C) { + s := New[string]() + s.Add("x") + delta := s.Add("x") // already present + + c.Assert(delta.Has("x"), qt.IsTrue) + c.Assert(delta.Len(), qt.Equals, 1) + c.Assert(s.Len(), qt.Equals, 1) + }) +} + +func TestMerge(t *testing.T) { + c := qt.New(t) + + c.Run("UnionOfDisjointSets", func(c *qt.C) { + a := New[string]() + b := New[string]() + a.Add("x") + b.Add("y") + + a.Merge(b) + c.Assert(a.Has("x"), qt.IsTrue) + c.Assert(a.Has("y"), qt.IsTrue) + c.Assert(a.Len(), qt.Equals, 2) + }) + + c.Run("UnionOfOverlappingSets", func(c *qt.C) { + a := New[string]() + b := New[string]() + a.Add("x") + a.Add("y") + b.Add("y") + b.Add("z") + + a.Merge(b) + c.Assert(a.Len(), qt.Equals, 3) + }) + + c.Run("ConcurrentAdds", func(c *qt.C) { + a := New[string]() + b := New[string]() + + da := a.Add("x") + db := b.Add("y") + + a.Merge(db) + b.Merge(da) + + c.Assert(a.Has("x"), qt.IsTrue) + c.Assert(a.Has("y"), qt.IsTrue) + c.Assert(b.Has("x"), qt.IsTrue) + c.Assert(b.Has("y"), qt.IsTrue) + }) + + c.Run("MergeIntoEmpty", func(c *qt.C) { + a := New[string]() + a.Add("x") + a.Add("y") + + b := New[string]() + b.Merge(a) + + c.Assert(b.Has("x"), qt.IsTrue) + c.Assert(b.Has("y"), qt.IsTrue) + c.Assert(b.Len(), qt.Equals, 2) + }) + + c.Run("MergeEmptyIntoPopulated", func(c *qt.C) { + a := New[string]() + a.Add("x") + + empty := New[string]() + a.Merge(empty) + + c.Assert(a.Has("x"), qt.IsTrue) + c.Assert(a.Len(), qt.Equals, 1) + }) +} diff --git a/gset/shared_test.go b/gset/shared_test.go new file mode 100644 index 0000000..05c56c0 --- /dev/null +++ b/gset/shared_test.go @@ -0,0 +1,29 @@ +package gset + +import ( + "slices" + "testing" + + "github.com/aalpar/crdt/crdttest" +) + +func TestSharedProperties(t *testing.T) { + crdttest.Harness[*GSet[string]]{ + New: func(_ string) *GSet[string] { return New[string]() }, + Merge: func(dst, src *GSet[string]) { dst.Merge(src) }, + Equal: func(a, b *GSet[string]) bool { + ae := a.Elements() + be := b.Elements() + slices.Sort(ae) + slices.Sort(be) + return slices.Equal(ae, be) + }, + Ops: []func(*GSet[string]) *GSet[string]{ + func(s *GSet[string]) *GSet[string] { return s.Add("x") }, + func(s *GSet[string]) *GSet[string] { return s.Add("y") }, + func(s *GSet[string]) *GSet[string] { return s.Add("z") }, + func(s *GSet[string]) *GSet[string] { return s.Add("w") }, + func(s *GSet[string]) *GSet[string] { return s.Add("v") }, + }, + }.Run(t) +} diff --git a/lwweset/doc.go b/lwweset/doc.go new file mode 100644 index 0000000..2e717b5 --- /dev/null +++ b/lwweset/doc.go @@ -0,0 +1,13 @@ +// Package lwweset implements a last-writer-wins element set (LWWESet), +// a delta-state CRDT where each element independently carries an add +// timestamp and a remove timestamp. An element is present when its +// add timestamp is greater than or equal to its remove timestamp. +// +// Unlike AWSet and RWSet, conflict resolution does not use causal dot +// stores — timestamps provide a total order that makes causal tracking +// redundant. The timestamp source (wall clock, logical clock, Lamport +// clock) is supplied by the caller. +// +// Mutators return deltas suitable for replication. Merge incorporates +// a delta or full state from another replica. +package lwweset diff --git a/lwweset/lwweset.go b/lwweset/lwweset.go new file mode 100644 index 0000000..901e9cd --- /dev/null +++ b/lwweset/lwweset.go @@ -0,0 +1,99 @@ +package lwweset + +// LWWESet is a last-writer-wins element set. Each element carries +// an add timestamp and a remove timestamp; an element is present +// when its add timestamp is greater than or equal to its remove +// timestamp (add-wins on equal timestamps). +// +// The timestamp source is controlled by the caller. Use a monotonic +// source (wall clock, Lamport clock, etc.) to ensure meaningful +// ordering. Identical timestamps on concurrent add and remove of the +// same element resolve in favor of add. +// +// Like GSet, LWWESet requires no causal dot stores — timestamps +// provide a total order that makes per-replica dot tracking redundant. +type LWWESet[E comparable] struct { + added map[E]int64 + removed map[E]int64 +} + +// New creates an empty LWWESet. +func New[E comparable]() *LWWESet[E] { + return &LWWESet[E]{ + added: make(map[E]int64), + removed: make(map[E]int64), + } +} + +// Add inserts elem with the given timestamp and returns a delta for +// replication. If a higher add timestamp for elem already exists +// locally, local state is unchanged but the delta still carries ts. +func (s *LWWESet[E]) Add(elem E, ts int64) *LWWESet[E] { + if ts > s.added[elem] { + s.added[elem] = ts + } + return &LWWESet[E]{ + added: map[E]int64{elem: ts}, + removed: make(map[E]int64), + } +} + +// Remove marks elem absent with the given timestamp and returns a +// delta for replication. An element is absent when its remove +// timestamp is strictly greater than its add timestamp. +func (s *LWWESet[E]) Remove(elem E, ts int64) *LWWESet[E] { + if ts > s.removed[elem] { + s.removed[elem] = ts + } + return &LWWESet[E]{ + added: make(map[E]int64), + removed: map[E]int64{elem: ts}, + } +} + +// Has reports whether elem is currently in the set. +// An element is present iff it has been added (add timestamp > 0) +// and its add timestamp is greater than or equal to its remove timestamp. +func (s *LWWESet[E]) Has(elem E) bool { + a, ok := s.added[elem] + return ok && a >= s.removed[elem] +} + +// Elements returns all elements currently in the set. +// The order is non-deterministic. +func (s *LWWESet[E]) Elements() []E { + elems := make([]E, 0) + for e, a := range s.added { + if a >= s.removed[e] { + elems = append(elems, e) + } + } + return elems +} + +// Len returns the number of elements currently in the set. +func (s *LWWESet[E]) Len() int { + n := 0 + for e, a := range s.added { + if a >= s.removed[e] { + n++ + } + } + return n +} + +// Merge incorporates a delta or full state from another LWWESet. +// For each element, Merge takes the maximum of the add timestamps +// and the maximum of the remove timestamps. +func (s *LWWESet[E]) Merge(other *LWWESet[E]) { + for e, ts := range other.added { + if ts > s.added[e] { + s.added[e] = ts + } + } + for e, ts := range other.removed { + if ts > s.removed[e] { + s.removed[e] = ts + } + } +} diff --git a/lwweset/lwweset_test.go b/lwweset/lwweset_test.go new file mode 100644 index 0000000..786647f --- /dev/null +++ b/lwweset/lwweset_test.go @@ -0,0 +1,205 @@ +package lwweset + +import ( + "slices" + "testing" + + qt "github.com/frankban/quicktest" +) + +func TestBasicOps(t *testing.T) { + c := qt.New(t) + + c.Run("NewEmpty", func(c *qt.C) { + s := New[string]() + c.Assert(s.Len(), qt.Equals, 0) + c.Assert(s.Has("x"), qt.IsFalse) + c.Assert(s.Elements(), qt.HasLen, 0) + }) + + c.Run("AddHas", func(c *qt.C) { + s := New[string]() + s.Add("x", 1) + s.Add("y", 2) + + c.Assert(s.Has("x"), qt.IsTrue) + c.Assert(s.Has("y"), qt.IsTrue) + c.Assert(s.Has("z"), qt.IsFalse) + c.Assert(s.Len(), qt.Equals, 2) + }) + + c.Run("RemoveAbsent", func(c *qt.C) { + s := New[string]() + s.Remove("ghost", 1) // never added + c.Assert(s.Has("ghost"), qt.IsFalse) + c.Assert(s.Len(), qt.Equals, 0) + }) + + c.Run("AddThenRemove", func(c *qt.C) { + s := New[string]() + s.Add("x", 1) + s.Remove("x", 2) + + c.Assert(s.Has("x"), qt.IsFalse) + c.Assert(s.Len(), qt.Equals, 0) + }) + + c.Run("RemoveThenReadd", func(c *qt.C) { + s := New[string]() + s.Add("x", 1) + s.Remove("x", 2) + s.Add("x", 3) + + c.Assert(s.Has("x"), qt.IsTrue) + c.Assert(s.Len(), qt.Equals, 1) + }) + + c.Run("Elements", func(c *qt.C) { + s := New[int]() + s.Add(3, 1) + s.Add(1, 2) + s.Add(2, 3) + + elems := s.Elements() + slices.Sort(elems) + c.Assert(elems, qt.DeepEquals, []int{1, 2, 3}) + }) +} + +func TestTimestampResolution(t *testing.T) { + c := qt.New(t) + + c.Run("HigherAddWins", func(c *qt.C) { + s := New[string]() + s.Add("x", 5) + s.Remove("x", 3) // remove ts < add ts + + c.Assert(s.Has("x"), qt.IsTrue) + }) + + c.Run("HigherRemoveWins", func(c *qt.C) { + s := New[string]() + s.Add("x", 3) + s.Remove("x", 5) // remove ts > add ts + + c.Assert(s.Has("x"), qt.IsFalse) + }) + + c.Run("EqualTimestampAddWins", func(c *qt.C) { + s := New[string]() + s.Add("x", 5) + s.Remove("x", 5) // same timestamp: add wins + + c.Assert(s.Has("x"), qt.IsTrue) + }) + + c.Run("LaterAddOverridesRemove", func(c *qt.C) { + s := New[string]() + s.Remove("x", 10) // remove arrives before add + s.Add("x", 15) // later add wins + + c.Assert(s.Has("x"), qt.IsTrue) + }) +} + +func TestDeltaReturn(t *testing.T) { + c := qt.New(t) + + c.Run("AddDeltaContainsOnlyAddedElem", func(c *qt.C) { + s := New[string]() + s.Add("x", 1) + delta := s.Add("y", 2) + + c.Assert(delta.Has("y"), qt.IsTrue) + c.Assert(delta.Has("x"), qt.IsFalse) + c.Assert(delta.Len(), qt.Equals, 1) + }) + + c.Run("RemoveDeltaDoesNotAddElement", func(c *qt.C) { + s := New[string]() + s.Add("x", 1) + delta := s.Remove("x", 2) + + c.Assert(delta.Has("x"), qt.IsFalse) + c.Assert(delta.Len(), qt.Equals, 0) + }) +} + +func TestMerge(t *testing.T) { + c := qt.New(t) + + c.Run("MergeAdds", func(c *qt.C) { + a := New[string]() + b := New[string]() + + da := a.Add("x", 1) + db := b.Add("y", 2) + + a.Merge(db) + b.Merge(da) + + c.Assert(a.Has("x"), qt.IsTrue) + c.Assert(a.Has("y"), qt.IsTrue) + c.Assert(b.Has("x"), qt.IsTrue) + c.Assert(b.Has("y"), qt.IsTrue) + }) + + c.Run("ConcurrentAddRemoveLaterAddWins", func(c *qt.C) { + a := New[string]() + b := New[string]() + + // a adds x at ts=5, b removes x at ts=3 (concurrent, different replicas) + da := a.Add("x", 5) + db := b.Remove("x", 3) + + a.Merge(db) + b.Merge(da) + + c.Assert(a.Has("x"), qt.IsTrue) + c.Assert(b.Has("x"), qt.IsTrue) + }) + + c.Run("ConcurrentAddRemoveLaterRemoveWins", func(c *qt.C) { + a := New[string]() + b := New[string]() + + // a adds x at ts=3, b removes x at ts=5 (concurrent, different replicas) + da := a.Add("x", 3) + db := b.Remove("x", 5) + + a.Merge(db) + b.Merge(da) + + c.Assert(a.Has("x"), qt.IsFalse) + c.Assert(b.Has("x"), qt.IsFalse) + }) + + c.Run("RemoveSeenBeforeAdd", func(c *qt.C) { + // b gets the remove delta before it has seen the add + a := New[string]() + b := New[string]() + + a.Add("x", 5) + rmDelta := a.Remove("x", 7) + + b.Merge(rmDelta) // b sees remove first + c.Assert(b.Has("x"), qt.IsFalse) + + addDelta := New[string]() + addDelta.added["x"] = 5 // synthesize the add delta + b.Merge(addDelta) // b now gets the add (ts=5 < rm ts=7) + + c.Assert(b.Has("x"), qt.IsFalse) // remove still wins + }) + + c.Run("MaxTimestampWinsOnMerge", func(c *qt.C) { + a := New[string]() + b := New[string]() + + a.Add("x", 3) + b.Add("x", 7) // b has a later add + + a.Merge(b) + c.Assert(a.added["x"], qt.Equals, int64(7)) + }) +} diff --git a/lwweset/shared_test.go b/lwweset/shared_test.go new file mode 100644 index 0000000..52d169e --- /dev/null +++ b/lwweset/shared_test.go @@ -0,0 +1,29 @@ +package lwweset + +import ( + "slices" + "testing" + + "github.com/aalpar/crdt/crdttest" +) + +func TestSharedProperties(t *testing.T) { + crdttest.Harness[*LWWESet[string]]{ + New: func(_ string) *LWWESet[string] { return New[string]() }, + Merge: func(dst, src *LWWESet[string]) { dst.Merge(src) }, + Equal: func(a, b *LWWESet[string]) bool { + ae := a.Elements() + be := b.Elements() + slices.Sort(ae) + slices.Sort(be) + return slices.Equal(ae, be) + }, + Ops: []func(*LWWESet[string]) *LWWESet[string]{ + func(s *LWWESet[string]) *LWWESet[string] { return s.Add("x", 1) }, + func(s *LWWESet[string]) *LWWESet[string] { return s.Add("y", 2) }, + func(s *LWWESet[string]) *LWWESet[string] { return s.Add("z", 3) }, + func(s *LWWESet[string]) *LWWESet[string] { return s.Add("w", 4) }, + func(s *LWWESet[string]) *LWWESet[string] { return s.Add("v", 5) }, + }, + }.Run(t) +}