Skip to content

Refactor #943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions src/main/java/com/corundumstudio/socketio/SocketIOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,39 +143,59 @@ public void start() {
* @return void
*/
public Future<Void> startAsync() {
log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
logServerInfo();

initGroups();
ServerBootstrap serverBootstrap = setupServerBootstrap();

pipelineFactory.start(configCopy, namespacesHub);
InetSocketAddress addr = createInetSocketAddress();

return bindServerBootstrap(serverBootstrap, addr);
}

private void logServerInfo() {
log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
}

private ServerBootstrap setupServerBootstrap() {
ServerBootstrap b = new ServerBootstrap();
Class<? extends ServerChannel> channelClass = NioServerSocketChannel.class;
if (configCopy.isUseLinuxNativeEpoll()) {
channelClass = EpollServerSocketChannel.class;
}

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(channelClass)
.childHandler(pipelineFactory);
.channel(channelClass)
.childHandler(pipelineFactory);
applyConnectionOptions(b);
return b;
}

private InetSocketAddress createInetSocketAddress() {
InetSocketAddress addr = new InetSocketAddress(configCopy.getPort());
if (configCopy.getHostname() != null) {
addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort());
}
return addr;
}

return b.bind(addr).addListener(new FutureListener<Void>() {
private Future<Void> bindServerBootstrap(ServerBootstrap serverBootstrap, InetSocketAddress addr) {
return serverBootstrap.bind(addr).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
log.info("SocketIO server started at port: {}", configCopy.getPort());
} else {
log.error("SocketIO server start failed at port: {}!", configCopy.getPort());
}
handleBindResult(future);
}
});
}

private void handleBindResult(Future<Void> future) {
if (future.isSuccess()) {
log.info("SocketIO server started at port: {}", configCopy.getPort());
} else {
log.error("SocketIO server start failed at port: {}!", configCopy.getPort());
}
}


protected void applyConnectionOptions(ServerBootstrap bootstrap) {
SocketConfig config = configCopy.getSocketConfig();
bootstrap.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,45 +229,60 @@ private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext c
break;
}

ByteBuf out = encoder.allocateBuffer(ctx.alloc());
encoder.encodePacket(packet, out, ctx.alloc(), true);
handlePacket(msg, ctx, packet, writeFutureList);
handleAttachments(msg, ctx, packet, writeFutureList);
}
}

if (log.isTraceEnabled()) {
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
}
if (out.isReadable() && out.readableBytes() > configuration.getMaxFramePayloadLength()) {
ByteBuf dstStart = out.readSlice(FRAME_BUFFER_SIZE);
dstStart.retain();
WebSocketFrame start = new TextWebSocketFrame(false, 0, dstStart);
ctx.channel().write(start);
while (out.isReadable()) {
int re = Math.min(out.readableBytes(), FRAME_BUFFER_SIZE);
ByteBuf dst = out.readSlice(re);
dst.retain();
WebSocketFrame res = new ContinuationWebSocketFrame(!out.isReadable(), 0, dst);
ctx.channel().write(res);
}
out.release();
ctx.channel().flush();
} else if (out.isReadable()){
WebSocketFrame res = new TextWebSocketFrame(out);
ctx.channel().writeAndFlush(res);
} else {
out.release();
}
private void handlePacket(final OutPacketMessage msg, ChannelHandlerContext ctx, Packet packet, ChannelFutureList writeFutureList) throws IOException {
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
encoder.encodePacket(packet, out, ctx.alloc(), true);

for (ByteBuf buf : packet.getAttachments()) {
ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc());
outBuf.writeByte(4);
outBuf.writeBytes(buf);
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)));
if (log.isTraceEnabled()) {
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
}

if (out.isReadable() && out.readableBytes() > configuration.getMaxFramePayloadLength()) {
handleLargePayload(ctx, out);
} else if (out.isReadable()) {
WebSocketFrame res = new TextWebSocketFrame(out);
ctx.channel().writeAndFlush(res);
} else {
out.release();
}
}

private void handleLargePayload(ChannelHandlerContext ctx, ByteBuf out) {
ByteBuf dstStart = out.readSlice(FRAME_BUFFER_SIZE);
dstStart.retain();
WebSocketFrame start = new TextWebSocketFrame(false, 0, dstStart);
ctx.channel().write(start);

while (out.isReadable()) {
int re = Math.min(out.readableBytes(), FRAME_BUFFER_SIZE);
ByteBuf dst = out.readSlice(re);
dst.retain();
WebSocketFrame res = new ContinuationWebSocketFrame(!out.isReadable(), 0, dst);
ctx.channel().write(res);
}

out.release();
ctx.channel().flush();
}

private void handleAttachments(final OutPacketMessage msg, ChannelHandlerContext ctx, Packet packet, ChannelFutureList writeFutureList) {
for (ByteBuf buf : packet.getAttachments()) {
ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc());
outBuf.writeByte(4);
outBuf.writeBytes(buf);
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)));
}
}


private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
Channel channel = ctx.channel();
Attribute<Boolean> attr = channel.attr(WRITE_ONCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,25 @@ public ByteBuf allocateBuffer(ByteBufAllocator allocator) {

return allocator.heapBuffer();
}

/*alterdo */
public void encodeJsonP(Integer jsonpIndex, Queue<Packet> packets, ByteBuf out, ByteBufAllocator allocator, int limit) throws IOException {
boolean jsonpMode = jsonpIndex != null;

ByteBuf buf = buildPacketsBuffer(packets, allocator, limit);

if (jsonpMode) {
writeJsonPHeader(out, jsonpIndex);
}

processUtf8(buf, out, jsonpMode);
buf.release();

if (jsonpMode) {
writeJsonPEnd(out);
}
}

private ByteBuf buildPacketsBuffer(Queue<Packet> packets, ByteBufAllocator allocator, int limit) throws IOException {
ByteBuf buf = allocateBuffer(allocator);

int i = 0;
Expand All @@ -69,41 +84,52 @@ public void encodeJsonP(Integer jsonpIndex, Queue<Packet> packets, ByteBuf out,
break;
}

ByteBuf packetBuf = allocateBuffer(allocator);
encodePacket(packet, packetBuf, allocator, true);

int packetSize = packetBuf.writerIndex();
buf.writeBytes(toChars(packetSize));
buf.writeBytes(B64_DELIMITER);
buf.writeBytes(packetBuf);

ByteBuf packetBuf = buildPacketBuffer(packet, allocator);
appendPacketToBuffer(packetBuf, buf);
packetBuf.release();

i++;
appendAttachmentsToBuffer(packet, buf);

for (ByteBuf attachment : packet.getAttachments()) {
ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE);
buf.writeBytes(toChars(encodedBuf.readableBytes() + 2));
buf.writeBytes(B64_DELIMITER);
buf.writeBytes(BINARY_HEADER);
buf.writeBytes(encodedBuf);
}
i++;
}

if (jsonpMode) {
out.writeBytes(JSONP_HEAD);
out.writeBytes(toChars(jsonpIndex));
out.writeBytes(JSONP_START);
}
return buf;
}

processUtf8(buf, out, jsonpMode);
buf.release();
private ByteBuf buildPacketBuffer(Packet packet, ByteBufAllocator allocator) throws IOException {
ByteBuf packetBuf = allocateBuffer(allocator);
encodePacket(packet, packetBuf, allocator, true);
return packetBuf;
}

if (jsonpMode) {
out.writeBytes(JSONP_END);
private void appendPacketToBuffer(ByteBuf packetBuf, ByteBuf buf) {
int packetSize = packetBuf.writerIndex();
buf.writeBytes(toChars(packetSize));
buf.writeBytes(B64_DELIMITER);
buf.writeBytes(packetBuf);
}

private void appendAttachmentsToBuffer(Packet packet, ByteBuf buf) throws IOException {
for (ByteBuf attachment : packet.getAttachments()) {
ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE);
buf.writeBytes(toChars(encodedBuf.readableBytes() + 2));
buf.writeBytes(B64_DELIMITER);
buf.writeBytes(BINARY_HEADER);
buf.writeBytes(encodedBuf);
}
}

private void writeJsonPHeader(ByteBuf out, Integer jsonpIndex) {
out.writeBytes(JSONP_HEAD);
out.writeBytes(toChars(jsonpIndex));
out.writeBytes(JSONP_START);
}

private void writeJsonPEnd(ByteBuf out) {
out.writeBytes(JSONP_END);
}


private void processUtf8(ByteBuf in, ByteBuf out, boolean jsonpMode) {
while (in.isReadable()) {
short value = (short) (in.readByte() & 0xFF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,69 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
List<String> transport = queryDecoder.parameters().get("transport");

if (transport != null && NAME.equals(transport.get(0))) {
List<String> sid = queryDecoder.parameters().get("sid");
List<String> j = queryDecoder.parameters().get("j");
List<String> b64 = queryDecoder.parameters().get("b64");

String origin = req.headers().get(HttpHeaderNames.ORIGIN);
ctx.channel().attr(EncoderHandler.ORIGIN).set(origin);

String userAgent = req.headers().get(HttpHeaderNames.USER_AGENT);
ctx.channel().attr(EncoderHandler.USER_AGENT).set(userAgent);

if (j != null && j.get(0) != null) {
Integer index = Integer.valueOf(j.get(0));
ctx.channel().attr(EncoderHandler.JSONP_INDEX).set(index);
}
if (b64 != null && b64.get(0) != null) {
String flag = b64.get(0);
if ("true".equals(flag)) {
flag = "1";
} else if ("false".equals(flag)) {
flag = "0";
}
Integer enable = Integer.valueOf(flag);
ctx.channel().attr(EncoderHandler.B64).set(enable == 1);
}

try {
if (sid != null && sid.get(0) != null) {
final UUID sessionId = UUID.fromString(sid.get(0));
handleMessage(req, sessionId, queryDecoder, ctx);
} else {
// first connection
ClientHead client = ctx.channel().attr(ClientHead.CLIENT).get();
if (client != null) {
handleMessage(req, client.getSessionId(), queryDecoder, ctx);
}
}
} finally {
req.release();
}
handleTransportParameters(ctx, req, queryDecoder);
return;
}
}
ctx.fireChannelRead(msg);
}

private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecoder queryDecoder, ChannelHandlerContext ctx)
throws IOException {
private void handleTransportParameters(ChannelHandlerContext ctx, FullHttpRequest req, QueryStringDecoder queryDecoder) {
List<String> sid = queryDecoder.parameters().get("sid");
List<String> j = queryDecoder.parameters().get("j");
List<String> b64 = queryDecoder.parameters().get("b64");

String origin = req.headers().get(HttpHeaderNames.ORIGIN);
ctx.channel().attr(EncoderHandler.ORIGIN).set(origin);

String userAgent = req.headers().get(HttpHeaderNames.USER_AGENT);
ctx.channel().attr(EncoderHandler.USER_AGENT).set(userAgent);

handleJParameter(ctx, j);
handleB64Parameter(ctx, b64);

try {
handleSidParameter(ctx, req, sid, queryDecoder);
} finally {
req.release();
}
}

private void handleJParameter(ChannelHandlerContext ctx, List<String> j) {
if (j != null && j.get(0) != null) {
Integer index = Integer.valueOf(j.get(0));
ctx.channel().attr(EncoderHandler.JSONP_INDEX).set(index);
}
}

private void handleB64Parameter(ChannelHandlerContext ctx, List<String> b64) {
if (b64 != null && b64.get(0) != null) {
String flag = b64.get(0);
if ("true".equals(flag)) {
flag = "1";
} else if ("false".equals(flag)) {
flag = "0";
}
Integer enable = Integer.valueOf(flag);
ctx.channel().attr(EncoderHandler.B64).set(enable == 1);
}
}

private void handleSidParameter(ChannelHandlerContext ctx, FullHttpRequest req, List<String> sid, QueryStringDecoder queryDecoder) {
if (sid != null && sid.get(0) != null) {
final UUID sessionId = UUID.fromString(sid.get(0));
handleMessage(req, sessionId, queryDecoder, ctx);
} else {
// first connection
ClientHead client = ctx.channel().attr(ClientHead.CLIENT).get();
if (client != null) {
handleMessage(req, client.getSessionId(), queryDecoder, ctx);
}
}
}

private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecoder queryDecoder, ChannelHandlerContext ctx) {
try {
String origin = req.headers().get(HttpHeaderNames.ORIGIN);
if (queryDecoder.parameters().containsKey("disconnect")) {
ClientHead client = clientsBox.get(sessionId);
Expand All @@ -128,6 +144,10 @@ private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecod
log.error("Wrong {} method invocation for {}", req.method(), sessionId);
sendError(ctx);
}
} catch (IOException e) {
log.error("Exception handling message for session {}: {}", sessionId, e.getMessage(), e);
sendError(ctx);
}
}

private void onOptions(UUID sessionId, ChannelHandlerContext ctx, String origin) {
Expand Down