@@ -50,7 +50,7 @@ class ShardManager(
50
50
ManagerMetrics .podHealthChecked.tagged(" pod_address" , podAddress.toString).increment *>
51
51
eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
52
52
ZIO .unlessZIO(healthApi.isAlive(podAddress))(
53
- ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
53
+ ZIO .logWarning(s " Pod $podAddress is not alive, unregistering " ) *> unregister(podAddress)
54
54
)
55
55
}
56
56
.unit
@@ -196,52 +196,96 @@ object ShardManager {
196
196
* A layer that starts the Shard Manager process
197
197
*/
198
198
val live : ZLayer [PodsHealth with Pods with Storage with ManagerConfig , Throwable , ShardManager ] =
199
- ZLayer {
199
+ ZLayer .scoped {
200
200
for {
201
- config <- ZIO .service[ManagerConfig ]
202
- stateRepository <- ZIO .service[Storage ]
203
- healthApi <- ZIO .service[PodsHealth ]
204
- podApi <- ZIO .service[Pods ]
205
- pods <- stateRepository.getPods
206
- assignments <- stateRepository.getAssignments
201
+ config <- ZIO .service[ManagerConfig ]
202
+ stateRepository <- ZIO .service[Storage ]
203
+ healthApi <- ZIO .service[PodsHealth ]
204
+ podApi <- ZIO .service[Pods ]
205
+ pods <- stateRepository.getPods
206
+ assignments <- stateRepository.getAssignments
207
207
// remove unhealthy pods on startup
208
- filteredPods <-
209
- ZIO .filterPar(pods.toList) { case (podAddress, _) => healthApi.isAlive(podAddress) }.map(_.toMap)
210
- filteredAssignments = assignments.collect {
211
- case assignment @ (_, Some (pod)) if filteredPods.contains(pod) => assignment
212
- }
213
- cdt <- ZIO .succeed(OffsetDateTime .now())
214
- initialState = ShardManagerState (
215
- filteredPods.map { case (k, v) => k -> PodWithMetadata (v, cdt) },
216
- (1 to config.numberOfShards).map(_ -> None ).toMap ++ filteredAssignments
217
- )
218
- _ <- ManagerMetrics .pods.incrementBy(initialState.pods.size)
219
- _ <- ZIO .foreachDiscard(initialState.shards) { case (_, podAddressOpt) =>
220
- podAddressOpt match {
221
- case Some (podAddress) =>
222
- ManagerMetrics .assignedShards.tagged(" pod_address" , podAddress.toString).increment
223
- case None =>
224
- ManagerMetrics .unassignedShards.increment
225
- }
226
- }
227
- state <- Ref .Synchronized .make(initialState)
228
- rebalanceSemaphore <- Semaphore .make(1 )
229
- eventsHub <- Hub .unbounded[ShardingEvent ]
230
- shardManager =
208
+ failedFilteredPods <-
209
+ ZIO .partitionPar(pods) { addrPod =>
210
+ ZIO .ifZIO(healthApi.isAlive(addrPod._1))(ZIO .succeed(addrPod), ZIO .fail(addrPod._2))
211
+ }
212
+ (failedPods, filtered) = failedFilteredPods
213
+ _ <- ZIO .when(failedPods.nonEmpty)(
214
+ ZIO .logInfo(s " Ignoring pods that are no longer alive ${failedPods.mkString(" [" , " , " , " ]" )}" )
215
+ )
216
+ filteredPods = filtered.toMap
217
+ failedFilteredAssignments = partitionMap(assignments) {
218
+ case assignment @ (_, Some (address)) if filteredPods.contains(address) =>
219
+ Right (assignment)
220
+ case assignment => Left (assignment)
221
+ }
222
+ (failed, filteredAssignments) = failedFilteredAssignments
223
+ failedAssignments = failed.collect { case (shard, Some (addr)) => shard -> addr }
224
+ _ <- ZIO .when(failedAssignments.nonEmpty)(
225
+ ZIO .logWarning(
226
+ s " Ignoring assignments for pods that are no longer alive ${failedAssignments.mkString(" [" , " , " , " ]" )}"
227
+ )
228
+ )
229
+ cdt <- ZIO .succeed(OffsetDateTime .now())
230
+ initialState = ShardManagerState (
231
+ filteredPods.map { case (k, v) => k -> PodWithMetadata (v, cdt) },
232
+ (1 to config.numberOfShards).map(_ -> None ).toMap ++ filteredAssignments
233
+ )
234
+ _ <-
235
+ ZIO .logInfo(
236
+ s " Recovered pods ${filteredPods
237
+ .mkString(" [" , " , " , " ]" )} and assignments ${filteredAssignments.view.flatMap(_._2).mkString(" [" , " , " , " ]" )}"
238
+ )
239
+ _ <- ManagerMetrics .pods.incrementBy(initialState.pods.size)
240
+ _ <- ZIO .foreachDiscard(initialState.shards) { case (_, podAddressOpt) =>
241
+ podAddressOpt match {
242
+ case Some (podAddress) =>
243
+ ManagerMetrics .assignedShards.tagged(" pod_address" , podAddress.toString).increment
244
+ case None =>
245
+ ManagerMetrics .unassignedShards.increment
246
+ }
247
+ }
248
+ state <- Ref .Synchronized .make(initialState)
249
+ rebalanceSemaphore <- Semaphore .make(1 )
250
+ eventsHub <- Hub .unbounded[ShardingEvent ]
251
+ shardManager =
231
252
new ShardManager (state, rebalanceSemaphore, eventsHub, healthApi, podApi, stateRepository, config)
232
- _ <- shardManager.persistPods.forkDaemon
253
+ _ <- ZIO .addFinalizer {
254
+ shardManager.persistAssignments.catchAllCause(cause =>
255
+ ZIO .logWarningCause(" Failed to persist assignments on shutdown" , cause)
256
+ ) *>
257
+ shardManager.persistPods.catchAllCause(cause =>
258
+ ZIO .logWarningCause(" Failed to persist pods on shutdown" , cause)
259
+ )
260
+ }
261
+ _ <- shardManager.persistPods.forkDaemon
233
262
// rebalance immediately if there are unassigned shards
234
- _ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon
263
+ _ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon
235
264
// start a regular rebalance at the given interval
236
- _ <- shardManager
237
- .rebalance(rebalanceImmediately = false )
238
- .repeat(Schedule .spaced(config.rebalanceInterval))
239
- .forkDaemon
240
- _ <- shardManager.getShardingEvents.mapZIO(event => ZIO .logInfo(event.toString)).runDrain.forkDaemon
241
- _ <- ZIO .logInfo(" Shard Manager loaded" )
265
+ _ <- shardManager
266
+ .rebalance(rebalanceImmediately = false )
267
+ .repeat(Schedule .spaced(config.rebalanceInterval))
268
+ .forkDaemon
269
+ _ <- shardManager.getShardingEvents.mapZIO(event => ZIO .logInfo(event.toString)).runDrain.forkDaemon
270
+ _ <- ZIO .logInfo(" Shard Manager loaded" )
242
271
} yield shardManager
243
272
}
244
273
274
+ // reimplement Map.partitionMap because it does not exist in 2.12
275
+ private def partitionMap [K , V , VL <: V , VR <: V ](map : Map [K , V ])(partition : ((K , V )) => Either [(K , VL ), (K , VR )]) = {
276
+ val left = Map .newBuilder[K , VL ]
277
+ val right = Map .newBuilder[K , VR ]
278
+
279
+ map.iterator.foreach { kv =>
280
+ partition(kv) match {
281
+ case Left (kvl) => left += kvl
282
+ case Right (kvr) => right += kvr
283
+ }
284
+ }
285
+
286
+ (left.result(), right.result())
287
+ }
288
+
245
289
implicit def listOrder [A ](implicit ev : Ordering [A ]): Ordering [List [A ]] = (xs : List [A ], ys : List [A ]) => {
246
290
@ tailrec def loop (xs : List [A ], ys : List [A ]): Int =
247
291
xs match {
0 commit comments