Skip to content

Commit 218ac0c

Browse files
author
胡贵
committed
performance sth
1 parent 7bb95cd commit 218ac0c

File tree

30 files changed

+362
-234
lines changed

30 files changed

+362
-234
lines changed

lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static org.junit.Assert.*;
1515

1616
/**
17-
* Created by hugui on 6/12/15.
17+
* @author Robert HG ([email protected]) on 6/12/15.
1818
*/
1919
public class MysqlJobLoggerTest {
2020

lts-core/src/main/java/com/lts/core/domain/Job.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.lts.core.domain;
22

33

4-
import com.lts.core.exception.JobSubmitException;
5-
import com.lts.core.support.CronExpression;
64
import com.lts.core.commons.utils.JSONUtils;
75
import com.lts.core.commons.utils.StringUtils;
6+
import com.lts.core.exception.JobSubmitException;
7+
import com.lts.core.support.CronExpression;
88
import com.lts.remoting.annotation.NotNull;
99

1010
import java.util.Date;
@@ -16,7 +16,6 @@
1616
*/
1717
public class Job {
1818

19-
protected String jobId;
2019
@NotNull
2120
protected String taskId;
2221
/**
@@ -85,14 +84,6 @@ public void setNeedFeedback(boolean needFeedback) {
8584
this.needFeedback = needFeedback;
8685
}
8786

88-
public String getJobId() {
89-
return jobId;
90-
}
91-
92-
public void setJobId(String jobId) {
93-
this.jobId = jobId;
94-
}
95-
9687
public Map<String, String> getExtParams() {
9788
return extParams;
9889
}

lts-core/src/main/java/com/lts/core/domain/JobResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import com.lts.core.commons.utils.JSONUtils;
44

55
/**
6-
* @author Robert HG ([email protected]) on 8/19/14.
7-
* 任务执行结果
6+
* @author Robert HG ([email protected]) on 6/13/15.
7+
* 发送给客户端的 任务执行结果
88
*/
99
public class JobResult {
1010

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.lts.core.domain;
2+
3+
/**
4+
* @author Robert HG ([email protected]) on 6/13/15.
5+
*/
6+
public class JobWrapper {
7+
8+
private String jobId;
9+
10+
private Job job;
11+
12+
public JobWrapper(String jobId, Job job) {
13+
this.jobId = jobId;
14+
this.job = job;
15+
}
16+
17+
public JobWrapper() {
18+
}
19+
20+
public String getJobId() {
21+
return jobId;
22+
}
23+
24+
public void setJobId(String jobId) {
25+
this.jobId = jobId;
26+
}
27+
28+
public Job getJob() {
29+
return job;
30+
}
31+
32+
public void setJob(Job job) {
33+
this.job = job;
34+
}
35+
36+
@Override
37+
public String toString() {
38+
return "JobWrapper{" +
39+
"jobId='" + jobId + '\'' +
40+
", job=" + job +
41+
'}';
42+
}
43+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.lts.core.domain;
2+
3+
import com.lts.core.commons.utils.JSONUtils;
4+
5+
/**
6+
* @author Robert HG ([email protected]) on 8/19/14.
7+
* TaskTracker 任务执行结果
8+
*/
9+
public class TaskTrackerJobResult {
10+
11+
private JobWrapper jobWrapper;
12+
13+
// 执行成功还是失败
14+
private boolean success;
15+
16+
private String msg;
17+
// 任务完成时间
18+
private Long time;
19+
20+
public JobWrapper getJobWrapper() {
21+
return jobWrapper;
22+
}
23+
24+
public void setJobWrapper(JobWrapper jobWrapper) {
25+
this.jobWrapper = jobWrapper;
26+
}
27+
28+
public boolean isSuccess() {
29+
return success;
30+
}
31+
32+
public void setSuccess(boolean success) {
33+
this.success = success;
34+
}
35+
36+
public String getMsg() {
37+
return msg;
38+
}
39+
40+
public void setMsg(String msg) {
41+
this.msg = msg;
42+
}
43+
44+
public Long getTime() {
45+
return time;
46+
}
47+
48+
public void setTime(Long time) {
49+
this.time = time;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return JSONUtils.toJSONString(this);
55+
}
56+
}

lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.lts.core.failstore.berkeleydb;
22

3-
import com.lts.core.cluster.Config;
43
import com.lts.core.commons.file.FileLock;
54
import com.lts.core.commons.file.FileUtils;
65
import com.lts.core.commons.utils.CollectionUtils;

lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public LeveldbFailStore(String failStorePath) {
3939
dbPath = FileUtils.createDirIfNotExist(failStorePath);
4040
options = new Options();
4141
options.createIfMissing(true);
42+
// options.compressionType(CompressionType.NONE);
4243
options.cacheSize(100 * 1024 * 1024); // 100M
4344
// options.logger(new Logger() {
4445
// @Override
@@ -107,10 +108,13 @@ public void delete(List<String> keys) throws FailStoreException {
107108

108109
@Override
109110
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
111+
Snapshot snapshot = db.getSnapshot();
110112
DBIterator iterator = null;
111113
try {
112114
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
113-
iterator = db.iterator();
115+
ReadOptions options=new ReadOptions();
116+
options.snapshot(snapshot);
117+
iterator = db.iterator(options);
114118
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
115119
Map.Entry<byte[], byte[]> entry = iterator.peekNext();
116120
String key = new String(entry.getKey(), "UTF-8");
@@ -132,6 +136,11 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
132136
throw new FailStoreException(e);
133137
}
134138
}
139+
try {
140+
snapshot.close();
141+
} catch (IOException e) {
142+
throw new FailStoreException(e);
143+
}
135144
}
136145
}
137146

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.lts.core.protocol.command;
22

3-
import com.lts.core.domain.Job;
3+
import com.lts.core.domain.JobWrapper;
44
import com.lts.remoting.annotation.NotNull;
55

66
/**
@@ -9,13 +9,13 @@
99
public class JobPushRequest extends AbstractCommandBody{
1010

1111
@NotNull
12-
private Job job;
12+
private JobWrapper jobWrapper;
1313

14-
public Job getJob() {
15-
return job;
14+
public JobWrapper getJobWrapper() {
15+
return jobWrapper;
1616
}
1717

18-
public void setJob(Job job) {
19-
this.job = job;
18+
public void setJobWrapper(JobWrapper jobWrapper) {
19+
this.jobWrapper = jobWrapper;
2020
}
2121
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.lts.core.protocol.command;
2+
3+
import com.lts.core.domain.TaskTrackerJobResult;
4+
import com.lts.remoting.annotation.NotNull;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
/**
10+
* @author Robert HG ([email protected]) on 8/16/14.
11+
* TaskTracker Job finished request command body
12+
*/
13+
public class TtJobFinishedRequest extends AbstractCommandBody {
14+
/**
15+
* 是否接受新任务
16+
*/
17+
private boolean receiveNewJob = false;
18+
19+
@NotNull
20+
private List<TaskTrackerJobResult> taskTrackerJobResults;
21+
22+
// 是否是重发(重发是批量发)
23+
private boolean reSend = false;
24+
25+
public boolean isReSend() {
26+
return reSend;
27+
}
28+
29+
public void setReSend(boolean reSend) {
30+
this.reSend = reSend;
31+
}
32+
33+
public boolean isReceiveNewJob() {
34+
return receiveNewJob;
35+
}
36+
37+
public void setReceiveNewJob(boolean receiveNewJob) {
38+
this.receiveNewJob = receiveNewJob;
39+
}
40+
41+
public List<TaskTrackerJobResult> getTaskTrackerJobResults() {
42+
return taskTrackerJobResults;
43+
}
44+
45+
public void setTaskTrackerJobResults(List<TaskTrackerJobResult> taskTrackerJobResults) {
46+
this.taskTrackerJobResults = taskTrackerJobResults;
47+
}
48+
49+
public void addJobResult(TaskTrackerJobResult taskTrackerJobResult) {
50+
if (taskTrackerJobResults == null) {
51+
taskTrackerJobResults = new ArrayList<TaskTrackerJobResult>();
52+
}
53+
taskTrackerJobResults.add(taskTrackerJobResult);
54+
}
55+
}

lts-core/src/main/java/com/lts/core/support/RetryScheduler.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ public void stop() {
9393
*/
9494
private class CheckRunner implements Runnable {
9595

96-
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
97-
private int maxSentSize = 100;
98-
9996
@Override
10097
public void run() {
10198
try {
@@ -104,8 +101,6 @@ public void run() {
104101
return;
105102
}
106103

107-
int sentSize = 0;
108-
109104
List<KVPair<String, T>> kvPairs = null;
110105
do {
111106
try {
@@ -124,20 +119,11 @@ public void run() {
124119
values.add(kvPair.getValue());
125120
}
126121
if (retry(values)) {
127-
LOGGER.info("{} local files send success, {}", name, JSONUtils.toJSONString(values));
122+
LOGGER.info("{} local files send success, size: {}, {}", name, values.size(), JSONUtils.toJSONString(values));
128123
failStore.delete(keys);
129124
} else {
130125
break;
131126
}
132-
sentSize += kvPairs.size();
133-
if (sentSize >= maxSentSize) {
134-
// 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到
135-
try {
136-
Thread.sleep(1000L);
137-
} catch (InterruptedException e1) {
138-
LOGGER.warn(e1.getMessage(), e1);
139-
}
140-
}
141127
} finally {
142128
failStore.close();
143129
}
@@ -155,6 +141,7 @@ public void inSchedule(String key, T value) {
155141
try {
156142
failStore.open();
157143
failStore.put(key, value);
144+
LOGGER.info("{} local files save success, {}", name, JSONUtils.toJSONString(value));
158145
} finally {
159146
failStore.close();
160147
}

0 commit comments

Comments
 (0)