Skip to content

Commit d96ed9f

Browse files
authored
zioGH-8076 ConcurrentWeakHashSet (zio#8093)
* zioGH-8076 Initialize ConcurrentWeakHashSet * zioGH-8093 Add ConcurrentWeakHashSet#remove * zioGH-8093 Implement `Iterable` by ConcurrentWeakHashSet * zioGH-8093 Init benchmarks * zioGH-8093 Reimplement `ConcurrentWeakHashSet#remove` * zioGH-8093 Benchmark `ConcurrentWeakHashSet#iterate` * zioGH-8093 Add `ConcurrentWeakHashSet#clear` * zioGH-8093 Add `ConcurrentWeakHashSet#contains` * zioGH-8093 Add scaladocs * zioGH-8093 fmt * zioGH-8093 Simplify tests & make sure that all used references are properly allocated * zioGH-8093 CR v1 * zioGH-8093 CR v2 * zioGH-8093 Replace dynamic consumer-like UpdateOperation with constant operation request * zioGH-8093 Rename `Ref` to `RefNode` to highlight its purpose in the impl * zioGH-8093 Fix invalid behavior of `ConcurrentWeakHashSet#remove` for chained elements with the same index + tests * zioGH-8093 Don't create empty Set instance if ref queue is empty * zioGH-8093 Support Scala 2.12 * zioGH-8093 Add `contains` benchmark * zioGH-8093 Replace `mutable.Set` used in restructure with mutable hash * zioGH-8093 fmt * zioGH-8093 Hash -1 is valid hash, so we can't really use it as indicator * zioGH-8093 Revert `Test / fork := false` * zioGH-8093 Remove `_` from number litterals to make Scala 2.12 happy * zioGH-8093 Fix Scala 3.x compilation error with ConcurrentWeakHashSetSpec & use `apply` instead of `new` * zioGH-8093 Revert change of `PlatformSpecific.newConcurrentWeakSet` * zioGH-8093 Move JVM based test to ConcurrentWeakHashSetSpecJVM to satisfy JS/Native compilation targets * zioGH-8093 Final cleanup * zioGH-8093 fmt
1 parent be51e70 commit d96ed9f

File tree

7 files changed

+1039
-1
lines changed

7 files changed

+1039
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
package zio.internal
2+
3+
import com.github.ghik.silencer.silent
4+
import org.openjdk.jmh.annotations._
5+
import org.openjdk.jmh.infra.Blackhole
6+
import org.springframework.util.{ConcurrentReferenceHashMap => SpringConcurrentReferenceHashMap}
7+
8+
import java.util
9+
import java.util.Collections
10+
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
11+
import java.util.concurrent.atomic.AtomicInteger
12+
13+
@State(Scope.Benchmark)
14+
private[this] class AddContext extends BaseContext {
15+
private var idx: AtomicInteger = _
16+
private var refs: ConcurrentLinkedQueue[TestKey] = _
17+
18+
@Setup(Level.Iteration)
19+
def setup(): Unit = {
20+
this.idx = new AtomicInteger(-1)
21+
this.refs = new ConcurrentLinkedQueue[TestKey]()
22+
this.javaSetInitializer = { _ => createJavaSet() }
23+
this.springMapInitializer = { _ => createSpringMap() }
24+
this.zioSetInitializer = { _ => createZioSet() }
25+
this.setupBase()
26+
}
27+
28+
def createCachedKey(): TestKey = {
29+
val key = TestKey(this.idx.incrementAndGet())
30+
this.refs.add(key)
31+
key
32+
}
33+
}
34+
35+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
36+
@Warmup(iterations = 2, time = 2)
37+
@Measurement(iterations = 2, time = 2)
38+
@Fork(1)
39+
private[this] class ConcurrentWeakHashSetAddBenchmark {
40+
41+
@Benchmark
42+
def javaAddSerial(ctx: AddContext, blackhole: Blackhole): Unit =
43+
blackhole.consume(ctx.javaSet.add(ctx.createCachedKey()))
44+
45+
@Threads(6)
46+
@Benchmark
47+
def javaAddConcurrent(ctx: AddContext, blackhole: Blackhole): Unit =
48+
blackhole.consume(ctx.javaSet.add(ctx.createCachedKey()))
49+
50+
@Benchmark
51+
def springAddSerial(ctx: AddContext, blackhole: Blackhole): Unit =
52+
blackhole.consume(ctx.springMap.put(ctx.createCachedKey(), true))
53+
54+
@Threads(6)
55+
@Benchmark
56+
def springAddConcurrent(ctx: AddContext, blackhole: Blackhole): Unit =
57+
blackhole.consume(ctx.springMap.put(ctx.createCachedKey(), true))
58+
59+
@Benchmark
60+
def zioAddSerial(ctx: AddContext, blackhole: Blackhole): Unit =
61+
blackhole.consume(ctx.zioSet.add(ctx.createCachedKey()))
62+
63+
@Threads(6)
64+
@Benchmark
65+
def zioAddConcurrent(ctx: AddContext, blackhole: Blackhole): Unit =
66+
blackhole.consume(ctx.zioSet.add(ctx.createCachedKey()))
67+
68+
}
69+
70+
@State(Scope.Benchmark)
71+
private[this] class RemoveContext extends BaseContext {
72+
private val sampleSize = 100000
73+
private val values: Array[TestKey] = (0 to this.sampleSize).map(TestKey).toArray
74+
private val idx: AtomicInteger = new AtomicInteger(this.sampleSize + 1)
75+
76+
@Setup(Level.Iteration)
77+
def setup(): Unit = {
78+
this.javaSetInitializer = { _ => createJavaSet(this.values) }
79+
this.springMapInitializer = { _ => createSpringMap(this.values) }
80+
this.zioSetInitializer = { _ => createZioSet(this.values) }
81+
this.setupBase()
82+
}
83+
84+
def appendNewKeyToJavaSet(): TestKey = {
85+
val key = TestKey(this.idx.incrementAndGet())
86+
this.javaSet.add(key)
87+
key
88+
}
89+
90+
def appendNewKeyToSpringMap(): TestKey = {
91+
val key = TestKey(this.idx.incrementAndGet())
92+
this.springMap.put(key, true)
93+
key
94+
}
95+
96+
def appendNewKeyToZioSet(): TestKey = {
97+
val key = TestKey(this.idx.incrementAndGet())
98+
this.zioSet.add(key)
99+
key
100+
}
101+
}
102+
103+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
104+
@Warmup(iterations = 2, time = 2)
105+
@Measurement(iterations = 2, time = 2)
106+
@Fork(1)
107+
private[this] class ConcurrentWeakHashSetRemoveBenchmark {
108+
109+
@Benchmark
110+
def javaRemoveSerial(ctx: RemoveContext, blackhole: Blackhole): Unit =
111+
blackhole.consume(ctx.javaSet.remove(ctx.appendNewKeyToJavaSet()))
112+
113+
@Threads(6)
114+
@Benchmark
115+
def javaRemoveConcurrent(ctx: RemoveContext, blackhole: Blackhole): Unit =
116+
blackhole.consume(ctx.javaSet.remove(ctx.appendNewKeyToJavaSet()))
117+
118+
@Benchmark
119+
def springRemoveSerial(ctx: RemoveContext, blackhole: Blackhole): Unit =
120+
blackhole.consume(ctx.springMap.remove(ctx.appendNewKeyToSpringMap()))
121+
122+
@Threads(6)
123+
@Benchmark
124+
def springRemoveConcurrent(ctx: RemoveContext, blackhole: Blackhole): Unit =
125+
blackhole.consume(ctx.springMap.remove(ctx.appendNewKeyToSpringMap()))
126+
127+
@Benchmark
128+
def zioRemoveSerial(ctx: RemoveContext, blackhole: Blackhole): Unit =
129+
blackhole.consume(ctx.zioSet.remove(ctx.appendNewKeyToZioSet()))
130+
131+
@Threads(6)
132+
@Benchmark
133+
def zioRemoveConcurrent(ctx: RemoveContext, blackhole: Blackhole): Unit =
134+
blackhole.consume(ctx.zioSet.remove(ctx.appendNewKeyToZioSet()))
135+
136+
}
137+
138+
@State(Scope.Benchmark)
139+
private[this] class IterateContext extends BaseContext {
140+
private val sampleSize = 1000
141+
private val values: Array[TestKey] = (0 to this.sampleSize).map(TestKey).toArray
142+
143+
@Setup(Level.Iteration)
144+
def setup(): Unit = {
145+
this.javaSetInitializer = { _ => createJavaSet(this.values) }
146+
this.springMapInitializer = { _ => createSpringMap(this.values) }
147+
this.zioSetInitializer = { _ => createZioSet(this.values) }
148+
this.setupBase()
149+
}
150+
}
151+
152+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
153+
@Warmup(iterations = 2, time = 2)
154+
@Measurement(iterations = 2, time = 2)
155+
@Fork(1)
156+
private[this] class ConcurrentWeakHashSetIterateBenchmark {
157+
158+
@Benchmark
159+
def javaIterateSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
160+
ctx.javaSet.forEach(element => blackhole.consume(element))
161+
162+
@Threads(6)
163+
@Benchmark
164+
def javaIterateConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
165+
ctx.javaSet.forEach(element => blackhole.consume(element))
166+
167+
@Benchmark
168+
def springIterateSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
169+
ctx.springMap.forEach((key, _) => blackhole.consume(key))
170+
171+
@Threads(6)
172+
@Benchmark
173+
def springIterateConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
174+
ctx.springMap.forEach((key, _) => blackhole.consume(key))
175+
176+
@Benchmark
177+
def zioIterateSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
178+
ctx.zioSet.foreach(element => blackhole.consume(element))
179+
180+
@Threads(6)
181+
@Benchmark
182+
def zioIterateConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
183+
ctx.zioSet.foreach(element => blackhole.consume(element))
184+
185+
}
186+
187+
@State(Scope.Benchmark)
188+
private[this] class ContainsContext extends BaseContext {
189+
private val sampleSize = 100000
190+
private val values: Array[TestKey] = (0 to this.sampleSize).map(TestKey).toArray
191+
192+
@Setup(Level.Iteration)
193+
def setup(): Unit = {
194+
this.javaSetInitializer = { _ => createJavaSet(this.values) }
195+
this.springMapInitializer = { _ => createSpringMap(this.values) }
196+
this.zioSetInitializer = { _ => createZioSet(this.values) }
197+
this.setupBase()
198+
}
199+
}
200+
201+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
202+
@Warmup(iterations = 2, time = 2)
203+
@Measurement(iterations = 2, time = 2)
204+
@Fork(1)
205+
private[this] class ConcurrentWeakHashSetContainsBenchmark {
206+
207+
@Benchmark
208+
def javaContainsSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
209+
blackhole.consume(ctx.javaSet.contains(TestKey(50000)))
210+
211+
@Threads(6)
212+
@Benchmark
213+
def javaContainsConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
214+
blackhole.consume(ctx.javaSet.contains(TestKey(50000)))
215+
216+
@Benchmark
217+
def springContainsSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
218+
blackhole.consume(ctx.springMap.containsKey(TestKey(50000)))
219+
220+
@Threads(6)
221+
@Benchmark
222+
def springContainsConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
223+
blackhole.consume(ctx.springMap.containsKey(TestKey(50000)))
224+
225+
@Benchmark
226+
def zioContainsSerial(ctx: IterateContext, blackhole: Blackhole): Unit =
227+
blackhole.consume(ctx.zioSet.contains(TestKey(50000)))
228+
229+
@Threads(6)
230+
@Benchmark
231+
def zioContainsConcurrent(ctx: IterateContext, blackhole: Blackhole): Unit =
232+
blackhole.consume(ctx.zioSet.contains(TestKey(50000)))
233+
234+
}
235+
236+
private[this] case class TestKey(name: Int)
237+
238+
private[this] class BaseContext {
239+
240+
protected var javaSetInitializer: Unit => util.Set[TestKey] = _
241+
var javaSet: util.Set[TestKey] = _
242+
243+
protected var springMapInitializer: Unit => SpringConcurrentReferenceHashMap[TestKey, Boolean] = _
244+
var springMap: SpringConcurrentReferenceHashMap[TestKey, Boolean] = _
245+
246+
protected var zioSetInitializer: Unit => ConcurrentWeakHashSet[TestKey] = _
247+
var zioSet: ConcurrentWeakHashSet[TestKey] = _
248+
249+
protected def setupBase(): Unit = {
250+
this.javaSet = this.javaSetInitializer(())
251+
this.springMap = this.springMapInitializer(())
252+
this.zioSet = this.zioSetInitializer(())
253+
}
254+
255+
protected def createJavaSet(values: Array[TestKey] = new Array[TestKey](0)): util.Set[TestKey] = {
256+
import scala.jdk.CollectionConverters._
257+
val set = Collections.synchronizedSet(Collections.newSetFromMap(new util.WeakHashMap[TestKey, java.lang.Boolean]()))
258+
set.addAll(values.toSet.asJava): @silent("JavaConverters")
259+
set
260+
}
261+
262+
protected def createSpringMap(
263+
values: Array[TestKey] = new Array(0)
264+
): SpringConcurrentReferenceHashMap[TestKey, Boolean] = {
265+
import scala.jdk.CollectionConverters._
266+
val map =
267+
new SpringConcurrentReferenceHashMap[TestKey, Boolean](16, SpringConcurrentReferenceHashMap.ReferenceType.WEAK)
268+
map.putAll(values.map((_, true)).toMap.asJava): @silent("JavaConverters")
269+
map
270+
}
271+
272+
protected def createZioSet(values: Array[TestKey] = new Array(0)): ConcurrentWeakHashSet[TestKey] = {
273+
val set = ConcurrentWeakHashSet[TestKey]()
274+
set.addAll(values)
275+
set
276+
}
277+
278+
}

build.sbt

+2-1
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,8 @@ lazy val benchmarks = project.module
688688
"org.typelevel" %% "cats-effect-std" % catsEffectVersion,
689689
"org.scalacheck" %% "scalacheck" % "1.17.0",
690690
"qa.hedgehog" %% "hedgehog-core" % "0.10.1",
691-
"com.github.japgolly.nyaya" %% "nyaya-gen" % "0.10.0"
691+
"com.github.japgolly.nyaya" %% "nyaya-gen" % "0.10.0",
692+
"org.springframework" % "spring-core" % "6.0.9"
692693
),
693694
unusedCompileDependenciesFilter -= libraryDependencies.value
694695
.map(moduleid =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package zio.internal
2+
3+
import zio.test._
4+
import zio.ZIOBaseSpec
5+
import zio.test.Assertion.equalTo
6+
7+
object ConcurrentWeakHashSetSpecJVM extends ZIOBaseSpec {
8+
9+
final case class Wrapper[A](value: A) {
10+
override def toString: String = value.toString
11+
}
12+
13+
def spec = suite("ConcurrentWeakHashSetSpec")(
14+
test("check if set is thread-safe with truly concurrent race condition between add & remove") {
15+
import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit, CountDownLatch}
16+
17+
val sampleSize = 1000000
18+
val executorService = Executors.newFixedThreadPool(2)
19+
val refs = new ConcurrentLinkedQueue[Wrapper[Int]]()
20+
val set = ConcurrentWeakHashSet[Wrapper[Int]]()
21+
val lock = new CountDownLatch(1)
22+
23+
executorService.submit(new Runnable {
24+
override def run(): Unit =
25+
(0 to sampleSize).foreach { idx =>
26+
val element = Wrapper(idx)
27+
refs.offer(element)
28+
set.add(element)
29+
}
30+
})
31+
32+
// remove half of the elements with even indices
33+
// it'll iterate multiple times over the set (often locking segments),
34+
// because removing is faster than adding
35+
executorService.submit(new Runnable {
36+
override def run(): Unit = {
37+
var removedCount = 0
38+
while (removedCount < (sampleSize / 2) + 1) {
39+
for (idx <- 0 to sampleSize if idx % 2 == 0) {
40+
if (set.remove(Wrapper(idx))) removedCount += 1
41+
}
42+
}
43+
lock.countDown()
44+
}
45+
})
46+
47+
lock.await(30, TimeUnit.SECONDS) // await for the removal to finish
48+
assert(set.size())(equalTo((sampleSize / 2) + 1))
49+
50+
// make sure only odd elements are left from full range
51+
val expected = (0 to sampleSize).filter(_ % 2 == 1).map(Wrapper(_)).toList
52+
val actual = set.iterator.toList.sortBy(_.value)
53+
assert(actual)(equalTo(expected))
54+
}
55+
)
56+
57+
}

0 commit comments

Comments
 (0)