Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒][PLUGIN-1856] Error management for Wrangler plugin #732

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 88 additions & 29 deletions wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
Expand Down Expand Up @@ -53,6 +56,7 @@
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeException;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
Expand Down Expand Up @@ -243,9 +247,17 @@ public void configurePipeline(PipelineConfigurer configurer) {
}
}
} catch (CompileException e) {
collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
collector.addFailure(
String.format("Compilation error occurred, %s: %s ", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveParseException e) {
collector.addFailure(e.getMessage(), null);
collector.addFailure(
String.format("Error parsing directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
} catch (DirectiveLoadException e) {
collector.addFailure(
String.format("Error loading directive, %s: %s", e.getClass().getName(),
e.getMessage()), null);
}

// Based on the configuration create output schema.
Expand All @@ -254,18 +266,22 @@ public void configurePipeline(PipelineConfigurer configurer) {
oSchema = Schema.parseJson(config.schema);
}
} catch (IOException e) {
collector.addFailure("Invalid output schema.", null)
.withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
collector.addFailure(
String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()),
null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace());
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(
Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
new Precondition(config.getPreconditionJEXL());
} catch (PreconditionException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(Config.NAME_PRECONDITION);
collector.addFailure(String.format("Error compiling precondition expression, %s: %s",
e.getClass().getName(), e.getMessage()), null)
.withConfigProperty(Config.NAME_PRECONDITION);
}
}
}
Expand All @@ -274,11 +290,10 @@ && checkPreconditionNotEmpty(false)) {
if (oSchema != null) {
configurer.getStageConfigurer().setOutputSchema(oSchema);
}

} catch (Exception e) {
LOG.error(e.getMessage());
collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
}
collector.getOrThrowException();
}

/**
Expand Down Expand Up @@ -319,7 +334,14 @@ public void prepareRun(StageSubmitterContext context) throws Exception {
// Parse the recipe and extract all the instances of directives
// to be processed for extracting lineage.
RecipeParser recipe = getRecipeParser(context);
List<Directive> directives = recipe.parse();
List<Directive> directives;
try {
directives = recipe.parse();
} catch (Exception e) {
String errorReason = "Unable to parse recipe and extract all instances of directives.";
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
ErrorType.USER);
}
emitDirectiveMetrics(directives, context.getMetrics());

LineageOperations lineageOperations = new LineageOperations(input, output, directives);
Expand All @@ -344,11 +366,13 @@ public void initialize(TransformContext context) throws Exception {
// Based on the configuration create output schema.
try {
oSchema = Schema.parseJson(config.schema);
} catch (IOException e) {
throw new IllegalArgumentException(
String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.",
context.getStageName()), e
);
} catch (Exception e) {
String errorReason = "Invalid output schema format.";
String errorMessage = String.format(
"Format of output schema specified is invalid. Please check the format. %s: %s",
e.getClass().getName(), e.getMessage());
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
errorMessage, ErrorType.USER);
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
Expand All @@ -357,8 +381,10 @@ public void initialize(TransformContext context) throws Exception {
&& checkPreconditionNotEmpty(false)) {
try {
condition = new Precondition(config.getPreconditionJEXL());
} catch (PreconditionException e) {
throw new IllegalArgumentException(e.getMessage(), e);
} catch (Exception e) {
String errorReason = "Failed to evaluate precondition due to an invalid JEXL expression.";
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
ErrorType.USER);
}
}
}
Expand All @@ -367,7 +393,12 @@ && checkPreconditionNotEmpty(false)) {
// Create the pipeline executor with context being set.
pipeline = new RecipePipelineExecutor(recipe, ctx);
} catch (Exception e) {
throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e);
String errorReason = "Unable to compile the recipe and execute directives.";
String errorMessage = String.format(
"Error compiling the recipe and executing directives. " + "%s: %s",
e.getClass().getName(), e.getMessage());
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
errorMessage, ErrorType.USER);
}

String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT);
Expand Down Expand Up @@ -437,8 +468,11 @@ && checkPreconditionNotEmpty(false)) {
}
if (WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled(getContext())
&& onErrorStrategy.equalsIgnoreCase(ON_ERROR_FAIL_PIPELINE)) {
throw new Exception(
String.format("Errors in Wrangler Transformation - %s", errorMessages));
String errorReason = String.format("Errors in Wrangler Transformation - %s",
errorMessages);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason,
ErrorType.UNKNOWN, false, null);
}
}
} catch (Exception e) {
Expand All @@ -457,8 +491,12 @@ && checkPreconditionNotEmpty(false)) {
getContext().getStageName(), e.getMessage()),
"value", String.valueOf(errorCounter)
));
throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s",
getContext().getStageName(), e.getMessage()), e);
String errorReason = "Error occurred while processing input data, possibly due to invalid "
+ "transformation or schema mismatch.";
String errorMessage = String.format("Pipeline failed at stage:%s, %s: %s",
getContext().getStageName(), e.getClass().getName(), e.getMessage());
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason,
errorMessage, ErrorType.UNKNOWN);
}
// If it's 'skip-on-error' we continue processing and don't emit any error records.
return;
Expand Down Expand Up @@ -553,18 +591,29 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
* @throws DirectiveLoadException
* @throws DirectiveParseException
*/
private RecipeParser getRecipeParser(StageContext context)
throws DirectiveLoadException, DirectiveParseException {
private RecipeParser getRecipeParser(StageContext context) {

registry = new CompositeDirectiveRegistry(SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context));
registry.reload(context.getNamespace());
try {
registry.reload(context.getNamespace());
} catch (Exception e) {
String errorReason = "Unable to load directive from the artifacts.";
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
ErrorType.USER);
}

String directives = config.getDirectives();
if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) {
directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives());
}

return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
try {
return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry);
} catch (Exception e) {
String errorReason = "Unable to parse the directives.";
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
ErrorType.USER);
}
}

@Override
Expand All @@ -573,7 +622,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R
&& checkPreconditionNotEmpty(true)) {

if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
throw new RuntimeException("SQL Precondition feature is not available");
String errorReason = "SQL Precondition feature is not available";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason,
ErrorType.SYSTEM, false, null);
}

Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
Expand All @@ -598,11 +650,18 @@ private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranf
* @param directives a list of Wrangler directives
* @param metrics CDAP {@link Metrics} object using which metrics can be emitted
*/
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) throws DirectiveLoadException {
private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) {
for (Directive directive : directives) {
// skip emitting metrics if the directive is not system directive
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
continue;
try {
if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) {
continue;
}
} catch (Exception e) {
String errorReason = String.format("Unable to load directive %s",
directive.define().getDirectiveName());
throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null,
ErrorType.USER);
}
List<EntityCountMetric> countMetrics = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveNotFoundException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.RecipeException;
import io.cdap.wrangler.expression.ELException;
import io.cdap.wrangler.utils.RecordConvertorException;
import java.util.List;
import java.util.Map;

/**
* Error util file to handle exceptions caught in Wrangler plugin
*/
public final class WranglerErrorUtil {


private static final Map<String, String> TERMINAL_EXCEPTIONS = ImmutableMap.<String, String>builder()
.put(DirectiveParseException.class.getName(), "Parsing-Directive")
.put(PreconditionException.class.getName(), "Precondition")
.put(DirectiveExecutionException.class.getName(), "Executing-Directive")
.put(DirectiveLoadException.class.getName(), "Loading-Directive")
.put(DirectiveNotFoundException.class.getName(), "Directive-Not-Found")
.put(RecordConvertorException.class.getName(), "Record-Conversion")
.put(ELException.class.getName(), "ExpressionLanguage-Parsing").build();

private static final Map<String, String> NON_TERMINAL_EXCEPTIONS = ImmutableMap.<String, String>builder()
.put(RecipeException.class.getName(), "Executing-Recipe").build();

/**
* Private constructor to prevent instantiation of this utility class.
* <p>
* This class is designed to contain only static utility methods for handling exceptions and
* should not be instantiated. Any attempt to create an instance of this class will result in an
* {@link IllegalStateException}.
*/
private WranglerErrorUtil() {
throw new IllegalStateException("Utility class");
}

/**
* Traverses the causal chain of the given Throwable to find specific exceptions. If a terminal
* exception is found, it returns a corresponding ProgramFailureException. If a non-terminal
* exception is found, it is stored as a fallback. Otherwise, a generic ProgramFailureException is
* returned.
*
* @param e the Throwable to analyze
* @param errorReason the error reason to tell the cause of error
* @param errorMessage default error message if no terminal exception is found
* @param errorType the error type to categorize the failure
* @return a ProgramFailureException with specific or generic error details
*/
public static ProgramFailureException getProgramFailureExceptionDetailsFromChain(Throwable e,
String errorReason, String errorMessage, ErrorType errorType) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
Throwable nonTerminalException = null;
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
return null; // Avoid multiple wrap
}
if (NON_TERMINAL_EXCEPTIONS.containsKey(t.getClass().getName())) {
nonTerminalException = t; // Store non-terminal exception as fallback
continue;
}
String errorSubCategory = TERMINAL_EXCEPTIONS.get(t.getClass().getName());
if (errorSubCategory != null) {
return getProgramFailureException(t, errorReason, errorSubCategory);
}
}

if (nonTerminalException != null) {
return getProgramFailureException(nonTerminalException, errorReason,
NON_TERMINAL_EXCEPTIONS.get(nonTerminalException.getClass().getName()));
}

return ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage,
errorType, false, e);
}

/**
* Constructs a ProgramFailureException using the provided exception details.
*
* @param exception the exception to wrap
* @param errorSubCategory specific subcategory of the error
* @return a new ProgramFailureException with the extracted details
*/
private static ProgramFailureException getProgramFailureException(Throwable exception,
String errorReason, String errorSubCategory) {
String errorMessage = exception.getMessage();
return ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, errorSubCategory), errorReason,
errorMessage, ErrorType.USER, false, exception);
}
}
Loading