Skip to content

Commit 49d1c36

Browse files
committed
Back to AsynchronousFileChannel for the time being
1 parent 6601659 commit 49d1c36

File tree

6 files changed

+821
-434
lines changed

6 files changed

+821
-434
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package de.gesellix.docker.client.filesocket;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.AsynchronousByteChannel;
6+
import java.nio.channels.AsynchronousCloseException;
7+
import java.nio.channels.AsynchronousFileChannel;
8+
import java.nio.channels.CompletionHandler;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.Future;
11+
12+
// Copy from https://github.com/docker-java/docker-java/blob/1da0e3ce211d963248b64484fc38101869a5a61e/docker-java-transport/src/main/java/com/github/dockerjava/transport/NamedPipeSocket.java#L88
13+
class AsynchronousFileByteChannel implements AsynchronousByteChannel {
14+
15+
private final AsynchronousFileChannel fileChannel;
16+
17+
AsynchronousFileByteChannel(AsynchronousFileChannel fileChannel) {
18+
this.fileChannel = fileChannel;
19+
}
20+
21+
@Override
22+
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
23+
fileChannel.read(dst, 0, attachment, new CompletionHandler<Integer, A>() {
24+
@Override
25+
public void completed(Integer read, A attachment) {
26+
handler.completed(read > 0 ? read : -1, attachment);
27+
}
28+
29+
@Override
30+
public void failed(Throwable exc, A attachment) {
31+
if (exc instanceof AsynchronousCloseException) {
32+
handler.completed(-1, attachment);
33+
return;
34+
}
35+
handler.failed(exc, attachment);
36+
}
37+
});
38+
}
39+
40+
@Override
41+
public Future<Integer> read(ByteBuffer dst) {
42+
CompletableFutureHandler future = new CompletableFutureHandler();
43+
fileChannel.read(dst, 0, null, future);
44+
return future;
45+
}
46+
47+
@Override
48+
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
49+
fileChannel.write(src, 0, attachment, handler);
50+
}
51+
52+
@Override
53+
public Future<Integer> write(ByteBuffer src) {
54+
return fileChannel.write(src, 0);
55+
}
56+
57+
@Override
58+
public void close() throws IOException {
59+
fileChannel.close();
60+
}
61+
62+
@Override
63+
public boolean isOpen() {
64+
return fileChannel.isOpen();
65+
}
66+
67+
private static class CompletableFutureHandler extends CompletableFuture<Integer> implements CompletionHandler<Integer, Object> {
68+
69+
@Override
70+
public void completed(Integer read, Object attachment) {
71+
complete(read > 0 ? read : -1);
72+
}
73+
74+
@Override
75+
public void failed(Throwable exc, Object attachment) {
76+
if (exc instanceof AsynchronousCloseException) {
77+
complete(-1);
78+
return;
79+
}
80+
completeExceptionally(exc);
81+
}
82+
}
83+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package de.gesellix.docker.client.filesocket;
2+
3+
import static com.sun.jna.platform.win32.WinBase.INVALID_HANDLE_VALUE;
4+
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
8+
import java.net.InetAddress;
9+
import java.net.InetSocketAddress;
10+
import java.net.SocketAddress;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import com.sun.jna.platform.win32.WinNT;
17+
18+
import okio.BufferedSink;
19+
import okio.BufferedSource;
20+
import okio.Okio;
21+
import okio.Timeout;
22+
23+
public class NamedPipeJniSocket extends FileSocket {
24+
25+
private static final Logger log = LoggerFactory.getLogger(NamedPipeJniSocket.class);
26+
27+
private WinNT.HANDLE handle;
28+
private boolean connected = false;
29+
private boolean closed = false;
30+
31+
private BufferedSource source;
32+
private BufferedSink sink;
33+
34+
private final Timeout ioTimeout = new Timeout().timeout(1000, TimeUnit.MILLISECONDS);
35+
36+
@Override
37+
public void connect(SocketAddress endpoint, int timeout) throws IOException {
38+
if (!(endpoint instanceof InetSocketAddress)) {
39+
throw new IllegalArgumentException("Expected endpoint to be a InetSocketAddress");
40+
}
41+
42+
InetSocketAddress inetSocketAddress = (InetSocketAddress) endpoint;
43+
InetAddress address = inetSocketAddress.getAddress();
44+
String socketPath = decodeHostname(address);
45+
connect(socketPath);
46+
}
47+
48+
void connect(String socketPath) {
49+
socketPath = socketPath.replace("/", "\\");
50+
log.debug("connect via '{}'...", socketPath);
51+
52+
handle = NamedPipeUtils.connect(socketPath, 10_000, 500, 50);
53+
54+
connected = true;
55+
source = Okio.buffer(new NamedPipeSource(handle, ioTimeout));
56+
sink = Okio.buffer(new NamedPipeSink(handle, ioTimeout));
57+
}
58+
59+
@Override
60+
public InputStream getInputStream() throws IOException {
61+
ensureOpen();
62+
return source.inputStream();
63+
}
64+
65+
@Override
66+
public OutputStream getOutputStream() throws IOException {
67+
ensureOpen();
68+
return sink.outputStream();
69+
}
70+
71+
@Override
72+
public synchronized void close() throws IOException {
73+
if (closed) {
74+
return;
75+
}
76+
log.debug("closing handle {}...", handle);
77+
try {
78+
if (handle != null && !INVALID_HANDLE_VALUE.equals(handle)) {
79+
// Cancel any pending read/write before closing to avoid CloseHandle() hang
80+
ExtendedKernel32.INSTANCE.CancelIoEx(handle, null);
81+
}
82+
83+
if (source != null) {
84+
log.debug("closing source {}...", source);
85+
source.close();
86+
}
87+
if (sink != null) {
88+
log.debug("closing sink {}...", sink);
89+
sink.close();
90+
}
91+
} finally {
92+
if (handle != null) {
93+
NamedPipeUtils.closeHandle(handle);
94+
}
95+
closed = true;
96+
connected = false;
97+
}
98+
}
99+
100+
@Override
101+
public boolean isConnected() {
102+
return connected;
103+
}
104+
105+
@Override
106+
public boolean isClosed() {
107+
return closed;
108+
}
109+
110+
private void ensureOpen() throws IOException {
111+
if (closed) {
112+
throw new IOException("NamedPipeSocket is closed");
113+
}
114+
if (!connected) {
115+
throw new IOException("NamedPipeSocket is not connected");
116+
}
117+
}
118+
}
Lines changed: 65 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,29 @@
11
package de.gesellix.docker.client.filesocket;
22

3-
import static com.sun.jna.platform.win32.WinBase.INVALID_HANDLE_VALUE;
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
45

56
import java.io.IOException;
67
import java.io.InputStream;
78
import java.io.OutputStream;
89
import java.net.InetAddress;
910
import java.net.InetSocketAddress;
1011
import java.net.SocketAddress;
11-
import java.util.concurrent.TimeUnit;
12-
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
16-
import com.sun.jna.platform.win32.WinNT;
17-
18-
import okio.BufferedSink;
19-
import okio.BufferedSource;
20-
import okio.Okio;
21-
import okio.Timeout;
12+
import java.nio.channels.AsynchronousFileChannel;
13+
import java.nio.channels.Channels;
14+
import java.nio.file.FileSystemException;
15+
import java.nio.file.Paths;
16+
import java.nio.file.StandardOpenOption;
17+
import java.util.concurrent.atomic.AtomicBoolean;
2218

2319
public class NamedPipeSocket extends FileSocket {
2420

2521
private static final Logger log = LoggerFactory.getLogger(NamedPipeSocket.class);
2622

27-
private WinNT.HANDLE handle;
28-
private boolean connected = false;
29-
private boolean closed = false;
30-
31-
private BufferedSource source;
32-
private BufferedSink sink;
33-
34-
private final Timeout ioTimeout = new Timeout().timeout(1000, TimeUnit.MILLISECONDS);
23+
private AsynchronousFileByteChannel channel;
24+
private final AtomicBoolean closed = new AtomicBoolean(false);
25+
private InputStream inputStream;
26+
private OutputStream outputStream;
3527

3628
@Override
3729
public void connect(SocketAddress endpoint, int timeout) throws IOException {
@@ -42,77 +34,75 @@ public void connect(SocketAddress endpoint, int timeout) throws IOException {
4234
InetSocketAddress inetSocketAddress = (InetSocketAddress) endpoint;
4335
InetAddress address = inetSocketAddress.getAddress();
4436
String socketPath = decodeHostname(address);
45-
connect(socketPath);
46-
}
47-
48-
void connect(String socketPath) {
49-
socketPath = socketPath.replace("/", "\\");
5037
log.debug("connect via '{}'...", socketPath);
5138

52-
handle = NamedPipeUtils.connect(socketPath, 10_000, 500, 50);
53-
54-
connected = true;
55-
source = Okio.buffer(new NamedPipeSource(handle, ioTimeout));
56-
sink = Okio.buffer(new NamedPipeSink(handle, ioTimeout));
57-
}
58-
59-
@Override
60-
public InputStream getInputStream() throws IOException {
61-
ensureOpen();
62-
return source.inputStream();
63-
}
64-
65-
@Override
66-
public OutputStream getOutputStream() throws IOException {
67-
ensureOpen();
68-
return sink.outputStream();
39+
socketPath = socketPath.replace("/", "\\\\");
40+
41+
long startedAt = System.currentTimeMillis();
42+
timeout = Math.max(timeout, 10_000);
43+
while (true) {
44+
try {
45+
channel = new AsynchronousFileByteChannel(
46+
AsynchronousFileChannel.open(
47+
Paths.get(socketPath),
48+
StandardOpenOption.READ,
49+
StandardOpenOption.WRITE
50+
)
51+
);
52+
break;
53+
}
54+
catch (FileSystemException e) {
55+
if (System.currentTimeMillis() - startedAt >= timeout) {
56+
throw new RuntimeException(e);
57+
}
58+
else {
59+
// requires a bit more code and the net.java.dev.jna:jna dependency
60+
// Kernel32.INSTANCE.WaitNamedPipe(socketFileName, 100);
61+
try {
62+
Thread.sleep(100);
63+
}
64+
catch (InterruptedException ignored) {
65+
}
66+
}
67+
}
68+
}
6969
}
7070

7171
@Override
72-
public synchronized void close() throws IOException {
73-
if (closed) {
74-
return;
75-
}
76-
log.debug("closing handle {}...", handle);
77-
try {
78-
if (handle != null && !INVALID_HANDLE_VALUE.equals(handle)) {
79-
// Cancel any pending read/write before closing to avoid CloseHandle() hang
80-
ExtendedKernel32.INSTANCE.CancelIoEx(handle, null);
81-
}
82-
83-
if (source != null) {
84-
log.debug("closing source {}...", source);
85-
source.close();
86-
}
87-
if (sink != null) {
88-
log.debug("closing sink {}...", sink);
89-
sink.close();
90-
}
91-
} finally {
92-
if (handle != null) {
93-
NamedPipeUtils.closeHandle(handle);
94-
}
95-
closed = true;
96-
connected = false;
72+
public InputStream getInputStream() {
73+
if (inputStream == null) {
74+
this.inputStream = Channels.newInputStream(channel);
9775
}
76+
return inputStream;
9877
}
9978

10079
@Override
101-
public boolean isConnected() {
102-
return connected;
80+
public OutputStream getOutputStream() {
81+
if (outputStream == null) {
82+
this.outputStream = Channels.newOutputStream(channel);
83+
}
84+
return outputStream;
10385
}
10486

10587
@Override
10688
public boolean isClosed() {
107-
return closed;
89+
return closed.get();
10890
}
10991

110-
private void ensureOpen() throws IOException {
111-
if (closed) {
112-
throw new IOException("NamedPipeSocket is closed");
92+
@Override
93+
public void close() throws IOException {
94+
if (!closed.compareAndSet(false, true)) {
95+
// if compareAndSet() returns false closed was already true
96+
return;
97+
}
98+
if (channel != null) {
99+
channel.close();
100+
}
101+
if (inputStream != null) {
102+
inputStream.close();
113103
}
114-
if (!connected) {
115-
throw new IOException("NamedPipeSocket is not connected");
104+
if (outputStream != null) {
105+
outputStream.close();
116106
}
117107
}
118108
}

0 commit comments

Comments
 (0)