diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 7866d991..bd9468ab 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2012-2019 Nikita Koksharov - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,12 +21,23 @@ /** * broadcast interface - * */ public interface BroadcastOperations extends ClientOperations { Collection getClients(); + /** + * {@link Packet#attachments} needs to be filled when sending byte[]. + * Using {@link io.netty.buffer.Unpooled#wrappedBuffer(byte[])} to + * fill byte[] into {@link Packet#attachments} is the recommended way. + * Before using {@link Packet#addAttachment(io.netty.buffer.ByteBuf)}, + * be sure to initialize the number of attachments with + * {@link Packet#initAttachments(int)})} + * + * @param packet + * @param ackCallback + * @param + */ void send(Packet packet, BroadcastAckCallback ackCallback); void sendEvent(String name, SocketIOClient excludedClient, Object... data); diff --git a/src/main/java/com/corundumstudio/socketio/ClientOperations.java b/src/main/java/com/corundumstudio/socketio/ClientOperations.java index dc576608..79a9bb4b 100644 --- a/src/main/java/com/corundumstudio/socketio/ClientOperations.java +++ b/src/main/java/com/corundumstudio/socketio/ClientOperations.java @@ -27,6 +27,13 @@ public interface ClientOperations { * Send custom packet. * But {@link ClientOperations#sendEvent} method * usage is enough for most cases. + * If the Packet is sent by BroadcastOperations, + * {@link Packet#attachments} needs to be filled when sending byte[]. + * Using {@link io.netty.buffer.Unpooled#wrappedBuffer(byte[])} to + * fill byte[] into {@link Packet#attachments} is the recommended way. + * Before using {@link Packet#addAttachment(io.netty.buffer.ByteBuf)}, + * be sure to initialize the number of attachments with + * {@link Packet#initAttachments(int)})} * * @param packet - packet to send */ diff --git a/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java index 8957c5df..6d5f6c17 100644 --- a/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2012-2019 Nikita Koksharov - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,9 +22,14 @@ import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.DispatchMessage; import com.corundumstudio.socketio.store.pubsub.PubSubType; +import io.netty.buffer.Unpooled; +import org.springframework.lang.NonNull; +import org.springframework.util.CollectionUtils; import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; /** * Author: liangjiaqi @@ -80,12 +85,15 @@ public void disconnect() { } @Override - public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + public void sendEvent(String name, SocketIOClient excludedClient, @NonNull Object... data) { Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN); packet.setSubType(PacketType.EVENT); packet.setName(name); packet.setData(Arrays.asList(data)); + // handle byte[] data + handleBytes(packet, data); + for (SocketIOClient client : clients) { packet.setEngineIOVersion(client.getEngineIOVersion()); if (client.getSessionId().equals(excludedClient.getSessionId())) { @@ -97,14 +105,31 @@ public void sendEvent(String name, SocketIOClient excludedClient, Object... data } @Override - public void sendEvent(String name, Object... data) { + public void sendEvent(String name, @NonNull Object... data) { Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN); packet.setSubType(PacketType.EVENT); packet.setName(name); packet.setData(Arrays.asList(data)); + + // handle byte[] data + handleBytes(packet, data); + send(packet); } + private static void handleBytes(Packet packet, Object[] data) { + List bytes = Arrays.stream(data) + .filter(o -> o instanceof byte[]) + .map(b -> (byte[]) b) + .filter(b -> b.length > 0) + .collect(Collectors.toList()); + + if (!CollectionUtils.isEmpty(bytes)) { + packet.initAttachments(bytes.size()); + bytes.stream().peek(b -> packet.addAttachment(Unpooled.wrappedBuffer(b))); + } + } + @Override public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { for (SocketIOClient client : clients) { diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java index b26ad866..5a1322e0 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java @@ -259,7 +259,7 @@ private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext c for (ByteBuf buf : packet.getAttachments()) { ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc()); outBuf.writeByte(4); - outBuf.writeBytes(buf); + outBuf.writeBytes(buf, 0, buf.readableBytes()); if (log.isTraceEnabled()) { log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId()); } diff --git a/src/main/java/com/corundumstudio/socketio/protocol/Packet.java b/src/main/java/com/corundumstudio/socketio/protocol/Packet.java index dfc8a994..1609f5a9 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/Packet.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/Packet.java @@ -15,6 +15,7 @@ */ package com.corundumstudio.socketio.protocol; +import com.corundumstudio.socketio.namespace.Namespace; import io.netty.buffer.ByteBuf; import java.io.Serializable; @@ -22,8 +23,6 @@ import java.util.Collections; import java.util.List; -import com.corundumstudio.socketio.namespace.Namespace; - public class Packet implements Serializable { private static final long serialVersionUID = 4560159536486711426L; @@ -71,9 +70,9 @@ public void setData(Object data) { /** * Get packet data - * + * * @param the type data - * + * *

      * @return json object for PacketType.JSON type
      * message for PacketType.MESSAGE type
@@ -145,6 +144,16 @@ public void initAttachments(int attachmentsCount) {
         this.attachmentsCount = attachmentsCount;
         this.attachments = new ArrayList(attachmentsCount);
     }
+
+    /**
+     *
+     * It needs to be called when transferring the byte[].
+     * Recommended Use{@link io.netty.buffer.Unpooled#wrappedBuffer(byte[])}.
+     * Before using {@link #addAttachment(ByteBuf)},
+     * be sure to initialize the number of attachments with {@link #initAttachments(int)})}
+     *
+     * @param attachment
+     */
     public void addAttachment(ByteBuf attachment) {
         if (this.attachments.size() < attachmentsCount) {
             this.attachments.add(attachment);
diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java
index 73a3fa15..ce40a686 100644
--- a/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java
+++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java
@@ -15,7 +15,7 @@
  */
 package com.corundumstudio.socketio.protocol;
 
-import com.corundumstudio.socketio.handler.ClientHead;
+import com.corundumstudio.socketio.Configuration;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufOutputStream;
@@ -29,8 +29,6 @@
 import java.util.List;
 import java.util.Queue;
 
-import com.corundumstudio.socketio.Configuration;
-
 public class PacketEncoder {
 
     private static final byte[] BINARY_HEADER = "b4".getBytes(CharsetUtil.UTF_8);
@@ -254,18 +252,20 @@ public void encodePacket(Packet packet, ByteBuf buffer, ByteBufAllocator allocat
 
                     ByteBuf encBuf = null;
 
-                    if (packet.getSubType() == PacketType.ERROR) {
+                    PacketType subType = packet.getSubType();
+                    if (subType == PacketType.ERROR) {
                         encBuf = allocateBuffer(allocator);
 
                         ByteBufOutputStream out = new ByteBufOutputStream(encBuf);
                         jsonSupport.writeValue(out, packet.getData());
                     }
 
-                    if (packet.getSubType() == PacketType.EVENT
-                            || packet.getSubType() == PacketType.ACK) {
+                    PacketType tmpSubType = subType;
+                    if (subType == PacketType.EVENT
+                            || subType == PacketType.ACK) {
 
                         List values = new ArrayList();
-                        if (packet.getSubType() == PacketType.EVENT) {
+                        if (subType == PacketType.EVENT) {
                             values.add(packet.getName());
                         }
 
@@ -277,17 +277,22 @@ public void encodePacket(Packet packet, ByteBuf buffer, ByteBufAllocator allocat
                         jsonSupport.writeValue(out, values);
 
                         if (!jsonSupport.getArrays().isEmpty()) {
-                            packet.initAttachments(jsonSupport.getArrays().size());
-                            for (byte[] array : jsonSupport.getArrays()) {
-                                packet.addAttachment(Unpooled.wrappedBuffer(array));
+                            // If the Packet is sent by BroadcastOperations,
+                            // there is a problem of concurrent initialization for the same Packet.
+                            // Please initAttachment when creating the Packet to avoid this problem.
+                            if (!packet.hasAttachments()) {
+                                packet.initAttachments(jsonSupport.getArrays().size());
+                                for (byte[] array : jsonSupport.getArrays()) {
+                                    packet.addAttachment(Unpooled.wrappedBuffer(array));
+                                }
                             }
-                            packet.setSubType(packet.getSubType() == PacketType.ACK
+                            tmpSubType = (subType == PacketType.ACK
                                     ? PacketType.BINARY_ACK : PacketType.BINARY_EVENT);
                         }
                     }
 
-                    byte subType = toChar(packet.getSubType().getValue());
-                    buf.writeByte(subType);
+                    byte subTypeByte = toChar(tmpSubType.getValue());
+                    buf.writeByte(subTypeByte);
 
                     if (packet.hasAttachments()) {
                         byte[] ackId = toChars(packet.getAttachments().size());
@@ -295,7 +300,7 @@ public void encodePacket(Packet packet, ByteBuf buffer, ByteBufAllocator allocat
                         buf.writeByte('-');
                     }
 
-                    if (packet.getSubType() == PacketType.CONNECT) {
+                    if (subType == PacketType.CONNECT) {
                         if (!packet.getNsp().isEmpty()) {
                             buf.writeBytes(packet.getNsp().getBytes(CharsetUtil.UTF_8));
                         }