Skip to content

Commit 319096c

Browse files
authored
Merge pull request #1 from linkedin/master
pull newest
2 parents b775aeb + d11e934 commit 319096c

File tree

26 files changed

+1138
-246
lines changed

26 files changed

+1138
-246
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
[![Join the chat at https://gitter.im/linkedin/databus](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/linkedin/databus?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
55

6-
In Internet architectures, data systems are typically categorized into source-of-truth systems that serve as primary stores for the user-generated writes, and derived data stores or indexes which serve reads and other complex queries. The data in these secondary stores is often derived from the primary data through custom transformations, sometimes involving complex processing driven by business logic. Similarly data in caching tiers is derived from reads against the primary data store, but needs to get invalidated or refreshed when the primary data gets mutated. A fundamental requirement emerging from these kinds of data architectures is the need to reliably capture, flow and process primary data changes.
6+
In Internet architectures, data systems are typically categorized into source-of-truth systems that serve as primary stores for the user-generated writes, and derived data stores or indexes which serve reads and other complex queries. The data in these secondary stores is often derived from the primary data through custom transformations, sometimes involving complex processing driven by business logic. Similarly, data in caching tiers is derived from reads against the primary data store, but needs to get invalidated or refreshed when the primary data gets mutated. A fundamental requirement emerging from these kinds of data architectures is the need to reliably capture, flow and process primary data changes.
77

88
We have built Databus, a source-agnostic distributed change data capture system, which is an integral part of LinkedIn's data processing pipeline. The Databus transport layer provides latencies in the low milliseconds and handles throughput of thousands of events per second per server while supporting infinite look back capabilities and rich subscription functionality.
99

databus-client/databus-client-http/src/main/java/com/linkedin/databus/client/netty/NettyHttpDatabusRelayConnection.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import java.io.InputStream;
2424
import java.nio.channels.Channels;
2525
import java.nio.channels.ClosedChannelException;
26+
import java.nio.charset.Charset;
2627
import java.util.Formatter;
2728
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031

32+
import org.apache.commons.io.IOUtils;
3133
import org.apache.log4j.Logger;
3234
import org.codehaus.jackson.map.ObjectMapper;
3335
import org.codehaus.jackson.type.TypeReference;
@@ -51,6 +53,7 @@
5153
import com.linkedin.databus.core.DbusPrettyLogUtils;
5254
import com.linkedin.databus.core.async.ActorMessageQueue;
5355
import com.linkedin.databus.core.data_model.PhysicalPartition;
56+
import com.linkedin.databus.core.util.CompressUtil;
5457
import com.linkedin.databus.core.util.IdNamePair;
5558
import com.linkedin.databus.core.util.Range;
5659
import com.linkedin.databus2.core.container.DatabusHttpHeaders;
@@ -67,6 +70,7 @@ public class NettyHttpDatabusRelayConnection
6770
{
6871
public static final String MODULE = NettyHttpDatabusRelayConnection.class.getName();
6972
public static final Logger LOG = Logger.getLogger(MODULE);
73+
public static final boolean needCompress = true;
7074

7175
private static enum State
7276
{
@@ -239,7 +243,7 @@ private String createRegisterUrl()
239243
uriString.append("&sources=")
240244
.append(_sourcesSubsList);
241245
}
242-
246+
uriString.append("&").append(DatabusHttpHeaders.PROTOCOL_COMPRESS_PARAM).append("=").append(needCompress);
243247
final String url = uriString.toString();
244248
return url;
245249
}
@@ -710,6 +714,19 @@ public void finishResponse() throws Exception
710714
else
711715
{
712716
InputStream bodyStream = Channels.newInputStream(_decorated);
717+
String bodyStr = IOUtils.toString(bodyStream,Charset.defaultCharset().name());
718+
IOUtils.closeQuietly(bodyStream);
719+
if (NettyHttpDatabusRelayConnection.needCompress)
720+
{
721+
try
722+
{
723+
bodyStr = CompressUtil.uncompress(bodyStr);
724+
}
725+
catch (Exception e)//failed because the steam may be not compressed
726+
{
727+
}
728+
}
729+
713730
ObjectMapper mapper = new ObjectMapper();
714731
int registerResponseVersion = 3; // either 2 or 3 would suffice here; we care only about 4
715732

@@ -734,7 +751,7 @@ public void finishResponse() throws Exception
734751
if (registerResponseVersion == 4) // DDSDBUS-2009
735752
{
736753
HashMap<String, List<Object>> responseMap =
737-
mapper.readValue(bodyStream, new TypeReference<HashMap<String, List<Object>>>() {});
754+
mapper.readValue(bodyStr, new TypeReference<HashMap<String, List<Object>>>() {});
738755

739756
// Look for mandatory SOURCE_SCHEMAS_KEY.
740757
Map<Long, List<RegisterResponseEntry>> sourcesSchemasMap = RegisterResponseEntry.createFromResponse(responseMap,
@@ -760,7 +777,7 @@ public void finishResponse() throws Exception
760777
else // version 2 or 3
761778
{
762779
List<RegisterResponseEntry> schemasList =
763-
mapper.readValue(bodyStream, new TypeReference<List<RegisterResponseEntry>>() {});
780+
mapper.readValue(bodyStr, new TypeReference<List<RegisterResponseEntry>>() {});
764781

765782
Map<Long, List<RegisterResponseEntry>> sourcesSchemasMap = RegisterResponseEntry.convertSchemaListToMap(schemasList);
766783

databus-core/databus-core-container/src/main/java/com/linkedin/databus2/core/container/DatabusHttpHeaders.java

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class DatabusHttpHeaders
5151

5252
/** protocol version param name for /register request */
5353
public static final String PROTOCOL_VERSION_PARAM = "protocolVersion";
54+
public static final String PROTOCOL_COMPRESS_PARAM = "compress";
5455

5556
/** max event version - max DbusEvent version client can understand */
5657
public static final String MAX_EVENT_VERSION = "maxev";

databus-core/databus-core-impl/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ dependencies {
1616
compile externalDependency.json
1717
compile externalDependency.log4j
1818
compile externalDependency.netty
19+
compile externalDependency.c3p0
20+
compile externalDependency.guava
1921

2022
testCompile externalDependency.testng
2123
testCompile externalDependency.easymock
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.linkedin.databus.core.util;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.IOException;
6+
import java.nio.charset.Charset;
7+
import java.util.zip.GZIPInputStream;
8+
import java.util.zip.GZIPOutputStream;
9+
10+
import com.google.common.io.BaseEncoding;
11+
12+
public class CompressUtil
13+
{
14+
public static String compress(String str) throws IOException
15+
{
16+
ByteArrayOutputStream out = new ByteArrayOutputStream();
17+
GZIPOutputStream gzip = new GZIPOutputStream(out);
18+
gzip.write(str.getBytes(Charset.defaultCharset()));
19+
gzip.close();
20+
return BaseEncoding.base64().encode(out.toByteArray());
21+
}
22+
23+
public static String uncompress(String str) throws IOException
24+
{
25+
byte[] encodeByteArr = BaseEncoding.base64().decode(str);
26+
ByteArrayOutputStream out = new ByteArrayOutputStream();
27+
ByteArrayInputStream in = new ByteArrayInputStream(encodeByteArr);
28+
GZIPInputStream gunzip = new GZIPInputStream(in);
29+
byte[] buffer = new byte[256];
30+
int n;
31+
while ((n = gunzip.read(buffer)) >= 0)
32+
{
33+
out.write(buffer, 0, n);
34+
}
35+
return out.toString(Charset.defaultCharset().name());
36+
}
37+
}

databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,23 @@ public class MaxSCNReaderWriterConfig implements ConfigBuilder<MaxSCNReaderWrite
2727

2828
private String _type;
2929
private FileMaxSCNHandler.Config _file;
30+
private MysqlMaxSCNHandler.Config _mysql;
3031
private MaxSCNReaderWriter _existing;
3132

3233
public MaxSCNReaderWriterConfig()
3334
{
3435
_type = MaxSCNReaderWriterStaticConfig.Type.FILE.toString();
3536
_existing = null;
3637
_file = new FileMaxSCNHandler.Config();
38+
_mysql = new MysqlMaxSCNHandler.Config();
39+
}
40+
41+
public MysqlMaxSCNHandler.Config getMysql() {
42+
return _mysql;
43+
}
44+
45+
public void setMysql(MysqlMaxSCNHandler.Config _mysql) {
46+
this._mysql = _mysql;
3747
}
3848

3949
public String getType()
@@ -84,7 +94,7 @@ public MaxSCNReaderWriterStaticConfig build() throws InvalidConfigException
8494
throw new InvalidConfigException("No existing max scn reader/writer specified ");
8595
}
8696

87-
return new MaxSCNReaderWriterStaticConfig(handlerType, _file.build(), _existing);
97+
return new MaxSCNReaderWriterStaticConfig(handlerType, _file.build(), _mysql.build(), _existing);
8898
}
8999

90100
}

databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterStaticConfig.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
*/
2020

2121

22-
import org.apache.log4j.Logger;
23-
2422
import com.linkedin.databus2.core.seq.FileMaxSCNHandler.StaticConfig;
23+
import org.apache.log4j.Logger;
2524

2625
/**
2726
* Static configuration for the SCN reader/writer
@@ -46,21 +45,25 @@ public enum Type
4645
DISABLED,
4746
FILE,
4847
EXISTING,
49-
IN_MEMORY
48+
IN_MEMORY,
49+
MYSQL
5050
}
5151

5252
private final Type _type;
5353
private final FileMaxSCNHandler.StaticConfig _file;
54+
private final MysqlMaxSCNHandler.StaticConfig _mysql;
5455
private final MaxSCNReaderWriter _existing;
5556

5657
public MaxSCNReaderWriterStaticConfig(Type type,
5758
StaticConfig file,
59+
MysqlMaxSCNHandler.StaticConfig mysql,
5860
MaxSCNReaderWriter existing)
5961
{
6062
super();
6163
_type = type;
6264
_file = file;
6365
_existing = existing;
66+
_mysql = mysql;
6467
}
6568

6669
/** Type of of the MaxSCN handler */
@@ -135,6 +138,21 @@ public SequenceNumberHandlerFactory createFactory()
135138
break;
136139
case IN_MEMORY: result = new InMemorySequenceNumberHandlerFactory(-1); break;
137140
case DISABLED: result = null; break;
141+
case MYSQL : {
142+
MysqlMaxSCNHandler.Config configBuilder = new MysqlMaxSCNHandler.Config();
143+
configBuilder.setJdbcUrl(_mysql.getJdbcUrl());
144+
configBuilder.setScnTable(_mysql.getScnTable());
145+
configBuilder.setDriverClass(_mysql.getDriverClass());
146+
configBuilder.setDbPassword(_mysql.getDbPassword());
147+
configBuilder.setDbUser(_mysql.getDbUser());
148+
configBuilder.setFlushItvl(_mysql.getFlushItvl());
149+
configBuilder.setInitVal(_mysql.getInitVal());
150+
configBuilder.setUpsertSCNQuery(_mysql.getUpsertSCNQuery());
151+
configBuilder.setGetSCNQuery(_mysql.getGetSCNQuery());
152+
configBuilder.setScnColumnName(_mysql.getScnColumnName());
153+
154+
result = new MysqlMaxSCNHandlerFactory(configBuilder);
155+
}break;
138156
default: throw new RuntimeException("unknown scn reader/writer type: " + _type.toString());
139157
}
140158

0 commit comments

Comments
 (0)