Skip to content

Commit 482673a

Browse files
garyrussellartembilan
authored andcommitted
GH-3509: Fix Memory Leak with Intercepted TCP Conn
Resolves #3509 When a connection was intercepted, the `TcpSender.add...` was called with the interceptor, but the `removeDead...` was called with the actual connection, causing a memory leak. Always call the `TcpSender` with the outer-most interceptor. # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetServerConnectionFactory.java # spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java
1 parent 321f036 commit 482673a

File tree

8 files changed

+127
-26
lines changed

8 files changed

+127
-26
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -615,12 +615,13 @@ protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg
615615
if (this.senders.size() == 0) {
616616
connection.registerSender(wrapper);
617617
}
618+
connection.setWrapped(true);
618619
connection = wrapper;
619620
}
620621
return connection;
621622
}
622623
finally {
623-
this.addConnection(connection);
624+
addConnection(connection);
624625
}
625626
}
626627

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorFactoryChain.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222

2323
/**
2424
* @author Gary Russell
25+
* @author Artem Bilan
2526
* @since 2.0
2627
*
2728
*/
@@ -38,4 +39,8 @@ public void setInterceptors(TcpConnectionInterceptorFactory[] interceptorFactori
3839
this.interceptorFactories = Arrays.copyOf(interceptorFactories, interceptorFactories.length);
3940
}
4041

42+
public void setInterceptor(TcpConnectionInterceptorFactory... interceptorFactories) {
43+
this.interceptorFactories = Arrays.copyOf(interceptorFactories, interceptorFactories.length);
44+
}
45+
4146
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2020 the original author or authors.
2+
* Copyright 2001-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -96,6 +96,8 @@ public abstract class TcpConnectionSupport implements TcpConnection {
9696

9797
private boolean manualListenerRegistration;
9898

99+
private boolean wrapped;
100+
99101
/*
100102
* This boolean is to avoid looking for a temporary listener when not needed
101103
* to avoid a CPU cache flush. This does not have to be volatile because it
@@ -164,8 +166,10 @@ void setTestFailed(boolean testFailed) {
164166
*/
165167
@Override
166168
public void close() {
167-
for (TcpSender sender : this.senders) {
168-
sender.removeDeadConnection(this);
169+
if (!this.wrapped) {
170+
for (TcpSender sender : this.senders) {
171+
sender.removeDeadConnection(this);
172+
}
169173
}
170174
// close() may be called multiple times; only publish once
171175
if (!this.closePublished.getAndSet(true)) {
@@ -194,6 +198,9 @@ protected void closeConnection(boolean isException) {
194198
outerListener = nextListener;
195199
}
196200
outerListener.close();
201+
for (TcpSender sender : getSenders()) {
202+
sender.removeDeadConnection(outerListener);
203+
}
197204
if (isException) {
198205
// ensure physical close in case the interceptor did not close
199206
this.close();
@@ -264,6 +271,10 @@ public void setNeedsTest(boolean needsTest) {
264271
this.needsTest = needsTest;
265272
}
266273

274+
void setSenders(List<TcpSender> senders) {
275+
this.senders.addAll(senders);
276+
}
277+
267278
/**
268279
* Set the listener that will receive incoming Messages.
269280
* @param listener The listener.
@@ -401,6 +412,14 @@ public SocketInfo getSocketInfo() {
401412
return this.socketInfo;
402413
}
403414

415+
/**
416+
* Set to true if intercepted.
417+
* @param wrapped true if wrapped.
418+
*/
419+
public void setWrapped(boolean wrapped) {
420+
this.wrapped = wrapped;
421+
}
422+
404423
public String getConnectionFactoryName() {
405424
return this.connectionFactoryName;
406425
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,7 +56,11 @@ protected TcpConnectionSupport buildNewConnection() {
5656
TcpConnectionSupport connection =
5757
this.tcpNetConnectionSupport.createNewConnection(socket, false, isLookupHost(),
5858
getApplicationEventPublisher(), getComponentName());
59-
connection = wrapConnection(connection);
59+
TcpConnectionSupport wrapped = wrapConnection(connection);
60+
if (wrapped.equals(connection)) {
61+
connection.setSenders(getSenders());
62+
connection = wrapped;
63+
}
6064
initializeConnection(connection, socket);
6165
this.getTaskExecutor().execute(connection);
6266
this.harvestClosedConnections();

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -98,6 +98,9 @@ protected TcpConnectionSupport buildNewConnection() {
9898
((TcpNioSSLConnection) connection).setHandshakeTimeout(sslHandshakeTimeout);
9999
}
100100
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
101+
if (!wrappedConnection.equals(connection)) {
102+
connection.setSenders(getSenders());
103+
}
101104
initializeConnection(wrappedConnection, socketChannel.socket());
102105
if (getSoTimeout() > 0) {
103106
connection.setLastRead(System.currentTimeMillis());

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioServerConnectionFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -277,6 +277,9 @@ private TcpNioConnection createTcpNioConnection(SocketChannel socketChannel) {
277277
isLookupHost(), getApplicationEventPublisher(), getComponentName());
278278
connection.setUsingDirectBuffers(this.usingDirectBuffers);
279279
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
280+
if (!wrappedConnection.equals(connection)) {
281+
connection.setSenders(getSenders());
282+
}
280283
initializeConnection(wrappedConnection, socketChannel.socket());
281284
return connection;
282285
}
Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,40 +16,34 @@
1616

1717
package org.springframework.integration.ip.tcp;
1818

19-
import org.springframework.context.ApplicationEvent;
2019
import org.springframework.context.ApplicationEventPublisher;
2120
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
2221
import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptorFactory;
2322

2423
/**
2524
* @author Gary Russell
25+
* @author Mário Dias
26+
* @author Artem Bilan
27+
*
2628
* @since 3.0
2729
*
2830
*/
2931
public class AbstractTcpChannelAdapterTests {
3032

31-
private static final ApplicationEventPublisher NOOP_PUBLISHER = new ApplicationEventPublisher() {
32-
33-
@Override
34-
public void publishEvent(ApplicationEvent event) {
35-
}
36-
37-
@Override
38-
public void publishEvent(Object event) {
39-
40-
}
41-
42-
};
33+
private static final ApplicationEventPublisher NOOP_PUBLISHER = event -> { };
4334

4435
protected HelloWorldInterceptorFactory newInterceptorFactory() {
36+
return newInterceptorFactory(NOOP_PUBLISHER);
37+
}
38+
39+
protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) {
4540
HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
46-
factory.setApplicationEventPublisher(NOOP_PUBLISHER);
41+
factory.setApplicationEventPublisher(applicationEventPublisher);
4742
return factory;
4843
}
4944

5045
protected void noopPublisher(AbstractConnectionFactory connectionFactory) {
5146
connectionFactory.setApplicationEventPublisher(NOOP_PUBLISHER);
5247
}
5348

54-
5549
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.ArrayList;
3131
import java.util.HashSet;
3232
import java.util.List;
33+
import java.util.Map;
3334
import java.util.Set;
3435
import java.util.TreeSet;
3536
import java.util.concurrent.CountDownLatch;
@@ -39,6 +40,7 @@
3940
import java.util.concurrent.atomic.AtomicReference;
4041

4142
import javax.net.ServerSocketFactory;
43+
import javax.net.SocketFactory;
4244

4345
import org.apache.commons.logging.Log;
4446
import org.apache.commons.logging.LogFactory;
@@ -57,15 +59,21 @@
5759
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
5860
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
5961
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
62+
import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor;
63+
import org.springframework.integration.ip.tcp.connection.TcpConnection;
64+
import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent;
6065
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
6166
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain;
67+
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
6268
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
69+
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
6370
import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
6471
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
6572
import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer;
6673
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
6774
import org.springframework.integration.ip.util.TestingUtilities;
6875
import org.springframework.integration.support.MessageBuilder;
76+
import org.springframework.integration.test.util.TestUtils;
6977
import org.springframework.messaging.Message;
7078
import org.springframework.messaging.MessageChannel;
7179
import org.springframework.messaging.MessagingException;
@@ -77,6 +85,7 @@
7785
/**
7886
* @author Gary Russell
7987
* @author Artem Bilan
88+
* @author Mário Dias
8089
*
8190
* @since 2.0
8291
*/
@@ -1191,4 +1200,67 @@ public void testConnectionException() throws Exception {
11911200
}
11921201
}
11931202

1203+
@SuppressWarnings("unchecked")
1204+
@Test
1205+
public void testInterceptedConnection() throws Exception {
1206+
final CountDownLatch latch = new CountDownLatch(1);
1207+
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
1208+
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
1209+
scf.setSerializer(serializer);
1210+
scf.setDeserializer(serializer);
1211+
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
1212+
adapter.setConnectionFactory(scf);
1213+
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
1214+
handler.setConnectionFactory(scf);
1215+
final AtomicReference<TcpConnection> connection = new AtomicReference<>();
1216+
scf.setApplicationEventPublisher(event -> {
1217+
if (event instanceof TcpConnectionOpenEvent) {
1218+
connection.set(handler.getConnections()
1219+
.get(((TcpConnectionOpenEvent) event).getConnectionId()));
1220+
latch.countDown();
1221+
}
1222+
});
1223+
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
1224+
fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher()));
1225+
scf.setInterceptorFactoryChain(fc);
1226+
scf.start();
1227+
TestingUtilities.waitListening(scf, null);
1228+
int port = scf.getPort();
1229+
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
1230+
socket.close();
1231+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
1232+
assertThat(connection.get()).isInstanceOf(HelloWorldInterceptor.class);
1233+
assertThat(TestUtils.getPropertyValue(handler, "connections", Map.class)).isEmpty();
1234+
scf.stop();
1235+
}
1236+
1237+
@Test
1238+
public void testInterceptedCleanup() throws Exception {
1239+
final CountDownLatch latch = new CountDownLatch(1);
1240+
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
1241+
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
1242+
scf.setSerializer(serializer);
1243+
scf.setDeserializer(serializer);
1244+
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
1245+
adapter.setConnectionFactory(scf);
1246+
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
1247+
handler.setConnectionFactory(scf);
1248+
scf.setApplicationEventPublisher(event -> {
1249+
if (event instanceof TcpConnectionCloseEvent) {
1250+
latch.countDown();
1251+
}
1252+
});
1253+
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
1254+
fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher()));
1255+
scf.setInterceptorFactoryChain(fc);
1256+
scf.start();
1257+
TestingUtilities.waitListening(scf, null);
1258+
int port = scf.getPort();
1259+
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
1260+
socket.close();
1261+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
1262+
assertThat(handler.getConnections().isEmpty()).isTrue();
1263+
scf.stop();
1264+
}
1265+
11941266
}

0 commit comments

Comments
 (0)