From 8a7cecd678eb2d43fc7415817519ec40743e44eb Mon Sep 17 00:00:00 2001 From: psainics Date: Fri, 31 Jan 2025 17:20:37 +0530 Subject: [PATCH] Error management for Wrangler Transform plugin --- .../main/java/io/cdap/wrangler/Wrangler.java | 117 ++++++++++++----- .../io/cdap/wrangler/WranglerErrorUtil.java | 118 ++++++++++++++++++ 2 files changed, 206 insertions(+), 29 deletions(-) create mode 100644 wrangler-transform/src/main/java/io/cdap/wrangler/WranglerErrorUtil.java diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..d4af8590e 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -24,6 +24,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.api.plugin.PluginProperties; @@ -53,6 +56,7 @@ import io.cdap.wrangler.api.EntityCountMetric; import io.cdap.wrangler.api.ErrorRecord; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RecipeException; import io.cdap.wrangler.api.RecipeParser; import io.cdap.wrangler.api.RecipePipeline; import io.cdap.wrangler.api.RecipeSymbol; @@ -243,9 +247,17 @@ public void configurePipeline(PipelineConfigurer configurer) { } } } catch (CompileException e) { - collector.addFailure("Compilation error occurred : " + e.getMessage(), null); + collector.addFailure( + String.format("Compilation error occurred, %s: %s ", e.getClass().getName(), + e.getMessage()), null); } catch (DirectiveParseException e) { - collector.addFailure(e.getMessage(), null); + collector.addFailure( + String.format("Error parsing directive, %s: %s", e.getClass().getName(), + e.getMessage()), null); + } catch (DirectiveLoadException e) { + collector.addFailure( + String.format("Error loading directive, %s: %s", e.getClass().getName(), + e.getMessage()), null); } // Based on the configuration create output schema. @@ -254,18 +266,22 @@ public void configurePipeline(PipelineConfigurer configurer) { oSchema = Schema.parseJson(config.schema); } } catch (IOException e) { - collector.addFailure("Invalid output schema.", null) - .withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace()); + collector.addFailure( + String.format("Invalid output schema %s: %s", e.getClass().getName(), e.getMessage()), + null).withConfigProperty(Config.NAME_SCHEMA).withStacktrace(e.getStackTrace()); } // Check if jexl pre-condition is not null or empty and if so compile expression. - if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { + if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro( + Config.NAME_PRECONDITION_LANGUAGE)) { if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) && checkPreconditionNotEmpty(false)) { try { new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { - collector.addFailure(e.getMessage(), null).withConfigProperty(Config.NAME_PRECONDITION); + collector.addFailure(String.format("Error compiling precondition expression, %s: %s", + e.getClass().getName(), e.getMessage()), null) + .withConfigProperty(Config.NAME_PRECONDITION); } } } @@ -274,11 +290,10 @@ && checkPreconditionNotEmpty(false)) { if (oSchema != null) { configurer.getStageConfigurer().setOutputSchema(oSchema); } - } catch (Exception e) { - LOG.error(e.getMessage()); collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace()); } + collector.getOrThrowException(); } /** @@ -319,7 +334,14 @@ public void prepareRun(StageSubmitterContext context) throws Exception { // Parse the recipe and extract all the instances of directives // to be processed for extracting lineage. RecipeParser recipe = getRecipeParser(context); - List directives = recipe.parse(); + List directives; + try { + directives = recipe.parse(); + } catch (Exception e) { + String errorReason = "Unable to parse recipe and extract all instances of directives."; + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null, + ErrorType.USER); + } emitDirectiveMetrics(directives, context.getMetrics()); LineageOperations lineageOperations = new LineageOperations(input, output, directives); @@ -344,11 +366,13 @@ public void initialize(TransformContext context) throws Exception { // Based on the configuration create output schema. try { oSchema = Schema.parseJson(config.schema); - } catch (IOException e) { - throw new IllegalArgumentException( - String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.", - context.getStageName()), e - ); + } catch (Exception e) { + String errorReason = "Invalid output schema format."; + String errorMessage = String.format( + "Format of output schema specified is invalid. Please check the format. %s: %s", + e.getClass().getName(), e.getMessage()); + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, + errorMessage, ErrorType.USER); } // Check if jexl pre-condition is not null or empty and if so compile expression. @@ -357,8 +381,10 @@ public void initialize(TransformContext context) throws Exception { && checkPreconditionNotEmpty(false)) { try { condition = new Precondition(config.getPreconditionJEXL()); - } catch (PreconditionException e) { - throw new IllegalArgumentException(e.getMessage(), e); + } catch (Exception e) { + String errorReason = "Failed to evaluate precondition due to an invalid JEXL expression."; + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null, + ErrorType.USER); } } } @@ -367,7 +393,12 @@ && checkPreconditionNotEmpty(false)) { // Create the pipeline executor with context being set. pipeline = new RecipePipelineExecutor(recipe, ctx); } catch (Exception e) { - throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e); + String errorReason = "Unable to compile the recipe and execute directives."; + String errorMessage = String.format( + "Error compiling the recipe and executing directives. " + "%s: %s", + e.getClass().getName(), e.getMessage()); + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, + errorMessage, ErrorType.USER); } String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT); @@ -437,8 +468,11 @@ && checkPreconditionNotEmpty(false)) { } if (WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled(getContext()) && onErrorStrategy.equalsIgnoreCase(ON_ERROR_FAIL_PIPELINE)) { - throw new Exception( - String.format("Errors in Wrangler Transformation - %s", errorMessages)); + String errorReason = String.format("Errors in Wrangler Transformation - %s", + errorMessages); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason, + ErrorType.UNKNOWN, false, null); } } } catch (Exception e) { @@ -457,8 +491,12 @@ && checkPreconditionNotEmpty(false)) { getContext().getStageName(), e.getMessage()), "value", String.valueOf(errorCounter) )); - throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s", - getContext().getStageName(), e.getMessage()), e); + String errorReason = "Error occurred while processing input data, possibly due to invalid " + + "transformation or schema mismatch."; + String errorMessage = String.format("Pipeline failed at stage:%s, %s: %s", + getContext().getStageName(), e.getClass().getName(), e.getMessage()); + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, + errorMessage, ErrorType.UNKNOWN); } // If it's 'skip-on-error' we continue processing and don't emit any error records. return; @@ -553,18 +591,29 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { * @throws DirectiveLoadException * @throws DirectiveParseException */ - private RecipeParser getRecipeParser(StageContext context) - throws DirectiveLoadException, DirectiveParseException { + private RecipeParser getRecipeParser(StageContext context) { registry = new CompositeDirectiveRegistry(SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context)); - registry.reload(context.getNamespace()); + try { + registry.reload(context.getNamespace()); + } catch (Exception e) { + String errorReason = "Unable to load directive from the artifacts."; + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null, + ErrorType.USER); + } String directives = config.getDirectives(); if (config.getUDDs() != null && !config.getUDDs().trim().isEmpty()) { directives = String.format("#pragma load-directives %s;%s", config.getUDDs(), config.getDirectives()); } - return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry); + try { + return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), registry); + } catch (Exception e) { + String errorReason = "Unable to parse the directives."; + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null, + ErrorType.USER); + } } @Override @@ -573,7 +622,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R && checkPreconditionNotEmpty(true)) { if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { - throw new RuntimeException("SQL Precondition feature is not available"); + String errorReason = "SQL Precondition feature is not available"; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason, + ErrorType.SYSTEM, false, null); } Optional> expressionFactory = getExpressionFactory(relationalTranformContext); @@ -598,11 +650,18 @@ private Optional> getExpressionFactory(RelationalTranf * @param directives a list of Wrangler directives * @param metrics CDAP {@link Metrics} object using which metrics can be emitted */ - private void emitDirectiveMetrics(List directives, Metrics metrics) throws DirectiveLoadException { + private void emitDirectiveMetrics(List directives, Metrics metrics) { for (Directive directive : directives) { // skip emitting metrics if the directive is not system directive - if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) { - continue; + try { + if (registry.get(Contexts.SYSTEM, directive.define().getDirectiveName()) == null) { + continue; + } + } catch (Exception e) { + String errorReason = String.format("Unable to load directive %s", + directive.define().getDirectiveName()); + throw WranglerErrorUtil.getProgramFailureExceptionDetailsFromChain(e, errorReason, null, + ErrorType.USER); } List countMetrics = new ArrayList<>(); diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerErrorUtil.java b/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerErrorUtil.java new file mode 100644 index 000000000..a80309f85 --- /dev/null +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/WranglerErrorUtil.java @@ -0,0 +1,118 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.wrangler.api.DirectiveExecutionException; +import io.cdap.wrangler.api.DirectiveLoadException; +import io.cdap.wrangler.api.DirectiveNotFoundException; +import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.RecipeException; +import io.cdap.wrangler.expression.ELException; +import io.cdap.wrangler.utils.RecordConvertorException; +import java.util.List; +import java.util.Map; + +/** + * Error util file to handle exceptions caught in Wrangler plugin + */ +public final class WranglerErrorUtil { + + + private static final Map TERMINAL_EXCEPTIONS = ImmutableMap.builder() + .put(DirectiveParseException.class.getName(), "Parsing-Directive") + .put(PreconditionException.class.getName(), "Precondition") + .put(DirectiveExecutionException.class.getName(), "Executing-Directive") + .put(DirectiveLoadException.class.getName(), "Loading-Directive") + .put(DirectiveNotFoundException.class.getName(), "Directive-Not-Found") + .put(RecordConvertorException.class.getName(), "Record-Conversion") + .put(ELException.class.getName(), "ExpressionLanguage-Parsing").build(); + + private static final Map NON_TERMINAL_EXCEPTIONS = ImmutableMap.builder() + .put(RecipeException.class.getName(), "Executing-Recipe").build(); + + /** + * Private constructor to prevent instantiation of this utility class. + *

+ * This class is designed to contain only static utility methods for handling exceptions and + * should not be instantiated. Any attempt to create an instance of this class will result in an + * {@link IllegalStateException}. + */ + private WranglerErrorUtil() { + throw new IllegalStateException("Utility class"); + } + + /** + * Traverses the causal chain of the given Throwable to find specific exceptions. If a terminal + * exception is found, it returns a corresponding ProgramFailureException. If a non-terminal + * exception is found, it is stored as a fallback. Otherwise, a generic ProgramFailureException is + * returned. + * + * @param e the Throwable to analyze + * @param errorReason the error reason to tell the cause of error + * @param errorMessage default error message if no terminal exception is found + * @param errorType the error type to categorize the failure + * @return a ProgramFailureException with specific or generic error details + */ + public static ProgramFailureException getProgramFailureExceptionDetailsFromChain(Throwable e, + String errorReason, String errorMessage, ErrorType errorType) { + List causalChain = Throwables.getCausalChain(e); + Throwable nonTerminalException = null; + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + return null; // Avoid multiple wrap + } + if (NON_TERMINAL_EXCEPTIONS.containsKey(t.getClass().getName())) { + nonTerminalException = t; // Store non-terminal exception as fallback + continue; + } + String errorSubCategory = TERMINAL_EXCEPTIONS.get(t.getClass().getName()); + if (errorSubCategory != null) { + return getProgramFailureException(t, errorReason, errorSubCategory); + } + } + + if (nonTerminalException != null) { + return getProgramFailureException(nonTerminalException, errorReason, + NON_TERMINAL_EXCEPTIONS.get(nonTerminalException.getClass().getName())); + } + + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + errorType, false, e); + } + + /** + * Constructs a ProgramFailureException using the provided exception details. + * + * @param exception the exception to wrap + * @param errorSubCategory specific subcategory of the error + * @return a new ProgramFailureException with the extracted details + */ + private static ProgramFailureException getProgramFailureException(Throwable exception, + String errorReason, String errorSubCategory) { + String errorMessage = exception.getMessage(); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, errorSubCategory), errorReason, + errorMessage, ErrorType.USER, false, exception); + } +}