Skip to content

Commit 27bb9cc

Browse files
fix: ensure background job will not be cancalled if throw exception (#432)
Affirm reported when exception be thrown within background job polling job, we will no longer fetch DCS This pr added a health check for the above situation
1 parent 273ab9a commit 27bb9cc

File tree

2 files changed

+64
-29
lines changed

2 files changed

+64
-29
lines changed

src/main/kotlin/com/statsig/sdk/SpecUpdater.kt

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import com.google.gson.Gson
44
import com.google.gson.JsonSyntaxException
55
import com.google.gson.reflect.TypeToken
66
import com.statsig.sdk.network.StatsigTransport
7-
import kotlinx.coroutines.CoroutineScope
8-
import kotlinx.coroutines.Job
9-
import kotlinx.coroutines.delay
7+
import kotlinx.coroutines.*
108
import kotlinx.coroutines.flow.Flow
119
import kotlinx.coroutines.flow.filterNotNull
1210
import kotlinx.coroutines.flow.flow
1311
import kotlinx.coroutines.flow.map
14-
import kotlinx.coroutines.launch
12+
13+
private const val HEALTH_CHECK_INTERVAL_MS = 60_000L
1514

1615
internal class SpecUpdater(
1716
private var transport: StatsigTransport,
@@ -25,6 +24,7 @@ internal class SpecUpdater(
2524
) {
2625
var lastUpdateTime: Long = 0
2726

27+
private val monitorScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
2828
private var configSpecCallback: suspend (config: APIDownloadedConfigs, source: DataSource) -> Unit = { _, _ -> }
2929
private var idListCallback: suspend (config: Map<String, IDList>) -> Unit = { }
3030
private var backgroundDownloadConfigs: Job? = null
@@ -56,36 +56,28 @@ internal class SpecUpdater(
5656
}
5757

5858
fun startListening() {
59-
if (backgroundDownloadConfigs == null) {
60-
logger.debug("[StatsigSpecUpdater] Initializing new background polling job")
61-
val flow = if (transport.downloadConfigSpecWorker.isPullWorker) {
62-
logger.debug("[StatsigSpecUpdater] Using pull worker for config specs syncing")
63-
pollForConfigSpecs()
64-
} else {
65-
logger.debug("[StatsigSpecUpdater] Using streaming for config specs syncing.")
66-
transport.configSpecsFlow().map(::parseConfigSpecs).map { Pair(it.first, DataSource.NETWORK) }
67-
}
59+
startBackgroundDcsPolling()
60+
startBackgroundIDListPolling()
61+
startPeriodicHealthCheck()
62+
}
6863

69-
backgroundDownloadConfigs = statsigScope.launch {
70-
flow.collect { response ->
71-
val spec = response.first
72-
spec?.let {
73-
configSpecCallback(spec, response.second)
74-
}
64+
private fun startPeriodicHealthCheck() {
65+
monitorScope.launch {
66+
while (true) {
67+
delay(HEALTH_CHECK_INTERVAL_MS) // Check every 60 seconds by default
68+
69+
if (backgroundDownloadConfigs?.isActive != true) {
70+
logger.debug("[StatsigPeriodicHealthCheck] Background polling is inactive. Restarting...")
71+
startBackgroundDcsPolling()
7572
}
76-
}
77-
}
78-
if (backgroundDownloadIDLists == null) {
79-
val idListFlow = if (transport.getIDListsWorker.isPullWorker) {
80-
pollForIDLists()
81-
} else transport.idListsFlow().map(::parseIDLists).filterNotNull()
8273

83-
backgroundDownloadIDLists = statsigScope.launch {
84-
idListFlow.collect { idListCallback(it) }
74+
if (backgroundDownloadIDLists?.isActive != true) {
75+
logger.debug("[StatsigPeriodicHealthCheck] ID list polling is inactive. Restarting...")
76+
startBackgroundIDListPolling()
77+
}
8578
}
8679
}
8780
}
88-
8981
fun getInitializeOrder(): List<DataSource> {
9082
val optionsOrder = options.initializeSources
9183
if (optionsOrder != null) return optionsOrder
@@ -149,6 +141,49 @@ internal class SpecUpdater(
149141
return parseConfigsFromNetwork(this.transport.downloadConfigSpecs(this.lastUpdateTime))
150142
}
151143

144+
/**
145+
* Starts background polling for config specs.
146+
* If already running, it will not create a duplicate.
147+
*/
148+
private fun startBackgroundDcsPolling() {
149+
if (backgroundDownloadConfigs?.isActive == true) {
150+
return
151+
}
152+
153+
logger.debug("[StatsigSpecUpdater] Initializing new background polling job")
154+
155+
val flow = if (transport.downloadConfigSpecWorker.isPullWorker) {
156+
logger.debug("[StatsigSpecUpdater] Using pull worker for config specs syncing")
157+
pollForConfigSpecs()
158+
} else {
159+
logger.debug("[StatsigSpecUpdater] Using streaming for config specs syncing.")
160+
transport.configSpecsFlow().map(::parseConfigSpecs).map { Pair(it.first, DataSource.NETWORK) }
161+
}
162+
163+
backgroundDownloadConfigs = statsigScope.launch {
164+
flow.collect { response ->
165+
val spec = response.first
166+
spec?.let {
167+
configSpecCallback(spec, response.second)
168+
}
169+
}
170+
}
171+
}
172+
173+
private fun startBackgroundIDListPolling() {
174+
if (backgroundDownloadIDLists?.isActive == true) {
175+
return
176+
}
177+
178+
val idListFlow = if (transport.getIDListsWorker.isPullWorker) {
179+
pollForIDLists()
180+
} else transport.idListsFlow().map(::parseIDLists).filterNotNull()
181+
182+
backgroundDownloadIDLists = statsigScope.launch {
183+
idListFlow.collect { idListCallback(it) }
184+
}
185+
}
186+
152187
private fun parseConfigSpecs(specs: String?): Pair<APIDownloadedConfigs?, FailureDetails?> {
153188
if (specs.isNullOrEmpty()) {
154189
return Pair(null, FailureDetails(FailureReason.EMPTY_SPEC))

src/test/java/com/statsig/sdk/LocalFileDataStoreTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class LocalFileDataStoreTest {
3030
downloadConfigSpecsResponse = StatsigE2ETest::class.java.getResource("/download_config_specs.json")?.readText() ?: ""
3131

3232
mockServer = MockWebServer()
33-
mockServer.start(8899)
33+
mockServer.start(9988)
3434
mockServer.apply {
3535
dispatcher = object : Dispatcher() {
3636
@Throws(InterruptedException::class)

0 commit comments

Comments
 (0)