Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ class ContinuePluginStartupActivity : StartupActivity, DumbAware {
project.service<ContinuePluginDisposable>()
)

continuePluginService.coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope)
val coreMessengerManager = CoreMessengerManager(project, ideProtocolClient, coroutineScope)
continuePluginService.coreMessengerManager = coreMessengerManager
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ import com.github.continuedev.continueintellijextension.constants.MessageTypes
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueBinaryProcess
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueProcessHandler
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueSocketProcess
import com.github.continuedev.continueintellijextension.services.ContinuePluginService
import com.github.continuedev.continueintellijextension.utils.uuid
import com.google.gson.Gson
import com.google.gson.JsonSyntaxException
import com.intellij.openapi.components.service
import com.intellij.openapi.diagnostic.Logger
import com.intellij.openapi.project.Project
import kotlinx.coroutines.CoroutineScope

class CoreMessenger(
private val project: Project,
private val ideProtocolClient: IdeProtocolClient,

// todo: this scope is public only to cancel the coroutines on dispose in ContinuePluginService
// todo: scope is created in ContinuePluginStartupActivity; refactor this/clean this up/encapsulate
val coroutineScope: CoroutineScope
val coroutineScope: CoroutineScope,
private val onUnexpectedExit: () -> Unit
) {
private val gson = Gson()
private val responseListeners = mutableMapOf<String, (Any?) -> Unit>()
private val process = startContinueProcess()
private var process = startContinueProcess()
private val log = Logger.getInstance(CoreMessenger::class.java.simpleName)

fun request(messageType: String, data: Any?, messageId: String?, onResponse: (Any?) -> Unit) {
val id = messageId ?: uuid()
Expand All @@ -33,15 +34,14 @@ class CoreMessenger(

private fun startContinueProcess(): ContinueProcessHandler {
val isTcp = System.getenv("USE_TCP")?.toBoolean() ?: false
return ContinueProcessHandler(coroutineScope, ::readProcessMessage) {
if (isTcp)
ContinueSocketProcess()
else
ContinueBinaryProcess()
}
val process = if (isTcp)
ContinueSocketProcess()
else
ContinueBinaryProcess(onUnexpectedExit)
return ContinueProcessHandler(coroutineScope, process, ::handleMessage)
}

private fun readProcessMessage(json: String) {
private fun handleMessage(json: String) {
val responseMap = tryToParse(json) ?: return
val messageId = responseMap["messageId"].toString()
val messageType = responseMap["messageType"].toString()
Expand Down Expand Up @@ -76,16 +76,19 @@ class CoreMessenger(
try {
gson.fromJson(json, Map::class.java)
} catch (_: JsonSyntaxException) {
LOG.warn("Invalid message JSON: $json") // example: NODE_ENV undefined
log.warn("Invalid message JSON: $json") // example: NODE_ENV undefined
null
}

fun restart() {
log.warn("Restarting Continue process")
responseListeners.clear()
process.restart()
process.close()
process = startContinueProcess()
}

private companion object {
private val LOG = Logger.getInstance(CoreMessenger::class.java.simpleName)
fun close() {
log.warn("Closing Continue process")
process.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.github.continuedev.continueintellijextension.`continue`

import com.github.continuedev.continueintellijextension.error.ContinuePostHogService
import com.intellij.openapi.components.service
import com.intellij.openapi.diagnostic.Logger
import com.intellij.openapi.project.Project
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.time.Duration.Companion.seconds

class CoreMessengerManager(
project: Project,
ideProtocolClient: IdeProtocolClient,
private val coroutineScope: CoroutineScope
) {
val coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope, ::onUnexpectedExit)
private var backoffIntervalSeconds = 1
private val log = Logger.getInstance(CoreMessengerManager::class.java)

private fun onUnexpectedExit() {
coroutineScope.launch {
try {
delay(backoffIntervalSeconds.seconds)
backoffIntervalSeconds *= 2
log.warn("Continue process terminated externally, retrying in $backoffIntervalSeconds seconds")
coreMessenger.restart()
} catch (e: Exception) {
service<ContinuePostHogService>().capture("jetbrains_core_start_error", mapOf("error" to e))
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.github.continuedev.continueintellijextension.`continue`.process

import com.github.continuedev.continueintellijextension.error.ContinuePostHogService
import com.github.continuedev.continueintellijextension.error.ContinueSentryService
import com.github.continuedev.continueintellijextension.proxy.ProxySettings
import com.github.continuedev.continueintellijextension.utils.OS
import com.github.continuedev.continueintellijextension.utils.getContinueBinaryPath
import com.github.continuedev.continueintellijextension.utils.getOS
import com.intellij.openapi.components.service
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import java.io.File
Expand All @@ -13,27 +16,47 @@ import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.attribute.PosixFilePermission

class ContinueBinaryProcess() : ContinueProcess {
class ContinueBinaryProcess(
private val onUnexpectedExit: () -> Unit
) : ContinueProcess {

private val process = start()
private val process = startBinaryProcess()
override val input: InputStream = process.inputStream
override val output: OutputStream = process.outputStream

override fun close() =
process.destroy()

private companion object {

private fun start(): Process {
val path = getContinueBinaryPath()
private fun startBinaryProcess(): Process {
val path = getContinueBinaryPath()
runBlocking(Dispatchers.IO) {
setPermissions()
val builder = ProcessBuilder(path)
builder.environment() += ProxySettings.getSettings().toContinueEnvVars()
return builder
.directory(File(path).parentFile)
.start()
}

val builder = ProcessBuilder(path)
builder.environment() += ProxySettings.getSettings().toContinueEnvVars()
return builder
.directory(File(path).parentFile)
.start()
.apply { onExit().thenRun(onUnexpectedExit).thenRun(::reportErrorTelemetry) }
}

private fun reportErrorTelemetry() {
var err = process.errorStream?.bufferedReader()?.readText()?.trim()
if (err != null) {
// There are often "⚡️Done in Xms" messages, and we want everything after the last one
val delimiter = "⚡ Done in"
val doneIndex = err.lastIndexOf(delimiter)
if (doneIndex != -1) {
err = err.substring(doneIndex + delimiter.length)
}
}
service<ContinueSentryService>().reportMessage("Core process exited with output: $err")
service<ContinuePostHogService>().capture("jetbrains_core_exit", mapOf("error" to err))
}

private companion object {

private fun setPermissions() {
val os = getOS()
when (os) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,49 @@
package com.github.continuedev.continueintellijextension.`continue`.process

import com.github.continuedev.continueintellijextension.error.ContinuePostHogService
import com.github.continuedev.continueintellijextension.error.ContinueSentryService
import com.intellij.openapi.components.service
import com.intellij.openapi.diagnostic.Logger
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStreamReader
import java.io.OutputStreamWriter

class ContinueProcessHandler(
private val parentScope: CoroutineScope,
private val readMessage: (String) -> (Unit),
private val createProcess: () -> ContinueProcess
parentScope: CoroutineScope,
private val process: ContinueProcess,
handleMessage: (String) -> (Unit)
) {
private val innerJob = Job()
private val scope = CoroutineScope(parentScope.coroutineContext + innerJob)
private val pendingWrites = Channel<String>(Channel.UNLIMITED)
private val backoff = BackoffCalculator()
private var processScope: CoroutineScope? = null
private var process: ContinueProcess? = null
private val writer = OutputStreamWriter(process.output)
private val reader = BufferedReader(InputStreamReader(process.input))
private val log = Logger.getInstance(ContinueProcessHandler::class.java)

init {
restart()
}

fun restart() {
LOG.warn("Starting Continue process")
processScope?.cancel()
process?.close()

val handler = CoroutineExceptionHandler { _, e ->
service<ContinueSentryService>().report(e)
service<ContinuePostHogService>().capture("jetbrains_core_exit", mapOf("error" to e))

val backoffDuration = backoff.nextDuration()
LOG.warn("Process failed! Restarting in $backoffDuration")
parentScope.launch {
delay(backoffDuration)
restart()
}
}

val job = SupervisorJob(parentScope.coroutineContext.job)
processScope = CoroutineScope(parentScope.coroutineContext + job + handler)
process = createProcess()

val reader = BufferedReader(InputStreamReader(process!!.input))
val writer = OutputStreamWriter(process!!.output)

processScope!!.launch(Dispatchers.IO) {
while (isActive) {
val line = reader.readLine()
if (line != null && line.isNotEmpty())
readMessage(line)
scope.launch(Dispatchers.IO) {
try {
while (isActive) {
val line = reader.readLine()
if (line != null && line.isNotEmpty()) {
try {
log.debug("Handle: $line")
handleMessage(line)
} catch (e: Exception) {
service<ContinueSentryService>().report(e, "Error handling message: $line")
}
} else
delay(100)
}
} catch (e: IOException) {
service<ContinueSentryService>().report(e)
}
}
processScope!!.launch(Dispatchers.IO) {
scope.launch(Dispatchers.IO) {
for (message in pendingWrites) {
log.debug("Write: $message")
writer.write(message)
writer.write("\r\n")
writer.flush()
Expand All @@ -67,7 +54,12 @@ class ContinueProcessHandler(
fun write(message: String) =
pendingWrites.trySend(message)

private companion object {
private val LOG = Logger.getInstance(ContinueProcessHandler::class.java.simpleName)
fun close() {
innerJob.cancel()
scope.launch(Dispatchers.IO) {
reader.close()
writer.close()
process.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ProxyPoolingActivity : StartupActivity {

private fun onSettingsChanged(project: Project) {
log.warn("Proxy settings changed, restarting")
project.service<ContinuePluginService>().coreMessenger?.restart()
project.service<ContinuePluginService>().coreMessengerManager?.coreMessenger?.restart()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
package com.github.continuedev.continueintellijextension.services

import com.github.continuedev.continueintellijextension.`continue`.CoreMessenger
import com.github.continuedev.continueintellijextension.`continue`.CoreMessengerManager
import com.github.continuedev.continueintellijextension.`continue`.DiffManager
import com.github.continuedev.continueintellijextension.`continue`.IdeProtocolClient
import com.github.continuedev.continueintellijextension.listeners.ActiveHandlerManager
import com.github.continuedev.continueintellijextension.listeners.DocumentChangeTracker
import com.github.continuedev.continueintellijextension.toolWindow.ContinuePluginToolWindowFactory
import com.github.continuedev.continueintellijextension.utils.uuid
import com.intellij.openapi.Disposable
import com.intellij.openapi.components.Service
import com.intellij.openapi.components.service
import com.intellij.openapi.editor.EditorFactory
import com.intellij.openapi.project.DumbAware
import com.intellij.openapi.project.Project
import com.intellij.openapi.util.Disposer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlin.properties.Delegates

@Service(Service.Level.PROJECT)
class ContinuePluginService(private val project: Project) : Disposable, DumbAware {
class ContinuePluginService(private val project: Project) : DumbAware {
private val coroutineScope = CoroutineScope(Dispatchers.Main)
var listener: (() -> Unit)? = null
var ideProtocolClient: IdeProtocolClient? by Delegates.observable(null) { _, _, _ ->
synchronized(this) { listener?.also { listener = null }?.invoke() }
}
var coreMessenger: CoreMessenger? = null
var coreMessengerManager: CoreMessengerManager? = null
val coreMessenger: CoreMessenger?
get() = coreMessengerManager?.coreMessenger
var workspacePaths: Array<String>? = null
var windowId: String = uuid()
var diffManager: DiffManager? = null
Expand Down Expand Up @@ -51,6 +57,14 @@ class ContinuePluginService(private val project: Project) : Disposable, DumbAwar
}
}

fun dispose() {
coroutineScope.cancel()
coreMessenger?.coroutineScope?.let {
it.cancel()
coreMessenger?.close()
}
}

/**
* Add a listener for protocolClient initialization.
* Currently, only one needs to be processed. If there are more than one,
Expand All @@ -69,8 +83,4 @@ class ContinuePluginService(private val project: Project) : Disposable, DumbAwar
listener()
}
}

override fun dispose() {
coreMessenger?.coroutineScope?.cancel()
}
}
Loading
Loading