Skip to content

Commit 5428c1b

Browse files
committed
Reformat with scalafmt 3.9.0
Executed command: scalafmt --non-interactive
1 parent 559a239 commit 5428c1b

File tree

7 files changed

+50
-33
lines changed

7 files changed

+50
-33
lines changed

modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[api] trait LowPrioImplicits {
4545
}
4646
}
4747

48-
def split[T: ClassTag : TypeTag](ctx: SealedTrait[TypeInformation, T]): TypeInformation[T] = {
48+
def split[T: ClassTag: TypeTag](ctx: SealedTrait[TypeInformation, T]): TypeInformation[T] = {
4949
val cacheKey = typeName[T]
5050
cache.get(cacheKey) match {
5151
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]

modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TypeTagMacro.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ object TypeTagMacro:
77
def gen[A: Type](using q: Quotes): Expr[TypeTag[A]] =
88
import q.reflect.*
99

10-
val A = TypeRepr.of[A]
11-
val symA = A.typeSymbol
12-
val flagsA = symA.flags
13-
val isModuleExpr = Expr(flagsA.is(Flags.Module))
10+
val A = TypeRepr.of[A]
11+
val symA = A.typeSymbol
12+
val flagsA = symA.flags
13+
val isModuleExpr = Expr(flagsA.is(Flags.Module))
1414
val isCachableExpr = Expr(A match {
1515
// this type is not cachable if one of its type args is abstract
1616
case a: AppliedType => !a.args.exists { t => t.typeSymbol.isAbstractType }
1717
case _ => true
1818
})
19-
val toStringExpr = Expr(A.show)
19+
val toStringExpr = Expr(A.show)
2020

2121
'{
2222
new TypeTag[A]:
23-
override lazy val isModule: Boolean = ${ isModuleExpr }
23+
override lazy val isModule: Boolean = ${ isModuleExpr }
2424
override lazy val isCachable: Boolean = ${ isCachableExpr }
25-
override lazy val toString: String = ${ toStringExpr }
25+
override lazy val toString: String = ${ toStringExpr }
2626
}

modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ object StreamExecutionEnvironment {
931931
jarFiles: String*
932932
): StreamExecutionEnvironment = {
933933
val extraConfig = copyWithExtra(new Configuration)
934-
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
934+
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
935935
javaEnv.setParallelism(parallelism)
936936
new StreamExecutionEnvironment(javaEnv)
937937
}
@@ -957,26 +957,38 @@ object StreamExecutionEnvironment {
957957
jarFiles: String*
958958
): StreamExecutionEnvironment = {
959959
val extraConfig = copyWithExtra(config)
960-
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
960+
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, extraConfig, jarFiles: _*)
961961
new StreamExecutionEnvironment(javaEnv)
962962
}
963963

964-
/**
965-
* Copy input config and add extra configuration:
966-
* - register type info factories to fail-fast on Scala type resolution with Class
967-
* @param config input configuration
968-
* @return a copy of input config with extra configuration
969-
*/
964+
/** Copy input config and add extra configuration:
965+
* - register type info factories to fail-fast on Scala type resolution with Class
966+
* @param config
967+
* input configuration
968+
* @return
969+
* a copy of input config with extra configuration
970+
*/
970971
private def copyWithExtra(config: Configuration): Configuration = {
971972
if (!isFailFastOnScalaTypeResolutionWithClassConfigured && !isFailFastOnScalaTypeResolutionWithClassDisabled) {
972973
isFailFastOnScalaTypeResolutionWithClassConfigured = true
973-
val serializationOption = ConfigOptions.key("pipeline.serialization-config").stringType().asList().noDefaultValue()
974+
val serializationOption =
975+
ConfigOptions.key("pipeline.serialization-config").stringType().asList().noDefaultValue()
974976
val serializationConfig = config.getOptional(serializationOption).orElse(new util.ArrayList[String])
975-
serializationConfig.add("scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
976-
serializationConfig.add("scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
977-
serializationConfig.add("scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
978-
serializationConfig.add("scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
979-
serializationConfig.add("scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}")
977+
serializationConfig.add(
978+
"scala.Product: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
979+
)
980+
serializationConfig.add(
981+
"scala.Option: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
982+
)
983+
serializationConfig.add(
984+
"scala.util.Either: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
985+
)
986+
serializationConfig.add(
987+
"scala.Array: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
988+
)
989+
serializationConfig.add(
990+
"scala.collection.Iterable: {type: typeinfo, class: org.apache.flinkx.api.typeinfo.FailFastTypeInfoFactory}"
991+
)
980992
new Configuration(config).set(serializationOption, serializationConfig)
981993
} else {
982994
config

modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/FailFastTypeInfoFactory.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ class FailFastTypeInfoFactory extends TypeInfoFactory[Nothing] {
1212

1313
override def createTypeInfo(t: Type, params: util.Map[String, TypeInformation[_]]): TypeInformation[Nothing] =
1414
throw new FlinkRuntimeException(
15-
s"""You are using a 'Class' to resolve '${formatType(t, params)}' Scala type. flink-scala-api has no control over this kind of type resolution which may lead to silently fallback to generic Kryo serializers.
15+
s"""You are using a 'Class' to resolve '${formatType(
16+
t,
17+
params
18+
)}' Scala type. flink-scala-api has no control over this kind of type resolution which may lead to silently fallback to generic Kryo serializers.
1619
|Use type information instead: import 'org.apache.flinkx.api.serializers._' to make implicitly available in the scope required 'TypeInformation' to resolve Scala types.
1720
|To disable this check, set 'DISABLE_FAIL_FAST_ON_SCALA_TYPE_RESOLUTION_WITH_CLASS' environment variable to 'true'.""".stripMargin
1821
)

modules/scala-api/src/test/scala-2/org/apache/flinkx/api/GenericCaseClassScala2Test.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ class GenericCaseClassScala2Test extends AnyFlatSpec with should.Matchers {
2222
aClass: Class[A]
2323
): Unit = {
2424
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Cat => OK
25-
val catInfo: TypeInformation[Cat] = implicitly[TypeInformation[Cat]]
25+
val catInfo: TypeInformation[Cat] = implicitly[TypeInformation[Cat]]
2626
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Dog => OK
27-
val dogInfo: TypeInformation[Dog] = implicitly[TypeInformation[Dog]]
27+
val dogInfo: TypeInformation[Dog] = implicitly[TypeInformation[Dog]]
2828
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Cat or Dog => OK
29-
val aInfo: TypeInformation[A] = implicitly[TypeInformation[A]]
29+
val aInfo: TypeInformation[A] = implicitly[TypeInformation[A]]
3030
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[org.apache.flinkx.api.GenericCaseClassTest.Cat] => OK
3131
val catBasketInfo: TypeInformation[Basket[Cat]] = implicitly[TypeInformation[Basket[Cat]]]
3232
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[org.apache.flinkx.api.GenericCaseClassTest.Dog] => OK
3333
val dogBasketInfo: TypeInformation[Basket[Dog]] = implicitly[TypeInformation[Basket[Dog]]]
3434
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[org.apache.flinkx.api.GenericCaseClassTest.Cat] or Dog => OK
35-
val aBasketInfo: TypeInformation[Basket[A]] = implicitly[TypeInformation[Basket[A]]]
35+
val aBasketInfo: TypeInformation[Basket[A]] = implicitly[TypeInformation[Basket[A]]]
3636

3737
if (classOf[Cat].isAssignableFrom(aClass)) {
3838
aInfo should be theSameInstanceAs catInfo

modules/scala-api/src/test/scala-3/org/apache/flinkx/api/GenericCaseClassScala3Test.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ class GenericCaseClassScala3Test extends AnyFlatSpec with should.Matchers {
1919
aClass: Class[A]
2020
): Unit = {
2121
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Cat => OK
22-
val catInfo: TypeInformation[Cat] = deriveTypeInformation
22+
val catInfo: TypeInformation[Cat] = deriveTypeInformation
2323
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Dog => OK
24-
val dogInfo: TypeInformation[Dog] = deriveTypeInformation
24+
val dogInfo: TypeInformation[Dog] = deriveTypeInformation
2525
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Cat or Dog => OK
26-
val aInfo: TypeInformation[A] = implicitly[TypeInformation[A]]
26+
val aInfo: TypeInformation[A] = implicitly[TypeInformation[A]]
2727
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[org.apache.flinkx.api.GenericCaseClassTest.Cat] => OK
2828
val catBasketInfo: TypeInformation[Basket[Cat]] = deriveTypeInformation
2929
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[org.apache.flinkx.api.GenericCaseClassTest.Dog] => OK
3030
val dogBasketInfo: TypeInformation[Basket[Dog]] = deriveTypeInformation
3131
// cacheKey=org.apache.flinkx.api.GenericCaseClassTest.Basket[A] => Basket[A] is not cachable
32-
val aBasketInfo: TypeInformation[Basket[A]] = deriveTypeInformation
32+
val aBasketInfo: TypeInformation[Basket[A]] = deriveTypeInformation
3333

3434
if (classOf[Cat].isAssignableFrom(aClass)) {
3535
aInfo should be theSameInstanceAs catInfo

modules/scala-api/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ class StreamExecutionEnvironmentTest extends AnyFlatSpec with Matchers with Inte
3838
Try(Class.forName("org.apache.flink.configuration.PipelineOptions").getField("SERIALIZATION_CONFIG")) match {
3939
case Failure(_) => // Before Flink 1.19: no fail-fast, exception happens at execution
4040
implicit val typeInfo: TypeInformation[Option[Int]] = TypeInformation.of(classOf[Option[Int]])
41-
val stream = env.fromElements(Some(1), None, Some(100))
41+
val stream = env.fromElements(Some(1), None, Some(100))
4242
val exception = intercept[UnsupportedOperationException] {
4343
stream.executeAndCollect(3)
4444
}
45-
exception.getMessage should startWith("Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type.")
45+
exception.getMessage should startWith(
46+
"Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type."
47+
)
4648

4749
case Success(_) => // From Flink 1.19: fail-fast at Scala type resolution
4850
val exception = intercept[FlinkRuntimeException] {

0 commit comments

Comments
 (0)