diff --git a/.gitignore b/.gitignore
index e611b44..31fa6a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,4 +12,6 @@
hs_err_pid*
.idea/*
-*.iml
\ No newline at end of file
+*.iml
+target
+target/*
diff --git a/pom.xml b/pom.xml
index b8aa95a..c2ca6c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,6 +8,11 @@
connect-mongodb
1.0
+
+ 0.10.0.0
+ 3.2.1
+
+
@@ -16,8 +21,8 @@
3.3
true
- 1.7
- 1.7
+ 1.8
+ 1.8
@@ -46,19 +51,25 @@
org.mongodb
mongodb-driver
- 3.2.1
+ ${mongodb.version}
org.apache.kafka
connect-api
- 0.9.0.0
- compile
+ ${kafka.version}
+ provided
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ provided
org.slf4j
slf4j-api
1.7.6
- compile
+ provided
junit
diff --git a/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java b/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java
index ad4176c..9a24beb 100644
--- a/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java
+++ b/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java
@@ -2,6 +2,7 @@
import com.mongodb.CursorType;
import com.mongodb.MongoClient;
+import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -14,7 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
/**
* Reads mutation from a mongodb database
@@ -23,8 +27,7 @@
*/
public class DatabaseReader implements Runnable {
Logger log = LoggerFactory.getLogger(DatabaseReader.class);
- private String host;
- private Integer port;
+ private String hosts;
private String db;
private String start;
@@ -33,9 +36,8 @@ public class DatabaseReader implements Runnable {
private MongoCollection oplog;
private Bson query;
- public DatabaseReader(String host, Integer port, String db, String start, ConcurrentLinkedQueue messages) {
- this.host = host;
- this.port = port;
+ public DatabaseReader(String hosts, String db, String start, ConcurrentLinkedQueue messages) {
+ this.hosts = hosts;
this.db = db;
this.start = start;
this.messages = messages;
@@ -81,7 +83,19 @@ private void init() {
* @return the oplog collection
*/
private MongoCollection readCollection() {
- MongoClient mongoClient = new MongoClient(host, port);
+ List addresses = Arrays.stream(hosts.split(",")).map(hostUrl -> {
+ try {
+ String[] hostAndPort = hostUrl.split(":");
+ String host = hostAndPort[0];
+ int port = Integer.parseInt(hostAndPort[1]);
+ return new ServerAddress(host, port);
+ } catch (ArrayIndexOutOfBoundsException aioobe) {
+ throw new ConnectException("hosts must be in host:port format");
+ } catch (NumberFormatException nfe) {
+ throw new ConnectException("port in the hosts field must be an integer");
+ }
+ }).collect(Collectors.toList());
+ MongoClient mongoClient = new MongoClient(addresses);
MongoDatabase db = mongoClient.getDatabase("local");
return db.getCollection("oplog.rs");
}
diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java
similarity index 72%
rename from src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java
rename to src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java
index 46d728c..435474d 100644
--- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java
+++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java
@@ -15,19 +15,17 @@
*
* @author Andrea Patelli
*/
-public class MongodbReader {
- private static final Logger log = LoggerFactory.getLogger(MongodbReader.class);
+public class MongoDBReader {
+ private static final Logger log = LoggerFactory.getLogger(MongoDBReader.class);
protected ConcurrentLinkedQueue messages;
private List dbs;
- private String host;
- private Integer port;
+ private String hosts;
private Map