Skip to content

Commit edd358d

Browse files
committed
[AMQ-9394] Tech Preview: Virtual Thread support
1 parent 51553dd commit edd358d

File tree

16 files changed

+865
-7
lines changed

16 files changed

+865
-7
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.activemq.ConfigurationException;
5151
import org.apache.activemq.Service;
5252
import org.apache.activemq.advisory.AdvisoryBroker;
53+
import org.apache.activemq.annotation.Experimental;
5354
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
5455
import org.apache.activemq.broker.jmx.AnnotatedMBean;
5556
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -223,6 +224,7 @@ public class BrokerService implements Service {
223224
private boolean monitorConnectionSplits = false;
224225
private int taskRunnerPriority = Thread.NORM_PRIORITY;
225226
private boolean dedicatedTaskRunner;
227+
private boolean virtualThreadTaskRunner;
226228
private boolean cacheTempDestinations = false;// useful for failover
227229
private int timeBeforePurgeTempDestinations = 5000;
228230
private final List<Runnable> shutdownHooks = new ArrayList<>();
@@ -1269,7 +1271,7 @@ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws
12691271
public TaskRunnerFactory getTaskRunnerFactory() {
12701272
if (this.taskRunnerFactory == null) {
12711273
this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1272-
isDedicatedTaskRunner());
1274+
isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
12731275
this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
12741276
}
12751277
return this.taskRunnerFactory;
@@ -1280,9 +1282,10 @@ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
12801282
}
12811283

12821284
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1285+
// [AMQ-9394] TODO: Should we have a separate config flag for virtualThread for persistence task runner?
12831286
if (taskRunnerFactory == null) {
12841287
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1285-
true, 1000, isDedicatedTaskRunner());
1288+
true, 1000, isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
12861289
}
12871290
return persistenceTaskRunnerFactory;
12881291
}
@@ -1891,6 +1894,15 @@ public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
18911894
this.dedicatedTaskRunner = dedicatedTaskRunner;
18921895
}
18931896

1897+
public boolean isVirtualThreadTaskRunner() {
1898+
return virtualThreadTaskRunner;
1899+
}
1900+
1901+
@Experimental("Tech Preview for Virtaul Thread support")
1902+
public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
1903+
this.virtualThreadTaskRunner = virtualThreadTaskRunner;
1904+
}
1905+
18941906
public boolean isCacheTempDestinations() {
18951907
return cacheTempDestinations;
18961908
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,16 @@ public boolean isSlave() {
547547
return brokerService.isSlave();
548548
}
549549

550+
@Override
551+
public boolean isDedicatedTaskRunner() {
552+
return brokerService.isDedicatedTaskRunner();
553+
}
554+
555+
@Override
556+
public boolean isVirtualThreadTaskRunner() {
557+
return brokerService.isVirtualThreadTaskRunner();
558+
}
559+
550560
private ManagedRegionBroker safeGetBroker() {
551561
if (broker == null) {
552562
throw new IllegalStateException("Broker is not yet started.");

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,4 +353,11 @@ public interface BrokerViewMBean extends Service {
353353

354354
@MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations")
355355
long getTotalMaxUncommittedExceededCount();
356+
357+
@MBeanInfo("Dedicated Task Runner enabled.")
358+
boolean isDedicatedTaskRunner();
359+
360+
@MBeanInfo("Virtual Thread Task Runner enabled.")
361+
boolean isVirtualThreadTaskRunner();
362+
356363
}

activemq-client-jdk21-test/pom.xml

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.activemq</groupId>
22+
<artifactId>activemq-parent</artifactId>
23+
<version>6.2.0-SNAPSHOT</version>
24+
</parent>
25+
<artifactId>activemq-client-jdk21-test</artifactId>
26+
<packaging>jar</packaging>
27+
<name>ActiveMQ :: Client JDK 21 Test</name>
28+
<description>Test module for activemq-client-jdk21 with tech preview support for Virtual Threads</description>
29+
<properties>
30+
<maven.compiler.release>21</maven.compiler.release>
31+
<maven.compiler.source>21</maven.compiler.source>
32+
<maven.compiler.target>21</maven.compiler.target>
33+
<maven.javadoc.skip>true</maven.javadoc.skip>
34+
</properties>
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.activemq</groupId>
38+
<artifactId>activemq-client-jdk21</artifactId>
39+
<version>${project.version}</version>
40+
<exclusions>
41+
<exclusion>
42+
<groupId>org.apache.activemq</groupId>
43+
<artifactId>activemq-client</artifactId>
44+
</exclusion>
45+
</exclusions>
46+
</dependency>
47+
<dependency>
48+
<groupId>junit</groupId>
49+
<artifactId>junit</artifactId>
50+
<scope>test</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.activemq</groupId>
54+
<artifactId>activemq-broker</artifactId>
55+
<scope>test</scope>
56+
<classifier>tests</classifier>
57+
<version>${project.version}</version>
58+
<exclusions>
59+
<exclusion>
60+
<groupId>org.apache.activemq</groupId>
61+
<artifactId>activemq-client</artifactId>
62+
</exclusion>
63+
</exclusions>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.apache.activemq</groupId>
67+
<artifactId>activemq-unit-tests</artifactId>
68+
<scope>test</scope>
69+
<classifier>tests</classifier>
70+
<version>${project.version}</version>
71+
<exclusions>
72+
<exclusion>
73+
<groupId>org.apache.activemq</groupId>
74+
<artifactId>activemq-client</artifactId>
75+
</exclusion>
76+
</exclusions>
77+
</dependency>
78+
</dependencies>
79+
<profiles>
80+
<profile>
81+
<id>jdk-17-skip</id>
82+
<activation>
83+
<jdk>[17,21)</jdk>
84+
</activation>
85+
<properties>
86+
<!-- maven.bundle.skip>true</maven.bundle.skip -->
87+
<maven.main.skip>true</maven.main.skip>
88+
<maven.test.skip>true</maven.test.skip>
89+
</properties>
90+
</profile>
91+
<profile>
92+
<id>jdk-21-plus</id>
93+
<activation>
94+
<jdk>[21,)</jdk>
95+
</activation>
96+
</profile>
97+
</profiles>
98+
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.broker;
18+
19+
import junit.framework.Test;
20+
21+
public class VirtualThreadTaskRunnerBrokerTest extends BrokerTest {
22+
23+
protected BrokerService createBroker() throws Exception {
24+
BrokerService broker = super.createBroker();
25+
broker.setVirtualThreadTaskRunner(true);
26+
return broker;
27+
}
28+
29+
public static Test suite() {
30+
return suite(VirtualThreadTaskRunnerBrokerTest.class);
31+
}
32+
33+
public static void main(String[] args) {
34+
junit.textui.TestRunner.run(suite());
35+
}
36+
37+
}

0 commit comments

Comments
 (0)