Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-20922] Reuse the Spark ClassLoader in app-fabric #15483

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.cdap.cdap.internal.app.runtime.ProgramClassLoader;

/**
* A provider for for program classloading creation.
* A provider for program classloading creation.
*/
public interface ProgramClassLoaderProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,22 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Provider for runtime system of programs.
*/
/** Provider for runtime system of programs. */
public interface ProgramRuntimeProvider {

/**
* Annotation for implementation to specify what are the supported {@link ProgramType}.
*/
/** Annotation for implementation to specify what are the supported {@link ProgramType}. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface SupportedProgramType {

/**
* Returns the list of supported {@link ProgramType}.
*/
/** Returns the list of supported {@link ProgramType}. */
ProgramType[] value();
}

/**
* The execution mode of the program runtime system.
*/
/** The execution mode of the program runtime system. */
enum Mode {
LOCAL, DISTRIBUTED
LOCAL,
DISTRIBUTED
}

/**
Expand All @@ -70,12 +63,15 @@ enum Mode {
boolean isSupported(ProgramType programType, CConfiguration cConf);

/**
* Creates a ClassLoader for the given program type. This is useful if you need the program class
* loader but do not need to run a program.
* Returns a ClassLoader for the given program type. This is useful if you only need the runtime
* classloader for the given program type, but not for program execution.
*
* @param cConf The configuration to use
* @param programType The type of program
* @param cConf The configuration to use
* @return a {@link ClassLoader} for the given program runner
* @throws UnsupportedOperationException if the given program type is not supported by this
* provider. Caller can use the {@link #isSupported(ProgramType, CConfiguration)} method to
* check.
*/
ClassLoader createProgramClassLoader(CConfiguration cConf, ProgramType programType);
ClassLoader getRuntimeClassLoader(ProgramType programType, CConfiguration cConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.cdap.internal.app.runtime.ProgramRuntimeProviderLoader;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.security.impersonation.EntityImpersonator;
import java.io.Closeable;
import java.io.File;
import java.util.Iterator;
import javax.annotation.Nullable;
Expand All @@ -48,21 +47,21 @@ final class ArtifactClassLoaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(ArtifactClassLoaderFactory.class);

private final CConfiguration cConf;
@Nullable
private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
@Nullable private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
private final File tmpDir;

@VisibleForTesting
ArtifactClassLoaderFactory(CConfiguration cConf) {
this(cConf, null);
}

ArtifactClassLoaderFactory(CConfiguration cConf,
@Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
ArtifactClassLoaderFactory(
CConfiguration cConf, @Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
this.cConf = cConf;
this.programRuntimeProviderLoader = programRuntimeProviderLoader;
this.tmpDir = new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR),
cConf.get(Constants.AppFabric.TEMP_DIR)).getAbsoluteFile();
this.tmpDir =
new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR), cConf.get(Constants.AppFabric.TEMP_DIR))
.getAbsoluteFile();
}

/**
Expand All @@ -77,35 +76,30 @@ final class ArtifactClassLoaderFactory {
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
*/
CloseableClassLoader createClassLoader(File unpackDir) {
ClassLoader sparkClassLoader = null;
ClassLoader parentClassLoader = null;
if (programRuntimeProviderLoader != null) {
try {
// Try to create a ProgramClassLoader from the Spark runtime system if it is available.
// It is needed because we don't know what program types that an artifact might have.
// TODO: CDAP-5613. We shouldn't always expose the Spark classes.
sparkClassLoader = programRuntimeProviderLoader.get(ProgramType.SPARK)
.createProgramClassLoader(cConf, ProgramType.SPARK);
parentClassLoader =
programRuntimeProviderLoader
.get(ProgramType.SPARK)
.getRuntimeClassLoader(ProgramType.SPARK, cConf);
} catch (Exception e) {
// If Spark is not supported, exception is expected. We'll use the default filter.
LOG.warn("Spark is not supported. Not using ProgramClassLoader from Spark");
LOG.trace("Failed to create spark program runner with error:", e);
}
}

ProgramClassLoader programClassLoader = null;
if (sparkClassLoader != null) {
programClassLoader = new ProgramClassLoader(cConf, unpackDir, sparkClassLoader);
} else {
programClassLoader = new ProgramClassLoader(cConf, unpackDir,
FilterClassLoader.create(getClass().getClassLoader()));
if (parentClassLoader == null) {
parentClassLoader = FilterClassLoader.create(getClass().getClassLoader());
}

final ClassLoader finalProgramClassLoader = programClassLoader;
return new CloseableClassLoader(programClassLoader, () -> {
if (finalProgramClassLoader instanceof Closeable) {
Closeables.closeQuietly((Closeable) finalProgramClassLoader);
}
});
ProgramClassLoader programClassLoader =
new ProgramClassLoader(cConf, unpackDir, parentClassLoader);
return new CloseableClassLoader(
programClassLoader, () -> Closeables.closeQuietly(programClassLoader));
}

/**
Expand All @@ -117,18 +111,22 @@ CloseableClassLoader createClassLoader(File unpackDir) {
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
* @see #createClassLoader(File)
*/
CloseableClassLoader createClassLoader(Location artifactLocation,
EntityImpersonator entityImpersonator) {
CloseableClassLoader createClassLoader(
Location artifactLocation, EntityImpersonator entityImpersonator) {
try {
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
() -> DirUtils.createTempDir(tmpDir)));
ClassLoaderFolder classLoaderFolder =
entityImpersonator.impersonate(
() ->
BundleJarUtil.prepareClassLoaderFolder(
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader classLoader = createClassLoader(classLoaderFolder.getDir());
return new CloseableClassLoader(classLoader, () -> {
Closeables.closeQuietly(classLoader);
Closeables.closeQuietly(classLoaderFolder);
});
return new CloseableClassLoader(
classLoader,
() -> {
Closeables.closeQuietly(classLoader);
Closeables.closeQuietly(classLoaderFolder);
});
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand All @@ -143,8 +141,8 @@ CloseableClassLoader createClassLoader(Location artifactLocation,
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
* @see #createClassLoader(File)
*/
CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
EntityImpersonator entityImpersonator) {
CloseableClassLoader createClassLoader(
Iterator<Location> artifactLocations, EntityImpersonator entityImpersonator) {
if (!artifactLocations.hasNext()) {
throw new IllegalArgumentException("Cannot create a classloader without an artifact.");
}
Expand All @@ -155,17 +153,20 @@ CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
}

try {
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
() -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader parentClassLoader = createClassLoader(artifactLocations,
entityImpersonator);
return new CloseableClassLoader(new DirectoryClassLoader(classLoaderFolder.getDir(),
parentClassLoader, "lib"), () -> {
Closeables.closeQuietly(parentClassLoader);
Closeables.closeQuietly(classLoaderFolder);
});
ClassLoaderFolder classLoaderFolder =
entityImpersonator.impersonate(
() ->
BundleJarUtil.prepareClassLoaderFolder(
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));

CloseableClassLoader parentClassLoader =
createClassLoader(artifactLocations, entityImpersonator);
return new CloseableClassLoader(
new DirectoryClassLoader(classLoaderFolder.getDir(), parentClassLoader, "lib"),
() -> {
Closeables.closeQuietly(parentClassLoader);
Closeables.closeQuietly(classLoaderFolder);
});
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.utils.Instances;

/**
* Default implementation of {@link MasterEnvironmentRunnableContext}.
*/
/** Default implementation of {@link MasterEnvironmentRunnableContext}. */
public class DefaultMasterEnvironmentRunnableContext implements MasterEnvironmentRunnableContext {

private final LocationFactory locationFactory;
Expand All @@ -46,13 +44,14 @@

private ClassLoader extensionCombinedClassLoader;

public DefaultMasterEnvironmentRunnableContext(LocationFactory locationFactory,
public DefaultMasterEnvironmentRunnableContext(

Check warning on line 47 in cdap-app-fabric/src/main/java/io/cdap/cdap/master/environment/DefaultMasterEnvironmentRunnableContext.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
LocationFactory locationFactory,
RemoteClientFactory remoteClientFactory,
CConfiguration cConf) {
this.locationFactory = locationFactory;
this.remoteClient = remoteClientFactory.createRemoteClient(
Constants.Service.APP_FABRIC_HTTP,
new DefaultHttpRequestConfig(false), "");
this.remoteClient =
remoteClientFactory.createRemoteClient(
Constants.Service.APP_FABRIC_HTTP, new DefaultHttpRequestConfig(false), "");
this.cConf = cConf;
this.programRuntimeProviderLoader = new ProgramRuntimeProviderLoader(cConf);
this.extensionCombinedClassLoader = null;
Expand All @@ -63,9 +62,7 @@
return locationFactory;
}

/**
* Opens a {@link HttpURLConnection} for the given resource path.
*/
/** Opens a {@link HttpURLConnection} for the given resource path. */
@Override
public HttpURLConnection openHttpURLConnection(String resource) throws IOException {
return remoteClient.openConnection(resource);
Expand All @@ -79,20 +76,22 @@
} catch (ClassNotFoundException e) {
// Try loading the class from the runtime extensions.
if (extensionCombinedClassLoader == null) {
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap = programRuntimeProviderLoader.getAll();
extensionCombinedClassLoader = new CombineClassLoader(getClass().getClassLoader(),
classLoaderProviderMap.entrySet().stream()
.map(entry -> entry.getValue()
.createProgramClassLoader(cConf, entry.getKey()))
.collect(Collectors.toList())
.toArray(new ClassLoader[0]));
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap =
programRuntimeProviderLoader.getAll();
extensionCombinedClassLoader =
new CombineClassLoader(
getClass().getClassLoader(),
classLoaderProviderMap.entrySet().stream()
.map(entry -> entry.getValue().getRuntimeClassLoader(entry.getKey(), cConf))
.collect(Collectors.toList()));
}
try {
runnableClass = extensionCombinedClassLoader.loadClass(className);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException(
String.format("Failed to load twill runnable class from runtime extension '%s'",
className), cnfe);
String.format(
"Failed to load twill runnable class from runtime extension '%s'", className),
cnfe);
}
}
if (!TwillRunnable.class.isAssignableFrom(runnableClass)) {
Expand Down
Loading
Loading