Skip to content

Commit 6157e87

Browse files
committed
SSH native implemntation; test rewrite.
1 parent bcf257a commit 6157e87

14 files changed

+726
-374
lines changed

README.md

+1-3
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,4 @@ Download artifacts directly from CircleCI: https://circleci.com/gh/bokysan/jdbc-
136136

137137
## TODO
138138

139-
- Make better use of connection pooling. If the system tries to open the same connection towards the same server, this implementation will reestablish
140-
another connection.
141-
- SSH connection should be persistent and kept alive only until until the last connection is closed. Currently their livespan is connected to the life of JVM.
139+
- Discover why Derby test for SSH native connection is failing.

pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,18 @@
9191
<version>${slf4j.version}</version>
9292
</dependency>
9393

94-
<dependency>
94+
<!--dependency>
9595
<groupId>org.slf4j</groupId>
9696
<artifactId>slf4j-simple</artifactId>
9797
<version>${slf4j.version}</version>
9898
<scope>test</scope>
99+
</dependency-->
100+
101+
<dependency>
102+
<groupId>ch.qos.logback</groupId>
103+
<artifactId>logback-classic</artifactId>
104+
<version>${logback.version}</version>
105+
<scope>test</scope>
99106
</dependency>
100107

101108
<dependency>

src/main/java/com/cekrlic/jdbc/ssh/tunnel/AbstractSshJDriver.java

+54-7
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@
22

33
import org.slf4j.LoggerFactory;
44

5+
import java.io.IOException;
56
import java.sql.*;
67
import java.util.Enumeration;
8+
import java.util.Map;
79
import java.util.Properties;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.atomic.AtomicReference;
812
import java.util.logging.Logger;
913

1014
public abstract class AbstractSshJDriver implements Driver {
1115
private static final org.slf4j.Logger log = LoggerFactory.getLogger(AbstractSshJDriver.class);
1216

13-
protected AbstractTunnel tunnel;
17+
static final Map<String, AtomicReference<AbstractTunnel>> tunnelList = new ConcurrentHashMap<>();
1418

1519
abstract String getDriverPrefix();
1620

@@ -19,11 +23,6 @@ public boolean acceptsURL(String url) throws SQLException {
1923
return (url != null && url.startsWith(getDriverPrefix()));
2024
}
2125

22-
@Override
23-
public Connection connect(String url, Properties info) throws SQLException {
24-
return null;
25-
}
26-
2726
private Driver findDriver(String url) throws SQLException {
2827
Driver realDriver = null;
2928

@@ -47,7 +46,7 @@ private Driver findDriver(String url) throws SQLException {
4746
return realDriver;
4847
}
4948

50-
protected Connection getRealConnection(Properties info, String originalUrl, String newHost, String newPort) throws SQLException {
49+
protected Connection getRealConnection(AbstractTunnel tunnel, Properties info, String originalUrl, String newHost, String newPort) throws SQLException {
5150
originalUrl = originalUrl.replaceAll("\\{\\{[hH][oO][sS][tT]\\}\\}", newHost);
5251
originalUrl = originalUrl.replaceAll("\\{\\{[pP][oO][rR][tT]\\}\\}", newPort);
5352

@@ -100,4 +99,52 @@ public boolean jdbcCompliant() {
10099
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
101100
throw new SQLFeatureNotSupportedException("Feature not supported");
102101
}
102+
103+
@Override
104+
public Connection connect(String url, Properties info) throws SQLException {
105+
ConnectionData d = verifyConnection(url);
106+
// Not our connection URL, skip further integration
107+
if(d == null) {
108+
return null;
109+
}
110+
111+
AbstractTunnel tunnel = getTunnel(url, d);
112+
tunnel.ensureStarted();
113+
Connection c = getRealConnection(tunnel, info, d.getForwardingUrl(), tunnel.getLocalHost(), tunnel.getLocalPort());
114+
return new SshConnection(tunnel, c);
115+
}
116+
117+
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
118+
public AbstractTunnel getTunnel(String url, ConnectionData d) throws SQLException {
119+
AbstractTunnel tunnel;
120+
try {
121+
AtomicReference<AbstractTunnel> a = tunnelList.get(url);
122+
if(a == null) {
123+
tunnel = newTunnel(d);
124+
log.info("No tunnel for {}, created a new tunnel: {}", url, tunnel);
125+
a = new AtomicReference<>(tunnel);
126+
synchronized (a) {
127+
tunnelList.put(url, a);
128+
tunnel.start();
129+
}
130+
} else {
131+
synchronized (a) {
132+
tunnel = a.get();
133+
if(tunnel.isStopped()) {
134+
tunnel = newTunnel(d);
135+
log.info("Tunnel stopped for {}, created a new tunnel: {}", url, tunnel);
136+
a.set(tunnel);
137+
} else {
138+
log.debug("Reusing connection {}:{}", tunnel.getLocalHost(), tunnel.getLocalPort());
139+
}
140+
}
141+
}
142+
} catch (IOException e) {
143+
throw new SQLException(e);
144+
}
145+
return tunnel;
146+
}
147+
148+
protected abstract AbstractTunnel newTunnel(ConnectionData d) throws IOException, SQLException;
149+
103150
}

src/main/java/com/cekrlic/jdbc/ssh/tunnel/AbstractTunnel.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public abstract class AbstractTunnel {
2424
public static final String REMOTE = "remote";
2525
public static final String DRIVERS = "drivers";
2626

27+
protected AtomicInteger connectionCount = new AtomicInteger(0);
2728
protected AtomicInteger localPort = new AtomicInteger(20000 + new java.util.Random().nextInt(100));
2829
protected String localHost;
2930

@@ -132,5 +133,19 @@ private static boolean isPortOpen(String ip, int port) {
132133
/**
133134
* Stop the tunnel
134135
*/
135-
abstract void stop();
136+
abstract void stop(String reason);
137+
138+
abstract void ensureStarted() throws SQLException;
139+
140+
abstract boolean isStopped();
141+
142+
public void remove(SshConnection sshConnection) {
143+
if(connectionCount.decrementAndGet()==0) {
144+
this.stop("No connections remaining open.");
145+
}
146+
}
147+
148+
public void add(SshConnection sshConnection) {
149+
connectionCount.incrementAndGet();
150+
}
136151
}

0 commit comments

Comments
 (0)