diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index 32109ea0..f5990bab 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -389,7 +389,8 @@ private void startSource(String newTableName, long newTableId) if (null == srcId) { - throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + _currTableName + ")" ); + _log.info("Could not find a matching logical source for table Uri (" + _currTableName + ")"); + return; } assert(_transaction != null); _perSourceTransaction = new PerSourceTransaction(srcId); @@ -403,14 +404,7 @@ private void startSource(String newTableName, long newTableId) private void endSource() { - if (_perSourceTransaction != null) - { _perSourceTransaction = null; - } - else - { - throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()"); - } } private void deleteRows(DeleteRowsEventV2 dre) @@ -467,6 +461,11 @@ private void frameAvroRecord(BinlogEventV4Header bh, List rl, final DbusOpc final long scn = scn(_currFileNum, (int)bh.getPosition()); final boolean isReplicated = false; VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(_currTableName)); + + if(vs == null) { + // There's nothing to do for this table, we're not listening to it + return; + } Schema schema = vs.getSchema(); if ( _log.isDebugEnabled()) diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java index 91946d7c..d6696dbf 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java @@ -21,6 +21,10 @@ import java.lang.management.ManagementFactory; import java.net.URI; import java.net.URISyntaxException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -297,11 +301,60 @@ public synchronized void start(long sinceSCN) } } } + + // If we get to here and the SCN is still 0 or -1 + // go get the current position from the DB + if(sinceSCNToUse <= 0) { + sinceSCNToUse = getCurrentMaxSCN(); + } _producerThread = new EventProducerThread(_physicalSourceName, sinceSCNToUse); _producerThread.start(); } - public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor + /** + * Gets the current Max SCN from the DB. Used when there isn't one specified + * @return + */ + private long getCurrentMaxSCN() { + Connection connection = null; + try { + Class.forName("com.mysql.jdbc.Driver").newInstance(); + connection = DriverManager.getConnection("jdbc:mysql://"+ _or.getHost() + ":" + _or.getPort() + "/?" + + "user="+ _or.getUser() + "&password=" + _or.getPassword()); + ResultSet rs = connection.prepareStatement("show master status").executeQuery(); + String filename = null; + int position = 4; + if(rs.next()) + { + filename = rs.getString(1); + position = rs.getInt(2); + } + rs.close(); + int fileId = Integer.parseInt(filename.split("\\.")[1]); + return ORListener.scn(fileId, position); + + } + catch (Exception e) + { + _log.error("Unable to load mysql driver to get SCN:" + e.getMessage(), e); + return 0; + } + finally + { + if (connection != null) + { + try { + connection.close(); + } + catch (SQLException e) + { + _log.error("Unable to close mysql connection" + e.getMessage(), e); + } + } + } +} + +public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor { // The scn with which the event buffer is started private final AtomicLong _startPrevScn = new AtomicLong(-1); diff --git a/subprojects.gradle b/subprojects.gradle index a919b467..3fc7c10d 100644 --- a/subprojects.gradle +++ b/subprojects.gradle @@ -9,7 +9,7 @@ File getEnvironmentScript() apply from: environmentScript -project.version = "2.0.0" +project.version = "2.0.2-sovrn" @@ -53,7 +53,7 @@ ext.externalDependency = [ if (isDefaultEnvironment) { externalDependency['mysqlConnectorJava'] = 'mysql:mysql-connector-java:5.1.14' externalDependency['helixCore'] = 'org.apache.helix:helix-core:0.6.2-incubating' - externalDependency['or'] = 'com.google:open-replicator:1.0.5' + externalDependency['or'] = 'net.sovrn:open-replicator:1.0.6' externalDependency['log4j'] = 'org.slf4j:slf4j-log4j12:1.6.1' }