Skip to content

Commit 050a2b0

Browse files
committed
Merge pull request #40 from qq254963746/develop
Develop
2 parents 3bfc6c6 + 218ac0c commit 050a2b0

File tree

34 files changed

+410
-247
lines changed

34 files changed

+410
-247
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
5454
###LTS Admin
5555
![Aaron Swartz](https://raw.githubusercontent.com/qq254963746/light-task-schedule/master/doc/LTS_Admin.png)
5656
###调用示例
57-
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/job/example/api) 模块下的 API 调用方式例子.
57+
下面提供的是最简单的配置方式。更多配置请查看 [lts-example](https://github.com/qq254963746/light-task-schedule/tree/master/lts-example/src/main/java/com/lts/example/api) 模块下的 API 调用方式例子.
5858

5959
####JobTracker 端
6060
```java
@@ -85,7 +85,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
8585
public void run(Job job) throws Throwable {
8686
System.out.println("我要执行"+ job);
8787
System.out.println(job.getParam("shopId"));
88-
// TODO 用户自己的业务逻辑
88+
// TODO 用户自己的业务逻辑, 应该保证幂等
8989
try {
9090
Thread.sleep(5*1000L);
9191
} catch (InterruptedException e) {

lts-admin/src/test/java/com/lts/biz/logger/mysql/MysqlJobLoggerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static org.junit.Assert.*;
1515

1616
/**
17-
* Created by hugui on 6/12/15.
17+
* @author Robert HG ([email protected]) on 6/12/15.
1818
*/
1919
public class MysqlJobLoggerTest {
2020

lts-core/src/main/java/com/lts/core/domain/Job.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.lts.core.domain;
22

33

4-
import com.lts.core.exception.JobSubmitException;
5-
import com.lts.core.support.CronExpression;
64
import com.lts.core.commons.utils.JSONUtils;
75
import com.lts.core.commons.utils.StringUtils;
6+
import com.lts.core.exception.JobSubmitException;
7+
import com.lts.core.support.CronExpression;
88
import com.lts.remoting.annotation.NotNull;
99

1010
import java.util.Date;
@@ -16,7 +16,6 @@
1616
*/
1717
public class Job {
1818

19-
protected String jobId;
2019
@NotNull
2120
protected String taskId;
2221
/**
@@ -85,14 +84,6 @@ public void setNeedFeedback(boolean needFeedback) {
8584
this.needFeedback = needFeedback;
8685
}
8786

88-
public String getJobId() {
89-
return jobId;
90-
}
91-
92-
public void setJobId(String jobId) {
93-
this.jobId = jobId;
94-
}
95-
9687
public Map<String, String> getExtParams() {
9788
return extParams;
9889
}

lts-core/src/main/java/com/lts/core/domain/JobResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import com.lts.core.commons.utils.JSONUtils;
44

55
/**
6-
* @author Robert HG ([email protected]) on 8/19/14.
7-
* 任务执行结果
6+
* @author Robert HG ([email protected]) on 6/13/15.
7+
* 发送给客户端的 任务执行结果
88
*/
99
public class JobResult {
1010

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.lts.core.domain;
2+
3+
/**
4+
* @author Robert HG ([email protected]) on 6/13/15.
5+
*/
6+
public class JobWrapper {
7+
8+
private String jobId;
9+
10+
private Job job;
11+
12+
public JobWrapper(String jobId, Job job) {
13+
this.jobId = jobId;
14+
this.job = job;
15+
}
16+
17+
public JobWrapper() {
18+
}
19+
20+
public String getJobId() {
21+
return jobId;
22+
}
23+
24+
public void setJobId(String jobId) {
25+
this.jobId = jobId;
26+
}
27+
28+
public Job getJob() {
29+
return job;
30+
}
31+
32+
public void setJob(Job job) {
33+
this.job = job;
34+
}
35+
36+
@Override
37+
public String toString() {
38+
return "JobWrapper{" +
39+
"jobId='" + jobId + '\'' +
40+
", job=" + job +
41+
'}';
42+
}
43+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.lts.core.domain;
2+
3+
import com.lts.core.commons.utils.JSONUtils;
4+
5+
/**
6+
* @author Robert HG ([email protected]) on 8/19/14.
7+
* TaskTracker 任务执行结果
8+
*/
9+
public class TaskTrackerJobResult {
10+
11+
private JobWrapper jobWrapper;
12+
13+
// 执行成功还是失败
14+
private boolean success;
15+
16+
private String msg;
17+
// 任务完成时间
18+
private Long time;
19+
20+
public JobWrapper getJobWrapper() {
21+
return jobWrapper;
22+
}
23+
24+
public void setJobWrapper(JobWrapper jobWrapper) {
25+
this.jobWrapper = jobWrapper;
26+
}
27+
28+
public boolean isSuccess() {
29+
return success;
30+
}
31+
32+
public void setSuccess(boolean success) {
33+
this.success = success;
34+
}
35+
36+
public String getMsg() {
37+
return msg;
38+
}
39+
40+
public void setMsg(String msg) {
41+
this.msg = msg;
42+
}
43+
44+
public Long getTime() {
45+
return time;
46+
}
47+
48+
public void setTime(Long time) {
49+
this.time = time;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return JSONUtils.toJSONString(this);
55+
}
56+
}

lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.lts.core.failstore.berkeleydb;
22

3-
import com.lts.core.cluster.Config;
43
import com.lts.core.commons.file.FileLock;
54
import com.lts.core.commons.file.FileUtils;
65
import com.lts.core.commons.utils.CollectionUtils;

lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import com.lts.core.failstore.FailStore;
88
import com.lts.core.failstore.FailStoreException;
99
import org.fusesource.leveldbjni.JniDBFactory;
10-
import org.iq80.leveldb.DB;
11-
import org.iq80.leveldb.DBIterator;
12-
import org.iq80.leveldb.Options;
10+
import org.iq80.leveldb.*;
1311

1412
import java.io.File;
1513
import java.io.IOException;
14+
import java.io.UnsupportedEncodingException;
1615
import java.lang.reflect.Type;
1716
import java.util.ArrayList;
1817
import java.util.List;
@@ -39,13 +38,23 @@ public LeveldbFailStore(String failStorePath) {
3938
failStorePath = failStorePath + "/leveldb/";
4039
dbPath = FileUtils.createDirIfNotExist(failStorePath);
4140
options = new Options();
41+
options.createIfMissing(true);
42+
// options.compressionType(CompressionType.NONE);
43+
options.cacheSize(100 * 1024 * 1024); // 100M
44+
// options.logger(new Logger() {
45+
// @Override
46+
// public void log(String message) {
47+
// System.out.println(message);
48+
// }
49+
// });
4250
lock = new FileLock(failStorePath + "___db.lock");
4351
}
4452

4553
@Override
4654
public void open() throws FailStoreException {
4755
try {
4856
lock.tryLock();
57+
JniDBFactory.factory.repair(dbPath, options);
4958
db = JniDBFactory.factory.open(dbPath, options);
5059
} catch (IOException e) {
5160
throw new FailStoreException(e);
@@ -79,16 +88,33 @@ public void delete(List<String> keys) throws FailStoreException {
7988
if (keys == null || keys.size() == 0) {
8089
return;
8190
}
82-
for (String key : keys) {
83-
delete(key);
91+
WriteBatch batch = db.createWriteBatch();
92+
try {
93+
94+
for (String key : keys) {
95+
batch.delete(key.getBytes("UTF-8"));
96+
}
97+
db.write(batch);
98+
} catch (UnsupportedEncodingException e) {
99+
throw new FailStoreException(e);
100+
} finally {
101+
try {
102+
batch.close();
103+
} catch (IOException e) {
104+
throw new FailStoreException(e);
105+
}
84106
}
85107
}
86108

87109
@Override
88110
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
111+
Snapshot snapshot = db.getSnapshot();
112+
DBIterator iterator = null;
89113
try {
90114
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
91-
DBIterator iterator = db.iterator();
115+
ReadOptions options=new ReadOptions();
116+
options.snapshot(snapshot);
117+
iterator = db.iterator(options);
92118
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
93119
Map.Entry<byte[], byte[]> entry = iterator.peekNext();
94120
String key = new String(entry.getKey(), "UTF-8");
@@ -102,6 +128,19 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
102128
return list;
103129
} catch (Exception e) {
104130
throw new FailStoreException(e);
131+
} finally {
132+
if (iterator != null) {
133+
try {
134+
iterator.close();
135+
} catch (IOException e) {
136+
throw new FailStoreException(e);
137+
}
138+
}
139+
try {
140+
snapshot.close();
141+
} catch (IOException e) {
142+
throw new FailStoreException(e);
143+
}
105144
}
106145
}
107146

lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStor
103103
try {
104104
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
105105
iterator = db.newIterator();
106-
for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) {
106+
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
107107
iterator.status();
108108
String key = new String(iterator.key(), "UTF-8");
109109
T value = JSONUtils.parse(new String(iterator.value(), "UTF-8"), type);
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.lts.core.protocol.command;
22

3-
import com.lts.core.domain.Job;
3+
import com.lts.core.domain.JobWrapper;
44
import com.lts.remoting.annotation.NotNull;
55

66
/**
@@ -9,13 +9,13 @@
99
public class JobPushRequest extends AbstractCommandBody{
1010

1111
@NotNull
12-
private Job job;
12+
private JobWrapper jobWrapper;
1313

14-
public Job getJob() {
15-
return job;
14+
public JobWrapper getJobWrapper() {
15+
return jobWrapper;
1616
}
1717

18-
public void setJob(Job job) {
19-
this.job = job;
18+
public void setJobWrapper(JobWrapper jobWrapper) {
19+
this.jobWrapper = jobWrapper;
2020
}
2121
}

0 commit comments

Comments
 (0)