From ecb5e10e3d251d1fea8a6d4bd7b4c3cacc25495b Mon Sep 17 00:00:00 2001 From: Stefan Rossbach Date: Sun, 18 Aug 2019 03:03:14 +0200 Subject: [PATCH 1/2] [INTERNAL][CORE] Prepares Stream access This patch introduces an new class / interface which is just used for the ITransmitter and IReceiver. This is necessary to distinguish between plan stream connections and internal stuff. --- core/src/saros/net/IConnectionManager.java | 21 +- core/src/saros/net/IStreamConnection.java | 17 + .../saros/net/IStreamConnectionListener.java | 6 + .../net/internal/BinaryChannelConnection.java | 54 ++-- .../saros/net/internal/ConnectionPool.java | 18 +- .../net/internal/DataTransferManager.java | 294 +++++++++++------- .../net/internal/DefaultStreamConnection.java | 86 +++++ .../net/internal/IByteStreamConnection.java | 42 --- .../IByteStreamConnectionListener.java | 27 -- core/src/saros/net/internal/IConnection.java | 16 + .../internal/IConnectionClosedCallback.java | 6 + .../saros/net/internal/IPacketConnection.java | 22 ++ .../internal/IPacketConnectionListener.java | 13 + core/src/saros/net/internal/XMPPReceiver.java | 9 +- .../saros/net/internal/XMPPTransmitter.java | 12 +- core/src/saros/net/stream/ByteStream.java | 8 + .../saros/net/stream/IBBStreamService.java | 54 +--- core/src/saros/net/stream/IStreamService.java | 14 +- .../net/stream/IStreamServiceListener.java | 7 + .../saros/net/stream/Socks5StreamService.java | 95 ++---- core/src/saros/net/stream/TCPByteStream.java | 24 +- core/src/saros/net/stream/TCPTransport.java | 41 +-- .../net/stream/XMPPByteStreamAdapter.java | 36 ++- .../internal/BinaryChannelConnectionTest.java | 104 +++---- .../net/internal/ConnectionPoolTest.java | 12 +- .../net/internal/DataTransferManagerTest.java | 253 +++++++++------ .../impl/NetworkManipulatorImpl.java | 9 +- 27 files changed, 752 insertions(+), 548 deletions(-) create mode 100644 core/src/saros/net/IStreamConnection.java create mode 100644 core/src/saros/net/IStreamConnectionListener.java create mode 100644 core/src/saros/net/internal/DefaultStreamConnection.java delete mode 100644 core/src/saros/net/internal/IByteStreamConnection.java delete mode 100644 core/src/saros/net/internal/IByteStreamConnectionListener.java create mode 100644 core/src/saros/net/internal/IConnection.java create mode 100644 core/src/saros/net/internal/IConnectionClosedCallback.java create mode 100644 core/src/saros/net/internal/IPacketConnection.java create mode 100644 core/src/saros/net/internal/IPacketConnectionListener.java create mode 100644 core/src/saros/net/stream/IStreamServiceListener.java diff --git a/core/src/saros/net/IConnectionManager.java b/core/src/saros/net/IConnectionManager.java index bdf4539eb1..310c88666c 100644 --- a/core/src/saros/net/IConnectionManager.java +++ b/core/src/saros/net/IConnectionManager.java @@ -1,7 +1,7 @@ package saros.net; import java.io.IOException; -import saros.net.internal.IByteStreamConnection; +import saros.net.internal.IConnection; import saros.net.stream.StreamMode; import saros.net.xmpp.JID; @@ -21,24 +21,29 @@ public interface IConnectionManager { */ public void setServices(int serviceMask); + public void addStreamConnectionListener(final IStreamConnectionListener listener); + + public void removeStreamConnectionListener(final IStreamConnectionListener listener); + + public IStreamConnection connectStream(String id, Object address) throws IOException; /** @deprecated */ @Deprecated - public IByteStreamConnection connect(JID peer) throws IOException; + public IConnection connect(Object address) throws IOException; - public IByteStreamConnection connect(String connectionID, JID peer) throws IOException; + public IConnection connect(String connectionID, Object address) throws IOException; /** - * @deprecated Disconnects {@link IByteStreamConnection} with the specified peer - * @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection} + * @deprecated Disconnects with the specified address + * @param address */ @Deprecated - public boolean closeConnection(JID peer); + public boolean closeConnection(Object address); - public boolean closeConnection(String connectionIdentifier, JID peer); + public boolean closeConnection(String connectionIdentifier, Object address); /** @deprecated */ @Deprecated public StreamMode getTransferMode(JID jid); - public StreamMode getTransferMode(String connectionID, JID jid); + public StreamMode getTransferMode(String connectionID, Object address); } diff --git a/core/src/saros/net/IStreamConnection.java b/core/src/saros/net/IStreamConnection.java new file mode 100644 index 0000000000..523a03ffeb --- /dev/null +++ b/core/src/saros/net/IStreamConnection.java @@ -0,0 +1,17 @@ +package saros.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import saros.net.internal.IConnection; + +public interface IStreamConnection extends IConnection { + + public InputStream getInputStream() throws IOException; + + public OutputStream getOutputStream() throws IOException; + + public int getReadTimeout() throws IOException; + + public void setReadTimeout(int timeout) throws IOException; +} diff --git a/core/src/saros/net/IStreamConnectionListener.java b/core/src/saros/net/IStreamConnectionListener.java new file mode 100644 index 0000000000..b648ce54a6 --- /dev/null +++ b/core/src/saros/net/IStreamConnectionListener.java @@ -0,0 +1,6 @@ +package saros.net; + +public interface IStreamConnectionListener { + + public boolean streamConnectionEstablished(String id, IStreamConnection connection); +} diff --git a/core/src/saros/net/internal/BinaryChannelConnection.java b/core/src/saros/net/internal/BinaryChannelConnection.java index 8603893b8e..41df7f2467 100644 --- a/core/src/saros/net/internal/BinaryChannelConnection.java +++ b/core/src/saros/net/internal/BinaryChannelConnection.java @@ -30,7 +30,7 @@ * @author coezbek * @author srossbach */ -public class BinaryChannelConnection implements IByteStreamConnection { +public class BinaryChannelConnection implements IPacketConnection { private static final Logger LOG = Logger.getLogger(BinaryChannelConnection.class); @@ -49,7 +49,6 @@ private static class Opcode { /** Max size of data chunks */ private static final int CHUNKSIZE = 32 * 1024 - 1; - private IByteStreamConnectionListener listener; private ReceiverThread receiveThread; private final JID remoteAddress; @@ -57,6 +56,8 @@ private static class Opcode { private final String connectionID; + private final IConnectionClosedCallback callback; + private IDPool idPool = new IDPool(); private boolean connected; @@ -64,6 +65,7 @@ private static class Opcode { private Map pendingFragmentedPackets = new HashMap(); + private Map pendingXMPPExtensions = new HashMap(); @@ -111,24 +113,16 @@ public void run() { private IBinaryXMPPExtensionReceiver receiver; - public BinaryChannelConnection( - JID localAddress, - JID remoteAddress, - String connectionID, - ByteStream stream, - StreamMode mode, - IByteStreamConnectionListener listener) - throws IOException { - this.listener = listener; - this.localAddress = localAddress; - this.remoteAddress = remoteAddress; - this.connectionID = connectionID; - this.stream = stream; - this.stream.setReadTimeout(0); // keep connection alive - this.mode = mode; + public BinaryChannelConnection(ByteStream stream, IConnectionClosedCallback callback) { + this.callback = callback; + // FIXME + this.localAddress = (JID) stream.getLocalAddress(); + // FIXME + this.remoteAddress = (JID) stream.getRemoteAddress(); - outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream())); - inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream())); + this.connectionID = stream.getId(); + this.mode = stream.getMode(); + this.stream = stream; } @Override @@ -138,14 +132,17 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver this.receiver = receiver; } - @Override - public synchronized void initialize() { + public synchronized void initialize() throws IOException { if (initialized) return; /* * it is ok to start the receiver a bit later because the data will be * already buffered by SMACK or the OS */ + stream.setReadTimeout(0); // keep connection alive + outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream())); + inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream())); + receiveThread = new ReceiverThread(); receiveThread.setName("BinaryChannel-" + remoteAddress.getName()); receiveThread.start(); @@ -154,13 +151,13 @@ public synchronized void initialize() { } @Override - public String getConnectionID() { - return connectionID; + public Object getLocalAddress() { + return stream.getRemoteAddress(); } @Override - public synchronized boolean isConnected() { - return connected; + public String getId() { + return connectionID; } @Override @@ -192,10 +189,9 @@ public void close() { } } - listener.connectionClosed(connectionID, this); + if (callback != null) callback.connectionClosed(this); } - @Override public StreamMode getMode() { return mode; } @@ -436,6 +432,10 @@ private BinaryXMPPExtension readNextXMPPExtension() throws IOException { throw new InterruptedIOException("interrupted while reading stream data"); } + private synchronized boolean isConnected() { + return connected; + } + private synchronized void sendData(int fragmentId, byte[] data, int offset, int length) throws IOException { diff --git a/core/src/saros/net/internal/ConnectionPool.java b/core/src/saros/net/internal/ConnectionPool.java index 5fe31da615..28d76b649a 100644 --- a/core/src/saros/net/internal/ConnectionPool.java +++ b/core/src/saros/net/internal/ConnectionPool.java @@ -15,8 +15,7 @@ final class ConnectionPool { private boolean isOpen; - private final Map pool = - new HashMap(); + private final Map pool = new HashMap<>(); /** * Opens the connection pool. After the connection pool is opened connections can be added, @@ -31,19 +30,19 @@ public synchronized void open() { */ public void close() { - final Map currentPoolCopy; + final Map currentPoolCopy; synchronized (this) { if (!isOpen) return; isOpen = false; - currentPoolCopy = new HashMap(pool); + currentPoolCopy = new HashMap<>(pool); pool.clear(); } - for (Entry entry : currentPoolCopy.entrySet()) { + for (Entry entry : currentPoolCopy.entrySet()) { final String id = entry.getKey(); - final IByteStreamConnection connection = entry.getValue(); + final IConnection connection = entry.getValue(); connection.close(); LOG.debug("closed connection [id=" + id + "]: " + connection); @@ -57,7 +56,7 @@ public void close() { * @return the connection associated with the id or null if no such connection exists * or the pool is closed */ - public synchronized IByteStreamConnection get(final String id) { + public synchronized IConnection get(final String id) { return pool.get(id); } @@ -70,8 +69,7 @@ public synchronized IByteStreamConnection get(final String id) { * that was already added with the given id or null if no connection was added * with the given id */ - public synchronized IByteStreamConnection add( - final String id, final IByteStreamConnection connection) { + public synchronized IConnection add(final String id, final IConnection connection) { if (!isOpen) return connection; @@ -85,7 +83,7 @@ public synchronized IByteStreamConnection add( * @return the connection associated with the id or null if no such connection exists * or the pool is closed */ - public synchronized IByteStreamConnection remove(final String id) { + public synchronized IConnection remove(final String id) { if (!isOpen) return null; return pool.remove(id); diff --git a/core/src/saros/net/internal/DataTransferManager.java b/core/src/saros/net/internal/DataTransferManager.java index 9b115ec4f2..ca285dbecd 100644 --- a/core/src/saros/net/internal/DataTransferManager.java +++ b/core/src/saros/net/internal/DataTransferManager.java @@ -6,6 +6,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -18,7 +19,11 @@ import saros.context.IContextKeyBindings.Socks5StreamService; import saros.net.ConnectionState; import saros.net.IConnectionManager; +import saros.net.IStreamConnection; +import saros.net.IStreamConnectionListener; +import saros.net.stream.ByteStream; import saros.net.stream.IStreamService; +import saros.net.stream.IStreamServiceListener; import saros.net.stream.StreamMode; import saros.net.xmpp.IConnectionListener; import saros.net.xmpp.JID; @@ -28,17 +33,16 @@ /** * This class is responsible for handling all transfers of binary data. It maintains a map of * established connections and tries to reuse them. - * - * @author srossbach - * @author coezbek - * @author jurke */ @Component(module = "net") public class DataTransferManager implements IConnectionListener, IConnectionManager { private static final Logger LOG = Logger.getLogger(DataTransferManager.class); - private static final String DEFAULT_CONNECTION_ID = "default"; + // package private for testing purposes + static final String DEFAULT_CONNECTION_ID = "default"; + + private static final String STREAM_SUFFIX = "-stream"; private static final String IN = "in"; @@ -62,78 +66,33 @@ public class DataTransferManager implements IConnectionListener, IConnectionMana private final List streamServices = new CopyOnWriteArrayList(); - private final CopyOnWriteArrayList connectionListeners = + private final CopyOnWriteArrayList packetConnectionListeners = + new CopyOnWriteArrayList<>(); + + private final CopyOnWriteArrayList streamConnectionListeners = new CopyOnWriteArrayList<>(); - private final IByteStreamConnectionListener byteStreamConnectionListener = - new IByteStreamConnectionListener() { + private final IStreamServiceListener streamServiceListener = + new IStreamServiceListener() { @Override - public void connectionChanged( - final String connectionId, - final IByteStreamConnection connection, - final boolean incomingRequest) { - - // FIXME init first, than add to pool and finally start the receiver - // thread ! - - final String id = - toConnectionIDToken( - connectionId, incomingRequest ? IN : OUT, connection.getRemoteAddress()); - - /// TODO we currently have to announce not initialized connections otherwise the IReceiver - // will miss updates - - notfiyconnectionChanged(id, connection, incomingRequest); - - LOG.debug( - "bytestream connection changed " - + connection - + ", inc=" - + incomingRequest - + ", pool id=" - + id - + "]"); - - /* - * this may return the current connection if the pool is closed so - * close it anyway - */ - final IByteStreamConnection current = connectionPool.add(id, connection); - - if (current != null) { - current.close(); - if (current == connection) { - LOG.warn( - "closed connection [pool id=" - + id - + "]: " - + current - + " , no connections are currently allowed"); - - return; + public void connectionEstablished(final ByteStream byteStream) { + + try { + if (byteStream.getId().endsWith(STREAM_SUFFIX)) { + createAndAnnounceStreamConnection(byteStream, true); } else { - LOG.warn( - "existing connection [pool id=" - + id - + "] " - + current - + " was replaced with connection " - + connection); + createAndAnnouncePacketConnection(byteStream, true); } + } catch (IOException e) { + LOG.error("failed to accept byte stream: " + byteStream, e); } - - connection.initialize(); - } - - @Override - public void connectionClosed( - final String connectionId, final IByteStreamConnection connection) { - closeConnection(connectionId, connection.getRemoteAddress()); - notfiyConnectionClosed(connectionId, connection); } }; + private final IConnectionClosedCallback connectionClosedCallback = + (connection) -> closeConnection(connection.getId(), connection.getRemoteAddress()); + public DataTransferManager( XMPPConnectionService connectionService, @Nullable @Socks5StreamService IStreamService mainService, @@ -149,42 +108,50 @@ public DataTransferManager( /** @deprecated */ @Override @Deprecated - public IByteStreamConnection connect(JID peer) throws IOException { - return connect(DEFAULT_CONNECTION_ID, peer); + public IConnection connect(Object address) throws IOException { + return connect(DEFAULT_CONNECTION_ID, address); + } + + @Override + public IStreamConnection connectStream(final String id, Object address) throws IOException { + Objects.requireNonNull(id, "id is null"); + Objects.requireNonNull(address, "address is null"); + + final JID jid = (JID) address; + + if (jid.isBareJID()) throw new IllegalStateException("cannot connect to a bare JID: " + jid); + + return (IStreamConnection) connectInternal(id + STREAM_SUFFIX, jid); } @Override - public IByteStreamConnection connect(String connectionID, JID peer) throws IOException { + public IConnection connect(String connectionID, Object address) throws IOException { if (connectionID == null) throw new NullPointerException("connectionID is null"); - if (peer == null) throw new NullPointerException("peer is null"); + if (address == null) throw new NullPointerException("peer is null"); - return connectInternal(connectionID, peer); + return connectInternal(connectionID, (JID) address); } - public IByteStreamConnection getConnection(final String connectionId, final JID peer) { - return getCurrentConnection(connectionId, peer); + public IConnection getConnection(final String connectionId, final Object address) { + return getCurrentConnection(connectionId, address); } - /** - * @deprecated Disconnects {@link IByteStreamConnection} with the specified peer - * @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection} - */ @Override @Deprecated - public boolean closeConnection(JID peer) { - return closeConnection(DEFAULT_CONNECTION_ID, peer); + public boolean closeConnection(Object address) { + return closeConnection(DEFAULT_CONNECTION_ID, address); } @Override - public boolean closeConnection(String connectionIdentifier, JID peer) { + public boolean closeConnection(String connectionIdentifier, Object address) { - final String outID = toConnectionIDToken(connectionIdentifier, OUT, peer); + final String outID = toConnectionIDToken(connectionIdentifier, OUT, address); - final String inID = toConnectionIDToken(connectionIdentifier, IN, peer); + final String inID = toConnectionIDToken(connectionIdentifier, IN, address); - final IByteStreamConnection out = connectionPool.remove(outID); - final IByteStreamConnection in = connectionPool.remove(inID); + final IConnection out = connectionPool.remove(outID); + final IConnection in = connectionPool.remove(inID); boolean closed = false; @@ -221,22 +188,32 @@ public StreamMode getTransferMode(JID jid) { } @Override - public StreamMode getTransferMode(String connectionID, JID jid) { - IByteStreamConnection connection = getCurrentConnection(connectionID, jid); + public StreamMode getTransferMode(String connectionID, Object address) { + IConnection connection = getCurrentConnection(connectionID, address); return connection == null ? StreamMode.NONE : connection.getMode(); } - public void addConnectionListener(final IByteStreamConnectionListener listener) { - connectionListeners.addIfAbsent(listener); + public void addPacketConnectionListener(final IPacketConnectionListener listener) { + packetConnectionListeners.addIfAbsent(listener); + } + + public void removePacketConnectionListener(final IPacketConnectionListener listener) { + packetConnectionListeners.remove(listener); + } + + @Override + public void addStreamConnectionListener(final IStreamConnectionListener listener) { + streamConnectionListeners.addIfAbsent(listener); } - public void removeConnectionListener(final IByteStreamConnectionListener listener) { - connectionListeners.remove(listener); + @Override + public void removeStreamConnectionListener(final IStreamConnectionListener listener) { + streamConnectionListeners.remove(listener); } - private IByteStreamConnection connectInternal(String connectionID, JID peer) throws IOException { + private IConnection connectInternal(String connectionID, JID peer) throws IOException { - IByteStreamConnection connection = null; + IConnection connection = null; final String connectionIDToken = toConnectionIDToken(connectionID, OUT, peer); @@ -265,6 +242,8 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr final ArrayList currentStreamServices = new ArrayList(streamServices); + ByteStream byteStream = null; + for (IStreamService streamService : currentStreamServices) { LOG.info( "establishing connection to " @@ -274,7 +253,7 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr + " using stream service " + streamService); try { - connection = streamService.connect(connectionID, peer); + byteStream = streamService.connect(connectionID, peer); break; } catch (IOException e) { LOG.warn("failed to connect to " + peer + " using stream service: " + streamService, e); @@ -298,10 +277,10 @@ private IByteStreamConnection connectInternal(String connectionID, JID peer) thr } } - if (connection != null) { - byteStreamConnectionListener.connectionChanged(connectionID, connection, false); - - return connection; + if (byteStream != null) { + if (byteStream.getId().endsWith(STREAM_SUFFIX)) + return createAndAnnounceStreamConnection(byteStream, false); + else return createAndAnnouncePacketConnection(byteStream, false); } throw new IOException( @@ -347,7 +326,7 @@ private void prepareConnection(final Connection connection) { connectionPool.open(); for (IStreamService streamService : streamServices) - streamService.initialize(xmppConnection, byteStreamConnectionListener); + streamService.initialize(xmppConnection, streamServiceListener); } private void disposeConnection() { @@ -385,50 +364,129 @@ public void connectionStateChanged(Connection connection, ConnectionState newSta * * @param connectionID identifier for the connection to retrieve or null to retrieve * the default one - * @param jid JID of the remote side + * @param address address of the remote side * @return the connection to the remote side or null if no connection exists */ - private IByteStreamConnection getCurrentConnection(String connectionID, JID jid) { + private IConnection getCurrentConnection(String connectionID, Object address) { - IByteStreamConnection connection; + IConnection connection; - connection = connectionPool.get(toConnectionIDToken(connectionID, OUT, jid)); + connection = connectionPool.get(toConnectionIDToken(connectionID, OUT, address)); if (connection != null) return connection; - return connectionPool.get(toConnectionIDToken(connectionID, IN, jid)); + return connectionPool.get(toConnectionIDToken(connectionID, IN, address)); } - private static String toConnectionIDToken(String connectionIdentifier, String mode, JID jid) { + private static String toConnectionIDToken( + String connectionIdentifier, String mode, Object address) { if (connectionIdentifier == null) connectionIdentifier = DEFAULT_CONNECTION_ID; - return connectionIdentifier + ":" + mode + ":" + jid.toString(); + return connectionIdentifier + ":" + mode + ":" + address; } - private void notfiyConnectionClosed( - final String connectionId, final IByteStreamConnection connection) { + private IConnection createAndAnnouncePacketConnection( + final ByteStream byteStream, final boolean isIncoming) throws IOException { + final BinaryChannelConnection connection = + new BinaryChannelConnection(byteStream, connectionClosedCallback); + + // FIXME init first, than add to pool and finally start the receiver + // thread ! + + addConnectionToPool(connection, isIncoming); - for (final IByteStreamConnectionListener listener : connectionListeners) { + for (final IPacketConnectionListener listener : packetConnectionListeners) { try { - listener.connectionClosed(connectionId, connection); + listener.connectionEstablished(connection); } catch (RuntimeException e) { - LOG.error("invoking connectionClosed() on listener: " + listener + " failed", e); + LOG.error("invoking connectionEstablished() on listener: " + listener + " failed", e); } } + + try { + connection.initialize(); + } catch (IOException e) { + LOG.error("failed to initialize connection [inc=" + isIncoming + "] : " + connection); + connection.close(); + connectionPool.remove(connection.getId()); + + throw e; + } + + return connection; } - private void notfiyconnectionChanged( - final String connectionId, - final IByteStreamConnection connection, - final boolean incomingRequest) { + private IConnection createAndAnnounceStreamConnection( + final ByteStream byteStream, final boolean isIncoming) throws IOException { + + final DefaultStreamConnection connection = + new DefaultStreamConnection(byteStream, connectionClosedCallback); + + addConnectionToPool(connection, isIncoming); + + if (!isIncoming) return connection; + + boolean accepted = false; - for (final IByteStreamConnectionListener listener : connectionListeners) { + final String connectionId = + connection.getId().substring(0, connection.getId().length() - STREAM_SUFFIX.length()); + + for (final IStreamConnectionListener listener : streamConnectionListeners) { try { - listener.connectionChanged(connectionId, connection, incomingRequest); + accepted |= listener.streamConnectionEstablished(connectionId, connection); } catch (RuntimeException e) { - LOG.error("invoking connectionChanged() on listener: " + listener + " failed", e); + LOG.error("invoking streamConnectionEstablished() on listener: " + listener + " failed", e); } } + + if (!accepted) closeConnection(connection.getId(), connection.getRemoteAddress()); + + throw new IOException("no listener accepted the connection: " + connection); + } + + private IConnection addConnectionToPool(final IConnection connection, final boolean isIncoming) + throws IOException { + + final String id = + toConnectionIDToken( + connection.getId(), isIncoming ? IN : OUT, connection.getRemoteAddress()); + + LOG.debug( + "bytestream connection changed " + + connection + + ", inc=" + + isIncoming + + ", pool id=" + + id + + "]"); + + /* + * this may return the current connection if the pool is closed so + * close it anyway + */ + final IConnection current = connectionPool.add(id, connection); + + if (current != null) { + current.close(); + if (current == connection) { + throw new IOException( + "closed connection [pool id=" + + id + + "]: " + + current + + " , no connections are currently allowed"); + } else { + LOG.warn( + "existing connection [pool id=" + + id + + "] " + + current + + " was replaced with connection " + + connection); + } + } + + return connection; } } diff --git a/core/src/saros/net/internal/DefaultStreamConnection.java b/core/src/saros/net/internal/DefaultStreamConnection.java new file mode 100644 index 0000000000..8200ac1822 --- /dev/null +++ b/core/src/saros/net/internal/DefaultStreamConnection.java @@ -0,0 +1,86 @@ +package saros.net.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.log4j.Logger; +import saros.net.IStreamConnection; +import saros.net.stream.ByteStream; +import saros.net.stream.StreamMode; + +public class DefaultStreamConnection implements IStreamConnection { + + private static final Logger log = Logger.getLogger(DefaultStreamConnection.class); + + private final ByteStream byteStream; + private final IConnectionClosedCallback callback; + + private boolean isClosed; + + DefaultStreamConnection(final ByteStream byteStream, final IConnectionClosedCallback callback) { + this.byteStream = byteStream; + this.callback = callback; + } + + @Override + public Object getLocalAddress() { + return byteStream.getLocalAddress(); + } + + @Override + public Object getRemoteAddress() { + return byteStream.getRemoteAddress(); + } + + @Override + public StreamMode getMode() { + return byteStream.getMode(); + } + + @Override + public String getId() { + return byteStream.getId(); + } + + @Override + public void close() { + synchronized (this) { + if (isClosed) return; + + isClosed = true; + } + + try { + byteStream.close(); + } catch (IOException e) { + log.error("failed to close connection: " + this, e); + } finally { + if (callback != null) callback.connectionClosed(this); + } + } + + @Override + public InputStream getInputStream() throws IOException { + return byteStream.getInputStream(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return byteStream.getOutputStream(); + } + + @Override + public int getReadTimeout() throws IOException { + return byteStream.getReadTimeout(); + } + + @Override + public void setReadTimeout(int timeout) throws IOException { + byteStream.setReadTimeout(timeout); + } + + @Override + protected void finalize() { + close(); + } +} diff --git a/core/src/saros/net/internal/IByteStreamConnection.java b/core/src/saros/net/internal/IByteStreamConnection.java deleted file mode 100644 index 67b57ea6fe..0000000000 --- a/core/src/saros/net/internal/IByteStreamConnection.java +++ /dev/null @@ -1,42 +0,0 @@ -package saros.net.internal; - -import java.io.IOException; -import saros.net.stream.StreamMode; -import saros.net.xmpp.JID; - -/** A IByteStreamConnection is responsible for sending data to a particular user */ -public interface IByteStreamConnection { - - public JID getRemoteAddress(); - - public void close(); - - public boolean isConnected(); - - /** - * Initializes the byte stream connection. After the initialization is performed the byte stream - * connection must be able to send and receive data. - */ - public void initialize(); - - /** - * If this call returns the data has been send successfully, otherwise an IOException is thrown - * with the reason why the transfer failed. - * - * @param data The data to be sent. - * @throws IOException if the send failed - * @blocking Send the given data as a blocking operation. - */ - public void send(TransferDescription data, byte[] content) throws IOException; - - /** - * Returns the connection id of this connection. - * - * @return the connection id or null if the connection has no id - */ - public String getConnectionID(); - - public StreamMode getMode(); - - public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver); -} diff --git a/core/src/saros/net/internal/IByteStreamConnectionListener.java b/core/src/saros/net/internal/IByteStreamConnectionListener.java deleted file mode 100644 index 01bed89a58..0000000000 --- a/core/src/saros/net/internal/IByteStreamConnectionListener.java +++ /dev/null @@ -1,27 +0,0 @@ -package saros.net.internal; - -/** - * Listener interface used by IStreamService and IBytestreamConnection to notify about established - * or changed connections and incoming XMPP extensions. - */ -public interface IByteStreamConnectionListener { - - public default void connectionClosed(String connectionID, IByteStreamConnection connection) { - // NOP; - } - - /** - * Gets called when a connection change is detected. The {@linkplain IByteStreamConnection - * connection} must be initialized first by calling {@link IByteStreamConnection#initialize()} to - * be able to receive and send data. - * - * @param connectionID the id of the connection - * @param connection - * @param incomingRequest true if the connection was a result of a remote connect - * request, false if the connect request was initiated on the local side - */ - public default void connectionChanged( - String connectionID, IByteStreamConnection connection, boolean incomingRequest) { - // NOP; - } -} diff --git a/core/src/saros/net/internal/IConnection.java b/core/src/saros/net/internal/IConnection.java new file mode 100644 index 0000000000..ed940b172d --- /dev/null +++ b/core/src/saros/net/internal/IConnection.java @@ -0,0 +1,16 @@ +package saros.net.internal; + +import saros.net.stream.StreamMode; + +public interface IConnection { + + public Object getLocalAddress(); + + public Object getRemoteAddress(); + + public StreamMode getMode(); + + public String getId(); + + public void close(); +} diff --git a/core/src/saros/net/internal/IConnectionClosedCallback.java b/core/src/saros/net/internal/IConnectionClosedCallback.java new file mode 100644 index 0000000000..6511e7f08c --- /dev/null +++ b/core/src/saros/net/internal/IConnectionClosedCallback.java @@ -0,0 +1,6 @@ +package saros.net.internal; + +public interface IConnectionClosedCallback { + + public void connectionClosed(IConnection connection); +} diff --git a/core/src/saros/net/internal/IPacketConnection.java b/core/src/saros/net/internal/IPacketConnection.java new file mode 100644 index 0000000000..b404485d3a --- /dev/null +++ b/core/src/saros/net/internal/IPacketConnection.java @@ -0,0 +1,22 @@ +package saros.net.internal; + +import java.io.IOException; +import saros.net.IReceiver; +import saros.net.ITransmitter; + +/** + * A packet connection is internally used as an abstraction layer to coordinate the transmission of + * and receiving of packets between the {@link IReceiver} and {@link ITransmitter}. + */ +public interface IPacketConnection extends IConnection { + + /** Sends the given data along with the given description. */ + public void send(TransferDescription description, byte[] data) throws IOException; + + /** + * Sets the receiver for incoming {@link BinaryXMPPExtension packet extensions}. + * + * @param receiver + */ + public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver); +} diff --git a/core/src/saros/net/internal/IPacketConnectionListener.java b/core/src/saros/net/internal/IPacketConnectionListener.java new file mode 100644 index 0000000000..f14ad4130b --- /dev/null +++ b/core/src/saros/net/internal/IPacketConnectionListener.java @@ -0,0 +1,13 @@ +package saros.net.internal; + +/** Listener interface for {@link IPacketConnection packet connections}. */ +public interface IPacketConnectionListener { + + /** + * Gets called when a new packet connection was established. The connection is not + * initialized at this point. + * + * @param connection + */ + public void connectionEstablished(IPacketConnection connection); +} diff --git a/core/src/saros/net/internal/XMPPReceiver.java b/core/src/saros/net/internal/XMPPReceiver.java index f755857063..b07d052b8f 100644 --- a/core/src/saros/net/internal/XMPPReceiver.java +++ b/core/src/saros/net/internal/XMPPReceiver.java @@ -89,13 +89,10 @@ public XMPPReceiver( this.parser = new MXParser(); connectionService.addListener(connectionListener); - dataTransferManager.addConnectionListener( - new IByteStreamConnectionListener() { + dataTransferManager.addPacketConnectionListener( + new IPacketConnectionListener() { @Override - public void connectionChanged( - final String connectionId, - final IByteStreamConnection connection, - final boolean incomingRequest) { + public void connectionEstablished(final IPacketConnection connection) { connection.setBinaryXMPPExtensionReceiver(XMPPReceiver.this); } }); diff --git a/core/src/saros/net/internal/XMPPTransmitter.java b/core/src/saros/net/internal/XMPPTransmitter.java index 1fcfcd5c40..2581fd0fb3 100644 --- a/core/src/saros/net/internal/XMPPTransmitter.java +++ b/core/src/saros/net/internal/XMPPTransmitter.java @@ -83,7 +83,7 @@ public void send(String connectionID, JID recipient, PacketExtension extension) if (currentLocalJid == null) throw new IOException("not connected to a XMPP server"); - IByteStreamConnection connection = dataManager.getConnection(connectionID, recipient); + IConnection connection = dataManager.getConnection(connectionID, recipient); if (connectionID != null && connection == null) throw new IOException( @@ -91,6 +91,10 @@ public void send(String connectionID, JID recipient, PacketExtension extension) if (connection == null) connection = dataManager.connect(recipient); + if (!(connection instanceof IPacketConnection)) + throw new IOException( + "connection id '" + connectionID + "' references not a packet connection"); + /* * The TransferDescription can be created out of the session, the name * and namespace of the packet extension and standard values and thus @@ -109,7 +113,7 @@ public void send(String connectionID, JID recipient, PacketExtension extension) transferDescription.setCompressContent(true); } - sendPacketExtension(connection, transferDescription, data); + sendPacketExtension((IPacketConnection) connection, transferDescription, data); } @Override @@ -189,12 +193,12 @@ private synchronized boolean isConnectionInvalid() { } private void sendPacketExtension( - final IByteStreamConnection connection, final TransferDescription description, byte[] payload) + final IPacketConnection connection, final TransferDescription description, byte[] payload) throws IOException { boolean sendPacket = true; - final String connectionId = connection.getConnectionID(); + final String connectionId = connection.getId(); for (IPacketInterceptor packetInterceptor : packetInterceptors) sendPacket &= packetInterceptor.sendPacket(connectionId, description, payload); diff --git a/core/src/saros/net/stream/ByteStream.java b/core/src/saros/net/stream/ByteStream.java index 1c1923c402..efee80be09 100644 --- a/core/src/saros/net/stream/ByteStream.java +++ b/core/src/saros/net/stream/ByteStream.java @@ -16,4 +16,12 @@ public interface ByteStream { public int getReadTimeout() throws IOException; public void setReadTimeout(int timeout) throws IOException; + + public Object getLocalAddress(); + + public Object getRemoteAddress(); + + public StreamMode getMode(); + + public String getId(); } diff --git a/core/src/saros/net/stream/IBBStreamService.java b/core/src/saros/net/stream/IBBStreamService.java index 80f014e1d2..083bef7277 100644 --- a/core/src/saros/net/stream/IBBStreamService.java +++ b/core/src/saros/net/stream/IBBStreamService.java @@ -9,9 +9,6 @@ import org.jivesoftware.smackx.bytestreams.BytestreamRequest; import org.jivesoftware.smackx.bytestreams.BytestreamSession; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager; -import saros.net.internal.BinaryChannelConnection; -import saros.net.internal.IByteStreamConnection; -import saros.net.internal.IByteStreamConnectionListener; import saros.net.xmpp.JID; /** @@ -24,7 +21,8 @@ public class IBBStreamService implements IStreamService, BytestreamListener { private static final Logger LOG = Logger.getLogger(IBBStreamService.class); private volatile InBandBytestreamManager manager; - private volatile IByteStreamConnectionListener connectionListener; + private volatile IStreamServiceListener connectionListener; + private JID localAddress; public IBBStreamService() { @@ -32,7 +30,7 @@ public IBBStreamService() { } @Override - public IByteStreamConnection connect(String connectionID, JID remoteAddress) + public ByteStream connect(String connectionID, JID remoteAddress) throws IOException, InterruptedException { if (connectionID == null) throw new NullPointerException("connectionID is null"); @@ -49,7 +47,7 @@ public IByteStreamConnection connect(String connectionID, JID remoteAddress) LOG.debug("establishing IBB bytestream to: " + remoteAddress); final BytestreamManager currentManager = manager; - final IByteStreamConnectionListener currentConnectionListener = connectionListener; + final IStreamServiceListener currentConnectionListener = connectionListener; if (currentManager == null || currentConnectionListener == null) throw new IOException(this + " is not initialized"); @@ -62,18 +60,12 @@ public IByteStreamConnection connect(String connectionID, JID remoteAddress) throw new IOException(e); } - return new BinaryChannelConnection( - localAddress, - remoteAddress, - connectionID, - new XMPPByteStreamAdapter(session), - StreamMode.IBB, - currentConnectionListener); + return new XMPPByteStreamAdapter( + localAddress, remoteAddress, session, connectionID, StreamMode.IBB); } @Override - public synchronized void initialize( - Connection connection, IByteStreamConnectionListener listener) { + public synchronized void initialize(Connection connection, IStreamServiceListener listener) { localAddress = new JID(connection.getUser()); connectionListener = listener; manager = InBandBytestreamManager.getByteStreamManager(connection); @@ -96,11 +88,12 @@ public void incomingBytestreamRequest(BytestreamRequest request) { LOG.debug("accepting IBB bytestream from: " + request.getFrom()); - final IByteStreamConnectionListener currentConnectionListener = connectionListener; + final IStreamServiceListener currentConnectionListener = connectionListener; if (currentConnectionListener == null) { LOG.warn(this + " is not initialized, rejecting connection..."); request.reject(); + return; } final BytestreamSession session; @@ -120,28 +113,15 @@ public void incomingBytestreamRequest(BytestreamRequest request) { return; } - final IByteStreamConnection connection; - - try { - connection = - new BinaryChannelConnection( - localAddress, - new JID(request.getFrom()), - request.getSessionID(), - new XMPPByteStreamAdapter(session), - StreamMode.IBB, - connectionListener); - } catch (IOException e) { - LOG.error("failed to initialize connection for IBB stream", e); - try { - session.close(); - } catch (IOException ignore) { - LOG.error(ignore); - } - return; - } + final ByteStream byteStream = + new XMPPByteStreamAdapter( + localAddress, + new JID(request.getFrom()), + session, + request.getSessionID(), + StreamMode.IBB); - connectionListener.connectionChanged(request.getSessionID(), connection, true); + currentConnectionListener.connectionEstablished(byteStream); } // ***************** BytestreamListener interface impl end diff --git a/core/src/saros/net/stream/IStreamService.java b/core/src/saros/net/stream/IStreamService.java index f9635230fb..d16db99d2d 100644 --- a/core/src/saros/net/stream/IStreamService.java +++ b/core/src/saros/net/stream/IStreamService.java @@ -2,13 +2,17 @@ import java.io.IOException; import org.jivesoftware.smack.Connection; -import saros.net.internal.IByteStreamConnection; -import saros.net.internal.IByteStreamConnectionListener; import saros.net.xmpp.JID; /** * This interface is used to define various services (probably only XEP 65 SOCKS5, XEP 47 in-band * bytestreams) that offer the possibility to establish network connections/sessions. + * + *

Implementations notes: You can expect that {@link #initialize} and {@link #uninitialize} are + * not called concurrently. You must expect that both methods are called concurrently while calling + * {@link #connect}. + * + *

It is safe to return valid {@link ByteStream streams} during unitialization. */ public interface IStreamService { @@ -16,7 +20,7 @@ public interface IStreamService { public static final char SESSION_ID_DELIMITER = ':'; /** - * Establishes a {@link IByteStreamConnection connection} to the given JID. + * Establishes a {@link ByteStream connection} to the given JID. * * @param connectionID an ID used to identify this stream(session) * @param remoteAddress a resource qualified JID to connect to @@ -26,7 +30,7 @@ public interface IStreamService { * @throws IOException if no connection could be established * @throws InterruptedException if the stream establishment was interrupted */ - public IByteStreamConnection connect(String connectionID, JID remoteAddress) + public ByteStream connect(String connectionID, JID remoteAddress) throws IOException, InterruptedException; /** @@ -36,7 +40,7 @@ public IByteStreamConnection connect(String connectionID, JID remoteAddress) * @param connection * @param listener */ - public void initialize(Connection connection, IByteStreamConnectionListener listener); + public void initialize(Connection connection, IStreamServiceListener listener); /** * Un-initializes the service. After un-initialization the service is not able to establish diff --git a/core/src/saros/net/stream/IStreamServiceListener.java b/core/src/saros/net/stream/IStreamServiceListener.java new file mode 100644 index 0000000000..be00f4013d --- /dev/null +++ b/core/src/saros/net/stream/IStreamServiceListener.java @@ -0,0 +1,7 @@ +package saros.net.stream; + +public interface IStreamServiceListener { + + /** Gets called when an incoming connection request was successfully performed. */ + public void connectionEstablished(ByteStream byteStream); +} diff --git a/core/src/saros/net/stream/Socks5StreamService.java b/core/src/saros/net/stream/Socks5StreamService.java index cdd80ca908..312bdeca0b 100644 --- a/core/src/saros/net/stream/Socks5StreamService.java +++ b/core/src/saros/net/stream/Socks5StreamService.java @@ -29,9 +29,6 @@ import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamRequest; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamSession; import org.jivesoftware.smackx.bytestreams.socks5.Socks5Proxy; -import saros.net.internal.BinaryChannelConnection; -import saros.net.internal.IByteStreamConnection; -import saros.net.internal.IByteStreamConnectionListener; import saros.net.util.NetworkingUtils; import saros.net.xmpp.JID; import saros.util.NamedThreadFactory; @@ -104,7 +101,7 @@ public class Socks5StreamService implements IStreamService, BytestreamListener { private ExecutorService executorService; private volatile Socks5BytestreamManager socks5Manager; - private volatile IByteStreamConnectionListener connectionListener; + private volatile IStreamServiceListener connectionListener; private JID localAddress; @@ -360,13 +357,13 @@ private void handleResponse(BytestreamRequest request) * @throws InterruptedException * @throws IOException */ - private IByteStreamConnection acceptNewRequest(BytestreamRequest request) + private ByteStream acceptNewRequest(BytestreamRequest request) throws XMPPException, IOException, InterruptedException { String peer = request.getFrom(); LOG.debug(prefix() + "receiving request from " + peer + ", " + verboseLocalProxyInfo()); - IByteStreamConnectionListener listener = connectionListener; + IStreamServiceListener listener = connectionListener; if (listener == null) throw new IOException(this + " transport is not initialized"); @@ -397,13 +394,9 @@ private IByteStreamConnection acceptNewRequest(BytestreamRequest request) waitToCloseResponse(responseFuture); configureSocks5Socket(inSession); - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(inSession), - StreamMode.SOCKS5_DIRECT, - listener); + return new XMPPByteStreamAdapter( + localAddress, new JID(peer), inSession, connectionIdentifier, StreamMode.SOCKS5_DIRECT); + } else { LOG.debug(prefix() + "incoming connection is mediated."); } @@ -426,17 +419,13 @@ private IByteStreamConnection acceptNewRequest(BytestreamRequest request) closeQuietly(inSession); configureSocks5Socket(outSession); - return new BinaryChannelConnection( + return new XMPPByteStreamAdapter( localAddress, new JID(peer), + outSession, connectionIdentifier, - new XMPPByteStreamAdapter(outSession), - StreamMode.SOCKS5_DIRECT, - listener); + StreamMode.SOCKS5_DIRECT); } - - } catch (IOException e) { - LOG.error(prefix() + "Socket crashed while initiating sending session (for wrapping)", e); } catch (ExecutionException e) { LOG.error("An error occurred while establishing a response connection ", e.getCause()); } @@ -447,13 +436,8 @@ private IByteStreamConnection acceptNewRequest(BytestreamRequest request) BytestreamSession session = testAndGetMediatedBidirectionalBytestream(inSession, outSession, true); - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(session), - StreamMode.SOCKS5_MEDIATED, - listener); + return new XMPPByteStreamAdapter( + localAddress, new JID(peer), session, connectionIdentifier, StreamMode.SOCKS5_MEDIATED); } /** @@ -462,7 +446,7 @@ private IByteStreamConnection acceptNewRequest(BytestreamRequest request) * *

see handleResponse() and acceptNewRequest() */ - private IByteStreamConnection acceptRequest(BytestreamRequest request) + private ByteStream acceptRequest(BytestreamRequest request) throws XMPPException, IOException, InterruptedException { ((Socks5BytestreamRequest) request).setTotalConnectTimeout(TOTAL_CONNECT_TIMEOUT); @@ -478,11 +462,11 @@ private IByteStreamConnection acceptRequest(BytestreamRequest request) /** * Tries to establish a connection to peer and waits for peer to connect. See handleResponse(). */ - private IByteStreamConnection establishBinaryChannel(String connectionIdentifier, String peer) + private ByteStream establishBinaryChannel(String connectionIdentifier, String peer) throws XMPPException, IOException, InterruptedException { Socks5BytestreamManager manager = socks5Manager; - IByteStreamConnectionListener listener = connectionListener; + IStreamServiceListener listener = connectionListener; if (manager == null || listener == null) throw new IOException(this + " transport is not initialized"); @@ -508,13 +492,12 @@ private IByteStreamConnection establishBinaryChannel(String connectionIdentifier if (outSession.isDirect()) { configureSocks5Socket(outSession); - return new BinaryChannelConnection( + return new XMPPByteStreamAdapter( localAddress, new JID(peer), + outSession, connectionIdentifier, - new XMPPByteStreamAdapter(outSession), - StreamMode.SOCKS5_DIRECT, - listener); + StreamMode.SOCKS5_DIRECT); } LOG.debug( @@ -584,13 +567,12 @@ private IByteStreamConnection establishBinaryChannel(String connectionIdentifier closeQuietly(outSession); configureSocks5Socket(inSession); - return new BinaryChannelConnection( + return new XMPPByteStreamAdapter( localAddress, new JID(peer), + inSession, connectionIdentifier, - new XMPPByteStreamAdapter(inSession), - StreamMode.SOCKS5_DIRECT, - listener); + StreamMode.SOCKS5_DIRECT); } } catch (TimeoutException e) { @@ -606,13 +588,8 @@ private IByteStreamConnection establishBinaryChannel(String connectionIdentifier BytestreamSession session = testAndGetMediatedBidirectionalBytestream(inSession, outSession, false); - return new BinaryChannelConnection( - localAddress, - new JID(peer), - connectionIdentifier, - new XMPPByteStreamAdapter(session), - StreamMode.SOCKS5_MEDIATED, - listener); + return new XMPPByteStreamAdapter( + localAddress, new JID(peer), session, connectionIdentifier, StreamMode.SOCKS5_MEDIATED); } finally { runningRemoteConnects.remove(sessionID); @@ -640,7 +617,7 @@ private BytestreamSession establishResponseSession(String sessionID, String peer // *********** IStreamService impl start @Override - public IByteStreamConnection connect(String connectionID, JID remoteAddress) + public ByteStream connect(String connectionID, JID remoteAddress) throws IOException, InterruptedException { if (connectionID == null) throw new NullPointerException("connectionID is null"); @@ -664,7 +641,7 @@ public IByteStreamConnection connect(String connectionID, JID remoteAddress) } @Override - public void initialize(Connection connection, IByteStreamConnectionListener listener) { + public void initialize(Connection connection, IStreamServiceListener listener) { synchronized (this) { localAddress = new JID(connection.getUser()); @@ -707,26 +684,20 @@ public void incomingBytestreamRequest(BytestreamRequest request) { + this + "]"); - try { + IStreamServiceListener listener = connectionListener; - IByteStreamConnection connection = acceptRequest(request); + if (listener == null) { + request.reject(); + return; + } - if (connection == null) return; + try { - IByteStreamConnectionListener listener = connectionListener; + ByteStream byteStream = acceptRequest(request); - if (listener == null) { - LOG.warn( - "closing bytestream connection " - + connection - + " because transport " - + this - + " was uninitilized during connection establishment"); - connection.close(); - return; - } + if (byteStream == null) return; - listener.connectionChanged(connection.getConnectionID(), connection, true); + listener.connectionEstablished(byteStream); } catch (InterruptedException e) { /* diff --git a/core/src/saros/net/stream/TCPByteStream.java b/core/src/saros/net/stream/TCPByteStream.java index 263e74047f..a1b1792421 100644 --- a/core/src/saros/net/stream/TCPByteStream.java +++ b/core/src/saros/net/stream/TCPByteStream.java @@ -8,11 +8,13 @@ public class TCPByteStream implements ByteStream { private final Socket socket; + private final String id; - public TCPByteStream(final Socket socket) { + public TCPByteStream(final Socket socket, final String id) { if (socket == null) throw new NullPointerException("socket is null"); this.socket = socket; + this.id = id; } @Override @@ -39,4 +41,24 @@ public int getReadTimeout() throws IOException { public void setReadTimeout(int timeout) throws IOException { socket.setSoTimeout(timeout); } + + @Override + public Object getLocalAddress() { + return socket.getLocalAddress(); + } + + @Override + public Object getRemoteAddress() { + return socket.getInetAddress(); + } + + @Override + public StreamMode getMode() { + return StreamMode.TCP; + } + + @Override + public String getId() { + return id; + } } diff --git a/core/src/saros/net/stream/TCPTransport.java b/core/src/saros/net/stream/TCPTransport.java index 01771d2d46..63b9c13184 100644 --- a/core/src/saros/net/stream/TCPTransport.java +++ b/core/src/saros/net/stream/TCPTransport.java @@ -1,59 +1,28 @@ package saros.net.stream; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Proxy; -import java.net.Socket; import org.jivesoftware.smack.Connection; -import saros.net.internal.BinaryChannelConnection; -import saros.net.internal.IByteStreamConnection; -import saros.net.internal.IByteStreamConnectionListener; import saros.net.xmpp.JID; // TODO rewrite IStreamService interface public class TCPTransport implements IStreamService { - private volatile IByteStreamConnectionListener currentListener; - @Override - public IByteStreamConnection connect(String connectionID, JID peer) + public ByteStream connect(String connectionID, JID peer) throws IOException, InterruptedException { - if (true) throw new RuntimeException("NYI"); - - // TODO this should be configurable; - - final Socket socket = new Socket(Proxy.NO_PROXY); - - final InetSocketAddress address = new InetSocketAddress("localhost", 4711); - - socket.connect(address, 30000); - socket.setTcpNoDelay(true); - - final IByteStreamConnectionListener listener = currentListener; - - if (listener == null) { - socket.close(); - throw new IOException(this + " transport is not initialized"); - } - - final IByteStreamConnection connection = - new BinaryChannelConnection( - null, peer, connectionID, new TCPByteStream(socket), StreamMode.TCP, listener); - - return connection; + throw new RuntimeException("NYI"); } @Override - public void initialize(Connection connection, IByteStreamConnectionListener listener) { - - currentListener = listener; + public void initialize(Connection connection, IStreamServiceListener listener) { + throw new RuntimeException("NYI"); } @Override public void uninitialize() { - currentListener = null; + throw new RuntimeException("NYI"); } @Override diff --git a/core/src/saros/net/stream/XMPPByteStreamAdapter.java b/core/src/saros/net/stream/XMPPByteStreamAdapter.java index 9f42a62d6e..b31cf7c433 100644 --- a/core/src/saros/net/stream/XMPPByteStreamAdapter.java +++ b/core/src/saros/net/stream/XMPPByteStreamAdapter.java @@ -4,15 +4,29 @@ import java.io.InputStream; import java.io.OutputStream; import org.jivesoftware.smackx.bytestreams.BytestreamSession; +import saros.net.xmpp.JID; public class XMPPByteStreamAdapter implements ByteStream { private final BytestreamSession delegate; + private final JID local; + private final JID remote; + private final String id; + private final StreamMode mode; - public XMPPByteStreamAdapter(final BytestreamSession session) { + public XMPPByteStreamAdapter( + final JID local, + final JID remote, + final BytestreamSession session, + final String id, + final StreamMode mode) { if (session == null) throw new NullPointerException("session is null"); delegate = session; + this.local = local; + this.remote = remote; + this.id = id; + this.mode = mode; } @Override @@ -39,4 +53,24 @@ public int getReadTimeout() throws IOException { public void setReadTimeout(int timeout) throws IOException { delegate.setReadTimeout(timeout); } + + @Override + public Object getLocalAddress() { + return local; + } + + @Override + public Object getRemoteAddress() { + return remote; + } + + @Override + public StreamMode getMode() { + return mode == null ? StreamMode.NONE : mode; + } + + @Override + public String getId() { + return id; + } } diff --git a/core/test/junit/saros/net/internal/BinaryChannelConnectionTest.java b/core/test/junit/saros/net/internal/BinaryChannelConnectionTest.java index 092d541208..0adef80dcc 100644 --- a/core/test/junit/saros/net/internal/BinaryChannelConnectionTest.java +++ b/core/test/junit/saros/net/internal/BinaryChannelConnectionTest.java @@ -30,9 +30,24 @@ private static class PipedBytestreamSession implements ByteStream { private InputStream in; private OutputStream out; - public PipedBytestreamSession(PipedInputStream in, PipedOutputStream out) { + private JID local; + private JID remote; + private String id; + private StreamMode mode; + + public PipedBytestreamSession( + PipedInputStream in, + PipedOutputStream out, + JID local, + JID remote, + String id, + StreamMode mode) { this.in = in; this.out = out; + this.local = local; + this.remote = remote; + this.id = id; + this.mode = mode; } @Override @@ -60,20 +75,25 @@ public int getReadTimeout() throws IOException { public void setReadTimeout(int timeout) throws IOException { // NOP } - } - private static class StreamConnectionListener implements IByteStreamConnectionListener { + @Override + public Object getLocalAddress() { + return local; + } @Override - public void connectionClosed(String connectionIdentifier, IByteStreamConnection connection) { - // NOP + public Object getRemoteAddress() { + return remote; + } + @Override + public StreamMode getMode() { + return mode; } @Override - public void connectionChanged( - String connectionIdentifier, IByteStreamConnection connection, boolean incomingRequest) { - // NOP + public String getId() { + return id; } } @@ -95,8 +115,12 @@ public void setUp() throws IOException { aliceOut.connect(bobIn); aliceIn.connect(bobOut); - aliceStream = new PipedBytestreamSession(aliceIn, aliceOut); - bobStream = new PipedBytestreamSession(bobIn, bobOut); + aliceStream = + new PipedBytestreamSession( + aliceIn, aliceOut, aliceJID, bobJID, "junit", StreamMode.SOCKS5_DIRECT); + bobStream = + new PipedBytestreamSession( + bobIn, bobOut, bobJID, aliceJID, "junit", StreamMode.SOCKS5_DIRECT); } private volatile byte[] receivedBytes; @@ -108,23 +132,9 @@ public void testCacheUpdates() throws Exception { final CountDownLatch received = new CountDownLatch(2); - BinaryChannelConnection alice = - new BinaryChannelConnection( - aliceJID, - bobJID, - "junit", - aliceStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); - - BinaryChannelConnection bob = - new BinaryChannelConnection( - bobJID, - aliceJID, - "junit", - bobStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); + BinaryChannelConnection alice = new BinaryChannelConnection(aliceStream, null); + + BinaryChannelConnection bob = new BinaryChannelConnection(bobStream, null); bob.setBinaryXMPPExtensionReceiver( (e) -> { @@ -184,23 +194,9 @@ public void testFragmentationOnLargeDataToBeSend() throws Exception { final CountDownLatch received = new CountDownLatch(1); - BinaryChannelConnection alice = - new BinaryChannelConnection( - aliceJID, - bobJID, - "junit", - aliceStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); - - BinaryChannelConnection bob = - new BinaryChannelConnection( - bobJID, - aliceJID, - "junit", - bobStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); + BinaryChannelConnection alice = new BinaryChannelConnection(aliceStream, null); + + BinaryChannelConnection bob = new BinaryChannelConnection(bobStream, null); bob.setBinaryXMPPExtensionReceiver( (e) -> { @@ -249,23 +245,9 @@ public void testFragmentationCleanup() throws Exception { final CountDownLatch received = new CountDownLatch((int) packetsToSend); - BinaryChannelConnection alice = - new BinaryChannelConnection( - aliceJID, - bobJID, - "junit", - aliceStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); - - BinaryChannelConnection bob = - new BinaryChannelConnection( - bobJID, - aliceJID, - "junit", - bobStream, - StreamMode.SOCKS5_DIRECT, - new StreamConnectionListener()); + BinaryChannelConnection alice = new BinaryChannelConnection(aliceStream, null); + + BinaryChannelConnection bob = new BinaryChannelConnection(bobStream, null); bob.setBinaryXMPPExtensionReceiver( (e) -> { diff --git a/core/test/junit/saros/net/internal/ConnectionPoolTest.java b/core/test/junit/saros/net/internal/ConnectionPoolTest.java index 09386af909..06aeeae533 100644 --- a/core/test/junit/saros/net/internal/ConnectionPoolTest.java +++ b/core/test/junit/saros/net/internal/ConnectionPoolTest.java @@ -19,7 +19,7 @@ public void setUp() { @Test public void testAddAndRemoveOnClosedPool() { - final IByteStreamConnection connection = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection = EasyMock.createNiceMock(IConnection.class); EasyMock.replay(connection); @@ -31,9 +31,9 @@ public void testAddAndRemoveOnClosedPool() { @Test public void testAddAndRemoveOnOpenedPool() { - final IByteStreamConnection connection0 = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection0 = EasyMock.createNiceMock(IConnection.class); - final IByteStreamConnection connection1 = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection1 = EasyMock.createNiceMock(IConnection.class); EasyMock.replay(connection0, connection1); @@ -55,7 +55,7 @@ public void testAddAndRemoveOnOpenedPool() { @Test public void testGet() { - final IByteStreamConnection connection = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection = EasyMock.createNiceMock(IConnection.class); EasyMock.replay(connection); @@ -76,9 +76,9 @@ public void testGet() { @Test public void testPoolClose() { - final IByteStreamConnection connection0 = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection0 = EasyMock.createNiceMock(IConnection.class); - final IByteStreamConnection connection1 = EasyMock.createNiceMock(IByteStreamConnection.class); + final IConnection connection1 = EasyMock.createNiceMock(IConnection.class); connection0.close(); EasyMock.expectLastCall().once(); diff --git a/core/test/junit/saros/net/internal/DataTransferManagerTest.java b/core/test/junit/saros/net/internal/DataTransferManagerTest.java index c2e092bf07..24ab73ffff 100644 --- a/core/test/junit/saros/net/internal/DataTransferManagerTest.java +++ b/core/test/junit/saros/net/internal/DataTransferManagerTest.java @@ -5,9 +5,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -17,11 +21,14 @@ import org.easymock.Capture; import org.easymock.EasyMock; import org.jivesoftware.smack.Connection; +import org.junit.After; import org.junit.Before; import org.junit.Test; import saros.net.ConnectionState; import saros.net.IConnectionManager; +import saros.net.stream.ByteStream; import saros.net.stream.IStreamService; +import saros.net.stream.IStreamServiceListener; import saros.net.stream.StreamMode; import saros.net.xmpp.IConnectionListener; import saros.net.xmpp.JID; @@ -32,39 +39,36 @@ public class DataTransferManagerTest { private static class Transport implements IStreamService { - private List establishedConnections = new ArrayList(); + private List establishedStreams = new ArrayList(); - private IByteStreamConnectionListener listener; + private IStreamServiceListener listener; private StreamMode mode; - private String connectionID; - public Transport(StreamMode mode) { this.mode = mode; } @Override - public synchronized IByteStreamConnection connect(String connectionIdentifier, JID peer) + public synchronized ByteStream connect(String connectionIdentifier, JID peer) throws IOException, InterruptedException { - connectionID = connectionIdentifier; - - ChannelConnection connection = new ChannelConnection(peer, mode, listener); + ByteStream byteStream = new DummyByteStream(peer, connectionIdentifier, mode); - establishedConnections.add(connection); - return connection; + establishedStreams.add(byteStream); + return byteStream; } - public synchronized void announceIncomingRequest(JID peer) { - ChannelConnection connection = new ChannelConnection(peer, mode, listener); + public synchronized void announceIncomingRequest(String connectionIdentifier, JID peer) { + + ByteStream byteStream = new DummyByteStream(peer, connectionIdentifier, mode); - establishedConnections.add(connection); - listener.connectionChanged(connectionID, connection, true); + establishedStreams.add(byteStream); + listener.connectionEstablished(byteStream); } @Override - public void initialize(Connection connection, IByteStreamConnectionListener listener) { + public void initialize(Connection connection, IStreamServiceListener listener) { this.listener = listener; } @@ -73,8 +77,16 @@ public void uninitialize() { this.listener = null; } - public synchronized List getEstablishedConnections() { - return establishedConnections; + public synchronized List getEstablishedStreams() { + return establishedStreams; + } + + public synchronized void close() { + for (ByteStream byteStream : establishedStreams) + try { + byteStream.close(); + } catch (IOException e) { // ignore} + } } } @@ -100,7 +112,7 @@ public BlockableTransport( } @Override - public IByteStreamConnection connect(String connectionIdentifier, JID peer) + public ByteStream connect(String connectionIdentifier, JID peer) throws IOException, InterruptedException { if (jidsToIgnore.contains(peer)) return super.connect(connectionIdentifier, peer); @@ -113,69 +125,102 @@ public IByteStreamConnection connect(String connectionIdentifier, JID peer) acknowledge.countDown(); proceed.await(); - IByteStreamConnection connection = super.connect(connectionIdentifier, peer); + ByteStream byteStream = super.connect(connectionIdentifier, peer); isConnecting = false; - return connection; + return byteStream; } } - private static class ChannelConnection implements IByteStreamConnection { + private static class DummyByteStream implements ByteStream { - private JID to; - private StreamMode mode; - private IByteStreamConnectionListener listener; - private volatile boolean closed; - private volatile int sendPackets; + private final JID local = new JID("example.org"); + private final JID remote; + private final String id; + private final StreamMode mode; - public ChannelConnection(JID to, StreamMode mode, IByteStreamConnectionListener listener) { - this.to = to; + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + private volatile boolean isClosed; + + private InputStream blockingInputStream = + new InputStream() { + + @Override + public int read() throws IOException { + while (!isClosed) { + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return -1; + } + } + } + return -1; + } + }; + + private DummyByteStream(final JID remote, final String id, StreamMode mode) { + this.remote = remote; + this.id = id; this.mode = mode; - this.listener = listener; } @Override - public JID getRemoteAddress() { - return to; + public InputStream getInputStream() throws IOException { + return blockingInputStream; } @Override - public void close() { - closed = true; - listener.connectionClosed(/* FIMXE */ null, this); + public OutputStream getOutputStream() throws IOException { + return out; + } + + public boolean isClosed() { + return isClosed; } @Override - public boolean isConnected() { - return !closed; + public void close() throws IOException { + synchronized (blockingInputStream) { + isClosed = true; + blockingInputStream.notifyAll(); + } } @Override - public void send(TransferDescription data, byte[] content) throws IOException { - sendPackets++; + public int getReadTimeout() throws IOException { + return 0; } @Override - public StreamMode getMode() { - return mode; + public void setReadTimeout(int timeout) throws IOException { + // NOP } - public int getSendPacketsCount() { - return sendPackets; + @Override + public Object getLocalAddress() { + return local; } @Override - public String getConnectionID() { - return null; + public Object getRemoteAddress() { + return remote; } @Override - public void initialize() { - // NOP + public StreamMode getMode() { + return mode; } @Override - public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver) { - // NOP + public String getId() { + return id; + } + + public boolean wasUsed() { + return out.size() != 0; } } @@ -185,6 +230,10 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver private Connection connectionMock; + private Transport mainTransport; + + private Transport fallbackTransport; + { connectionMock = EasyMock.createMock(Connection.class); EasyMock.expect(connectionMock.getUser()).andReturn("local@host").anyTimes(); @@ -207,6 +256,15 @@ public void setUp() { connectionServiceStub = createConnectionsServiceMock(connectionListener); } + @After + public void tearDown() { + if (mainTransport != null) mainTransport.close(); + + if (fallbackTransport != null) fallbackTransport.close(); + + mainTransport = fallbackTransport = null; + } + @Test(expected = NullPointerException.class) public void testEstablishConnectionWithNullPeer() throws Exception { @@ -240,8 +298,8 @@ public void testEstablishConnectionWithNoTransports() throws Exception { @Test public void testEstablishConnectionWithMainAndFallbackTransport() throws Exception { - IStreamService mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); - IStreamService fallbackTransport = new Transport(StreamMode.IBB); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + fallbackTransport = new Transport(StreamMode.IBB); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, fallbackTransport); @@ -256,16 +314,20 @@ public void testEstablishConnectionWithMainAndFallbackTransport() throws Excepti public void testEstablishConnectionWithMainAndFallbackTransportAndUsingFallback() throws Exception { - IStreamService mainTransport = EasyMock.createMock(IStreamService.class); + mainTransport = EasyMock.createMock(Transport.class); - IStreamService fallbackTransport = new Transport(StreamMode.IBB); + fallbackTransport = new Transport(StreamMode.IBB); EasyMock.expect(mainTransport.connect(EasyMock.isA(String.class), EasyMock.isA(JID.class))) .andThrow(new IOException()) .anyTimes(); + mainTransport.close(); + + EasyMock.expectLastCall().anyTimes(); + mainTransport.initialize( - EasyMock.isA(Connection.class), EasyMock.isA(IByteStreamConnectionListener.class)); + EasyMock.isA(Connection.class), EasyMock.isA(IStreamServiceListener.class)); EasyMock.expectLastCall().once(); @@ -287,8 +349,8 @@ public void testEstablishConnectionWithMainAndFallbackTransportAndUsingFallback( @Test public void testForceIBBOnly() throws Exception { - IStreamService mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); - IStreamService fallbackTransport = new Transport(StreamMode.IBB); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + fallbackTransport = new Transport(StreamMode.IBB); DataTransferManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, fallbackTransport); @@ -308,7 +370,7 @@ public void testForceIBBOnly() throws Exception { @Test public void testConnectionCaching() throws Exception { - Transport mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -318,17 +380,17 @@ public void testConnectionCaching() throws Exception { dtm.connect(new JID("foo@bar.de")); dtm.connect(new JID("foo@bar.com")); - assertEquals("connection caching failed", 2, mainTransport.getEstablishedConnections().size()); + assertEquals("connection caching failed", 2, mainTransport.getEstablishedStreams().size()); assertNotSame( "connection caching failed", - mainTransport.getEstablishedConnections().get(0), - mainTransport.getEstablishedConnections().get(1)); + mainTransport.getEstablishedStreams().get(0), + mainTransport.getEstablishedStreams().get(1)); } @Test public void testGetTransferMode() throws Exception { - IStreamService mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -348,7 +410,7 @@ public void testGetTransferMode() throws Exception { } public void testGetConnectionOnInvalidConnectionIdentifierWithNoConnection() throws Exception { - IStreamService mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); DataTransferManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -360,7 +422,7 @@ public void testGetConnectionOnInvalidConnectionIdentifierWithNoConnection() thr } public void testGetConnectionOnInvalidConnectionIdentifier() throws Exception { - IStreamService mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); DataTransferManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -372,7 +434,7 @@ public void testGetConnectionOnInvalidConnectionIdentifier() throws Exception { @Test public void testGetConnectionOnValidConnectionIdentifier() throws Exception { - Transport mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); DataTransferManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -393,11 +455,11 @@ public void testConcurrentConnections() throws Exception { nonBlockingConnects.add(new JID("foo@bar.example")); - BlockableTransport mainTransport = + mainTransport = new BlockableTransport( nonBlockingConnects, StreamMode.SOCKS5_DIRECT, connectAcknowledge, connectProceed); - Transport fallbackTransport = new Transport(StreamMode.IBB); + fallbackTransport = new Transport(StreamMode.IBB); final IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, fallbackTransport); @@ -480,24 +542,25 @@ public void run() throws Exception { assertEquals( "connection caching failed during multiple connection requests", 2, - mainTransport.getEstablishedConnections().size()); + mainTransport.getEstablishedStreams().size()); } @Test public void connectWithRemoteSideConnectedFirst() throws Exception { - Transport mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); connectionListener.getValue().connectionStateChanged(connectionMock, ConnectionState.CONNECTED); - mainTransport.announceIncomingRequest(new JID("fallback@emergency")); + mainTransport.announceIncomingRequest( + DataTransferManager.DEFAULT_CONNECTION_ID, new JID("fallback@emergency")); dtm.connect(new JID("fallback@emergency")); assertEquals( "established an outgoing connection also the remote side is already connected to the local side", 1, - mainTransport.getEstablishedConnections().size()); + mainTransport.getEstablishedStreams().size()); } @Test(timeout = 30000) @@ -506,11 +569,11 @@ public void connectToRemoteSideWhileRemoteIsConnectingToLocalSide() throws Excep final CountDownLatch connectAcknowledge = new CountDownLatch(1); final CountDownLatch connectProceed = new CountDownLatch(1); - BlockableTransport mainTransport = + mainTransport = new BlockableTransport( new HashSet(), StreamMode.SOCKS5_DIRECT, connectAcknowledge, connectProceed); - Transport fallbackTransport = new Transport(StreamMode.IBB); + fallbackTransport = new Transport(StreamMode.IBB); final DataTransferManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, fallbackTransport); @@ -533,7 +596,8 @@ public void run() throws Exception { fail("transport connect method was not called"); } - fallbackTransport.announceIncomingRequest(new JID("foo@bar.com")); + fallbackTransport.announceIncomingRequest( + DataTransferManager.DEFAULT_CONNECTION_ID, new JID("foo@bar.com")); connectProceed.countDown(); connectThread0.join(10000); @@ -542,46 +606,48 @@ public void run() throws Exception { TransferDescription description = TransferDescription.newDescription(); description.setRecipient(new JID("foo@bar.com")); + description.setNamespace("http://example,org"); + description.setElementName("dummy"); - dtm.getConnection(null, new JID("foo@bar.com")).send(description, new byte[0]); + ((IPacketConnection) dtm.getConnection(null, new JID("foo@bar.com"))) + .send(description, new byte[1]); - assertEquals( - "wrong connection was chosen", - 1, - mainTransport.getEstablishedConnections().get(0).getSendPacketsCount()); + assertTrue( + "wrong transport was chosen", + ((DummyByteStream) mainTransport.getEstablishedStreams().get(0)).wasUsed()); - assertEquals( - "wrong connection was chosen", - 0, - fallbackTransport.getEstablishedConnections().get(0).getSendPacketsCount()); + assertFalse( + "wrong transport was chosen", + ((DummyByteStream) fallbackTransport.getEstablishedStreams().get(0)).wasUsed()); } @Test public void testConnectionClosureOnManualClose() throws Exception { - Transport mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); + mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); connectionListener.getValue().connectionStateChanged(connectionMock, ConnectionState.CONNECTED); dtm.connect(new JID("fallback@emergency")); - mainTransport.announceIncomingRequest(new JID("fallback@emergency")); + mainTransport.announceIncomingRequest( + DataTransferManager.DEFAULT_CONNECTION_ID, new JID("fallback@emergency")); dtm.closeConnection(new JID("fallback@emergency")); - assertFalse( + assertTrue( "outgoing connection was not closed", - mainTransport.getEstablishedConnections().get(0).isConnected()); + ((DummyByteStream) mainTransport.getEstablishedStreams().get(0)).isClosed()); - assertFalse( + assertTrue( "incoming connection was not closed", - mainTransport.getEstablishedConnections().get(1).isConnected()); + ((DummyByteStream) mainTransport.getEstablishedStreams().get(1)).isClosed()); assertEquals(StreamMode.NONE, dtm.getTransferMode(new JID("fallback@emergency"))); } @Test - public void testConnectionClosureOnDisconnect() throws Exception { + public void testConnectionCloseOnDisconnect() throws Exception { Transport mainTransport = new Transport(StreamMode.SOCKS5_DIRECT); IConnectionManager dtm = new DataTransferManager(connectionServiceStub, mainTransport, null); @@ -589,19 +655,20 @@ public void testConnectionClosureOnDisconnect() throws Exception { connectionListener.getValue().connectionStateChanged(connectionMock, ConnectionState.CONNECTED); dtm.connect(new JID("fallback@emergency")); - mainTransport.announceIncomingRequest(new JID("fallback@emergency")); + mainTransport.announceIncomingRequest( + DataTransferManager.DEFAULT_CONNECTION_ID, new JID("fallback@emergency")); connectionListener .getValue() .connectionStateChanged(connectionMock, ConnectionState.NOT_CONNECTED); - assertFalse( + assertTrue( "outgoing connection was not closed", - mainTransport.getEstablishedConnections().get(0).isConnected()); + ((DummyByteStream) mainTransport.getEstablishedStreams().get(0)).isClosed()); - assertFalse( + assertTrue( "incoming connection was not closed", - mainTransport.getEstablishedConnections().get(1).isConnected()); + ((DummyByteStream) mainTransport.getEstablishedStreams().get(1)).isClosed()); assertEquals(StreamMode.NONE, dtm.getTransferMode(new JID("fallback@emergency"))); } diff --git a/stf/src/saros/stf/server/rmi/controlbot/manipulation/impl/NetworkManipulatorImpl.java b/stf/src/saros/stf/server/rmi/controlbot/manipulation/impl/NetworkManipulatorImpl.java index 5af9612d99..47af56aa1e 100644 --- a/stf/src/saros/stf/server/rmi/controlbot/manipulation/impl/NetworkManipulatorImpl.java +++ b/stf/src/saros/stf/server/rmi/controlbot/manipulation/impl/NetworkManipulatorImpl.java @@ -16,7 +16,8 @@ import saros.monitoring.IProgressMonitor; import saros.net.IPacketInterceptor; import saros.net.internal.BinaryXMPPExtension; -import saros.net.internal.IByteStreamConnection; +import saros.net.internal.IConnection; +import saros.net.internal.IPacketConnection; import saros.net.internal.TransferDescription; import saros.net.internal.XMPPReceiver; import saros.net.xmpp.JID; @@ -319,10 +320,10 @@ public void unblockOutgoingSessionPackets(JID jid) throws RemoteException { return; } - IByteStreamConnection connection = + IConnection connection = getDataTransferManager().getConnection(ISarosSession.SESSION_CONNECTION_ID, jid); - while (!pendingOutgoingPackets.isEmpty()) { + while (!pendingOutgoingPackets.isEmpty() && connection instanceof IPacketConnection) { try { OutgoingPacketHolder holder = pendingOutgoingPackets.remove(); @@ -332,7 +333,7 @@ public void unblockOutgoingSessionPackets(JID jid) throws RemoteException { + ", payload length: " + holder.payload.length); - connection.send(holder.description, holder.payload); + ((IPacketConnection) connection).send(holder.description, holder.payload); } catch (Exception e) { LOG.error(e.getMessage(), e); } From 3af66aebe81d05bfafd44220dee2010c9d18d64d Mon Sep 17 00:00:00 2001 From: Stefan Rossbach Date: Tue, 20 Aug 2019 17:40:40 +0200 Subject: [PATCH 2/2] WIP Use Streams --- .../ArchiveIncomingProjectNegotiation.java | 78 ++++++++++---- .../ArchiveOutgoingProjectNegotiation.java | 101 ++++++++++++++++-- .../saros/negotiation/ProjectNegotiation.java | 2 + 3 files changed, 150 insertions(+), 31 deletions(-) diff --git a/core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java b/core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java index 0cea71a9a1..98102d5da8 100644 --- a/core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java +++ b/core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java @@ -1,14 +1,17 @@ package saros.negotiation; +import java.io.DataInputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; -import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smackx.filetransfer.IncomingFileTransfer; +import saros.SarosPluginContext; import saros.exceptions.LocalCancellationException; import saros.exceptions.SarosCancellationException; import saros.filesystem.IChecksumCache; @@ -18,11 +21,14 @@ import saros.monitoring.IProgressMonitor; import saros.monitoring.SubProgressMonitor; import saros.negotiation.NegotiationTools.CancelOption; +import saros.net.IConnectionManager; import saros.net.IReceiver; +import saros.net.IStreamConnection; import saros.net.ITransmitter; import saros.net.xmpp.JID; import saros.net.xmpp.XMPPConnectionService; import saros.observables.FileReplacementInProgressObservable; +import saros.repackaged.picocontainer.annotations.Inject; import saros.session.ISarosSession; import saros.session.ISarosSessionManager; import saros.util.CoreUtils; @@ -35,6 +41,9 @@ public class ArchiveIncomingProjectNegotiation extends AbstractIncomingProjectNe private static final Logger LOG = Logger.getLogger(ArchiveIncomingProjectNegotiation.class); + // TODO move to factory + @Inject private IConnectionManager connectionManager; + public ArchiveIncomingProjectNegotiation( final JID peer, // final String negotiationID, // @@ -60,6 +69,9 @@ public ArchiveIncomingProjectNegotiation( connectionService, transmitter, receiver); + + // FIXME remove + SarosPluginContext.initComponent(this); } @Override @@ -73,22 +85,20 @@ protected void transfer( // the host do not send an archive if we do not need any files if (filesMissing) { - receiveAndUnpackArchive(projectMapping, transferListener, monitor); + receiveAndUnpackArchive(projectMapping, monitor); } } /** Receives the archive with all missing files and unpacks it. */ private void receiveAndUnpackArchive( - final Map localProjectMapping, - final TransferListener archiveTransferListener, - final IProgressMonitor monitor) + final Map localProjectMapping, final IProgressMonitor monitor) throws IOException, SarosCancellationException { // waiting for the big archive to come in monitor.beginTask(null, 100); - File archiveFile = receiveArchive(archiveTransferListener, new SubProgressMonitor(monitor, 50)); + File archiveFile = receiveArchive(new SubProgressMonitor(monitor, 50)); /* * FIXME at this point it makes no sense to report the cancellation to @@ -146,37 +156,63 @@ private void unpackArchive( // TODO: now add the checksums into the cache } - private File receiveArchive(TransferListener archiveTransferListener, IProgressMonitor monitor) + private File receiveArchive(IProgressMonitor monitor) throws IOException, SarosCancellationException { monitor.beginTask("Receiving archive file...", 100); - LOG.debug("waiting for incoming archive stream request"); - - monitor.subTask("Host is compressing project files. Waiting for the archive file..."); - - awaitTransferRequest(); + LOG.debug("connecting to " + getPeer() + " to receive archive file"); - monitor.subTask("Receiving archive file..."); + monitor.subTask("Connecting to " + getPeer().getName() + "..."); - LOG.debug(this + " : receiving archive"); - - IncomingFileTransfer transfer = archiveTransferListener.getRequest().accept(); + IStreamConnection connection = + connectionManager.connectStream(TRANSFER_ID_PREFIX + getID(), getPeer()); File archiveFile = File.createTempFile("saros_archive_" + System.currentTimeMillis(), null); + OutputStream out = null; + boolean transferFailed = true; try { - transfer.recieveFile(archiveFile); - monitorFileTransfer(transfer, monitor); + out = new FileOutputStream(archiveFile); + + connection.setReadTimeout(60 * 60 * 1000); + monitor.subTask("Host is compressing project files. Waiting for the archive file..."); + + DataInputStream dis = new DataInputStream(connection.getInputStream()); + + long remainingDataSize = dis.readLong(); + + monitor.subTask("Receiving archive file..."); + + LOG.debug(this + " : receiving archive"); + + final byte buffer[] = new byte[BUFFER_SIZE]; + + while (remainingDataSize > 0) { + int read = dis.read(buffer); + + if (read == -1) break; + + out.write(buffer, 0, read); + remainingDataSize -= read; + + checkCancellation(CancelOption.NOTIFY_PEER); + } + + if (remainingDataSize > 0) + localCancel( + "The receiving of the archive file was not successful.", CancelOption.NOTIFY_PEER); + transferFailed = false; - } catch (XMPPException e) { - throw new IOException(e.getMessage(), e.getCause()); } finally { if (transferFailed && !archiveFile.delete()) { LOG.warn("Could not clean up archive file " + archiveFile.getAbsolutePath()); } + + IOUtils.closeQuietly(out); + connection.close(); } monitor.done(); diff --git a/core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java b/core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java index a10fc5d0b1..6da525aba6 100644 --- a/core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java +++ b/core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java @@ -1,12 +1,15 @@ package saros.negotiation; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; -import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smackx.filetransfer.OutgoingFileTransfer; +import saros.SarosPluginContext; import saros.editor.IEditorManager; import saros.exceptions.LocalCancellationException; import saros.exceptions.OperationCanceledException; @@ -18,10 +21,14 @@ import saros.filesystem.IWorkspace; import saros.monitoring.IProgressMonitor; import saros.negotiation.NegotiationTools.CancelOption; +import saros.net.IConnectionManager; import saros.net.IReceiver; +import saros.net.IStreamConnection; +import saros.net.IStreamConnectionListener; import saros.net.ITransmitter; import saros.net.xmpp.JID; import saros.net.xmpp.XMPPConnectionService; +import saros.repackaged.picocontainer.annotations.Inject; import saros.session.ISarosSession; import saros.session.ISarosSessionManager; import saros.session.User; @@ -36,6 +43,33 @@ public class ArchiveOutgoingProjectNegotiation extends AbstractOutgoingProjectNe private static final Logger LOG = Logger.getLogger(ArchiveOutgoingProjectNegotiation.class); private File zipArchive = null; + // TODO move to factory + @Inject private IConnectionManager connectionManager; + + private IStreamConnection connection; + + private boolean awaitConnection = true; + + private final IStreamConnectionListener streamConnectionListener = + new IStreamConnectionListener() { + + @Override + public boolean streamConnectionEstablished(String id, IStreamConnection connection) { + + synchronized (ArchiveOutgoingProjectNegotiation.this) { + if (!awaitConnection) return false; + + if (!(TRANSFER_ID_PREFIX + getID()).equals(id)) return false; + + ArchiveOutgoingProjectNegotiation.this.connection = connection; + + ArchiveOutgoingProjectNegotiation.this.notifyAll(); + } + + return true; + } + }; + public ArchiveOutgoingProjectNegotiation( // final JID peer, // final ProjectSharingData projects, // @@ -59,6 +93,9 @@ public ArchiveOutgoingProjectNegotiation( // connectionService, transmitter, receiver); + + // FIXME remove + SarosPluginContext.initComponent(this); } @Override @@ -66,6 +103,8 @@ protected void setup(IProgressMonitor monitor) throws IOException { if (fileTransferManager == null) // FIXME: the logic will try to send this to the remote contact throw new IOException("not connected to a XMPP server"); + + connectionManager.addStreamConnectionListener(streamConnectionListener); } @Override @@ -113,6 +152,7 @@ protected void transfer(IProgressMonitor monitor, List fileLists) protected void cleanup(IProgressMonitor monitor) { if (zipArchive != null && !zipArchive.delete()) LOG.warn("could not delete archive file: " + zipArchive.getAbsolutePath()); + connectionManager.addStreamConnectionListener(streamConnectionListener); super.cleanup(monitor); } @@ -200,19 +240,60 @@ private void sendArchive( File archive, JID remoteContact, String transferID, IProgressMonitor monitor) throws SarosCancellationException, IOException { - LOG.debug(this + " : sending archive"); + LOG.debug(this + " : waiting for remote connection"); monitor.beginTask("Sending archive file...", 100); - assert fileTransferManager != null; + final long timeout = 60 * 1000; + + long currentTime = System.currentTimeMillis(); + + synchronized (this) { + while (System.currentTimeMillis() - currentTime < timeout) { + if (connection != null) break; + + if (monitor.isCanceled()) { + awaitConnection = false; + checkCancellation(CancelOption.NOTIFY_PEER); + } + + try { + wait(1000); + } catch (InterruptedException e) { + awaitConnection = false; + Thread.currentThread().interrupt(); + this.localCancel("Negotiation got internally interrupted.", CancelOption.NOTIFY_PEER); + break; + } + } + + awaitConnection = false; + } + + assert connection != null; + + DataOutputStream out = null; + InputStream in = null; try { - OutgoingFileTransfer transfer = - fileTransferManager.createOutgoingFileTransfer(remoteContact.toString()); - transfer.sendFile(archive, transferID); - monitorFileTransfer(transfer, monitor); - } catch (XMPPException e) { - throw new IOException(e.getMessage(), e); + in = new FileInputStream(archive); + + long fileSize = archive.length(); + + out = new DataOutputStream(connection.getOutputStream()); + + out.writeLong(fileSize); + final byte buffer[] = new byte[BUFFER_SIZE]; + + int read = 0; + + while ((read = in.read(buffer)) != -1) { + out.write(buffer, 0, read); + checkCancellation(CancelOption.NOTIFY_PEER); + } + } finally { + connection.close(); + IOUtils.closeQuietly(in); } monitor.done(); diff --git a/core/src/saros/negotiation/ProjectNegotiation.java b/core/src/saros/negotiation/ProjectNegotiation.java index c213d4a357..e3bf858a17 100644 --- a/core/src/saros/negotiation/ProjectNegotiation.java +++ b/core/src/saros/negotiation/ProjectNegotiation.java @@ -61,6 +61,8 @@ public abstract class ProjectNegotiation extends Negotiation { */ protected FileTransferManager fileTransferManager; + protected static final int BUFFER_SIZE = 32 * 1024; + public ProjectNegotiation( final String id, final JID peer,