Skip to content

Commit

Permalink
Fix seatunnel log conflicts (#3872)
Browse files Browse the repository at this point in the history
* seatunnel once

* seatunnel config

* seatunnel log
  • Loading branch information
dlimeng authored Nov 25, 2022
1 parent ee40ca1 commit 0bd23e5
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 34 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ The following file are provided under the Apache 2.0 License.
linkis-web/src/common/i18n/zh.json
linkis-web/src/config.json
linkis-web/public/favicon.ico

linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/*

The files:
.mvn/wrapper/MavenWrapperDownloader.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.LoggerFactory;

public class LinkisSeatunnelFlinkClient {
private static Logger logger = LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
private static Logger logger = LoggerFactory.getLogger(LinkisSeatunnelFlinkClient.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.LoggerFactory;

public class LinkisSeatunnelFlinkSQLClient {
private static Logger logger = LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
private static Logger logger = LoggerFactory.getLogger(LinkisSeatunnelFlinkSQLClient.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public static Path appRootDir() {
if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
try {
String path = System.getProperty("SEATUNNEL_HOME") + "/seatunnel";
System.out.println("appRootDir:" + path);
path = new File(path).getPath();
return Paths.get(path);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.core.base.config;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
Expand All @@ -26,18 +28,14 @@

import java.nio.file.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to build the {@link Config} from file.
*
* @param <ENVIRONMENT> environment type.
*/
public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {

private static Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);

public static final Log LOGGER = LogFactory.getLog(ConfigBuilder.class.getName());
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
private final EngineType engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.core.base.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
Expand All @@ -40,17 +42,14 @@
import java.util.*;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to load the plugins.
*
* @param <ENVIRONMENT> environment
*/
public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {

private static Logger LOGGER = LoggerFactory.getLogger(PluginFactory.class);
public static final Log LOGGER = LogFactory.getLog(PluginFactory.class.getName());
private final Config config;
private final EngineType engineType;
private static final Map<EngineType, Map<PluginType, Class<?>>> PLUGIN_BASE_CLASS_MAP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
Expand All @@ -27,15 +29,12 @@

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The SeaTunnel flink starter. This class is responsible for generate the final flink job execute
* command.
*/
public class FlinkStarter implements Starter {
private static Logger logger = LoggerFactory.getLogger(FlinkStarter.class);
public static final Log logger = LogFactory.getLog(FlinkStarter.class.getName());
private static final String APP_NAME = SeatunnelFlink.class.getName();
private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
Expand All @@ -45,13 +47,12 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.file.FileVisitOption.FOLLOW_LINKS;

public class SparkStarter implements Starter {
private static Logger logger = LoggerFactory.getLogger(SparkStarter.class);
public static final Log logger = LogFactory.getLog(SparkStarter.class.getName());

private static final int USAGE_EXIT_CODE = 234;

private static final int PLUGIN_LIB_DIR_DEPTH = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
Expand All @@ -27,11 +29,8 @@

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSqlStarter implements Starter {
private static Logger logger = LoggerFactory.getLogger(FlinkSqlStarter.class);
public static final Log logger = LogFactory.getLog(FlinkSqlStarter.class.getName());
private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
private static final String CLASS_NAME = SeatunnelSql.class.getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class SeatunnelFlinkOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) + "/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
info(s"Execute SeatunnelFlink Process end")
info(s"Execute SeatunnelFlink Process end args:${args.mkString(" ")}")
LinkisSeatunnelFlinkClient.main(args)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import java.nio.file.Files
import java.util
import java.util.concurrent.{Future, TimeUnit}

import scala.language.postfixOps

class SeatunnelSparkOnceCodeExecutor(
override val id: Long,
override protected val seatunnelEngineConnContext: SeatunnelEngineConnContext
Expand Down Expand Up @@ -91,14 +93,13 @@ class SeatunnelSparkOnceCodeExecutor(
protected def runCode(code: String): Int = {
info("Execute SeatunnelSpark Process")
val masterKey = LINKIS_SPARK_MASTER.getValue
val configKey = LINKIS_SPARK_CONFIG.getValue
val deployModeKey = LINKIS_SPARK_DEPLOY_MODE.getValue

var args: Array[String] = Array.empty
if (params != null && StringUtils.isNotBlank(params.get(masterKey))) {
args = Array(
GET_LINKIS_SPARK_MASTER,
params.getOrDefault(masterKey, "yarn"),
params.getOrDefault(masterKey, "local[4]"),
GET_LINKIS_SPARK_DEPLOY_MODE,
params.getOrDefault(deployModeKey, "client"),
GET_LINKIS_SPARK_CONFIG,
Expand All @@ -113,7 +114,7 @@ class SeatunnelSparkOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) + "/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
info(s"Execute SeatunnelSpark Process end")
info(s"Execute SeatunnelSpark Process end args:${args.mkString(" ")}")
LinkisSeatunnelSparkClient.main(args)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

package org.apache.linkis.engineconnplugin.seatunnel.util

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineconn.acessible.executor.log.LogHelper
import org.apache.linkis.engineconn.common.conf.EngineConnConf.ENGINE_CONN_LOCAL_PATH_PWD_KEY
import org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelSparkEnvConfiguration

import org.apache.commons.io.IOUtils
import org.apache.commons.logging.{Log, LogFactory}

import java.io.{BufferedReader, File, InputStreamReader, PrintWriter}

object SeatunnelUtils extends Logging {
object SeatunnelUtils {
val LOGGER: Log = LogFactory.getLog(SeatunnelUtils.getClass)
private var process: Process = _

def localArray(code: String): Array[String] = {
Array(SeatunnelSparkEnvConfiguration.LINKIS_SPARK_CONFIG.getValue, generateExecFile(code))
Array(SeatunnelSparkEnvConfiguration.GET_LINKIS_SPARK_CONFIG, generateExecFile(code))
}

def generateExecFile(code: String): String = {
Expand All @@ -54,10 +54,9 @@ object SeatunnelUtils extends Logging {
while ({
line = bufferedReader.readLine(); line != null
}) {
LogHelper.logCache.cacheLog(line)
LOGGER.info(line)
}
val exitcode = process.waitFor()
logger.info("executeLine exitcode:" + exitcode)
exitcode
} finally {
IOUtils.closeQuietly(bufferedReader)
Expand Down

0 comments on commit 0bd23e5

Please sign in to comment.