Skip to content

Commit a6d9841

Browse files
author
Cody Lee
authored
Merge pull request #18 from platinummonkey/dynamic-partitions
Dynamic Partition strategy support
2 parents c712e58 + 360766d commit a6d9841

File tree

4 files changed

+128
-0
lines changed

4 files changed

+128
-0
lines changed

strategy/lookup_partition.go

+26
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,32 @@ func NewLookupPartitionStrategyWithMetricRegistry(
167167
return strategy, nil
168168
}
169169

170+
// AddPartition will dynamically add a partition
171+
// will return false if this partition is already defined, otherwise true if successfully added
172+
func (s *LookupPartitionStrategy) AddPartition(name string, partition *LookupPartition) bool {
173+
s.mu.Lock()
174+
defer s.mu.Unlock()
175+
_, ok := s.partitions[name]
176+
if ok {
177+
return false
178+
}
179+
s.partitions[name] = partition
180+
return true
181+
}
182+
183+
// RemovePartition will remove a given partition dynamically
184+
// will return the busy count from that partition, along with true if the partition was found, otherwise false.
185+
func (s *LookupPartitionStrategy) RemovePartition(name string) (int, bool) {
186+
s.mu.Lock()
187+
defer s.mu.Unlock()
188+
partition, ok := s.partitions[name]
189+
if !ok {
190+
return 0, false
191+
}
192+
delete(s.partitions, name)
193+
return partition.BusyCount(), true
194+
}
195+
170196
// TryAcquire a token from a partition
171197
func (s *LookupPartitionStrategy) TryAcquire(ctx context.Context) (token core.StrategyToken, ok bool) {
172198
s.mu.Lock()

strategy/lookup_partition_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,35 @@ func TestLookupPartitionStrategy(t *testing.T) {
312312
asrt.Equal(1, busyCount)
313313
asrt.Equal(1, strategy.BusyCount())
314314
})
315+
316+
t.Run("AddRemoveDynamically", func(t2 *testing.T) {
317+
t2.Parallel()
318+
asrt := assert.New(t2)
319+
testPartitions := makeTestLookupPartitions()
320+
strategy, err := NewLookupPartitionStrategyWithMetricRegistry(
321+
testPartitions,
322+
nil,
323+
1,
324+
core.EmptyMetricRegistryInstance,
325+
)
326+
asrt.NoError(err, "failed to create strategy")
327+
asrt.NotNil(strategy)
328+
329+
// add a partition
330+
testPartition := NewLookupPartitionWithMetricRegistry(
331+
"test1",
332+
0.7,
333+
1,
334+
core.EmptyMetricRegistryInstance,
335+
)
336+
strategy.AddPartition(testPartition.Name(), testPartition)
337+
binLimit, err := strategy.BinLimit("test1")
338+
asrt.NoError(err)
339+
asrt.Equal(1, binLimit)
340+
341+
// remove a partition
342+
strategy.RemovePartition("test1")
343+
binLimit, err = strategy.BinLimit("test1")
344+
asrt.Error(err)
345+
})
315346
}

strategy/predicate_partition.go

+37
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,43 @@ func NewPredicatePartitionStrategyWithMetricRegistry(
157157
return strategy, nil
158158
}
159159

160+
// AddPartition will dynamically add a partition
161+
// will return false if this partition is already defined, otherwise true if successfully added
162+
func (s *PredicatePartitionStrategy) AddPartition(partition *PredicatePartition) bool {
163+
s.mu.Lock()
164+
defer s.mu.Unlock()
165+
exists := false
166+
for _, p := range s.partitions {
167+
if p == partition {
168+
exists = true
169+
break
170+
}
171+
}
172+
if exists {
173+
return false
174+
}
175+
s.partitions = append(s.partitions, partition)
176+
return true
177+
}
178+
179+
// RemovePartitionsMatching will remove partitions dynamically
180+
// will return the removed matching partitions, and true if there are at least 1 removed partition
181+
func (s *PredicatePartitionStrategy) RemovePartitionsMatching(matcher context.Context) ([]*PredicatePartition, bool) {
182+
s.mu.Lock()
183+
defer s.mu.Unlock()
184+
kept := make([]*PredicatePartition, 0)
185+
removed := make([]*PredicatePartition, 0)
186+
for _, p := range s.partitions {
187+
if p.predicate(matcher) {
188+
removed = append(removed, p)
189+
} else {
190+
kept = append(kept, p)
191+
}
192+
}
193+
s.partitions = kept
194+
return removed, len(removed) > 0
195+
}
196+
160197
// TryAcquire a token from a partition
161198
func (s *PredicatePartitionStrategy) TryAcquire(ctx context.Context) (core.StrategyToken, bool) {
162199
s.mu.Lock()

strategy/predicate_partition_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,38 @@ func TestPredicatePartition(t *testing.T) {
310310
asrt.Equal(1, busyCount)
311311
asrt.Equal(1, strategy.BusyCount())
312312
})
313+
314+
t.Run("AddRemoveDynamically", func(t2 *testing.T) {
315+
t2.Parallel()
316+
asrt := assert.New(t2)
317+
318+
testPartitions := makeTestPartitions()
319+
strategy, err := NewPredicatePartitionStrategyWithMetricRegistry(
320+
testPartitions,
321+
1,
322+
core.EmptyMetricRegistryInstance)
323+
asrt.NoError(err, "failed to create strategy")
324+
asrt.NotNil(strategy)
325+
strategy.SetLimit(10)
326+
327+
// add a partition
328+
testPartition := NewPredicatePartitionWithMetricRegistry(
329+
"test1",
330+
0.7,
331+
matchers.StringPredicateMatcher("test1", false),
332+
core.EmptyMetricRegistryInstance,
333+
)
334+
strategy.AddPartition(testPartition)
335+
ctxTest := context.WithValue(context.Background(), matchers.StringPredicateContextKey, "test1")
336+
token, ok := strategy.TryAcquire(ctxTest)
337+
asrt.True(ok)
338+
asrt.NotNil(token)
339+
340+
// remove a partition
341+
strategy.RemovePartitionsMatching(ctxTest)
342+
// we get the default token now
343+
token, ok = strategy.TryAcquire(ctxTest)
344+
asrt.False(ok)
345+
asrt.False(token.IsAcquired())
346+
})
313347
}

0 commit comments

Comments
 (0)