Skip to content

Commit 35f2d9d

Browse files
authored
fix: recover Continue process after unexpected exceptions (#7463)
Solves CON-3587 Solves #7018 Solves #7121 Solves #5505 Solves #3981, and possibly other Stream closed-like errors ## Main change Now, if some exception bubbles up at any stage (whether it's read, write, anything else within `processScope`), we catch it via `CoroutineExceptionHandler` and we restart the process + back-off. Type of exception is not important / now it's more like black-box approach. This simplifies things quite a bit, since we now have only single piece of code for process recovery. ## Detailed changes * The process now recovers from exceptions globally, without specifying the type of exception (more of a black-box approach). * Remove `CoreMessengerManager`, since process recovery is now implemented in `ContinueProcessHandler`. * Add a back-off utility for calculating delays between process recovery retries (just a cleanup). * Restore `ContinuePluginService.dispose()` (because `Disposable` was accidentally removed during the last changes in the Next Edit - that's not critical, but it was causing exceptions when closing the application/during plugin disposal). * Remove the artificial delay between reading messages (from now on, blocking happens directly on `readLine`).
1 parent 3928a43 commit 35f2d9d

File tree

9 files changed

+123
-140
lines changed

9 files changed

+123
-140
lines changed

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/activities/ContinuePluginStartupActivity.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,7 @@ class ContinuePluginStartupActivity : StartupActivity, DumbAware {
266266
project.service<ContinuePluginDisposable>()
267267
)
268268

269-
val coreMessengerManager = CoreMessengerManager(project, ideProtocolClient, coroutineScope)
270-
continuePluginService.coreMessengerManager = coreMessengerManager
269+
continuePluginService.coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope)
271270
}
272271
}
273272
}

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessenger.kt

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,24 @@ import com.github.continuedev.continueintellijextension.constants.MessageTypes
55
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueBinaryProcess
66
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueProcessHandler
77
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueSocketProcess
8-
import com.github.continuedev.continueintellijextension.services.ContinuePluginService
98
import com.github.continuedev.continueintellijextension.utils.uuid
109
import com.google.gson.Gson
1110
import com.google.gson.JsonSyntaxException
12-
import com.intellij.openapi.components.service
1311
import com.intellij.openapi.diagnostic.Logger
1412
import com.intellij.openapi.project.Project
1513
import kotlinx.coroutines.CoroutineScope
1614

1715
class CoreMessenger(
1816
private val project: Project,
1917
private val ideProtocolClient: IdeProtocolClient,
20-
val coroutineScope: CoroutineScope,
21-
private val onUnexpectedExit: () -> Unit
18+
19+
// todo: this scope is public only to cancel the coroutines on dispose in ContinuePluginService
20+
// todo: scope is created in ContinuePluginStartupActivity; refactor this/clean this up/encapsulate
21+
val coroutineScope: CoroutineScope
2222
) {
2323
private val gson = Gson()
2424
private val responseListeners = mutableMapOf<String, (Any?) -> Unit>()
25-
private var process = startContinueProcess()
26-
private val log = Logger.getInstance(CoreMessenger::class.java.simpleName)
25+
private val process = startContinueProcess()
2726

2827
fun request(messageType: String, data: Any?, messageId: String?, onResponse: (Any?) -> Unit) {
2928
val id = messageId ?: uuid()
@@ -34,14 +33,15 @@ class CoreMessenger(
3433

3534
private fun startContinueProcess(): ContinueProcessHandler {
3635
val isTcp = System.getenv("USE_TCP")?.toBoolean() ?: false
37-
val process = if (isTcp)
38-
ContinueSocketProcess()
39-
else
40-
ContinueBinaryProcess(onUnexpectedExit)
41-
return ContinueProcessHandler(coroutineScope, process, ::handleMessage)
36+
return ContinueProcessHandler(coroutineScope, ::readProcessMessage) {
37+
if (isTcp)
38+
ContinueSocketProcess()
39+
else
40+
ContinueBinaryProcess()
41+
}
4242
}
4343

44-
private fun handleMessage(json: String) {
44+
private fun readProcessMessage(json: String) {
4545
val responseMap = tryToParse(json) ?: return
4646
val messageId = responseMap["messageId"].toString()
4747
val messageType = responseMap["messageType"].toString()
@@ -76,19 +76,16 @@ class CoreMessenger(
7676
try {
7777
gson.fromJson(json, Map::class.java)
7878
} catch (_: JsonSyntaxException) {
79-
log.warn("Invalid message JSON: $json") // example: NODE_ENV undefined
79+
LOG.warn("Invalid message JSON: $json") // example: NODE_ENV undefined
8080
null
8181
}
8282

8383
fun restart() {
84-
log.warn("Restarting Continue process")
8584
responseListeners.clear()
86-
process.close()
87-
process = startContinueProcess()
85+
process.restart()
8886
}
8987

90-
fun close() {
91-
log.warn("Closing Continue process")
92-
process.close()
88+
private companion object {
89+
private val LOG = Logger.getInstance(CoreMessenger::class.java.simpleName)
9390
}
9491
}

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessengerManager.kt

Lines changed: 0 additions & 33 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.github.continuedev.continueintellijextension.`continue`.process
2+
3+
import kotlin.time.Duration
4+
import kotlin.time.Duration.Companion.seconds
5+
6+
/**
7+
* Starts at [initialDuration] and doubles on each call to [nextDuration] up to [maxDuration].
8+
*/
9+
class BackoffCalculator(
10+
private val initialDuration: Duration = 1.seconds,
11+
private val maxDuration: Duration = 30.seconds
12+
) {
13+
private var currentTime: Duration = initialDuration
14+
15+
init {
16+
require(initialDuration > 0.seconds)
17+
}
18+
19+
fun nextDuration(): Duration {
20+
val delay = currentTime
21+
val next = currentTime * 2.0
22+
currentTime = if (next <= maxDuration) next else maxDuration
23+
return delay
24+
}
25+
}

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueBinaryProcess.kt

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package com.github.continuedev.continueintellijextension.`continue`.process
22

3-
import com.github.continuedev.continueintellijextension.error.ContinuePostHogService
4-
import com.github.continuedev.continueintellijextension.error.ContinueSentryService
53
import com.github.continuedev.continueintellijextension.proxy.ProxySettings
64
import com.github.continuedev.continueintellijextension.utils.OS
75
import com.github.continuedev.continueintellijextension.utils.getContinueBinaryPath
86
import com.github.continuedev.continueintellijextension.utils.getOS
9-
import com.intellij.openapi.components.service
107
import kotlinx.coroutines.Dispatchers
118
import kotlinx.coroutines.runBlocking
129
import java.io.File
@@ -16,46 +13,26 @@ import java.nio.file.Files
1613
import java.nio.file.Paths
1714
import java.nio.file.attribute.PosixFilePermission
1815

19-
class ContinueBinaryProcess(
20-
private val onUnexpectedExit: () -> Unit
21-
) : ContinueProcess {
16+
class ContinueBinaryProcess() : ContinueProcess {
2217

23-
private val process = startBinaryProcess()
18+
private val process = start()
2419
override val input: InputStream = process.inputStream
2520
override val output: OutputStream = process.outputStream
2621

2722
override fun close() =
2823
process.destroy()
2924

30-
private fun startBinaryProcess(): Process {
31-
val path = getContinueBinaryPath()
32-
runBlocking(Dispatchers.IO) {
33-
setPermissions()
34-
}
35-
36-
val builder = ProcessBuilder(path)
37-
builder.environment() += ProxySettings.getSettings().toContinueEnvVars()
38-
return builder
39-
.directory(File(path).parentFile)
40-
.start()
41-
.apply { onExit().thenRun(onUnexpectedExit).thenRun(::reportErrorTelemetry) }
42-
}
25+
private companion object {
4326

44-
private fun reportErrorTelemetry() {
45-
var err = process.errorStream?.bufferedReader()?.readText()?.trim()
46-
if (err != null) {
47-
// There are often "⚡️Done in Xms" messages, and we want everything after the last one
48-
val delimiter = "⚡ Done in"
49-
val doneIndex = err.lastIndexOf(delimiter)
50-
if (doneIndex != -1) {
51-
err = err.substring(doneIndex + delimiter.length)
52-
}
27+
private fun start(): Process {
28+
val path = getContinueBinaryPath()
29+
setPermissions()
30+
val builder = ProcessBuilder(path)
31+
builder.environment() += ProxySettings.getSettings().toContinueEnvVars()
32+
return builder
33+
.directory(File(path).parentFile)
34+
.start()
5335
}
54-
service<ContinueSentryService>().reportMessage("Core process exited with output: $err")
55-
service<ContinuePostHogService>().capture("jetbrains_core_exit", mapOf("error" to err))
56-
}
57-
58-
private companion object {
5936

6037
private fun setPermissions() {
6138
val os = getOS()
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,62 @@
11
package com.github.continuedev.continueintellijextension.`continue`.process
22

3+
import com.github.continuedev.continueintellijextension.error.ContinuePostHogService
34
import com.github.continuedev.continueintellijextension.error.ContinueSentryService
45
import com.intellij.openapi.components.service
56
import com.intellij.openapi.diagnostic.Logger
67
import kotlinx.coroutines.*
78
import kotlinx.coroutines.channels.Channel
89
import java.io.BufferedReader
9-
import java.io.IOException
1010
import java.io.InputStreamReader
1111
import java.io.OutputStreamWriter
1212

1313
class ContinueProcessHandler(
14-
parentScope: CoroutineScope,
15-
private val process: ContinueProcess,
16-
handleMessage: (String) -> (Unit)
14+
private val parentScope: CoroutineScope,
15+
private val readMessage: (String) -> (Unit),
16+
private val createProcess: () -> ContinueProcess
1717
) {
18-
private val innerJob = Job()
19-
private val scope = CoroutineScope(parentScope.coroutineContext + innerJob)
2018
private val pendingWrites = Channel<String>(Channel.UNLIMITED)
21-
private val writer = OutputStreamWriter(process.output)
22-
private val reader = BufferedReader(InputStreamReader(process.input))
23-
private val log = Logger.getInstance(ContinueProcessHandler::class.java)
19+
private val backoff = BackoffCalculator()
20+
private var processScope: CoroutineScope? = null
21+
private var process: ContinueProcess? = null
2422

2523
init {
26-
scope.launch(Dispatchers.IO) {
27-
try {
28-
while (isActive) {
29-
val line = reader.readLine()
30-
if (line != null && line.isNotEmpty()) {
31-
try {
32-
log.debug("Handle: $line")
33-
handleMessage(line)
34-
} catch (e: Exception) {
35-
service<ContinueSentryService>().report(e, "Error handling message: $line")
36-
}
37-
} else
38-
delay(100)
39-
}
40-
} catch (e: IOException) {
41-
service<ContinueSentryService>().report(e)
24+
restart()
25+
}
26+
27+
fun restart() {
28+
LOG.warn("Starting Continue process")
29+
processScope?.cancel()
30+
process?.close()
31+
32+
val handler = CoroutineExceptionHandler { _, e ->
33+
service<ContinueSentryService>().report(e)
34+
service<ContinuePostHogService>().capture("jetbrains_core_exit", mapOf("error" to e))
35+
36+
val backoffDuration = backoff.nextDuration()
37+
LOG.warn("Process failed! Restarting in $backoffDuration")
38+
parentScope.launch {
39+
delay(backoffDuration)
40+
restart()
41+
}
42+
}
43+
44+
val job = SupervisorJob(parentScope.coroutineContext.job)
45+
processScope = CoroutineScope(parentScope.coroutineContext + job + handler)
46+
process = createProcess()
47+
48+
val reader = BufferedReader(InputStreamReader(process!!.input))
49+
val writer = OutputStreamWriter(process!!.output)
50+
51+
processScope!!.launch(Dispatchers.IO) {
52+
while (isActive) {
53+
val line = reader.readLine()
54+
if (line != null && line.isNotEmpty())
55+
readMessage(line)
4256
}
4357
}
44-
scope.launch(Dispatchers.IO) {
58+
processScope!!.launch(Dispatchers.IO) {
4559
for (message in pendingWrites) {
46-
log.debug("Write: $message")
4760
writer.write(message)
4861
writer.write("\r\n")
4962
writer.flush()
@@ -54,12 +67,7 @@ class ContinueProcessHandler(
5467
fun write(message: String) =
5568
pendingWrites.trySend(message)
5669

57-
fun close() {
58-
innerJob.cancel()
59-
scope.launch(Dispatchers.IO) {
60-
reader.close()
61-
writer.close()
62-
process.close()
63-
}
70+
private companion object {
71+
private val LOG = Logger.getInstance(ContinueProcessHandler::class.java.simpleName)
6472
}
6573
}

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/proxy/ProxyPoolingActivity.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ProxyPoolingActivity : StartupActivity {
2828

2929
private fun onSettingsChanged(project: Project) {
3030
log.warn("Proxy settings changed, restarting")
31-
project.service<ContinuePluginService>().coreMessengerManager?.coreMessenger?.restart()
31+
project.service<ContinuePluginService>().coreMessenger?.restart()
3232
}
3333
}
3434

extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/services/ContinuePluginService.kt

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,28 @@
11
package com.github.continuedev.continueintellijextension.services
22

33
import com.github.continuedev.continueintellijextension.`continue`.CoreMessenger
4-
import com.github.continuedev.continueintellijextension.`continue`.CoreMessengerManager
54
import com.github.continuedev.continueintellijextension.`continue`.DiffManager
65
import com.github.continuedev.continueintellijextension.`continue`.IdeProtocolClient
76
import com.github.continuedev.continueintellijextension.listeners.ActiveHandlerManager
87
import com.github.continuedev.continueintellijextension.listeners.DocumentChangeTracker
9-
import com.github.continuedev.continueintellijextension.toolWindow.ContinuePluginToolWindowFactory
108
import com.github.continuedev.continueintellijextension.utils.uuid
9+
import com.intellij.openapi.Disposable
1110
import com.intellij.openapi.components.Service
1211
import com.intellij.openapi.components.service
1312
import com.intellij.openapi.editor.EditorFactory
1413
import com.intellij.openapi.project.DumbAware
1514
import com.intellij.openapi.project.Project
1615
import com.intellij.openapi.util.Disposer
17-
import kotlinx.coroutines.CoroutineScope
18-
import kotlinx.coroutines.Dispatchers
1916
import kotlinx.coroutines.cancel
2017
import kotlin.properties.Delegates
2118

2219
@Service(Service.Level.PROJECT)
23-
class ContinuePluginService(private val project: Project) : DumbAware {
24-
private val coroutineScope = CoroutineScope(Dispatchers.Main)
20+
class ContinuePluginService(private val project: Project) : Disposable, DumbAware {
2521
var listener: (() -> Unit)? = null
2622
var ideProtocolClient: IdeProtocolClient? by Delegates.observable(null) { _, _, _ ->
2723
synchronized(this) { listener?.also { listener = null }?.invoke() }
2824
}
29-
var coreMessengerManager: CoreMessengerManager? = null
30-
val coreMessenger: CoreMessenger?
31-
get() = coreMessengerManager?.coreMessenger
25+
var coreMessenger: CoreMessenger? = null
3226
var workspacePaths: Array<String>? = null
3327
var windowId: String = uuid()
3428
var diffManager: DiffManager? = null
@@ -57,14 +51,6 @@ class ContinuePluginService(private val project: Project) : DumbAware {
5751
}
5852
}
5953

60-
fun dispose() {
61-
coroutineScope.cancel()
62-
coreMessenger?.coroutineScope?.let {
63-
it.cancel()
64-
coreMessenger?.close()
65-
}
66-
}
67-
6854
/**
6955
* Add a listener for protocolClient initialization.
7056
* Currently, only one needs to be processed. If there are more than one,
@@ -83,4 +69,8 @@ class ContinuePluginService(private val project: Project) : DumbAware {
8369
listener()
8470
}
8571
}
72+
73+
override fun dispose() {
74+
coreMessenger?.coroutineScope?.cancel()
75+
}
8676
}

0 commit comments

Comments
 (0)