Skip to content

Commit 99f6412

Browse files
authored
[Feat] comes with maven, which simplifies CICD's dependence on Maven (apache#688)
* [Feat] comes with maven, which simplifies CICD's dependence on Maven
1 parent bbcdcff commit 99f6412

File tree

2 files changed

+12
-24
lines changed
  • streamx-common/src/main/scala/com/streamxhub/streamx/common/util
  • streamx-spark/streamx-spark-core/src/main/scala/com/streamxhub/streamx/spark/core/support/kafka/offset

2 files changed

+12
-24
lines changed

streamx-common/src/main/scala/com/streamxhub/streamx/common/util/CommandUtils.scala

+11-24
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.streamxhub.streamx.common.util
2020

2121
import java.io._
22-
import java.lang.{Iterable => JavaIter}
22+
import java.lang.{Iterable => JavaIterable}
2323
import java.util.Scanner
2424
import java.util.function.Consumer
2525
import scala.collection.JavaConversions._
@@ -30,24 +30,15 @@ object CommandUtils extends Logger {
3030

3131
def execute(command: String): String = {
3232
val buffer = new StringBuffer()
33-
Try {
34-
val process = new ProcessBuilder(List(command)).redirectErrorStream(true).start
35-
val reader = new InputStreamReader(process.getInputStream)
36-
val scanner = new Scanner(reader)
37-
while (scanner.hasNextLine) {
38-
buffer.append(scanner.nextLine()).append("\n")
33+
this.execute(List(command), new Consumer[String] {
34+
override def accept(line: String): Unit = {
35+
buffer.append(line).append("\n")
3936
}
40-
processClose(process)
41-
scanner.close()
42-
reader.close()
43-
} match {
44-
case Success(_) =>
45-
case Failure(e) => throw e
46-
}
37+
})
4738
buffer.toString
4839
}
4940

50-
def execute(commands: JavaIter[String], consumer: Consumer[String]): Unit = {
41+
def execute(commands: JavaIterable[String], consumer: Consumer[String]): Unit = {
5142
Try {
5243
require(commands != null && commands.nonEmpty, "[StreamX] CommandUtils.execute: commands must not be null.")
5344
logDebug(s"Command execute:\n${commands.mkString("\n")} ")
@@ -63,7 +54,11 @@ object CommandUtils extends Logger {
6354
while (scanner.hasNextLine) {
6455
consumer.accept(scanner.nextLine)
6556
}
66-
processClose(process)
57+
process.waitFor()
58+
process.getErrorStream.close()
59+
process.getInputStream.close()
60+
process.getOutputStream.close()
61+
process.destroy()
6762
scanner.close()
6863
out.close()
6964
} match {
@@ -72,12 +67,4 @@ object CommandUtils extends Logger {
7267
}
7368
}
7469

75-
def processClose(process: Process): Unit = {
76-
process.waitFor()
77-
process.getErrorStream.close()
78-
process.getInputStream.close()
79-
process.getOutputStream.close()
80-
process.destroy()
81-
}
82-
8370
}

streamx-spark/streamx-spark-core/src/main/scala/com/streamxhub/streamx/spark/core/support/kafka/offset/Offset.scala

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException}
2828

2929
import java.util.Properties
3030
import scala.collection.mutable
31+
import scala.language.implicitConversions
3132
import scala.util.Try
3233

3334

0 commit comments

Comments
 (0)