From e9dc699d09fb92fae11cb5edeb76ae3e5e06b695 Mon Sep 17 00:00:00 2001 From: zhangkewei Date: Thu, 8 Nov 2018 10:48:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1=E5=81=9C?= =?UTF-8?q?=E6=AD=A2=E6=97=B6=E6=95=B0=E6=8D=AE=E5=BA=93=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E5=85=88=E4=BA=8E=E4=B8=9A=E5=8A=A1=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E6=B8=85=E7=90=86=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 +- README_EN.md | 8 +- .../common/client/impl/CanalClient.java | 2 +- .../common/client/impl/KafkaClient.java | 58 ++++--- .../config/source/KafkaProduceConfig.java | 8 +- .../exception/TaskStopTriggerException.java | 2 +- .../common/client/impl/KafkaClientTest.java | 4 +- .../manager/ManagerBootApplication.java | 156 ++++++++---------- .../resources/application-sample.properties | 55 ------ .../src/main/resources/application.properties | 28 ++++ .../porter/boot/config/NodeConfig.java | 2 +- .../resources/application-sample.properties | 26 --- .../src/main/resources/application.properties | 46 +++++- .../tasks/sample/task.sample.properties | 59 +++---- .../porter/core/loader/DataLoader.java | 2 +- .../porter/core/task/AbstractStageJob.java | 13 +- .../plugin/loader/jdbc/BaseJdbcLoader.java | 14 ++ .../plugin/loader/jdbc/JdbcBatchLoader.java | 2 +- .../loader/jdbc/JdbcPrintSqlLoader.java | 2 +- .../plugin/loader/jdbc/JdbcSingleLoader.java | 2 +- .../porter/task/extract/ExtractJob.java | 2 +- .../middleware/porter/task/load/LoadJob.java | 6 +- .../porter/task/select/SelectJob.java | 2 +- .../porter/task/transform/TransformJob.java | 6 +- 24 files changed, 249 insertions(+), 266 deletions(-) delete mode 100644 manager/manager-boot/src/main/resources/application-sample.properties create mode 100644 manager/manager-boot/src/main/resources/application.properties delete mode 100644 porter/porter-boot/src/main/resources/application-sample.properties diff --git a/README.md b/README.md index 11d407cb..fc457b82 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Porter始于2017年,提供数据同步功能,但并不仅仅局限于数据 ### 从源码编译 ``` git clone https://github.com/sxfad/porter.git -cd vbill-proter +cd proter git checkout 版本 gradle build 从build/distributions目录查找安装包 @@ -44,11 +44,11 @@ gradle build [配置文档](https://github.com/sxfad/porter/blob/master/doc/profiles.md) ``` - node.id=节点编号,在集群中唯一 + porter.id=节点编号,在集群中唯一 #集群配置 - node.cluster.strategy=ZOOKEEPER - node.cluster.client.url=127.0.0.1:2181 - node.cluster.client.sessionTimeout=100000 + porter.cluster.strategy=ZOOKEEPER + porter.cluster.client.url=127.0.0.1:2181 + porter.cluster.client.sessionTimeout=100000 ``` diff --git a/README_EN.md b/README_EN.md index 9e48ce78..7da4bb11 100644 --- a/README_EN.md +++ b/README_EN.md @@ -44,11 +44,11 @@ Find the installation package from the build/distributions list [configuration document](https://github.com/sxfad/porter/blob/master/doc/profiles.md) ``` - node.id=unique id + porter.id=unique id #cluser - node.cluster.strategy=ZOOKEEPER - node.cluster.client.url=127.0.0.1:2181 - node.cluster.client.sessionTimeout=100000 + porter.cluster.strategy=ZOOKEEPER + porter.cluster.client.url=127.0.0.1:2181 + porter.cluster.client.sessionTimeout=100000 ``` ### Run diff --git a/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/CanalClient.java b/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/CanalClient.java index 2775f5d1..fcbcd1d5 100644 --- a/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/CanalClient.java +++ b/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/CanalClient.java @@ -161,7 +161,7 @@ public CanalInstance generate(String destination) { @Override public void sendAlarm(String destination, String msg) { //过滤密码 - msg = StringUtils.trimToEmpty(msg).replaceAll("password=[^,]*," , ""); + msg = StringUtils.trimToEmpty(msg).replaceAll("password=[^,]*,", ""); //master连接不上 if (msg.contains("CanalParseException: java.io.IOException") || msg.contains("java.io.IOException: Received error packet: errno")) { diff --git a/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/KafkaClient.java b/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/KafkaClient.java index e3dc863e..6c9dac17 100644 --- a/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/KafkaClient.java +++ b/common/src/main/java/cn/vbill/middleware/porter/common/client/impl/KafkaClient.java @@ -119,39 +119,43 @@ public void initializePosition(String taskId, String swimlaneId, String position } @Override - public List fetch(FetchCallback callback) { - List msgs = new ArrayList<>(); - if (isStarted()) { - ConsumerRecords results = null; - synchronized (consumer) { - try { - //提交kafka消费进度 - commitLazyPosition(); - results = consumer.poll(pollTimeOut); - } catch (WakeupException e) { - LOGGER.info("trigger kafka consumer WakeupException:{}", getClientInfo()); - consumer.unsubscribe(); - consumer.close(); - consumer = null; - canFetch = new CountDownLatch(1); - } - } - if (null != results && !results.isEmpty()) { - Iterator> it = results.iterator(); - while (it.hasNext()) { + public List fetch(FetchCallback callback) throws InterruptedException { + try { + List msgs = new ArrayList<>(); + if (isStarted()) { + ConsumerRecords results = null; + synchronized (consumer) { try { - ConsumerRecord record = it.next(); - F f = callback.accept(record); - if (null != f) { - msgs.add(f); + //提交kafka消费进度 + commitLazyPosition(); + results = consumer.poll(pollTimeOut); + } catch (WakeupException e) { + LOGGER.info("trigger kafka consumer WakeupException:{}", getClientInfo()); + consumer.unsubscribe(); + consumer.close(); + consumer = null; + canFetch = new CountDownLatch(1); + } + } + if (null != results && !results.isEmpty()) { + Iterator> it = results.iterator(); + while (it.hasNext()) { + try { + ConsumerRecord record = it.next(); + F f = callback.accept(record); + if (null != f) { + msgs.add(f); + } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); } } } + return msgs; + } catch (org.apache.kafka.common.errors.InterruptException e) { + throw new InterruptedException(e.getMessage()); } - return msgs; } @Override diff --git a/common/src/main/java/cn/vbill/middleware/porter/common/config/source/KafkaProduceConfig.java b/common/src/main/java/cn/vbill/middleware/porter/common/config/source/KafkaProduceConfig.java index e6847280..2b38dd09 100644 --- a/common/src/main/java/cn/vbill/middleware/porter/common/config/source/KafkaProduceConfig.java +++ b/common/src/main/java/cn/vbill/middleware/porter/common/config/source/KafkaProduceConfig.java @@ -32,10 +32,10 @@ * @review: zhangkewei[zhang_kw@suixingpay.com]/2018年02月02日 14:24 */ public class KafkaProduceConfig extends SourceConfig { - @Setter @Getter private String servers; //服务器列表 - @Setter @Getter private String group;//自动生成 - @Setter @Getter private String topic;//主题 只能一个 - @Setter @Getter private boolean transaction = true;//不输入 + @Setter @Getter private String servers; //服务器列表 + @Setter @Getter private String group; //自动生成 + @Setter @Getter private String topic; //主题 只能一个 + @Setter @Getter private boolean transaction = true; //不输入 @Setter @Getter private boolean oggJson = true; //是否格式化为ogg json格式 //分片字段名 //schema.表名->字段名1,字段名2 diff --git a/common/src/main/java/cn/vbill/middleware/porter/common/exception/TaskStopTriggerException.java b/common/src/main/java/cn/vbill/middleware/porter/common/exception/TaskStopTriggerException.java index ff2ebfcd..2e9b5210 100644 --- a/common/src/main/java/cn/vbill/middleware/porter/common/exception/TaskStopTriggerException.java +++ b/common/src/main/java/cn/vbill/middleware/porter/common/exception/TaskStopTriggerException.java @@ -69,7 +69,7 @@ public static boolean isMatch(Throwable cause, String sqlType) { } if (cause instanceof CannotGetJdbcConnectionException || cause instanceof UncategorizedSQLException || cause instanceof MySQLSyntaxErrorException || cause instanceof BadSqlGrammarException - || cause instanceof DataIntegrityViolationException) { + || cause instanceof DataIntegrityViolationException || cause instanceof CannotCreateTransactionException) { return true; } diff --git a/common/src/test/java/cn/vbill/middleware/common/client/impl/KafkaClientTest.java b/common/src/test/java/cn/vbill/middleware/common/client/impl/KafkaClientTest.java index 8d1754fc..d8f288e8 100644 --- a/common/src/test/java/cn/vbill/middleware/common/client/impl/KafkaClientTest.java +++ b/common/src/test/java/cn/vbill/middleware/common/client/impl/KafkaClientTest.java @@ -46,7 +46,7 @@ public static void initSource() throws Exception { @Test @Ignore - public void fetch() { + public void fetch() throws InterruptedException { KAFKA_CLIENT.fetch(new ConsumeClient.FetchCallback() { @Override public F accept(O o) { @@ -56,4 +56,4 @@ public F accept(O o) { } }); } -} +} \ No newline at end of file diff --git a/manager/manager-boot/src/main/java/cn/vbill/middleware/porter/manager/ManagerBootApplication.java b/manager/manager-boot/src/main/java/cn/vbill/middleware/porter/manager/ManagerBootApplication.java index 1f246dab..37b6b3fc 100644 --- a/manager/manager-boot/src/main/java/cn/vbill/middleware/porter/manager/ManagerBootApplication.java +++ b/manager/manager-boot/src/main/java/cn/vbill/middleware/porter/manager/ManagerBootApplication.java @@ -1,91 +1,65 @@ -/* - * Copyright ©2018 vbill.cn. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package cn.vbill.middleware.porter.manager; - -import cn.vbill.middleware.porter.manager.config.ManagerConfig; -import cn.vbill.middleware.porter.common.cluster.ClusterProviderProxy; -import org.mybatis.spring.annotation.MapperScan; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.Banner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.web.servlet.ServletComponentScan; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.transaction.annotation.EnableTransactionManagement; - -/** - * @author: zhangkewei[zhang_kw@suixingpay.com] - * @date: 2017年12月15日 14:09 - * @version: V1.0 - * @review: zhangkewei[zhang_kw@suixingpay.com]/2017年12月15日 14:09 - */ -@EnableScheduling -@EnableTransactionManagement -@ServletComponentScan -@SpringBootApplication(scanBasePackages = { "cn.vbill" }) -@MapperScan("cn.vbill.middleware.porter.manager.core.mapper") -public class ManagerBootApplication { - - private static final Logger LOGGER = LoggerFactory.getLogger(ManagerBootApplication.class); - - public static void main(String[] args) throws Exception { - SpringApplication app = new SpringApplication(ManagerBootApplication.class); - app.setBannerMode(Banner.Mode.OFF); - ConfigurableApplicationContext context = app.run(args); - LOGGER.info(print()); - LOGGER.info("ManagerApplication is success!"); - // 注入spring工具类 - ManagerContext.INSTANCE.setApplicationContext(context); - // 获取配置 - ManagerConfig config = context.getBean(ManagerConfig.class); - try { - ClusterProviderProxy.INSTANCE.initialize(config.getCluster()); - } catch (Exception e) { - ClusterProviderProxy.INSTANCE.stop(); - LOGGER.error("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage()); - throw new RuntimeException("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage()); - } - } - - private static String print() { - StringBuffer sb = new StringBuffer(); - sb.append(" \n"); - sb.append(" _ooOoo_\n"); - sb.append(" o8888888o\n"); - sb.append(" 88\" . \"88\n"); - sb.append(" (| -_- |)\n"); - sb.append(" O\\ = /O\n"); - sb.append(" ____/`---'\\____\n"); - sb.append(" .' \\\\| |// `.\n"); - sb.append(" / \\\\||| : |||// \\ \n"); - sb.append(" / _||||| -:- |||||- \\ \n"); - sb.append(" | | \\\\\\ - /// | |\n"); - sb.append(" | \\_| ''\\---/'' | |\n"); - sb.append(" \\ .-\\__ `-` ___/-. /\n"); - sb.append(" ___`. .' /--.--\\ `. . __\n"); - sb.append(" .\"\" '< `.___\\_<|>_/___.' >'\"\".\n"); - sb.append(" | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n"); - sb.append(" \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n"); - sb.append("======`-.____`-.___\\_____/___.-`____.-'======\n"); - sb.append(" `=---='\n"); - sb.append("...................................................\n"); - return sb.toString(); - } -} +/* + * Copyright ©2018 vbill.cn. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package cn.vbill.middleware.porter.manager; + +import cn.vbill.middleware.porter.manager.config.ManagerConfig; +import cn.vbill.middleware.porter.common.cluster.ClusterProviderProxy; +import org.mybatis.spring.annotation.MapperScan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.Banner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.web.servlet.ServletComponentScan; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +/** + * @author: zhangkewei[zhang_kw@suixingpay.com] + * @date: 2017年12月15日 14:09 + * @version: V1.0 + * @review: zhangkewei[zhang_kw@suixingpay.com]/2017年12月15日 14:09 + */ +@EnableScheduling +@EnableTransactionManagement +@ServletComponentScan +@SpringBootApplication(scanBasePackages = { "cn.vbill" }) +@MapperScan("cn.vbill.middleware.porter.manager.core.mapper") +public class ManagerBootApplication { + + private static final Logger LOGGER = LoggerFactory.getLogger(ManagerBootApplication.class); + + public static void main(String[] args) throws Exception { + SpringApplication app = new SpringApplication(ManagerBootApplication.class); + app.setBannerMode(Banner.Mode.OFF); + ConfigurableApplicationContext context = app.run(args); + LOGGER.info("ManagerApplication is success!"); + // 注入spring工具类 + ManagerContext.INSTANCE.setApplicationContext(context); + // 获取配置 + ManagerConfig config = context.getBean(ManagerConfig.class); + try { + ClusterProviderProxy.INSTANCE.initialize(config.getCluster()); + } catch (Exception e) { + ClusterProviderProxy.INSTANCE.stop(); + LOGGER.error("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage()); + throw new RuntimeException("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/manager/manager-boot/src/main/resources/application-sample.properties b/manager/manager-boot/src/main/resources/application-sample.properties deleted file mode 100644 index 326e9eef..00000000 --- a/manager/manager-boot/src/main/resources/application-sample.properties +++ /dev/null @@ -1,55 +0,0 @@ -logging.level.root=INFO - -#集群配置 -manager.cluster.strategy=ZOOKEEPER -manager.cluster.client.url=127.0.0.1:2181 -manager.cluster.client.sessionTimeout=100000 - -#数据库连接信息 -spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/ds_data?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true -spring.datasource.druid.username=test -spring.datasource.druid.password=111111 - -#属性类型是字符串,通过别名的方式配置扩展插件常用的插件有:监控统计用的filter:stat 日志用的filter:log4j 防御sql注入的filter:wall -spring.datasource.druid.filters=stat -#初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -spring.datasource.druid.initialSize=5 -#最小连接池数量 -spring.datasource.druid.minIdle=5 -#最大连接池数量 -spring.datasource.druid.maxActive=10 -#有两个含义:1) Destroy线程会检测连接的间隔时间 2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明 -timeBetweenEvictionRunsMillis=20000 -#配置一个连接在池中最小生存的时间,单位是毫秒 -minEvictableIdleTimeMillis=300000 -#用来检测连接是否有效的sql,要求是一个查询语句 如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会其作用。 -spring.datasource.druid.validationQuery=SELECT '1' FROM DUAL -#建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis 执行validationQuery检测连接是否有效 -spring.datasource.druid.testWhileIdle=true -#申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 -spring.datasource.druid.testOnBorrow=false -#归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -spring.datasource.druid.testOnReturn=false -#要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。 在Druid中,不会存在Oracle下PSCache占用内存过多的问题 可以把这个数值配置大一些,比如说100 -spring.datasource.druid.maxOpenPreparedStatements=100 -#关闭 abanded 连接时输出错误日志 -spring.datasource.druid.logAbandoned=true - -#mybatis配置 -mybatis.typeAliasesPackage=cn.vbill.middleware.porter.manager.core.entity -mybatis.mapperLocations=classpath:mapper/*.xml - -#sql日志级别 -logging.level.cn.vbill.middleware.porter.manager.core.mapper=debug - -#端口 -server.port=8081 - -#项目前缀路径 -server.context-path=/api - -# mysql 时区 -spring.jackson.time-zone=GMT+8 - -#log -#logging.file=${app.home}/logs/manager-boot.log \ No newline at end of file diff --git a/manager/manager-boot/src/main/resources/application.properties b/manager/manager-boot/src/main/resources/application.properties new file mode 100644 index 00000000..514577fc --- /dev/null +++ b/manager/manager-boot/src/main/resources/application.properties @@ -0,0 +1,28 @@ +logging.level.root=INFO +#端口 +server.port=8081 +#项目前缀路径 +server.context-path=/api + +#集群配置 +manager.cluster.strategy=ZOOKEEPER +manager.cluster.client.url=127.0.0.1:2181 +manager.cluster.client.sessionTimeout=100000 + +#数据库连接信息 +spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/ds_data?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true +spring.datasource.druid.username=test +spring.datasource.druid.password=111111 + +#mybatis配置 +mybatis.typeAliasesPackage=cn.vbill.middleware.porter.manager.core.entity +mybatis.mapperLocations=classpath:mapper/*.xml + +#sql日志级别 +logging.level.cn.vbill.middleware.porter.manager.core.mapper=debug + +# mysql 时区 +spring.jackson.time-zone=GMT+8 + +#log +logging.file=${app.home}/logs/manager-boot.log \ No newline at end of file diff --git a/porter/porter-boot/src/main/java/cn/vbill/middleware/porter/boot/config/NodeConfig.java b/porter/porter-boot/src/main/java/cn/vbill/middleware/porter/boot/config/NodeConfig.java index 64b1d786..cbe67dd8 100644 --- a/porter/porter-boot/src/main/java/cn/vbill/middleware/porter/boot/config/NodeConfig.java +++ b/porter/porter-boot/src/main/java/cn/vbill/middleware/porter/boot/config/NodeConfig.java @@ -35,7 +35,7 @@ * @version: V1.0 * @review: zhangkewei[zhang_kw@suixingpay.com]/2017年12月19日 10:14 */ -@ConfigurationProperties(prefix = "node") +@ConfigurationProperties(prefix = "porter") @Setter @Getter @Component public class NodeConfig { diff --git a/porter/porter-boot/src/main/resources/application-sample.properties b/porter/porter-boot/src/main/resources/application-sample.properties deleted file mode 100644 index 2d283c89..00000000 --- a/porter/porter-boot/src/main/resources/application-sample.properties +++ /dev/null @@ -1,26 +0,0 @@ -logging.level.root=info - -#告警配置 -node.alert.strategy=EMAIL -node.alert.client.host=smtphm.qq.com -node.alert.client.username=test@qq.com -node.alert.client.password=123456 -node.alert.client.smtpAuth=true -node.alert.client.smtpStarttlsEnable=true -node.alert.client.smtpStarttlsRequired=false - -node.alert.receiver[0].realName=样板告警人 -node.alert.receiver[0].email=test@qq.com -node.alert.receiver[0].phone=13800138000 - -#节点描述 -node.id=2019 - -#统计信息 -node.statistic.upload=false - -#集群配置 -node.cluster.strategy=ZOOKEEPER -node.cluster.client.url=127.0.0.1:2181 -node.cluster.client.sessionTimeout=100000 - diff --git a/porter/porter-boot/src/main/resources/application.properties b/porter/porter-boot/src/main/resources/application.properties index 6a543fc2..36286822 100644 --- a/porter/porter-boot/src/main/resources/application.properties +++ b/porter/porter-boot/src/main/resources/application.properties @@ -1,10 +1,38 @@ +#加载tasks/sample 文件目录下的任务配置文件 spring.profiles.active = sample -#logging.level.org.apache.kafka=INFO -#logging.level.org.apache.zookeeper=INFO -#logging.level.org.apache.commons.beanutils=INFO -#logging.level.org.springframework=INFO -#logging.level.org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator=DEBUG - -#logging.file=${app.home}/logs/data-node.log -server.port=8888 -#server.context-path=/api +#节点描述 +porter.id=1 + +#统计信息 +porter.statistic.upload=false + +#集群配置 +porter.cluster.strategy=ZOOKEEPER +porter.cluster.client.url=127.0.0.1:2181 +porter.cluster.client.sessionTimeout=100000 + + + +#告警配置 +porter.alert.strategy=EMAIL +porter.alert.client.host=smtphm.qq.com +porter.alert.client.username=test@qq.com +porter.alert.client.password=123456 +porter.alert.client.smtpAuth=true +porter.alert.client.smtpStarttlsEnable=true +porter.alert.client.smtpStarttlsRequired=false + +#porter.alert.receiver[0].realName=样板告警人 +#porter.alert.receiver[0].email=test@qq.com +#porter.alert.receiver[0].phone=13800138000 + + + +#状态检查web接口 +#server.port=8080 +server.context-path=/api + +logging.level.root=info +logging.level.org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator=DEBUG +logging.file=${app.home}/logs/data-node.log + diff --git a/porter/porter-boot/src/main/resources/tasks/sample/task.sample.properties b/porter/porter-boot/src/main/resources/tasks/sample/task.sample.properties index 3e9fe8b1..ef693fe8 100644 --- a/porter/porter-boot/src/main/resources/tasks/sample/task.sample.properties +++ b/porter/porter-boot/src/main/resources/tasks/sample/task.sample.properties @@ -1,39 +1,42 @@ #任务配置 +#配置映射类:TaskConfig #一期从配置文件读取,后期从管理中心刷新 -node.task[0].taskId=1 +#porter.task[0].taskId=1 -node.task[0].receiver[0].realName=真实姓名 -node.task[0].receiver[0].email=123456@qq.com -node.task[0].receiver[0].phone=13800138000 +#porter.task[0].receiver[0].realName=真实姓名 +#porter.task[0].receiver[0].email=123456@qq.com +#porter.task[0].receiver[0].phone=13800138000 +#源端配置 +#porter.task[0].consumer.consumerName=CanalFetch +#porter.task[0].consumer.converter=canalRow +#porter.task[0].consumer.source.sourceType=CANAL +#porter.task[0].consumer.source.slaveId=2018 +#porter.task[0].consumer.source.address=127.0.0.1:3306 +#porter.task[0].consumer.source.database=test_from +#porter.task[0].consumer.source.username=canal +#porter.task[0].consumer.source.password=123456 +#porter.task[0].consumer.source.filter=test_from\.(t1|b_1) +#porter.task[0].consumer.includes=test_from.t1,test_from.b_1 +#porter.task[0].consumer.metaSource.sourceName=testFrom -node.task[0].consumer.consumerName=CanalFetch -node.task[0].consumer.converter=canalRow -node.task[0].consumer.source.sourceType=CANAL -node.task[0].consumer.source.slaveId=2018 -node.task[0].consumer.source.address=127.0.0.1:3306 -node.task[0].consumer.source.database=test_from -node.task[0].consumer.source.username=canal -node.task[0].consumer.source.password=123456 -node.task[0].consumer.source.filter=test_from\.(t1|b_1) -node.task[0].consumer.includes=test_from.t1,test_from.b_1 -node.task[0].consumer.metaSource.sourceName=testFrom +#自定义过滤插件配置 +#porter.task[0].consumer.eventProcessor.className=com.suixingpay.test +#porter.task[0].consumer.eventProcessor.content=~/A.class -#node.task[0].consumer.eventProcessor.className=com.suixingpay.test -#node.task[0].consumer.eventProcessor.content=~/A.class +#目标端配置 +#porter.task[0].loader.loaderName=JdbcBatch +#porter.task[0].loader.source.dbType=ORACLE +#porter.task[0].loader.source.url=jdbc:oracle:thin:@127.0.0.1:1521:xe +#porter.task[0].loader.source.userName=oracle +#porter.task[0].loader.source.password=123456 -node.task[0].loader.loaderName=JdbcBatch -node.task[0].loader.source.sourceName=testTo -#node.task[0].loader.source.dbType=ORACLE -#node.task[0].loader.source.url=jdbc:oracle:thin:@127.0.0.1:1521:xe -#node.task[0].loader.source.userName=oracle -#node.task[0].loader.source.password=123456 - -node.task[0].mapper[0].schema=test_from,test_to -node.task[0].mapper[0].table=t1,t2 -#node.task[0].mapper[0].updateDate=updated_time,updated_time -#node.task[0].mapper[0].column.a=b +#字段映射配置 +#porter.task[0].mapper[0].auto=false +#porter.task[0].mapper[0].schema=test_from,test_to +#porter.task[0].mapper[0].table=t1,t2 +#porter.task[0].mapper[0].column.a=b diff --git a/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/loader/DataLoader.java b/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/loader/DataLoader.java index c9dbbb35..c893f43a 100644 --- a/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/loader/DataLoader.java +++ b/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/loader/DataLoader.java @@ -69,7 +69,7 @@ default boolean canStart() { * @param bucket * @return key : true 会提交同步点, false不会提交同步点 ; value : 影响行数 */ - Pair> load(ETLBucket bucket) throws TaskStopTriggerException; + Pair> load(ETLBucket bucket) throws TaskStopTriggerException, InterruptedException; void setLoadClient(LoadClient c); void setMetaQueryClient(MetaQueryClient c); diff --git a/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/task/AbstractStageJob.java b/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/task/AbstractStageJob.java index 22fb71ce..c52539ca 100644 --- a/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/task/AbstractStageJob.java +++ b/porter/porter-core/src/main/java/cn/vbill/middleware/porter/core/task/AbstractStageJob.java @@ -21,8 +21,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -35,7 +37,7 @@ public abstract class AbstractStageJob implements StageJob { protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); protected static final int LOGIC_THREAD_SIZE = 5; - private AtomicBoolean stat = new AtomicBoolean(false); + private final AtomicBoolean stat = new AtomicBoolean(false); private final Thread loopService; private final ThreadFactory threadFactory; //任务退出信号量,为了保证优雅关机时内存中的数据处理完毕 @@ -44,6 +46,7 @@ public abstract class AbstractStageJob implements StageJob { //管道无数据线程等待间隙 private static final long DEFAULT_THREAD_WAIT_SPAN = 2000; private final long threadWaitSpan; + private final CountDownLatch stopJobLatch = new CountDownLatch(1); public AbstractStageJob(String baseThreadName) { this(baseThreadName, DEFAULT_THREAD_WAIT_SPAN); @@ -86,6 +89,7 @@ public void stop() { LOGGER.debug("源队列为空,发送线程中断信号"); } loopService.interrupt(); + stopJobLatch.await(10, TimeUnit.SECONDS); } catch (Throwable e) { } finally { try { @@ -103,7 +107,7 @@ private class LoopService implements Runnable { @Override public void run() { //如果线程没有中断信号并且服务可用,持续执行 - while (!Thread.currentThread().isInterrupted() && stat.get()) { + while (!Thread.currentThread().isInterrupted() && getWorkingStat()) { try { stopSignal.acquire(); LOGGER.debug("源队列为空,线程恢复执行."); @@ -113,6 +117,8 @@ public void run() { LOGGER.debug("源队列为空,线程进入等待."); Thread.sleep(threadWaitSpan); } catch (InterruptedException e) { + LOGGER.info("停止任务线程LoopService逻辑"); + stopJobLatch.countDown(); //如果线程有中断信号,退出线程 break; } @@ -123,4 +129,7 @@ protected ThreadFactory getThreadFactory() { return threadFactory; } + public final boolean getWorkingStat() { + return stat.get(); + } } diff --git a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/BaseJdbcLoader.java b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/BaseJdbcLoader.java index 149bd878..a04b27a5 100644 --- a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/BaseJdbcLoader.java +++ b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/BaseJdbcLoader.java @@ -19,6 +19,8 @@ import cn.vbill.middleware.porter.common.exception.TaskDataException; import cn.vbill.middleware.porter.common.exception.TaskStopTriggerException; +import cn.vbill.middleware.porter.core.event.etl.ETLBucket; +import cn.vbill.middleware.porter.core.loader.SubmitStatObject; import com.alibaba.fastjson.JSONObject; import cn.vbill.middleware.porter.common.client.impl.JDBCClient; import cn.vbill.middleware.porter.common.db.SqlTemplate; @@ -243,4 +245,16 @@ protected static Map getNewColumns(ETLRow row) { k -> new LinkedHashMap()); } } + + @Override + public Pair> load(ETLBucket bucket) throws TaskStopTriggerException, InterruptedException { + try { + return doLoad(bucket); + } catch (TaskStopTriggerException e) { + if (e.getMessage().contains("interrupt") && e.getMessage().contains("CannotCreateTransactionException")) throw new InterruptedException(e.getMessage()); + throw e; + } + } + + public abstract Pair> doLoad(ETLBucket bucket) throws TaskStopTriggerException; } diff --git a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcBatchLoader.java b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcBatchLoader.java index df08dbfc..48886a34 100644 --- a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcBatchLoader.java +++ b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcBatchLoader.java @@ -42,7 +42,7 @@ protected String getPluginName() { } @Override - public Pair> load(ETLBucket bucket) throws TaskStopTriggerException { + public Pair> doLoad(ETLBucket bucket) throws TaskStopTriggerException { LOGGER.info("start loading bucket:{},size:{}", bucket.getSequence(), bucket.getRows().size()); List affectRow = new ArrayList<>(); for (List rows : bucket.getBatchRows()) { diff --git a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcPrintSqlLoader.java b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcPrintSqlLoader.java index 4c12e1b2..044ab6d9 100644 --- a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcPrintSqlLoader.java +++ b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcPrintSqlLoader.java @@ -44,7 +44,7 @@ protected String getPluginName() { } @Override - public Pair> load(ETLBucket bucket) throws TaskStopTriggerException { + public Pair> doLoad(ETLBucket bucket) throws TaskStopTriggerException { LOGGER.info("start loading bucket:{},size:{}", bucket.getSequence(), bucket.getRows().size()); List affectRow = new ArrayList<>(); for (ETLRow row : bucket.getRows()) { diff --git a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcSingleLoader.java b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcSingleLoader.java index 29ff0e8a..5b47eee0 100644 --- a/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcSingleLoader.java +++ b/porter/porter-plugin/jdbc-loader/src/main/java/cn/vbill/middleware/porter/plugin/loader/jdbc/JdbcSingleLoader.java @@ -42,7 +42,7 @@ protected String getPluginName() { } @Override - public Pair> load(ETLBucket bucket) throws TaskStopTriggerException { + public Pair> doLoad(ETLBucket bucket) throws TaskStopTriggerException { LOGGER.info("start loading bucket:{},size:{}", bucket.getSequence(), bucket.getRows().size()); List affectRow = new ArrayList<>(); for (ETLRow row : bucket.getRows()) { diff --git a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/extract/ExtractJob.java b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/extract/ExtractJob.java index 3447561e..580af83b 100644 --- a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/extract/ExtractJob.java +++ b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/extract/ExtractJob.java @@ -112,7 +112,7 @@ protected void loopLogic() throws InterruptedException { "extract MessageEvent error" + e.getMessage()); LOGGER.error("extract MessageEvent error!", e); } - } while (null != events); + } while (null != events && getWorkingStat()); } @Override diff --git a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/load/LoadJob.java b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/load/LoadJob.java index a8b97187..c98da7ce 100644 --- a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/load/LoadJob.java +++ b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/load/LoadJob.java @@ -93,7 +93,7 @@ protected void doStart() throws Exception { } @Override - protected void loopLogic() { + protected void loopLogic() throws InterruptedException { //只要队列有消息,持续读取 ETLBucket bucket = null; do { @@ -135,6 +135,8 @@ protected void loopLogic() { loadResult.getRight().clear(); bucket.markUnUsed(); } + } catch (InterruptedException e) { + throw e; } catch (TaskStopTriggerException stopException) { LOGGER.error("Load ETLRow error", stopException); stopException.printStackTrace(); @@ -149,7 +151,7 @@ protected void loopLogic() { "Load ETLRow error" + e.getMessage()); LOGGER.error("Load ETLRow error!", e); } - } while (null != bucket && !work.triggerStopped()); //数据不为空并且当前任务没有触发停止告警 + } while (null != bucket && !work.triggerStopped() && getWorkingStat()); //数据不为空并且当前任务没有触发停止告警 } @Override public ETLBucket output() throws Exception { diff --git a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/select/SelectJob.java b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/select/SelectJob.java index 062b4a9a..291f60d0 100644 --- a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/select/SelectJob.java +++ b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/select/SelectJob.java @@ -103,7 +103,7 @@ protected void loopLogic() throws InterruptedException { NodeLog.upload(NodeLog.LogType.TASK_LOG, work.getTaskId(), consumer.getSwimlaneId(), "fetch MessageEvent error" + e.getMessage()); LOGGER.error("fetch MessageEvent error!", e); } - } while (null != events && !events.isEmpty()); + } while (null != events && !events.isEmpty() && getWorkingStat()); try { //退出轮训循环,判断累计查不到数据时间,按照配置发送邮件告警 diff --git a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/transform/TransformJob.java b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/transform/TransformJob.java index 7dfd1517..26f0d738 100644 --- a/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/transform/TransformJob.java +++ b/porter/porter-task/src/main/java/cn/vbill/middleware/porter/task/transform/TransformJob.java @@ -74,7 +74,7 @@ protected void doStart() { } @Override - protected void loopLogic() { + protected void loopLogic() throws InterruptedException { //只要队列有消息,持续读取 ETLBucket bucket = null; do { @@ -100,10 +100,12 @@ protected void loopLogic() { carrier.push(inThreadBucket.getSequence(), result); carrier.printState(); } + } catch (InterruptedException e) { + throw e; } catch (Throwable e) { LOGGER.error("transform ETLBucket error!", e); } - } while (null != bucket); + } while (null != bucket && getWorkingStat()); } @Override