Skip to content

Commit

Permalink
python runner takes an optional outputstream for redirecting stderr/s…
Browse files Browse the repository at this point in the history
…tdout (#701)

Co-authored-by: helenyugithub <[email protected]>
  • Loading branch information
helenyugithub and helenyuyu authored Jul 15, 2020
1 parent 597d5a2 commit a995f2a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/CondaRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.io.OutputStream
import java.util.concurrent.atomic.AtomicReference

import org.apache.spark.SparkConf
Expand All @@ -33,10 +34,13 @@ import org.apache.spark.util.Utils
abstract class CondaRunner extends Logging {
final def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
run(args, CondaRunner.setupCondaEnvironmentAutomatically(sparkConf))
run(args, CondaRunner.setupCondaEnvironmentAutomatically(sparkConf), None)
}

def run(args: Array[String], maybeConda: Option[CondaEnvironment]): Unit
def run(
args: Array[String],
maybeConda: Option[CondaEnvironment],
maybeOutputStream: Option[OutputStream]): Unit
}

object CondaRunner {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy

import java.io.File
import java.io.OutputStream
import java.net.{InetAddress, URI}
import java.nio.file.Files

Expand All @@ -40,7 +41,10 @@ import org.apache.spark.util.Utils
*/
object PythonRunner extends CondaRunner with Logging {

override def run(args: Array[String], maybeConda: Option[CondaEnvironment]): Unit = {
override def run(
args: Array[String],
maybeConda: Option[CondaEnvironment],
maybeOutputStream: Option[OutputStream]): Unit = {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
Expand Down Expand Up @@ -116,7 +120,9 @@ object PythonRunner extends CondaRunner with Logging {
try {
val process = builder.start()

new RedirectThread(process.getInputStream, System.out, "redirect output").start()
new RedirectThread(
process.getInputStream, maybeOutputStream.getOrElse(System.out), "redirect output")
.start()

val exitCode = process.waitFor()
if (exitCode != 0) {
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import org.apache.spark.util.RedirectThread
* subprocess and then has it connect back to the JVM to access system properties etc.
*/
object RRunner extends CondaRunner with Logging {
override def run(args: Array[String], maybeConda: Option[CondaEnvironment]): Unit = {
override def run(
args: Array[String],
maybeConda: Option[CondaEnvironment],
maybeOutputStream: Option[OutputStream]): Unit = {
val rFile = PythonRunner.formatPath(args(0))

val otherArgs = args.slice(1, args.length)
Expand Down Expand Up @@ -117,7 +120,9 @@ object RRunner extends CondaRunner with Logging {
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()

new RedirectThread(process.getInputStream, System.out, "redirect R output").start()
new RedirectThread(
process.getInputStream, maybeOutputStream.getOrElse(System.out), "redirect R output")
.start()

process.waitFor()
} finally {
Expand Down

0 comments on commit a995f2a

Please sign in to comment.