Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.datastax.oss.driver.internal.core.session.PoolManager;
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
import com.datastax.oss.driver.internal.core.session.SessionRegistry;
import com.datastax.oss.driver.internal.core.ssl.JdkSslHandlerFactory;
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
import com.datastax.oss.driver.internal.core.tracker.MultiplexingRequestTracker;
Expand Down Expand Up @@ -226,6 +227,7 @@ public class DefaultDriverContext implements InternalDriverContext {
private final LazyReference<List<LifecycleListener>> lifecycleListenersRef =
new LazyReference<>("lifecycleListeners", this::buildLifecycleListeners, cycleDetector);

private static SessionRegistry sessionRegistry;
private final DriverConfig config;
private final DriverConfigLoader configLoader;
private final ChannelPoolFactory channelPoolFactory = new ChannelPoolFactory();
Expand Down Expand Up @@ -335,6 +337,14 @@ public DefaultDriverContext(
.build());
}

public SessionRegistry getSessionRegistry() {
return sessionRegistry;
}

public static void setSessionRegistry(SessionRegistry registry) {
sessionRegistry = registry;
}

/**
* Builds a map of options to send in a Startup message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.LifecycleListener;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
Expand Down Expand Up @@ -336,6 +337,10 @@ private class SingleThreaded {
private boolean forceCloseWasCalled;

private SingleThreaded(InternalDriverContext context, Set<EndPoint> contactPoints) {
if (context instanceof DefaultDriverContext) {
SessionRegistry sessionRegistry = ((DefaultDriverContext) context).getSessionRegistry();
if (sessionRegistry != null) sessionRegistry.registerSession(this);
}
this.context = context;
this.nodeStateManager = new NodeStateManager(context);
this.initialContactPoints = contactPoints;
Expand Down Expand Up @@ -656,6 +661,10 @@ private void warnIfFailed(CompletionStage<Void> stage) {
}

private void closePolicies() {
if (context instanceof DefaultDriverContext) {
SessionRegistry sessionRegistry = ((DefaultDriverContext) context).getSessionRegistry();
if (sessionRegistry != null) sessionRegistry.closeSession(this);
}
// This is a bit tricky: we might be closing the session because of an initialization error.
// This error might have been triggered by a policy failing to initialize. If we try to access
// the policy here to close it, it will fail again. So make sure we ignore that error and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.datastax.oss.driver.internal.core.session;

import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;

public abstract class SessionRegistry {
public SessionRegistry() {
DefaultDriverContext.setSessionRegistry(this);
}

public abstract void registerSession(Object session);

public abstract void closeSession(Object session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.datastax.oss.driver.internal.osgi.support;

import com.datastax.oss.driver.api.testinfra.requirement.BackendRequirementRule;
import com.datastax.oss.driver.api.testinfra.session.SessionTracker;
import org.junit.AssumptionViolatedException;
import org.junit.runner.Description;
import org.junit.runner.notification.Failure;
Expand All @@ -26,7 +27,6 @@
import org.ops4j.pax.exam.junit.PaxExam;

public class CcmPaxExam extends PaxExam {

public CcmPaxExam(Class<?> klass) throws InitializationError {
super(klass);
}
Expand All @@ -36,7 +36,12 @@ public void run(RunNotifier notifier) {
Description description = getDescription();

if (BackendRequirementRule.meetsDescriptionRequirements(description)) {
super.run(notifier);
try {
SessionTracker.testStarted(description.getClassName(), description.getMethodName());
super.run(notifier);
} finally {
SessionTracker.testEnded(description.getClassName(), description.getMethodName());
}
} else {
// requirements not met, throw reasoning assumption to skip test
AssumptionViolatedException e =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package com.datastax.oss.driver.api.testinfra.ccm;

import com.datastax.oss.driver.api.testinfra.session.SessionTracker;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,6 +42,24 @@ public class CustomCcmRule extends BaseCcmRule {
super(ccmBridge);
}

@Override
public Statement apply(Statement base, Description description) {
final Statement statement = super.apply(base, description);
return new Statement() {
final Statement original = statement;

@Override
public void evaluate() throws Throwable {
try {
SessionTracker.testStarted(description.getClassName(), description.getMethodName());
original.evaluate();
} finally {
SessionTracker.testEnded(description.getClassName(), description.getMethodName());
}
}
};
}

@Override
protected void before() {
if (CURRENT.get() == null && CURRENT.compareAndSet(null, this)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.datastax.oss.driver.api.testinfra.session;

import com.datastax.oss.driver.internal.core.session.SessionRegistry;
import java.lang.ref.WeakReference;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

public class SessionTracker {
static final TestSessionRegistry sessionRegistry = new TestSessionRegistry();

private static final Set<String> runningTests = new ConcurrentSkipListSet<>();

public static void testStarted(String className, String methodName) {
runningTests.add(String.format("%s.%s", className, methodName));
}

public static void testEnded(String className, String methodName) {
runningTests.remove(String.format("%s.%s", className, methodName));
if (runningTests.isEmpty()) {
List<TestSessionRegistry.SessionRecord> activeSessions =
sessionRegistry.getActiveSessionsAndForget();
if (!activeSessions.isEmpty()) {
throw new IllegalStateException(
String.format(
"There are active sessions, created in following tests: %s",
activeSessions.stream()
.flatMap(s -> s.sourceTests.stream())
.collect(Collectors.toList())));
}
}
}

private static class TestSessionRegistry extends SessionRegistry {
protected TestSessionRegistry() {
super();
}

public static class SessionRecord {
final WeakReference<Object> session;
final Set<String> sourceTests;

SessionRecord(WeakReference<Object> session, Set<String> sourceTests) {
this.session = session;
this.sourceTests = sourceTests;
}
}

private static final List<SessionRecord> sessions = new CopyOnWriteArrayList<>();

@Override
public void registerSession(Object session) {
sessions.add(
new SessionRecord(
new WeakReference<>(session), runningTests.stream().collect(Collectors.toSet())));
}

@Override
public void closeSession(Object session) {
sessions.removeIf(s -> s.session == session);
}

public List<SessionRecord> getActiveSessionsAndForget() {
// Purge known sessions
sessions.removeIf(ref -> ref.session.get() == null);
return sessions.stream()
.filter(ref -> ref.session.get() == null)
.collect(Collectors.toList());
}
}
}
Loading