Skip to content

Commit 13d927b

Browse files
committed
Merge pull request #21 from qq254963746/develop
采用CAS 来减小 mysql queue 情况下 dead lock 的概率
2 parents 69d701e + b551fd4 commit 13d927b

File tree

3 files changed

+77
-37
lines changed

3 files changed

+77
-37
lines changed

job-example/src/main/java/com/lts/job/example/api/JobTrackerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ public class JobTrackerTest {
1212
public static void main(String[] args) {
1313

1414
// 1. 使用mongo做任务队列
15-
testMongoQueue();
15+
// testMongoQueue();
1616
// 2. 使用mysql做任务队列
17-
// testMysqlQueue();
17+
testMysqlQueue();
1818
}
1919

2020
/**

job-example/src/main/java/com/lts/job/example/support/TestJobRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public void run(Job job) throws Throwable {
2424
bizLogger.info("测试,业务日志啊啊啊啊啊");
2525

2626
try {
27-
Thread.sleep(1000L);
27+
Thread.sleep(100L);
2828
} catch (InterruptedException e) {
2929
e.printStackTrace();
3030
}

job-queue/job-queue-mysql/src/main/java/com/lts/job/queue/mysql/MysqlExecutableJobQueue.java

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
import com.lts.job.queue.domain.JobPo;
99
import com.lts.job.queue.exception.JobQueueException;
1010
import com.lts.job.queue.mysql.support.ResultSetHandlerHolder;
11-
import com.lts.job.store.jdbc.SqlExecutor;
1211

1312
import java.io.InputStream;
14-
import java.sql.Connection;
1513
import java.sql.SQLException;
1614
import java.util.List;
1715

@@ -62,41 +60,83 @@ public boolean add(JobPo jobPo) {
6260

6361
@Override
6462
public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) {
65-
return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
66-
@Override
67-
public JobPo run(Connection conn) throws SQLException {
68-
Long now = System.currentTimeMillis();
69-
// select for update
70-
String selectForUpdateSql = "SELECT *" +
71-
" FROM `{tableName}` " +
72-
" WHERE is_running = ? " +
73-
" AND `trigger_time` < ? " +
74-
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
75-
" LIMIT 0, 1 FOR UPDATE";
76-
Object[] selectParams = new Object[]{false, now};
77-
JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
78-
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
79-
);
80-
if (jobPo != null) {
81-
String updateSql = "UPDATE `{tableName}` SET " +
82-
"`is_running` = ?, " +
83-
"`task_tracker_identity` = ?, " +
84-
"`gmt_modified` = ?," +
85-
"`prev_exe_time` = ? " +
86-
" WHERE job_id = ?";
87-
Object[] params = new Object[]{
88-
true, taskTrackerIdentity, now, now, jobPo.getJobId()
89-
};
90-
getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
63+
/**
64+
* 这里从SELECT FOR UPDATE 优化为 CAS 乐观锁
65+
*/
66+
Long now = DateUtils.currentTimeMillis();
67+
String selectSql = "SELECT *" +
68+
" FROM `{tableName}` " +
69+
" WHERE is_running = ? " +
70+
" AND `trigger_time` < ? " +
71+
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
72+
" LIMIT 0, 1";
73+
Object[] selectParams = new Object[]{false, now};
9174

92-
jobPo.setIsRunning(true);
93-
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
94-
jobPo.setGmtModified(now);
95-
jobPo.setPrevExeTime(now);
96-
}
75+
try {
76+
JobPo jobPo = getSqlTemplate().query(getRealSql(selectSql, taskTrackerNodeGroup),
77+
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams);
78+
if (jobPo == null) {
79+
return null;
80+
}
81+
82+
String updateSql = "UPDATE `{tableName}` SET " +
83+
"`is_running` = ?, " +
84+
"`task_tracker_identity` = ?, " +
85+
"`gmt_modified` = ?," +
86+
"`prev_exe_time` = ? " +
87+
" WHERE job_id = ? AND is_running = ?";
88+
Object[] params = new Object[]{
89+
true, taskTrackerIdentity, now, now, jobPo.getJobId(), false
90+
};
91+
// 返回影响的行数
92+
int affectedRow = getSqlTemplate().update(getRealSql(updateSql, taskTrackerNodeGroup), params);
93+
if (affectedRow == 0) {
94+
return take(taskTrackerNodeGroup, taskTrackerIdentity);
95+
} else {
96+
jobPo.setIsRunning(true);
97+
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
98+
jobPo.setGmtModified(now);
99+
jobPo.setPrevExeTime(now);
97100
return jobPo;
98101
}
99-
});
102+
} catch (SQLException e) {
103+
throw new JobQueueException(e);
104+
}
105+
// return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
106+
// @Override
107+
// public JobPo run(Connection conn) throws SQLException {
108+
// Long now = System.currentTimeMillis();
109+
// // select for update
110+
// String selectForUpdateSql = "SELECT *" +
111+
// " FROM `{tableName}` " +
112+
// " WHERE is_running = ? " +
113+
// " AND `trigger_time` < ? " +
114+
// " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
115+
// " LIMIT 0, 1 FOR UPDATE";
116+
// Object[] selectParams = new Object[]{false, now};
117+
// JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
118+
// ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
119+
// );
120+
// if (jobPo != null) {
121+
// String updateSql = "UPDATE `{tableName}` SET " +
122+
// "`is_running` = ?, " +
123+
// "`task_tracker_identity` = ?, " +
124+
// "`gmt_modified` = ?," +
125+
// "`prev_exe_time` = ? " +
126+
// " WHERE job_id = ?";
127+
// Object[] params = new Object[]{
128+
// true, taskTrackerIdentity, now, now, jobPo.getJobId()
129+
// };
130+
// getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
131+
//
132+
// jobPo.setIsRunning(true);
133+
// jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
134+
// jobPo.setGmtModified(now);
135+
// jobPo.setPrevExeTime(now);
136+
// }
137+
// return jobPo;
138+
// }
139+
// });
100140
}
101141

102142
@Override

0 commit comments

Comments
 (0)