Skip to content

Commit

Permalink
修复任务停止时数据库连接可能先于业务逻辑清理的BUG
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangkewei committed Nov 8, 2018
1 parent 499e187 commit e9dc699
Show file tree
Hide file tree
Showing 24 changed files with 249 additions and 266 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Porter始于2017年,提供数据同步功能,但并不仅仅局限于数据
### 从源码编译
```
git clone https://github.com/sxfad/porter.git
cd vbill-proter
cd proter
git checkout 版本
gradle build
从build/distributions目录查找安装包
Expand All @@ -44,11 +44,11 @@ gradle build
[配置文档](https://github.com/sxfad/porter/blob/master/doc/profiles.md)

```
node.id=节点编号,在集群中唯一
porter.id=节点编号,在集群中唯一
#集群配置
node.cluster.strategy=ZOOKEEPER
node.cluster.client.url=127.0.0.1:2181
node.cluster.client.sessionTimeout=100000
porter.cluster.strategy=ZOOKEEPER
porter.cluster.client.url=127.0.0.1:2181
porter.cluster.client.sessionTimeout=100000
```


Expand Down
8 changes: 4 additions & 4 deletions README_EN.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Find the installation package from the build/distributions list
[configuration document](https://github.com/sxfad/porter/blob/master/doc/profiles.md)

```
node.id=unique id
porter.id=unique id
#cluser
node.cluster.strategy=ZOOKEEPER
node.cluster.client.url=127.0.0.1:2181
node.cluster.client.sessionTimeout=100000
porter.cluster.strategy=ZOOKEEPER
porter.cluster.client.url=127.0.0.1:2181
porter.cluster.client.sessionTimeout=100000
```

### Run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public CanalInstance generate(String destination) {
@Override
public void sendAlarm(String destination, String msg) {
//过滤密码
msg = StringUtils.trimToEmpty(msg).replaceAll("password=[^,]*," , "");
msg = StringUtils.trimToEmpty(msg).replaceAll("password=[^,]*,", "");
//master连接不上
if (msg.contains("CanalParseException: java.io.IOException")
|| msg.contains("java.io.IOException: Received error packet: errno")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,39 +119,43 @@ public void initializePosition(String taskId, String swimlaneId, String position
}

@Override
public <F, O> List<F> fetch(FetchCallback<F, O> callback) {
List<F> msgs = new ArrayList<>();
if (isStarted()) {
ConsumerRecords<String, String> results = null;
synchronized (consumer) {
try {
//提交kafka消费进度
commitLazyPosition();
results = consumer.poll(pollTimeOut);
} catch (WakeupException e) {
LOGGER.info("trigger kafka consumer WakeupException:{}", getClientInfo());
consumer.unsubscribe();
consumer.close();
consumer = null;
canFetch = new CountDownLatch(1);
}
}
if (null != results && !results.isEmpty()) {
Iterator<ConsumerRecord<String, String>> it = results.iterator();
while (it.hasNext()) {
public <F, O> List<F> fetch(FetchCallback<F, O> callback) throws InterruptedException {
try {
List<F> msgs = new ArrayList<>();
if (isStarted()) {
ConsumerRecords<String, String> results = null;
synchronized (consumer) {
try {
ConsumerRecord<String, String> record = it.next();
F f = callback.accept(record);
if (null != f) {
msgs.add(f);
//提交kafka消费进度
commitLazyPosition();
results = consumer.poll(pollTimeOut);
} catch (WakeupException e) {
LOGGER.info("trigger kafka consumer WakeupException:{}", getClientInfo());
consumer.unsubscribe();
consumer.close();
consumer = null;
canFetch = new CountDownLatch(1);
}
}
if (null != results && !results.isEmpty()) {
Iterator<ConsumerRecord<String, String>> it = results.iterator();
while (it.hasNext()) {
try {
ConsumerRecord<String, String> record = it.next();
F f = callback.accept(record);
if (null != f) {
msgs.add(f);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
return msgs;
} catch (org.apache.kafka.common.errors.InterruptException e) {
throw new InterruptedException(e.getMessage());
}
return msgs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
* @review: zhangkewei[[email protected]]/2018年02月02日 14:24
*/
public class KafkaProduceConfig extends SourceConfig {
@Setter @Getter private String servers; //服务器列表
@Setter @Getter private String group;//自动生成
@Setter @Getter private String topic;//主题 只能一个
@Setter @Getter private boolean transaction = true;//不输入
@Setter @Getter private String servers; //服务器列表
@Setter @Getter private String group; //自动生成
@Setter @Getter private String topic; //主题 只能一个
@Setter @Getter private boolean transaction = true; //不输入
@Setter @Getter private boolean oggJson = true; //是否格式化为ogg json格式
//分片字段名
//schema.表名->字段名1,字段名2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static boolean isMatch(Throwable cause, String sqlType) {
}
if (cause instanceof CannotGetJdbcConnectionException || cause instanceof UncategorizedSQLException
|| cause instanceof MySQLSyntaxErrorException || cause instanceof BadSqlGrammarException
|| cause instanceof DataIntegrityViolationException) {
|| cause instanceof DataIntegrityViolationException || cause instanceof CannotCreateTransactionException) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void initSource() throws Exception {

@Test
@Ignore
public void fetch() {
public void fetch() throws InterruptedException {
KAFKA_CLIENT.fetch(new ConsumeClient.FetchCallback<Object, Object>() {
@Override
public <F, O> F accept(O o) {
Expand All @@ -56,4 +56,4 @@ public <F, O> F accept(O o) {
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,91 +1,65 @@
/*
* Copyright ©2018 vbill.cn.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package cn.vbill.middleware.porter.manager;

import cn.vbill.middleware.porter.manager.config.ManagerConfig;
import cn.vbill.middleware.porter.common.cluster.ClusterProviderProxy;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
* @author: zhangkewei[[email protected]]
* @date: 2017年12月15日 14:09
* @version: V1.0
* @review: zhangkewei[[email protected]]/2017年12月15日 14:09
*/
@EnableScheduling
@EnableTransactionManagement
@ServletComponentScan
@SpringBootApplication(scanBasePackages = { "cn.vbill" })
@MapperScan("cn.vbill.middleware.porter.manager.core.mapper")
public class ManagerBootApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(ManagerBootApplication.class);

public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(ManagerBootApplication.class);
app.setBannerMode(Banner.Mode.OFF);
ConfigurableApplicationContext context = app.run(args);
LOGGER.info(print());
LOGGER.info("ManagerApplication is success!");
// 注入spring工具类
ManagerContext.INSTANCE.setApplicationContext(context);
// 获取配置
ManagerConfig config = context.getBean(ManagerConfig.class);
try {
ClusterProviderProxy.INSTANCE.initialize(config.getCluster());
} catch (Exception e) {
ClusterProviderProxy.INSTANCE.stop();
LOGGER.error("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage());
throw new RuntimeException("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage());
}
}

private static String print() {
StringBuffer sb = new StringBuffer();
sb.append(" \n");
sb.append(" _ooOoo_\n");
sb.append(" o8888888o\n");
sb.append(" 88\" . \"88\n");
sb.append(" (| -_- |)\n");
sb.append(" O\\ = /O\n");
sb.append(" ____/`---'\\____\n");
sb.append(" .' \\\\| |// `.\n");
sb.append(" / \\\\||| : |||// \\ \n");
sb.append(" / _||||| -:- |||||- \\ \n");
sb.append(" | | \\\\\\ - /// | |\n");
sb.append(" | \\_| ''\\---/'' | |\n");
sb.append(" \\ .-\\__ `-` ___/-. /\n");
sb.append(" ___`. .' /--.--\\ `. . __\n");
sb.append(" .\"\" '< `.___\\_<|>_/___.' >'\"\".\n");
sb.append(" | | : `- \\`.;`\\ _ /`;.`/ - ` : | |\n");
sb.append(" \\ \\ `-. \\_ __\\ /__ _/ .-` / /\n");
sb.append("======`-.____`-.___\\_____/___.-`____.-'======\n");
sb.append(" `=---='\n");
sb.append("...................................................\n");
return sb.toString();
}
}
/*
* Copyright ©2018 vbill.cn.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package cn.vbill.middleware.porter.manager;

import cn.vbill.middleware.porter.manager.config.ManagerConfig;
import cn.vbill.middleware.porter.common.cluster.ClusterProviderProxy;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
* @author: zhangkewei[[email protected]]
* @date: 2017年12月15日 14:09
* @version: V1.0
* @review: zhangkewei[[email protected]]/2017年12月15日 14:09
*/
@EnableScheduling
@EnableTransactionManagement
@ServletComponentScan
@SpringBootApplication(scanBasePackages = { "cn.vbill" })
@MapperScan("cn.vbill.middleware.porter.manager.core.mapper")
public class ManagerBootApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(ManagerBootApplication.class);

public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(ManagerBootApplication.class);
app.setBannerMode(Banner.Mode.OFF);
ConfigurableApplicationContext context = app.run(args);
LOGGER.info("ManagerApplication is success!");
// 注入spring工具类
ManagerContext.INSTANCE.setApplicationContext(context);
// 获取配置
ManagerConfig config = context.getBean(ManagerConfig.class);
try {
ClusterProviderProxy.INSTANCE.initialize(config.getCluster());
} catch (Exception e) {
ClusterProviderProxy.INSTANCE.stop();
LOGGER.error("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage());
throw new RuntimeException("集群模块初始化失败, 数据同步管理后台退出!error:" + e.getMessage());
}
}
}

This file was deleted.

Loading

0 comments on commit e9dc699

Please sign in to comment.