Skip to content

Commit cb4842f

Browse files
author
cuidapeng
committed
first commit
0 parents  commit cb4842f

File tree

8 files changed

+142
-0
lines changed

8 files changed

+142
-0
lines changed

.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea
2+
.target

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.idea
2+
src
3+
project
4+

Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM gettyimages/spark:2.3.0-hadoop-2.8
2+
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
3+
ADD jars /usr/spark-2.3.0/jars
4+
ADD target/scala-2.11/ /usr/spark-2.3.0/

README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
### use Spark SQL to load/store offset with mysql
2+
3+
less complicated than custom implement sql operation
4+
5+
#### offset store
6+
7+
```sql
8+
mysql> select * from kfk_offset where datetime>'2019-01-09' and topic='task-response' and `group`='extract';
9+
+--------+------------+-------+------+-----------+-----------+-----------+-------+---------------------+
10+
| id | topic | group | step | partition | from | until | count | datetime |
11+
+--------+------------+-------+------+-----------+-----------+-----------+-------+---------------------+
12+
| 1 | task-response | extract | 1 | 0 | 1959008 | 1995008 | 36000 | 2019-01-09 00:01:19 |
13+
| 2 | task-response | extract | 1 | 1 | 1897546 | 1933546 | 36000 | 2019-01-09 00:01:19 |
14+
| 0 | task-response | extract | 1 | 2 | 1876072 | 1912072 | 36000 | 2019-01-09 00:01:19 |
15+
| 5 | task-response | extract | 2 | 0 | 1995008 | 2031008 | 36000 | 2019-01-09 00:05:05 |
16+
| 7 | task-response | extract | 2 | 1 | 1933546 | 1969546 | 36000 | 2019-01-09 00:05:05 |
17+
| 6 | task-response | extract | 2 | 2 | 1912072 | 1948072 | 36000 | 2019-01-09 00:05:05 |
18+
19+
```
20+
21+
For my scene(extract crawler's response dom/json),I need rollback to the 'problem datetime' and re-consumer records after it;
22+
23+
### rollback
24+
25+
1 kill the spark consume process
26+
27+
2 point the problem datetime,then delete sql record by datetime or step
28+
29+
`delete from kfk_offset where `step`>1 and `topic`='task-response' and `group`='extract'`
30+
31+
3 start the spark consume process
32+
33+
### Use
34+
35+
#### develop
36+
37+
copy source code
38+
39+
or
40+
41+
copy spark-streaming-kafka-offset-mysql_2.11-0.1.jar -> {project}/lib/
42+
43+
#### deploy
44+
45+
sbt package
46+
47+
copy spark-streaming-kafka-offset-mysql_2.11-0.1.jar -> $SPARK_HOME/jars/
48+
49+
### other
50+
51+
upload to maven repositories to use jar

build.sbt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
name := "spark-streaming-kafka-offset-mysql"
2+
3+
version := "0.1"
4+
5+
scalaVersion := "2.11.8"
6+
7+
resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
8+
9+
libraryDependencies ++= Seq(
10+
"org.scalactic" %% "scalactic" % "3.0.4" % "test",
11+
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
12+
"mysql" % "mysql-connector-java" % "5.1.44",
13+
"org.apache.spark" % "spark-core_2.11" % "2.3.0",
14+
"org.apache.spark" % "spark-sql_2.11" % "2.3.0",
15+
"org.apache.spark" % "spark-streaming_2.11" % "2.3.0",
16+
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.3.0"
17+
)

build.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
sbt package
3+
docker build -t cuidapeng/spark-offset:$1 ./
4+
docker push cuidapeng/spark-offset:$1

deployment.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
apiVersion: extensions/v1beta1
2+
kind: Deployment
3+
metadata:
4+
name: spark-offset
5+
namespace: default
6+
labels:
7+
app: spark-offset
8+
spec:
9+
replicas: 1
10+
selector:
11+
matchLabels:
12+
app: spark-offset
13+
template:
14+
metadata:
15+
labels:
16+
app: spark-offset
17+
spec:
18+
containers:
19+
- name: spark-offset
20+
imagePullPolicy: Always
21+
image: cuidapeng/spark-offset:v0.1
22+
command:
23+
- spark-submit
24+
args:
25+
- --driver-java-options
26+
- "-Duser.timezone=Asia/Shanghai"
27+
- --repositories
28+
- http://maven.aliyun.com/nexus/content/groups/public/
29+
- --packages
30+
- mysql:mysql-connector-java:5.1.27,org.apache.spark:spark-streaming_2.11:2.3.0,,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0
31+
- --executor-memory
32+
- 2G
33+
- --driver-memory
34+
- 2G
35+
- --conf
36+
- "spark.streaming.kafka.maxRatePerPartition=100"
37+
- --class
38+
- com.github.cclient.spark.Stream
39+
- spark-streaming-kafka-offset-mysql_2.11-0.1.jar
40+
- prod
41+
- task-response
42+
- extract
43+
resources:
44+
requests:
45+
memory: "2Gi"
46+
limits:
47+
memory: "3Gi"

doc/kfk_offset.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE TABLE `kfk_offset` (
2+
`id` int(11) NOT NULL AUTO_INCREMENT,
3+
`topic` varchar(45) NOT NULL,
4+
`group` varchar(45) NOT NULL,
5+
`step` int(11) NOT NULL DEFAULT '0',
6+
`partition` int(11) NOT NULL,
7+
`from` bigint(10) NOT NULL,
8+
`until` bigint(10) NOT NULL,
9+
`count` bigint(10) NOT NULL DEFAULT '0',
10+
`datetime` datetime NOT NULL,
11+
PRIMARY KEY (`id`),
12+
UNIQUE KEY `unique` (`topic`,`group`,`step`,`partition`)
13+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

0 commit comments

Comments
 (0)