|
16 | 16 | */ |
17 | 17 | package org.apache.activemq.artemis.core.remoting.impl.netty; |
18 | 18 |
|
19 | | -import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; |
20 | | - |
21 | 19 | import javax.net.ssl.SNIHostName; |
22 | 20 | import javax.net.ssl.SSLContext; |
23 | 21 | import javax.net.ssl.SSLEngine; |
24 | 22 | import javax.net.ssl.SSLParameters; |
25 | 23 | import java.io.IOException; |
| 24 | +import java.lang.invoke.MethodHandles; |
26 | 25 | import java.net.ConnectException; |
27 | 26 | import java.net.InetAddress; |
28 | 27 | import java.net.InetSocketAddress; |
|
63 | 62 | import io.netty.channel.ChannelPipeline; |
64 | 63 | import io.netty.channel.ChannelPromise; |
65 | 64 | import io.netty.channel.EventLoopGroup; |
| 65 | +import io.netty.channel.MultiThreadIoEventLoopGroup; |
66 | 66 | import io.netty.channel.SimpleChannelInboundHandler; |
67 | 67 | import io.netty.channel.WriteBufferWaterMark; |
68 | | -import io.netty.channel.epoll.EpollEventLoopGroup; |
| 68 | +import io.netty.channel.epoll.EpollIoHandler; |
69 | 69 | import io.netty.channel.epoll.EpollSocketChannel; |
70 | 70 | import io.netty.channel.group.ChannelGroup; |
71 | 71 | import io.netty.channel.group.DefaultChannelGroup; |
72 | | -import io.netty.channel.kqueue.KQueueEventLoopGroup; |
| 72 | +import io.netty.channel.kqueue.KQueueIoHandler; |
73 | 73 | import io.netty.channel.kqueue.KQueueSocketChannel; |
74 | | -import io.netty.channel.nio.NioEventLoopGroup; |
| 74 | +import io.netty.channel.nio.NioIoHandler; |
75 | 75 | import io.netty.channel.socket.nio.NioSocketChannel; |
76 | 76 | import io.netty.handler.codec.base64.Base64; |
77 | 77 | import io.netty.handler.codec.http.DefaultFullHttpRequest; |
|
92 | 92 | import io.netty.handler.codec.http.LastHttpContent; |
93 | 93 | import io.netty.handler.codec.http.cookie.ClientCookieDecoder; |
94 | 94 | import io.netty.handler.codec.http.cookie.Cookie; |
95 | | -import io.netty.handler.ssl.SslContext; |
96 | 95 | import io.netty.handler.codec.socksx.SocksVersion; |
97 | 96 | import io.netty.handler.proxy.ProxyHandler; |
98 | 97 | import io.netty.handler.proxy.Socks4ProxyHandler; |
99 | 98 | import io.netty.handler.proxy.Socks5ProxyHandler; |
| 99 | +import io.netty.handler.ssl.SslContext; |
100 | 100 | import io.netty.handler.ssl.SslHandler; |
101 | 101 | import io.netty.resolver.NoopAddressResolverGroup; |
102 | 102 | import io.netty.util.AttributeKey; |
|
128 | 128 | import org.apache.activemq.artemis.utils.PasswordMaskingUtil; |
129 | 129 | import org.slf4j.Logger; |
130 | 130 | import org.slf4j.LoggerFactory; |
131 | | -import java.lang.invoke.MethodHandles; |
132 | 131 |
|
| 132 | +import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; |
133 | 133 | import static org.apache.activemq.artemis.utils.Base64.encodeBytes; |
134 | 134 |
|
135 | 135 | public class NettyConnector extends AbstractConnector { |
@@ -537,29 +537,29 @@ public synchronized void start() { |
537 | 537 |
|
538 | 538 | if (useEpoll && CheckDependencies.isEpollAvailable()) { |
539 | 539 | if (useGlobalWorkerPool) { |
540 | | - group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); |
| 540 | + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()))); |
541 | 541 | } else { |
542 | | - group = new EpollEventLoopGroup(remotingThreads); |
| 542 | + group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory()); |
543 | 543 | } |
544 | 544 | connectorType = EPOLL_CONNECTOR_TYPE; |
545 | 545 | channelClazz = EpollSocketChannel.class; |
546 | 546 | logger.debug("Connector {} using native epoll", this); |
547 | 547 | } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { |
548 | 548 | if (useGlobalWorkerPool) { |
549 | | - group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); |
| 549 | + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()))); |
550 | 550 | } else { |
551 | | - group = new KQueueEventLoopGroup(remotingThreads); |
| 551 | + group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory()); |
552 | 552 | } |
553 | 553 | connectorType = KQUEUE_CONNECTOR_TYPE; |
554 | 554 | channelClazz = KQueueSocketChannel.class; |
555 | 555 | logger.debug("Connector {} using native kqueue", this); |
556 | 556 | } else { |
557 | 557 | if (useGlobalWorkerPool) { |
558 | 558 | channelClazz = NioSocketChannel.class; |
559 | | - group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); |
| 559 | + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()))); |
560 | 560 | } else { |
561 | 561 | channelClazz = NioSocketChannel.class; |
562 | | - group = new NioEventLoopGroup(remotingThreads); |
| 562 | + group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory()); |
563 | 563 | } |
564 | 564 | connectorType = NIO_CONNECTOR_TYPE; |
565 | 565 | channelClazz = NioSocketChannel.class; |
|
0 commit comments