@@ -32,14 +32,15 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink
32
32
import org .apache .flink .streaming .api .functions .sink .{SinkFunction , TwoPhaseCommitSinkFunction }
33
33
import org .apache .flink .streaming .api .scala .DataStream
34
34
import org .apache .flink .streaming .connectors .redis .common .config .{FlinkJedisConfigBase , FlinkJedisPoolConfig , FlinkJedisSentinelConfig }
35
- import org .apache .flink .streaming .connectors .redis .common .container .{RedisContainer => RContainer }
35
+ import org .apache .flink .streaming .connectors .redis .common .container .{RedisContainer => BahirRedisContainer }
36
36
import org .apache .flink .streaming .connectors .redis .common .mapper .RedisCommand ._
37
- import org .apache .flink .streaming .connectors .redis .common .mapper .{RedisCommand , RedisCommandDescription , RedisMapper => RMapper }
38
- import org .apache .flink .streaming .connectors .redis .{RedisSink => RSink }
37
+ import org .apache .flink .streaming .connectors .redis .common .mapper .{RedisCommand , RedisCommandDescription , RedisMapper => BahirRedisMapper }
38
+ import org .apache .flink .streaming .connectors .redis .{RedisSink => BahirRedisSink }
39
39
import redis .clients .jedis .exceptions .JedisException
40
40
import redis .clients .jedis .{Jedis , JedisPool , JedisSentinelPool }
41
41
42
42
import java .io .IOException
43
+ import java .lang .reflect .Field
43
44
import java .util
44
45
import java .util .Properties
45
46
import scala .annotation .meta .param
@@ -65,67 +66,90 @@ class RedisSink(@(transient@param) ctx: StreamingContext,
65
66
66
67
val enableCheckpoint : Boolean = ctx.parameter.toMap.getOrElse(KEY_FLINK_CHECKPOINTS_ENABLE , " false" ).toBoolean
67
68
68
- val cpMode : CheckpointingMode = Try (CheckpointingMode .valueOf(ctx.parameter.toMap.get(KEY_FLINK_CHECKPOINTS_MODE ))).getOrElse(CheckpointingMode .AT_LEAST_ONCE )
69
+ val cpMode : CheckpointingMode = Try (
70
+ CheckpointingMode .valueOf(ctx.parameter.toMap.get(KEY_FLINK_CHECKPOINTS_MODE ))
71
+ ).getOrElse(CheckpointingMode .AT_LEAST_ONCE )
72
+
69
73
70
74
lazy val config : FlinkJedisConfigBase = {
71
75
val map : util.Map [String , String ] = ctx.parameter.toMap
72
76
val redisConf = ConfigUtils .getConf(map, REDIS_PREFIX )
73
77
val connectType : String = Try (redisConf.remove(REDIS_CONNECT_TYPE ).toString).getOrElse(DEFAULT_REDIS_CONNECT_TYPE )
74
78
Utils .copyProperties(property, redisConf)
79
+
75
80
val host : String = redisConf.remove(KEY_HOST ) match {
76
81
case null => throw new IllegalArgumentException (" redis host must not null" )
77
82
case hostStr => hostStr.toString
78
83
}
84
+
79
85
val port : Int = redisConf.remove(KEY_PORT ) match {
80
86
case null => 6379
81
87
case portStr => portStr.toString.toInt
82
88
}
89
+
90
+ def setFieldValue (field : Field , targetObject : Any , value : String ): Unit = {
91
+ field.setAccessible(true )
92
+ field.getType.getSimpleName match {
93
+ case " String" => field.set(targetObject, value)
94
+ case " int" | " Integer" => field.set(targetObject, value.toInt)
95
+ case " long" | " Long" => field.set(targetObject, value.toLong)
96
+ case " boolean" | " Boolean" => field.set(targetObject, value.toBoolean)
97
+ case _ =>
98
+ }
99
+ }
100
+
83
101
connectType match {
84
- case " sentinel" => {
102
+
103
+ case " sentinel" =>
85
104
val sentinels : Set [String ] = host.split(SIGN_COMMA ).map(x => {
86
- if (! x.contains(SIGN_COLON )) {
105
+ if (x.contains(SIGN_COLON )) x; else {
87
106
throw new IllegalArgumentException (s " redis sentinel host invalid { $x} must match host:port " )
88
107
}
89
- x
90
108
}).toSet
91
- val builder : FlinkJedisSentinelConfig . Builder = new FlinkJedisSentinelConfig .Builder ().setSentinels(sentinels)
109
+ val builder = new FlinkJedisSentinelConfig .Builder ().setSentinels(sentinels)
92
110
redisConf.foreach(x => {
93
- val field = Try (Option (builder.getClass.getDeclaredField(x._1))).getOrElse(None ) match {
94
- case Some (x) => x
95
- case None => throw new IllegalArgumentException (s " redis config error,property: ${
96
- x._1
97
- } invalid,init FlinkJedisSentinelConfig err " )
98
- }
99
- field.setAccessible(true )
100
- field.getType.getSimpleName match {
101
- case " String" => field.set(builder, x._2)
102
- case " int" | " Integer" => field.set(builder, x._2.toInt)
103
- case " long" | " Long" => field.set(builder, x._2.toLong)
104
- case " boolean" | " Boolean" => field.set(builder, x._2.toBoolean)
105
- case _ =>
111
+ val field = Try (builder.getClass.getDeclaredField(x._1)).getOrElse {
112
+ throw new IllegalArgumentException (
113
+ s """
114
+ |redis config error,property: ${x._1} invalid,init FlinkJedisSentinelConfig error, property options:
115
+ |<String masterName>,
116
+ |<Set<String> sentinels>,
117
+ |<int connectionTimeout>,
118
+ |<int soTimeout>,
119
+ |<String password>,
120
+ |<int database>,
121
+ |<int maxTotal>,
122
+ |<int maxIdle>,
123
+ |<int minIdle>
124
+ | """ .stripMargin)
106
125
}
126
+ setFieldValue(field, builder, x._2)
107
127
})
108
128
builder.build()
109
- }
110
- case DEFAULT_REDIS_CONNECT_TYPE => {
129
+
130
+ case DEFAULT_REDIS_CONNECT_TYPE =>
111
131
val builder : FlinkJedisPoolConfig .Builder = new FlinkJedisPoolConfig .Builder ().setHost(host).setPort(port)
112
132
redisConf.foreach(x => {
113
- val field = Try (Option (builder.getClass.getDeclaredField(x._1))).getOrElse(None ) match {
114
- case Some (x) => x
115
- case None => throw new IllegalArgumentException (s " redis config error,property: ${x._1} invalid,init FlinkJedisPoolConfig err " )
116
- }
117
- field.setAccessible(true )
118
- field.getType.getSimpleName match {
119
- case " String" => field.set(builder, x._2)
120
- case " int" | " Integer" => field.set(builder, x._2.toInt)
121
- case " long" | " Long" => field.set(builder, x._2.toLong)
122
- case " boolean" | " Boolean" => field.set(builder, x._2.toBoolean)
123
- case _ =>
133
+ val field = Try (builder.getClass.getDeclaredField(x._1)).getOrElse {
134
+ throw new IllegalArgumentException (
135
+ s """
136
+ |redis config error,property: ${x._1} invalid,init FlinkJedisPoolConfig error,property options:
137
+ |<String host>,
138
+ |<int port>,
139
+ |<int timeout>,
140
+ |<int database>,
141
+ |<String password>,
142
+ |<int maxTotal>,
143
+ |<int maxIdle>,
144
+ |<int minIdle>
145
+ | """ .stripMargin)
124
146
}
147
+ setFieldValue(field, builder, x._2)
125
148
})
149
+
126
150
builder.build()
127
- }
128
- case _ => throw throw new IllegalArgumentException (s " redis connectType must be jedispool |sentinel|cluster $connectType" )
151
+
152
+ case _ => throw throw new IllegalArgumentException (s " redis connectType must be jedisPool |sentinel|cluster $connectType" )
129
153
}
130
154
}
131
155
@@ -142,7 +166,7 @@ class RedisSink(@(transient@param) ctx: StreamingContext,
142
166
}
143
167
144
168
145
- class RedisSinkFunction [T ](jedisConfig : FlinkJedisConfigBase , mapper : RedisMapper [T ], ttl : Int ) extends RSink [T ](jedisConfig, mapper) with Logger {
169
+ class RedisSinkFunction [T ](jedisConfig : FlinkJedisConfigBase , mapper : RedisMapper [T ], ttl : Int ) extends BahirRedisSink [T ](jedisConfig, mapper) with Logger {
146
170
147
171
private [this ] var redisContainer : RedisContainer = _
148
172
@@ -235,31 +259,29 @@ object RedisContainer extends Logger {
235
259
genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal)
236
260
genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle)
237
261
try {
238
- var redisContaineInnerr : RContainer = null
239
- if (jedisConfig.isInstanceOf [FlinkJedisPoolConfig ]) {
240
- val jedisPoolConfig = jedisConfig.asInstanceOf [FlinkJedisPoolConfig ]
241
- val jedisPool = new JedisPool (
242
- genericObjectPoolConfig,
243
- jedisPoolConfig.getHost,
244
- jedisPoolConfig.getPort,
245
- jedisPoolConfig.getConnectionTimeout,
246
- jedisPoolConfig.getPassword,
247
- jedisPoolConfig.getDatabase
248
- )
249
- redisContaineInnerr = new RContainer (jedisPool)
250
- }
251
- else {
252
- val jedissentinelconfig : FlinkJedisSentinelConfig = jedisConfig.asInstanceOf [FlinkJedisSentinelConfig ]
253
- val jedisSentinelPool : JedisSentinelPool = new JedisSentinelPool (jedissentinelconfig.getMasterName(),
254
- jedissentinelconfig.getSentinels(),
255
- genericObjectPoolConfig,
256
- jedissentinelconfig.getSoTimeout(),
257
- jedissentinelconfig.getPassword(),
258
- jedissentinelconfig.getDatabase()
259
- )
260
- redisContaineInnerr = new RContainer (jedisSentinelPool)
262
+ val bahirRedisContainer = jedisConfig match {
263
+ case jedisPoolConfig : FlinkJedisPoolConfig =>
264
+ val jedisPool = new JedisPool (
265
+ genericObjectPoolConfig,
266
+ jedisPoolConfig.getHost,
267
+ jedisPoolConfig.getPort,
268
+ jedisPoolConfig.getConnectionTimeout,
269
+ jedisPoolConfig.getPassword,
270
+ jedisPoolConfig.getDatabase
271
+ )
272
+ new BahirRedisContainer (jedisPool)
273
+ case _ =>
274
+ val jedisSentinelConfig = jedisConfig.asInstanceOf [FlinkJedisSentinelConfig ]
275
+ val jedisSentinelPool = new JedisSentinelPool (jedisSentinelConfig.getMasterName,
276
+ jedisSentinelConfig.getSentinels,
277
+ genericObjectPoolConfig,
278
+ jedisSentinelConfig.getSoTimeout,
279
+ jedisSentinelConfig.getPassword,
280
+ jedisSentinelConfig.getDatabase
281
+ )
282
+ new BahirRedisContainer (jedisSentinelPool)
261
283
}
262
- val redisContainer = new RedisContainer (redisContaineInnerr )
284
+ val redisContainer = new RedisContainer (bahirRedisContainer )
263
285
redisContainer.open()
264
286
redisContainer
265
287
} catch {
@@ -271,16 +293,16 @@ object RedisContainer extends Logger {
271
293
272
294
}
273
295
274
- class RedisContainer (reContainer : RContainer ) {
296
+ class RedisContainer (container : BahirRedisContainer ) {
275
297
276
298
def open (): Unit = {
277
- reContainer .open()
299
+ container .open()
278
300
}
279
301
280
302
lazy val jedis : Jedis = {
281
- val method = reContainer .getClass.getDeclaredMethod(" getInstance" )
303
+ val method = container .getClass.getDeclaredMethod(" getInstance" )
282
304
method.setAccessible(true )
283
- method.invoke(reContainer ).asInstanceOf [Jedis ]
305
+ method.invoke(container ).asInstanceOf [Jedis ]
284
306
}
285
307
286
308
def invoke [T ](mapper : RedisMapper [T ], input : T , transaction : Option [redis.clients.jedis.Transaction ]): Unit = {
@@ -289,39 +311,39 @@ class RedisContainer(reContainer: RContainer) {
289
311
mapper.getCommandDescription.getCommand match {
290
312
case RPUSH => transaction match {
291
313
case Some (t) => t.rpush(key, value)
292
- case _ => this .reContainer .rpush(key, value)
314
+ case _ => this .container .rpush(key, value)
293
315
}
294
316
case LPUSH => transaction match {
295
317
case Some (t) => t.lpush(key, value)
296
- case _ => this .reContainer .lpush(key, value)
318
+ case _ => this .container .lpush(key, value)
297
319
}
298
320
case SADD => transaction match {
299
321
case Some (t) => t.sadd(key, value)
300
- case _ => this .reContainer .sadd(key, value)
322
+ case _ => this .container .sadd(key, value)
301
323
}
302
324
case SET => transaction match {
303
325
case Some (t) => t.set(key, value)
304
- case _ => this .reContainer .set(key, value)
326
+ case _ => this .container .set(key, value)
305
327
}
306
328
case PFADD => transaction match {
307
329
case Some (t) => t.pfadd(key, value)
308
- case _ => this .reContainer .pfadd(key, value)
330
+ case _ => this .container .pfadd(key, value)
309
331
}
310
332
case PUBLISH => transaction match {
311
333
case Some (t) => t.publish(key, value)
312
- case _ => this .reContainer .publish(key, value)
334
+ case _ => this .container .publish(key, value)
313
335
}
314
336
case ZADD => transaction match {
315
337
case Some (t) => t.zadd(mapper.getCommandDescription.getAdditionalKey, value.toDouble, key)
316
- case _ => this .reContainer .zadd(mapper.getCommandDescription.getAdditionalKey, value, key)
338
+ case _ => this .container .zadd(mapper.getCommandDescription.getAdditionalKey, value, key)
317
339
}
318
340
case ZREM => transaction match {
319
341
case Some (t) => t.zrem(mapper.getCommandDescription.getAdditionalKey, key)
320
- case _ => this .reContainer .zrem(mapper.getCommandDescription.getAdditionalKey, key)
342
+ case _ => this .container .zrem(mapper.getCommandDescription.getAdditionalKey, key)
321
343
}
322
344
case HSET => transaction match {
323
345
case Some (t) => t.hset(mapper.getCommandDescription.getAdditionalKey, key, value)
324
- case _ => this .reContainer .hset(mapper.getCommandDescription.getAdditionalKey, key, value)
346
+ case _ => this .container .hset(mapper.getCommandDescription.getAdditionalKey, key, value)
325
347
}
326
348
case other => throw new IllegalArgumentException (" [StreamX] RedisSink:Cannot process such data type: " + other)
327
349
}
@@ -335,13 +357,13 @@ class RedisContainer(reContainer: RContainer) {
335
357
}
336
358
337
359
def close (): Unit = {
338
- reContainer .close()
360
+ container .close()
339
361
}
340
362
341
363
}
342
364
343
365
344
- case class RedisMapper [T ](cmd : RedisCommand , additionalKey : String , key : T => String , value : T => String ) extends RMapper [T ] {
366
+ case class RedisMapper [T ](cmd : RedisCommand , additionalKey : String , key : T => String , value : T => String ) extends BahirRedisMapper [T ] {
345
367
346
368
override def getCommandDescription : RedisCommandDescription = new RedisCommandDescription (cmd, additionalKey)
347
369
0 commit comments