Skip to content

Commit 80860c2

Browse files
committedNov 17, 2020
bug fix and write speed optimization
1 parent 3fd9c0a commit 80860c2

File tree

8 files changed

+342
-98
lines changed

8 files changed

+342
-98
lines changed
 
39.9 KB
Loading

‎kuduwriter/doc/kuduwirter.md

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# datax-kudu-plugins
2+
datax kudu的writer插件
3+
4+
5+
6+
eg:
7+
8+
```json
9+
{
10+
"name": "kuduwriter",
11+
"parameter": {
12+
"kuduConfig": {
13+
"kudu.master_addresses": "***",
14+
"timeout": 60000,
15+
"sessionTimeout": 60000
16+
17+
},
18+
"table": "",
19+
"replicaCount": 3,
20+
"truncate": false,
21+
"writeMode": "upsert",
22+
"partition": {
23+
"range": {
24+
"column1": [
25+
{
26+
"lower": "2020-08-25",
27+
"upper": "2020-08-26"
28+
},
29+
{
30+
"lower": "2020-08-26",
31+
"upper": "2020-08-27"
32+
},
33+
{
34+
"lower": "2020-08-27",
35+
"upper": "2020-08-28"
36+
}
37+
]
38+
},
39+
"hash": {
40+
"column": [
41+
"column1"
42+
],
43+
"number": 3
44+
}
45+
},
46+
"column": [
47+
{
48+
"index": 0,
49+
"name": "c1",
50+
"type": "string",
51+
"primaryKey": true
52+
},
53+
{
54+
"index": 1,
55+
"name": "c2",
56+
"type": "string",
57+
"compress": "DEFAULT_COMPRESSION",
58+
"encoding": "AUTO_ENCODING",
59+
"comment": "注解xxxx"
60+
}
61+
],
62+
"batchSize": 1024,
63+
"bufferSize": 2048,
64+
"skipFail": false,
65+
"encoding": "UTF-8"
66+
}
67+
}
68+
```
69+
70+
必须参数:
71+
72+
```json
73+
"writer": {
74+
"name": "kuduwriter",
75+
"parameter": {
76+
"kuduConfig": {
77+
"kudu.master_addresses": "***"
78+
},
79+
"table": "***",
80+
"column": [
81+
{
82+
"name": "c1",
83+
"type": "string",
84+
"primaryKey": true
85+
},
86+
{
87+
"name": "c2",
88+
"type": "string",
89+
},
90+
{
91+
"name": "c3",
92+
"type": "string"
93+
},
94+
{
95+
"name": "c4",
96+
"type": "string"
97+
}
98+
]
99+
}
100+
}
101+
```
102+
103+
主键列请写到最前面
104+
105+
106+
107+
![image-20200901193148188](./image-20200901193148188.png)
108+
109+
##### 配置列表
110+
111+
| name | default | description | 是否必须 |
112+
| -------------- | ------------------- | ------------------------------------------------------------ | -------- |
113+
| kuduConfig | | kudu配置 (kudu.master_addresses等) ||
114+
| table | | 导入目标表名 ||
115+
| partition | | 分区 ||
116+
| column | |||
117+
| name | | 列名 ||
118+
| type | string | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 ||
119+
| index | 升序排列 | 列索引位置(要么全部列都写,要么都不写),如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) ||
120+
| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 ||
121+
| compress | DEFAULT_COMPRESSION | 压缩格式 ||
122+
| encoding | AUTO_ENCODING | 编码 ||
123+
| replicaCount | 3 | 保留副本个数 ||
124+
| hash | | hash分区 ||
125+
| number | 3 | hash分区个数 ||
126+
| range | | range分区 ||
127+
| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) ||
128+
| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) ||
129+
| truncate | false | 是否清空表,本质上是删表重建 ||
130+
| writeMode | upsert | upsert,insert,update ||
131+
| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) ||
132+
| bufferSize | 3072 | 缓冲区大小 ||
133+
| skipFail | false | 是否跳过插入不成功的数据 ||
134+
| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms ||
135+
| sessionTimeout | 60000 | session超时时间 单位:ms ||
136+
137+
138+
139+
140+
141+
142+
143+

‎kuduwriter/src/main/assembly/package.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@
1414
<include>plugin.json</include>
1515
<include>plugin_job_template.json</include>
1616
</includes>
17-
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
17+
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
1818
</fileSet>
1919
<fileSet>
2020
<directory>target/</directory>
2121
<includes>
22-
<include>kudu11xwriter-0.0.1-SNAPSHOT.jar</include>
22+
<include>kuduwriter-0.0.1-SNAPSHOT.jar</include>
2323
</includes>
24-
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
24+
<outputDirectory>plugin/writer/kuduwriter</outputDirectory>
2525
</fileSet>
2626
</fileSets>
2727

2828
<dependencySets>
2929
<dependencySet>
3030
<useProjectArtifact>false</useProjectArtifact>
31-
<outputDirectory>plugin/writer/kudu11xwriter/libs</outputDirectory>
31+
<outputDirectory>plugin/writer/kuduwriter/libs</outputDirectory>
3232
<scope>runtime</scope>
3333
</dependencySet>
3434
</dependencySets>

‎kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java

+90-13
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@
1010
import org.apache.kudu.Schema;
1111
import org.apache.kudu.Type;
1212
import org.apache.kudu.client.*;
13+
import org.apache.kudu.shaded.org.checkerframework.checker.units.qual.K;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
16+
import sun.rmi.runtime.Log;
1517

1618
import java.nio.charset.Charset;
1719
import java.util.*;
20+
import java.util.concurrent.SynchronousQueue;
21+
import java.util.concurrent.ThreadFactory;
22+
import java.util.concurrent.ThreadPoolExecutor;
23+
import java.util.concurrent.TimeUnit;
1824
import java.util.concurrent.atomic.AtomicInteger;
1925

2026
/**
@@ -47,10 +53,10 @@ public static KuduClient getKuduClient(String kuduConfig) {
4753
Map<String, Object> conf = Kudu11xHelper.getKuduConfiguration(kuduConfig);
4854
KuduClient kuduClient = null;
4955
try {
50-
String masterAddress = (String)conf.get(Key.KUDU_MASTER);
56+
String masterAddress = (String) conf.get(Key.KUDU_MASTER);
5157
kuduClient = new KuduClient.KuduClientBuilder(masterAddress)
5258
.defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT))
53-
.defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT))
59+
.defaultOperationTimeoutMs((Long) conf.get(Key.KUDU_SESSION_TIMEOUT))
5460
.build();
5561
} catch (Exception e) {
5662
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
@@ -106,17 +112,17 @@ public static void createTable(Configuration configuration) {
106112
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e);
107113
} finally {
108114
AtomicInteger i = new AtomicInteger(5);
109-
while (i.get()>0) {
115+
while (i.get() > 0) {
110116
try {
111-
if (kuduClient.isCreateTableDone(tableName)){
117+
if (kuduClient.isCreateTableDone(tableName)) {
112118
Kudu11xHelper.closeClient(kuduClient);
113-
LOG.info("Table "+ tableName +" is created!");
119+
LOG.info("Table " + tableName + " is created!");
114120
break;
115121
}
116122
i.decrementAndGet();
117123
LOG.error("timeout!");
118-
} catch (KuduException e) {
119-
LOG.info("Wait for the table to be created..... "+i);
124+
} catch (KuduException e) {
125+
LOG.info("Wait for the table to be created..... " + i);
120126
try {
121127
Thread.sleep(1000L);
122128
} catch (InterruptedException ex) {
@@ -135,6 +141,44 @@ public static void createTable(Configuration configuration) {
135141
}
136142
}
137143

144+
public static ThreadPoolExecutor createRowAddThreadPool(int coreSize) {
145+
return new ThreadPoolExecutor(coreSize,
146+
coreSize,
147+
60L,
148+
TimeUnit.SECONDS,
149+
new SynchronousQueue<Runnable>(),
150+
new ThreadFactory() {
151+
private final ThreadGroup group = System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup();
152+
private final AtomicInteger threadNumber = new AtomicInteger(1);
153+
154+
@Override
155+
public Thread newThread(Runnable r) {
156+
Thread t = new Thread(group, r,
157+
"pool-kudu_rows_add-thread-" + threadNumber.getAndIncrement(),
158+
0);
159+
if (t.isDaemon())
160+
t.setDaemon(false);
161+
if (t.getPriority() != Thread.NORM_PRIORITY)
162+
t.setPriority(Thread.NORM_PRIORITY);
163+
return t;
164+
}
165+
}, new ThreadPoolExecutor.CallerRunsPolicy());
166+
}
167+
168+
public static List<List<Configuration>> getColumnLists(List<Configuration> columns) {
169+
int quota = 8;
170+
int num = (columns.size() - 1) / quota + 1;
171+
int gap = columns.size() / num;
172+
List<List<Configuration>> columnLists = new ArrayList<>(num);
173+
for (int j = 0; j < num - 1; j++) {
174+
List<Configuration> destList = new ArrayList<>(columns.subList(j * gap, (j + 1) * gap));
175+
columnLists.add(destList);
176+
}
177+
List<Configuration> destList = new ArrayList<>(columns.subList(gap * (num - 1), columns.size()));
178+
columnLists.add(destList);
179+
return columnLists;
180+
}
181+
138182
public static boolean isTableExists(Configuration configuration) {
139183
String tableName = configuration.getString(Key.TABLE);
140184
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
@@ -154,7 +198,7 @@ public static void closeClient(KuduClient kuduClient) {
154198
kuduClient.close();
155199
}
156200
} catch (KuduException e) {
157-
LOG.warn("kudu client is not gracefully closed !");
201+
LOG.warn("The \"kudu client\" was not stopped gracefully. !");
158202

159203
}
160204

@@ -172,8 +216,8 @@ public static Schema getSchema(Configuration configuration) {
172216

173217
String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ||
174218
"LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
175-
"INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())?
176-
"INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase();
219+
"INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
220+
"INT32" : column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase();
177221
String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE);
178222
Boolean key = column.getBool(Key.PRIMARYKEY, false);
179223
String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase();
@@ -194,9 +238,9 @@ public static Schema getSchema(Configuration configuration) {
194238
return schema;
195239
}
196240

197-
public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns){
241+
public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns) {
198242
int i = 0;
199-
while ( i < columns.size() ) {
243+
while (i < columns.size()) {
200244
Configuration col = columns.get(i);
201245
if (!col.getBool(Key.PRIMARYKEY, false)) {
202246
break;
@@ -244,6 +288,7 @@ public static void setTablePartition(Configuration configuration,
244288
}
245289

246290
public static void validateParameter(Configuration configuration) {
291+
LOG.info("Start validating parameters!");
247292
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
248293
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
249294
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
@@ -268,7 +313,39 @@ public static void validateParameter(Configuration configuration) {
268313

269314
Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
270315
configuration.set(Key.SKIP_FAIL, isSkipFail);
271-
LOG.info("==validate parameter complete!");
316+
List<Configuration> columns = configuration.getListConfiguration(Key.COLUMN);
317+
List<Configuration> goalColumns = new ArrayList<>();
318+
//column参数验证
319+
int indexFlag = 0;
320+
boolean primaryKey = true;
321+
int primaryKeyFlag = 0;
322+
for (int i = 0; i < columns.size(); i++) {
323+
Configuration col = columns.get(i);
324+
String index = col.getString(Key.INDEX);
325+
if (index == null) {
326+
index = String.valueOf(i);
327+
col.set(Key.INDEX, index);
328+
indexFlag++;
329+
}
330+
if(primaryKey != col.getBool(Key.PRIMARYKEY, false)){
331+
primaryKey = col.getBool(Key.PRIMARYKEY, false);
332+
primaryKeyFlag++;
333+
}
334+
goalColumns.add(col);
335+
}
336+
if (indexFlag != 0 && indexFlag != columns.size()) {
337+
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
338+
"\"index\" either has values for all of them, or all of them are null!");
339+
}
340+
if (primaryKeyFlag > 1){
341+
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
342+
"\"primaryKey\" must be written in the front!");
343+
}
344+
configuration.set(Key.COLUMN, goalColumns);
345+
// LOG.info("------------------------------------");
346+
// LOG.info(configuration.toString());
347+
// LOG.info("------------------------------------");
348+
LOG.info("validate parameter complete!");
272349
}
273350

274351
public static void truncateTable(Configuration configuration) {

‎kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void prepare() {
3838

3939
@Override
4040
public List<Configuration> split(int i) {
41-
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
41+
List<Configuration> splitResultConfigs = new ArrayList<>();
4242
for (int j = 0; j < i; j++) {
4343
splitResultConfigs.add(config.clone());
4444
}
@@ -76,7 +76,7 @@ public void destroy() {
7676
kuduTaskProxy.session.close();
7777
}
7878
}catch (Exception e){
79-
LOG.warn("kudu session is not gracefully closed !");
79+
LOG.warn("The \"kudu session\" was not stopped gracefully !");
8080
}
8181
Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);
8282

‎kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java

+101-77
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15+
import java.util.ArrayList;
16+
import java.util.Collections;
1517
import java.util.List;
16-
import java.util.concurrent.LinkedBlockingQueue;
17-
import java.util.concurrent.ThreadPoolExecutor;
18-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020
import java.util.concurrent.atomic.AtomicLong;
21+
import java.util.concurrent.atomic.LongAdder;
2122

2223
/**
2324
* @author daizihao
@@ -26,28 +27,30 @@
2627
public class KuduWriterTask {
2728
private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
2829

29-
public List<Configuration> columns;
30-
public String encoding;
31-
public String insertMode;
32-
public Double batchSize;
33-
public long mutationBufferSpace;
34-
public Boolean isUpsert;
35-
public Boolean isSkipFail;
36-
30+
private List<Configuration> columns;
31+
private List<List<Configuration>> columnLists;
32+
private ThreadPoolExecutor pool;
33+
private String encoding;
34+
private Double batchSize;
35+
private Boolean isUpsert;
36+
private Boolean isSkipFail;
3737
public KuduClient kuduClient;
38-
public KuduTable table;
3938
public KuduSession session;
39+
private KuduTable table;
4040
private Integer primaryKeyIndexUntil;
4141

42+
private final Object lock = new Object();
4243

4344
public KuduWriterTask(Configuration configuration) {
44-
this.columns = configuration.getListConfiguration(Key.COLUMN);
45+
columns = configuration.getListConfiguration(Key.COLUMN);
46+
columnLists = Kudu11xHelper.getColumnLists(columns);
47+
pool = Kudu11xHelper.createRowAddThreadPool(columnLists.size());
48+
4549
this.encoding = configuration.getString(Key.ENCODING);
46-
this.insertMode = configuration.getString(Key.INSERT_MODE);
4750
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
48-
this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
49-
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
51+
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equalsIgnoreCase("insert");
5052
this.isSkipFail = configuration.getBool(Key.SKIP_FAIL);
53+
long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
5154

5255
this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
5356
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
@@ -59,9 +62,9 @@ public KuduWriterTask(Configuration configuration) {
5962
}
6063

6164
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
62-
LOG.info("==kuduwriter began to write!");
65+
LOG.info("kuduwriter began to write!");
6366
Record record;
64-
AtomicLong counter = new AtomicLong(0L);
67+
LongAdder counter = new LongAdder();
6568
try {
6669
while ((record = lineReceiver.getFromReader()) != null) {
6770
if (record.getColumnNumber() != columns.size()) {
@@ -70,7 +73,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
7073
boolean isDirtyRecord = false;
7174

7275

73-
for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) {
76+
for (int i = 0; i < primaryKeyIndexUntil && !isDirtyRecord; i++) {
7477
Column column = record.getColumn(i);
7578
isDirtyRecord = StringUtils.isBlank(column.asString());
7679
}
@@ -80,78 +83,102 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
8083
continue;
8184
}
8285

86+
CountDownLatch countDownLatch = new CountDownLatch(columnLists.size());
8387
Upsert upsert = table.newUpsert();
8488
Insert insert = table.newInsert();
89+
PartialRow row;
90+
if (isUpsert) {
91+
//覆盖更新
92+
row = upsert.getRow();
93+
} else {
94+
//增量更新
95+
row = insert.getRow();
96+
}
8597

86-
for (int i = 0; i < columns.size(); i++) {
87-
PartialRow row;
88-
if (isUpsert) {
89-
//覆盖更新
90-
row = upsert.getRow();
91-
} else {
92-
//增量更新
93-
row = insert.getRow();
94-
}
95-
Configuration col = columns.get(i);
96-
String name = col.getString(Key.NAME);
97-
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE));
98-
Column column = record.getColumn(col.getInt(Key.INDEX, i));
99-
Object rawData = column.getRawData();
100-
if (rawData == null) {
101-
row.setNull(name);
102-
continue;
103-
}
104-
switch (type) {
105-
case INT:
106-
row.addInt(name, Integer.parseInt(rawData.toString()));
107-
break;
108-
case LONG:
109-
case BIGINT:
110-
row.addLong(name, Long.parseLong(rawData.toString()));
111-
break;
112-
case FLOAT:
113-
row.addFloat(name, Float.parseFloat(rawData.toString()));
114-
break;
115-
case DOUBLE:
116-
row.addDouble(name, Double.parseDouble(rawData.toString()));
117-
break;
118-
case BOOLEAN:
119-
row.addBoolean(name, Boolean.getBoolean(rawData.toString()));
120-
break;
121-
case STRING:
122-
default:
123-
row.addString(name, rawData.toString());
124-
}
98+
for (List<Configuration> columnList : columnLists) {
99+
Record finalRecord = record;
100+
pool.submit(()->{
101+
102+
for (Configuration col : columnList) {
103+
104+
String name = col.getString(Key.NAME);
105+
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string"));
106+
Column column = finalRecord.getColumn(col.getInt(Key.INDEX));
107+
String rawData = column.asString();
108+
if (rawData == null) {
109+
synchronized (lock) {
110+
row.setNull(name);
111+
}
112+
continue;
113+
}
114+
switch (type) {
115+
case INT:
116+
synchronized (lock) {
117+
row.addInt(name, Integer.parseInt(rawData));
118+
}
119+
break;
120+
case LONG:
121+
case BIGINT:
122+
synchronized (lock) {
123+
row.addLong(name, Long.parseLong(rawData));
124+
}
125+
break;
126+
case FLOAT:
127+
synchronized (lock) {
128+
row.addFloat(name, Float.parseFloat(rawData));
129+
}
130+
break;
131+
case DOUBLE:
132+
synchronized (lock) {
133+
row.addDouble(name, Double.parseDouble(rawData));
134+
}
135+
break;
136+
case BOOLEAN:
137+
synchronized (lock) {
138+
row.addBoolean(name, Boolean.getBoolean(rawData));
139+
}
140+
break;
141+
case STRING:
142+
default:
143+
synchronized (lock) {
144+
row.addString(name, rawData);
145+
}
146+
}
147+
}
148+
countDownLatch.countDown();
149+
});
125150
}
151+
countDownLatch.await();
126152
try {
127-
RetryUtil.executeWithRetry(()->{
153+
RetryUtil.executeWithRetry(() -> {
128154
if (isUpsert) {
129155
//覆盖更新
130156
session.apply(upsert);
131157
} else {
132158
//增量更新
133159
session.apply(insert);
134160
}
135-
//提前写数据,阈值可自定义
136-
if (counter.incrementAndGet() > batchSize * 0.75) {
161+
//flush
162+
if (counter.longValue() > (batchSize * 0.8)) {
137163
session.flush();
138-
counter.set(0L);
164+
counter.reset();
139165
}
166+
counter.increment();
140167
return true;
141-
},5,1000L,true);
168+
}, 5, 500L, true);
142169

143170
} catch (Exception e) {
144-
LOG.error("Data write failed!", e);
171+
LOG.error("Record Write Failure!", e);
145172
if (isSkipFail) {
146-
LOG.warn("Because you have configured skipFail is true,this data will be skipped!");
173+
LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
147174
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
148-
}else {
175+
} else {
149176
throw e;
150177
}
151178
}
152179
}
153180
} catch (Exception e) {
154-
LOG.error("write failed! the task will exit!");
181+
LOG.error("write failure! the task will exit!");
155182
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
156183
}
157184
AtomicInteger i = new AtomicInteger(10);
@@ -161,23 +188,20 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
161188
session.flush();
162189
break;
163190
}
164-
Thread.sleep(1000L);
191+
Thread.sleep(20L);
165192
i.decrementAndGet();
166193
}
167194
} catch (Exception e) {
168-
LOG.info("Waiting for data to be inserted...... " + i + "s");
169-
try {
170-
Thread.sleep(1000L);
171-
} catch (InterruptedException ex) {
172-
ex.printStackTrace();
173-
}
174-
i.decrementAndGet();
195+
LOG.info("Waiting for data to be written to kudu...... " + i + "s");
196+
175197
} finally {
176198
try {
199+
pool.shutdown();
200+
//强制刷写
177201
session.flush();
178202
} catch (KuduException e) {
179-
LOG.error("==kuduwriter flush error! the results may not be complete!");
180-
e.printStackTrace();
203+
LOG.error("kuduwriter flush error! The results may be incomplete!");
204+
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
181205
}
182206
}
183207

‎kuduwriter/src/main/resources/plugin.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "kudu11xwriter",
2+
"name": "kuduwriter",
33
"class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
44
"description": "use put: prod. mechanism: use kudu java api put data.",
55
"developer": "com.q1.daizihao"

‎kuduwriter/src/main/resources/plugin_job_template.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "kudu11xwriter",
2+
"name": "kuduwriter",
33
"parameter": {
44
"kuduConfig": {
55
"kudu.master_addresses": "***",

0 commit comments

Comments
 (0)
Please sign in to comment.