Skip to content

Commit 3af66ae

Browse files
committed
WIP Use Streams
1 parent ecb5e10 commit 3af66ae

File tree

3 files changed

+150
-31
lines changed

3 files changed

+150
-31
lines changed

core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java

+57-21
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package saros.negotiation;
22

3+
import java.io.DataInputStream;
34
import java.io.File;
5+
import java.io.FileOutputStream;
46
import java.io.IOException;
7+
import java.io.OutputStream;
58
import java.util.HashMap;
69
import java.util.List;
710
import java.util.Map;
811
import java.util.Map.Entry;
12+
import org.apache.commons.io.IOUtils;
913
import org.apache.log4j.Logger;
10-
import org.jivesoftware.smack.XMPPException;
11-
import org.jivesoftware.smackx.filetransfer.IncomingFileTransfer;
14+
import saros.SarosPluginContext;
1215
import saros.exceptions.LocalCancellationException;
1316
import saros.exceptions.SarosCancellationException;
1417
import saros.filesystem.IChecksumCache;
@@ -18,11 +21,14 @@
1821
import saros.monitoring.IProgressMonitor;
1922
import saros.monitoring.SubProgressMonitor;
2023
import saros.negotiation.NegotiationTools.CancelOption;
24+
import saros.net.IConnectionManager;
2125
import saros.net.IReceiver;
26+
import saros.net.IStreamConnection;
2227
import saros.net.ITransmitter;
2328
import saros.net.xmpp.JID;
2429
import saros.net.xmpp.XMPPConnectionService;
2530
import saros.observables.FileReplacementInProgressObservable;
31+
import saros.repackaged.picocontainer.annotations.Inject;
2632
import saros.session.ISarosSession;
2733
import saros.session.ISarosSessionManager;
2834
import saros.util.CoreUtils;
@@ -35,6 +41,9 @@ public class ArchiveIncomingProjectNegotiation extends AbstractIncomingProjectNe
3541

3642
private static final Logger LOG = Logger.getLogger(ArchiveIncomingProjectNegotiation.class);
3743

44+
// TODO move to factory
45+
@Inject private IConnectionManager connectionManager;
46+
3847
public ArchiveIncomingProjectNegotiation(
3948
final JID peer, //
4049
final String negotiationID, //
@@ -60,6 +69,9 @@ public ArchiveIncomingProjectNegotiation(
6069
connectionService,
6170
transmitter,
6271
receiver);
72+
73+
// FIXME remove
74+
SarosPluginContext.initComponent(this);
6375
}
6476

6577
@Override
@@ -73,22 +85,20 @@ protected void transfer(
7385

7486
// the host do not send an archive if we do not need any files
7587
if (filesMissing) {
76-
receiveAndUnpackArchive(projectMapping, transferListener, monitor);
88+
receiveAndUnpackArchive(projectMapping, monitor);
7789
}
7890
}
7991

8092
/** Receives the archive with all missing files and unpacks it. */
8193
private void receiveAndUnpackArchive(
82-
final Map<String, IProject> localProjectMapping,
83-
final TransferListener archiveTransferListener,
84-
final IProgressMonitor monitor)
94+
final Map<String, IProject> localProjectMapping, final IProgressMonitor monitor)
8595
throws IOException, SarosCancellationException {
8696

8797
// waiting for the big archive to come in
8898

8999
monitor.beginTask(null, 100);
90100

91-
File archiveFile = receiveArchive(archiveTransferListener, new SubProgressMonitor(monitor, 50));
101+
File archiveFile = receiveArchive(new SubProgressMonitor(monitor, 50));
92102

93103
/*
94104
* FIXME at this point it makes no sense to report the cancellation to
@@ -146,37 +156,63 @@ private void unpackArchive(
146156
// TODO: now add the checksums into the cache
147157
}
148158

149-
private File receiveArchive(TransferListener archiveTransferListener, IProgressMonitor monitor)
159+
private File receiveArchive(IProgressMonitor monitor)
150160
throws IOException, SarosCancellationException {
151161

152162
monitor.beginTask("Receiving archive file...", 100);
153-
LOG.debug("waiting for incoming archive stream request");
154-
155-
monitor.subTask("Host is compressing project files. Waiting for the archive file...");
156-
157-
awaitTransferRequest();
163+
LOG.debug("connecting to " + getPeer() + " to receive archive file");
158164

159-
monitor.subTask("Receiving archive file...");
165+
monitor.subTask("Connecting to " + getPeer().getName() + "...");
160166

161-
LOG.debug(this + " : receiving archive");
162-
163-
IncomingFileTransfer transfer = archiveTransferListener.getRequest().accept();
167+
IStreamConnection connection =
168+
connectionManager.connectStream(TRANSFER_ID_PREFIX + getID(), getPeer());
164169

165170
File archiveFile = File.createTempFile("saros_archive_" + System.currentTimeMillis(), null);
166171

172+
OutputStream out = null;
173+
167174
boolean transferFailed = true;
168175

169176
try {
170-
transfer.recieveFile(archiveFile);
171177

172-
monitorFileTransfer(transfer, monitor);
178+
out = new FileOutputStream(archiveFile);
179+
180+
connection.setReadTimeout(60 * 60 * 1000);
181+
monitor.subTask("Host is compressing project files. Waiting for the archive file...");
182+
183+
DataInputStream dis = new DataInputStream(connection.getInputStream());
184+
185+
long remainingDataSize = dis.readLong();
186+
187+
monitor.subTask("Receiving archive file...");
188+
189+
LOG.debug(this + " : receiving archive");
190+
191+
final byte buffer[] = new byte[BUFFER_SIZE];
192+
193+
while (remainingDataSize > 0) {
194+
int read = dis.read(buffer);
195+
196+
if (read == -1) break;
197+
198+
out.write(buffer, 0, read);
199+
remainingDataSize -= read;
200+
201+
checkCancellation(CancelOption.NOTIFY_PEER);
202+
}
203+
204+
if (remainingDataSize > 0)
205+
localCancel(
206+
"The receiving of the archive file was not successful.", CancelOption.NOTIFY_PEER);
207+
173208
transferFailed = false;
174-
} catch (XMPPException e) {
175-
throw new IOException(e.getMessage(), e.getCause());
176209
} finally {
177210
if (transferFailed && !archiveFile.delete()) {
178211
LOG.warn("Could not clean up archive file " + archiveFile.getAbsolutePath());
179212
}
213+
214+
IOUtils.closeQuietly(out);
215+
connection.close();
180216
}
181217

182218
monitor.done();

core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java

+91-10
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package saros.negotiation;
22

3+
import java.io.DataOutputStream;
34
import java.io.File;
5+
import java.io.FileInputStream;
46
import java.io.IOException;
7+
import java.io.InputStream;
58
import java.util.ArrayList;
69
import java.util.List;
10+
import org.apache.commons.io.IOUtils;
711
import org.apache.log4j.Logger;
8-
import org.jivesoftware.smack.XMPPException;
9-
import org.jivesoftware.smackx.filetransfer.OutgoingFileTransfer;
12+
import saros.SarosPluginContext;
1013
import saros.editor.IEditorManager;
1114
import saros.exceptions.LocalCancellationException;
1215
import saros.exceptions.OperationCanceledException;
@@ -18,10 +21,14 @@
1821
import saros.filesystem.IWorkspace;
1922
import saros.monitoring.IProgressMonitor;
2023
import saros.negotiation.NegotiationTools.CancelOption;
24+
import saros.net.IConnectionManager;
2125
import saros.net.IReceiver;
26+
import saros.net.IStreamConnection;
27+
import saros.net.IStreamConnectionListener;
2228
import saros.net.ITransmitter;
2329
import saros.net.xmpp.JID;
2430
import saros.net.xmpp.XMPPConnectionService;
31+
import saros.repackaged.picocontainer.annotations.Inject;
2532
import saros.session.ISarosSession;
2633
import saros.session.ISarosSessionManager;
2734
import saros.session.User;
@@ -36,6 +43,33 @@ public class ArchiveOutgoingProjectNegotiation extends AbstractOutgoingProjectNe
3643
private static final Logger LOG = Logger.getLogger(ArchiveOutgoingProjectNegotiation.class);
3744
private File zipArchive = null;
3845

46+
// TODO move to factory
47+
@Inject private IConnectionManager connectionManager;
48+
49+
private IStreamConnection connection;
50+
51+
private boolean awaitConnection = true;
52+
53+
private final IStreamConnectionListener streamConnectionListener =
54+
new IStreamConnectionListener() {
55+
56+
@Override
57+
public boolean streamConnectionEstablished(String id, IStreamConnection connection) {
58+
59+
synchronized (ArchiveOutgoingProjectNegotiation.this) {
60+
if (!awaitConnection) return false;
61+
62+
if (!(TRANSFER_ID_PREFIX + getID()).equals(id)) return false;
63+
64+
ArchiveOutgoingProjectNegotiation.this.connection = connection;
65+
66+
ArchiveOutgoingProjectNegotiation.this.notifyAll();
67+
}
68+
69+
return true;
70+
}
71+
};
72+
3973
public ArchiveOutgoingProjectNegotiation( //
4074
final JID peer, //
4175
final ProjectSharingData projects, //
@@ -59,13 +93,18 @@ public ArchiveOutgoingProjectNegotiation( //
5993
connectionService,
6094
transmitter,
6195
receiver);
96+
97+
// FIXME remove
98+
SarosPluginContext.initComponent(this);
6299
}
63100

64101
@Override
65102
protected void setup(IProgressMonitor monitor) throws IOException {
66103
if (fileTransferManager == null)
67104
// FIXME: the logic will try to send this to the remote contact
68105
throw new IOException("not connected to a XMPP server");
106+
107+
connectionManager.addStreamConnectionListener(streamConnectionListener);
69108
}
70109

71110
@Override
@@ -113,6 +152,7 @@ protected void transfer(IProgressMonitor monitor, List<FileList> fileLists)
113152
protected void cleanup(IProgressMonitor monitor) {
114153
if (zipArchive != null && !zipArchive.delete())
115154
LOG.warn("could not delete archive file: " + zipArchive.getAbsolutePath());
155+
connectionManager.addStreamConnectionListener(streamConnectionListener);
116156
super.cleanup(monitor);
117157
}
118158

@@ -200,19 +240,60 @@ private void sendArchive(
200240
File archive, JID remoteContact, String transferID, IProgressMonitor monitor)
201241
throws SarosCancellationException, IOException {
202242

203-
LOG.debug(this + " : sending archive");
243+
LOG.debug(this + " : waiting for remote connection");
204244
monitor.beginTask("Sending archive file...", 100);
205245

206-
assert fileTransferManager != null;
246+
final long timeout = 60 * 1000;
247+
248+
long currentTime = System.currentTimeMillis();
249+
250+
synchronized (this) {
251+
while (System.currentTimeMillis() - currentTime < timeout) {
252+
if (connection != null) break;
253+
254+
if (monitor.isCanceled()) {
255+
awaitConnection = false;
256+
checkCancellation(CancelOption.NOTIFY_PEER);
257+
}
258+
259+
try {
260+
wait(1000);
261+
} catch (InterruptedException e) {
262+
awaitConnection = false;
263+
Thread.currentThread().interrupt();
264+
this.localCancel("Negotiation got internally interrupted.", CancelOption.NOTIFY_PEER);
265+
break;
266+
}
267+
}
268+
269+
awaitConnection = false;
270+
}
271+
272+
assert connection != null;
273+
274+
DataOutputStream out = null;
275+
InputStream in = null;
207276

208277
try {
209-
OutgoingFileTransfer transfer =
210-
fileTransferManager.createOutgoingFileTransfer(remoteContact.toString());
211278

212-
transfer.sendFile(archive, transferID);
213-
monitorFileTransfer(transfer, monitor);
214-
} catch (XMPPException e) {
215-
throw new IOException(e.getMessage(), e);
279+
in = new FileInputStream(archive);
280+
281+
long fileSize = archive.length();
282+
283+
out = new DataOutputStream(connection.getOutputStream());
284+
285+
out.writeLong(fileSize);
286+
final byte buffer[] = new byte[BUFFER_SIZE];
287+
288+
int read = 0;
289+
290+
while ((read = in.read(buffer)) != -1) {
291+
out.write(buffer, 0, read);
292+
checkCancellation(CancelOption.NOTIFY_PEER);
293+
}
294+
} finally {
295+
connection.close();
296+
IOUtils.closeQuietly(in);
216297
}
217298

218299
monitor.done();

core/src/saros/negotiation/ProjectNegotiation.java

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public abstract class ProjectNegotiation extends Negotiation {
6161
*/
6262
protected FileTransferManager fileTransferManager;
6363

64+
protected static final int BUFFER_SIZE = 32 * 1024;
65+
6466
public ProjectNegotiation(
6567
final String id,
6668
final JID peer,

0 commit comments

Comments
 (0)