From 332debb87fc50183563e504c99a42ad156033a7c Mon Sep 17 00:00:00 2001 From: Jakub Blach Date: Tue, 2 Sep 2025 19:08:43 +0200 Subject: [PATCH] Revert "fix: recover Continue process after unexpected exceptions (#7463)" This reverts commit 35f2d9de0bb9841118f453d9e16f83440cf07026. --- .../ContinuePluginStartupActivity.kt | 3 +- .../continue/CoreMessenger.kt | 35 +++++---- .../continue/CoreMessengerManager.kt | 33 ++++++++ .../continue/process/BackoffCalculator.kt | 25 ------ .../continue/process/ContinueBinaryProcess.kt | 45 ++++++++--- .../process/ContinueProcessHandler.kt | 76 +++++++++---------- .../proxy/ProxyPoolingActivity.kt | 2 +- .../services/ContinuePluginService.kt | 24 ++++-- .../unit/BackoffCalculatorTest.kt | 20 ----- 9 files changed, 140 insertions(+), 123 deletions(-) create mode 100644 extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessengerManager.kt delete mode 100644 extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/BackoffCalculator.kt delete mode 100644 extensions/intellij/src/test/kotlin/com/github/continuedev/continueintellijextension/unit/BackoffCalculatorTest.kt diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/activities/ContinuePluginStartupActivity.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/activities/ContinuePluginStartupActivity.kt index 0fab04b30dc..61cda0c427d 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/activities/ContinuePluginStartupActivity.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/activities/ContinuePluginStartupActivity.kt @@ -266,7 +266,8 @@ class ContinuePluginStartupActivity : StartupActivity, DumbAware { project.service() ) - continuePluginService.coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope) + val coreMessengerManager = CoreMessengerManager(project, ideProtocolClient, coroutineScope) + continuePluginService.coreMessengerManager = coreMessengerManager } } } \ No newline at end of file diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessenger.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessenger.kt index 22a059f9d1a..01affb7dbc1 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessenger.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessenger.kt @@ -5,9 +5,11 @@ import com.github.continuedev.continueintellijextension.constants.MessageTypes import com.github.continuedev.continueintellijextension.`continue`.process.ContinueBinaryProcess import com.github.continuedev.continueintellijextension.`continue`.process.ContinueProcessHandler import com.github.continuedev.continueintellijextension.`continue`.process.ContinueSocketProcess +import com.github.continuedev.continueintellijextension.services.ContinuePluginService import com.github.continuedev.continueintellijextension.utils.uuid import com.google.gson.Gson import com.google.gson.JsonSyntaxException +import com.intellij.openapi.components.service import com.intellij.openapi.diagnostic.Logger import com.intellij.openapi.project.Project import kotlinx.coroutines.CoroutineScope @@ -15,14 +17,13 @@ import kotlinx.coroutines.CoroutineScope class CoreMessenger( private val project: Project, private val ideProtocolClient: IdeProtocolClient, - - // todo: this scope is public only to cancel the coroutines on dispose in ContinuePluginService - // todo: scope is created in ContinuePluginStartupActivity; refactor this/clean this up/encapsulate - val coroutineScope: CoroutineScope + val coroutineScope: CoroutineScope, + private val onUnexpectedExit: () -> Unit ) { private val gson = Gson() private val responseListeners = mutableMapOf Unit>() - private val process = startContinueProcess() + private var process = startContinueProcess() + private val log = Logger.getInstance(CoreMessenger::class.java.simpleName) fun request(messageType: String, data: Any?, messageId: String?, onResponse: (Any?) -> Unit) { val id = messageId ?: uuid() @@ -33,15 +34,14 @@ class CoreMessenger( private fun startContinueProcess(): ContinueProcessHandler { val isTcp = System.getenv("USE_TCP")?.toBoolean() ?: false - return ContinueProcessHandler(coroutineScope, ::readProcessMessage) { - if (isTcp) - ContinueSocketProcess() - else - ContinueBinaryProcess() - } + val process = if (isTcp) + ContinueSocketProcess() + else + ContinueBinaryProcess(onUnexpectedExit) + return ContinueProcessHandler(coroutineScope, process, ::handleMessage) } - private fun readProcessMessage(json: String) { + private fun handleMessage(json: String) { val responseMap = tryToParse(json) ?: return val messageId = responseMap["messageId"].toString() val messageType = responseMap["messageType"].toString() @@ -76,16 +76,19 @@ class CoreMessenger( try { gson.fromJson(json, Map::class.java) } catch (_: JsonSyntaxException) { - LOG.warn("Invalid message JSON: $json") // example: NODE_ENV undefined + log.warn("Invalid message JSON: $json") // example: NODE_ENV undefined null } fun restart() { + log.warn("Restarting Continue process") responseListeners.clear() - process.restart() + process.close() + process = startContinueProcess() } - private companion object { - private val LOG = Logger.getInstance(CoreMessenger::class.java.simpleName) + fun close() { + log.warn("Closing Continue process") + process.close() } } \ No newline at end of file diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessengerManager.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessengerManager.kt new file mode 100644 index 00000000000..4acfb461d95 --- /dev/null +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/CoreMessengerManager.kt @@ -0,0 +1,33 @@ +package com.github.continuedev.continueintellijextension.`continue` + +import com.github.continuedev.continueintellijextension.error.ContinuePostHogService +import com.intellij.openapi.components.service +import com.intellij.openapi.diagnostic.Logger +import com.intellij.openapi.project.Project +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlin.time.Duration.Companion.seconds + +class CoreMessengerManager( + project: Project, + ideProtocolClient: IdeProtocolClient, + private val coroutineScope: CoroutineScope +) { + val coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope, ::onUnexpectedExit) + private var backoffIntervalSeconds = 1 + private val log = Logger.getInstance(CoreMessengerManager::class.java) + + private fun onUnexpectedExit() { + coroutineScope.launch { + try { + delay(backoffIntervalSeconds.seconds) + backoffIntervalSeconds *= 2 + log.warn("Continue process terminated externally, retrying in $backoffIntervalSeconds seconds") + coreMessenger.restart() + } catch (e: Exception) { + service().capture("jetbrains_core_start_error", mapOf("error" to e)) + } + } + } +} diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/BackoffCalculator.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/BackoffCalculator.kt deleted file mode 100644 index b9967c64448..00000000000 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/BackoffCalculator.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.continuedev.continueintellijextension.`continue`.process - -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds - -/** - * Starts at [initialDuration] and doubles on each call to [nextDuration] up to [maxDuration]. - */ -class BackoffCalculator( - private val initialDuration: Duration = 1.seconds, - private val maxDuration: Duration = 30.seconds -) { - private var currentTime: Duration = initialDuration - - init { - require(initialDuration > 0.seconds) - } - - fun nextDuration(): Duration { - val delay = currentTime - val next = currentTime * 2.0 - currentTime = if (next <= maxDuration) next else maxDuration - return delay - } -} diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueBinaryProcess.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueBinaryProcess.kt index 9733d33126d..7e675bfb1a3 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueBinaryProcess.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueBinaryProcess.kt @@ -1,9 +1,12 @@ package com.github.continuedev.continueintellijextension.`continue`.process +import com.github.continuedev.continueintellijextension.error.ContinuePostHogService +import com.github.continuedev.continueintellijextension.error.ContinueSentryService import com.github.continuedev.continueintellijextension.proxy.ProxySettings import com.github.continuedev.continueintellijextension.utils.OS import com.github.continuedev.continueintellijextension.utils.getContinueBinaryPath import com.github.continuedev.continueintellijextension.utils.getOS +import com.intellij.openapi.components.service import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import java.io.File @@ -13,27 +16,47 @@ import java.nio.file.Files import java.nio.file.Paths import java.nio.file.attribute.PosixFilePermission -class ContinueBinaryProcess() : ContinueProcess { +class ContinueBinaryProcess( + private val onUnexpectedExit: () -> Unit +) : ContinueProcess { - private val process = start() + private val process = startBinaryProcess() override val input: InputStream = process.inputStream override val output: OutputStream = process.outputStream override fun close() = process.destroy() - private companion object { - - private fun start(): Process { - val path = getContinueBinaryPath() + private fun startBinaryProcess(): Process { + val path = getContinueBinaryPath() + runBlocking(Dispatchers.IO) { setPermissions() - val builder = ProcessBuilder(path) - builder.environment() += ProxySettings.getSettings().toContinueEnvVars() - return builder - .directory(File(path).parentFile) - .start() } + val builder = ProcessBuilder(path) + builder.environment() += ProxySettings.getSettings().toContinueEnvVars() + return builder + .directory(File(path).parentFile) + .start() + .apply { onExit().thenRun(onUnexpectedExit).thenRun(::reportErrorTelemetry) } + } + + private fun reportErrorTelemetry() { + var err = process.errorStream?.bufferedReader()?.readText()?.trim() + if (err != null) { + // There are often "⚡️Done in Xms" messages, and we want everything after the last one + val delimiter = "⚡ Done in" + val doneIndex = err.lastIndexOf(delimiter) + if (doneIndex != -1) { + err = err.substring(doneIndex + delimiter.length) + } + } + service().reportMessage("Core process exited with output: $err") + service().capture("jetbrains_core_exit", mapOf("error" to err)) + } + + private companion object { + private fun setPermissions() { val os = getOS() when (os) { diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueProcessHandler.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueProcessHandler.kt index d18c9677bc8..04e4d3d2e74 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueProcessHandler.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/continue/process/ContinueProcessHandler.kt @@ -1,62 +1,49 @@ package com.github.continuedev.continueintellijextension.`continue`.process -import com.github.continuedev.continueintellijextension.error.ContinuePostHogService import com.github.continuedev.continueintellijextension.error.ContinueSentryService import com.intellij.openapi.components.service import com.intellij.openapi.diagnostic.Logger import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import java.io.BufferedReader +import java.io.IOException import java.io.InputStreamReader import java.io.OutputStreamWriter class ContinueProcessHandler( - private val parentScope: CoroutineScope, - private val readMessage: (String) -> (Unit), - private val createProcess: () -> ContinueProcess + parentScope: CoroutineScope, + private val process: ContinueProcess, + handleMessage: (String) -> (Unit) ) { + private val innerJob = Job() + private val scope = CoroutineScope(parentScope.coroutineContext + innerJob) private val pendingWrites = Channel(Channel.UNLIMITED) - private val backoff = BackoffCalculator() - private var processScope: CoroutineScope? = null - private var process: ContinueProcess? = null + private val writer = OutputStreamWriter(process.output) + private val reader = BufferedReader(InputStreamReader(process.input)) + private val log = Logger.getInstance(ContinueProcessHandler::class.java) init { - restart() - } - - fun restart() { - LOG.warn("Starting Continue process") - processScope?.cancel() - process?.close() - - val handler = CoroutineExceptionHandler { _, e -> - service().report(e) - service().capture("jetbrains_core_exit", mapOf("error" to e)) - - val backoffDuration = backoff.nextDuration() - LOG.warn("Process failed! Restarting in $backoffDuration") - parentScope.launch { - delay(backoffDuration) - restart() - } - } - - val job = SupervisorJob(parentScope.coroutineContext.job) - processScope = CoroutineScope(parentScope.coroutineContext + job + handler) - process = createProcess() - - val reader = BufferedReader(InputStreamReader(process!!.input)) - val writer = OutputStreamWriter(process!!.output) - - processScope!!.launch(Dispatchers.IO) { - while (isActive) { - val line = reader.readLine() - if (line != null && line.isNotEmpty()) - readMessage(line) + scope.launch(Dispatchers.IO) { + try { + while (isActive) { + val line = reader.readLine() + if (line != null && line.isNotEmpty()) { + try { + log.debug("Handle: $line") + handleMessage(line) + } catch (e: Exception) { + service().report(e, "Error handling message: $line") + } + } else + delay(100) + } + } catch (e: IOException) { + service().report(e) } } - processScope!!.launch(Dispatchers.IO) { + scope.launch(Dispatchers.IO) { for (message in pendingWrites) { + log.debug("Write: $message") writer.write(message) writer.write("\r\n") writer.flush() @@ -67,7 +54,12 @@ class ContinueProcessHandler( fun write(message: String) = pendingWrites.trySend(message) - private companion object { - private val LOG = Logger.getInstance(ContinueProcessHandler::class.java.simpleName) + fun close() { + innerJob.cancel() + scope.launch(Dispatchers.IO) { + reader.close() + writer.close() + process.close() + } } } diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/proxy/ProxyPoolingActivity.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/proxy/ProxyPoolingActivity.kt index 460d7994d15..0d88d0bdd69 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/proxy/ProxyPoolingActivity.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/proxy/ProxyPoolingActivity.kt @@ -28,7 +28,7 @@ class ProxyPoolingActivity : StartupActivity { private fun onSettingsChanged(project: Project) { log.warn("Proxy settings changed, restarting") - project.service().coreMessenger?.restart() + project.service().coreMessengerManager?.coreMessenger?.restart() } } diff --git a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/services/ContinuePluginService.kt b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/services/ContinuePluginService.kt index bf84b7ccbaf..daaecb1233e 100644 --- a/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/services/ContinuePluginService.kt +++ b/extensions/intellij/src/main/kotlin/com/github/continuedev/continueintellijextension/services/ContinuePluginService.kt @@ -1,28 +1,34 @@ package com.github.continuedev.continueintellijextension.services import com.github.continuedev.continueintellijextension.`continue`.CoreMessenger +import com.github.continuedev.continueintellijextension.`continue`.CoreMessengerManager import com.github.continuedev.continueintellijextension.`continue`.DiffManager import com.github.continuedev.continueintellijextension.`continue`.IdeProtocolClient import com.github.continuedev.continueintellijextension.listeners.ActiveHandlerManager import com.github.continuedev.continueintellijextension.listeners.DocumentChangeTracker +import com.github.continuedev.continueintellijextension.toolWindow.ContinuePluginToolWindowFactory import com.github.continuedev.continueintellijextension.utils.uuid -import com.intellij.openapi.Disposable import com.intellij.openapi.components.Service import com.intellij.openapi.components.service import com.intellij.openapi.editor.EditorFactory import com.intellij.openapi.project.DumbAware import com.intellij.openapi.project.Project import com.intellij.openapi.util.Disposer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlin.properties.Delegates @Service(Service.Level.PROJECT) -class ContinuePluginService(private val project: Project) : Disposable, DumbAware { +class ContinuePluginService(private val project: Project) : DumbAware { + private val coroutineScope = CoroutineScope(Dispatchers.Main) var listener: (() -> Unit)? = null var ideProtocolClient: IdeProtocolClient? by Delegates.observable(null) { _, _, _ -> synchronized(this) { listener?.also { listener = null }?.invoke() } } - var coreMessenger: CoreMessenger? = null + var coreMessengerManager: CoreMessengerManager? = null + val coreMessenger: CoreMessenger? + get() = coreMessengerManager?.coreMessenger var workspacePaths: Array? = null var windowId: String = uuid() var diffManager: DiffManager? = null @@ -51,6 +57,14 @@ class ContinuePluginService(private val project: Project) : Disposable, DumbAwar } } + fun dispose() { + coroutineScope.cancel() + coreMessenger?.coroutineScope?.let { + it.cancel() + coreMessenger?.close() + } + } + /** * Add a listener for protocolClient initialization. * Currently, only one needs to be processed. If there are more than one, @@ -69,8 +83,4 @@ class ContinuePluginService(private val project: Project) : Disposable, DumbAwar listener() } } - - override fun dispose() { - coreMessenger?.coroutineScope?.cancel() - } } \ No newline at end of file diff --git a/extensions/intellij/src/test/kotlin/com/github/continuedev/continueintellijextension/unit/BackoffCalculatorTest.kt b/extensions/intellij/src/test/kotlin/com/github/continuedev/continueintellijextension/unit/BackoffCalculatorTest.kt deleted file mode 100644 index 12eec6c4feb..00000000000 --- a/extensions/intellij/src/test/kotlin/com/github/continuedev/continueintellijextension/unit/BackoffCalculatorTest.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.github.continuedev.continueintellijextension.unit - -import com.github.continuedev.continueintellijextension.`continue`.process.BackoffCalculator -import junit.framework.TestCase -import kotlin.time.Duration.Companion.seconds - -class BackoffCalculatorTest : TestCase() { - - fun `test backoff`() { - val backoff = BackoffCalculator(initialDuration = 1.seconds, maxDuration = 30.seconds) - assertEquals(1.seconds, backoff.nextDuration()) - assertEquals(2.seconds, backoff.nextDuration()) - assertEquals(4.seconds, backoff.nextDuration()) - assertEquals(8.seconds, backoff.nextDuration()) - assertEquals(16.seconds, backoff.nextDuration()) - assertEquals(30.seconds, backoff.nextDuration()) - assertEquals(30.seconds, backoff.nextDuration()) - } - -} \ No newline at end of file