diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d80e22f1b..73541b0a4 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -98,8 +98,8 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr, - ListExtract, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson, + ArrayInsert, Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField, + HourExpr, IfExpr, ListExtract, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson, }; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ @@ -720,6 +720,22 @@ impl PhysicalPlanner { )?; Ok(Arc::new(case_expr)) } + ExprStruct::ArrayInsert(expr) => { + let src_array_expr = self.create_expr( + expr.src_array_expr.as_ref().unwrap(), + Arc::clone(&input_schema), + )?; + let pos_expr = + self.create_expr(expr.pos_expr.as_ref().unwrap(), Arc::clone(&input_schema))?; + let item_expr = + self.create_expr(expr.item_expr.as_ref().unwrap(), Arc::clone(&input_schema))?; + Ok(Arc::new(ArrayInsert::new( + src_array_expr, + pos_expr, + item_expr, + expr.legacy_negative_index, + ))) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 220f5e521..7a8ea78d5 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -83,6 +83,7 @@ message Expr { ListExtract list_extract = 56; GetArrayStructFields get_array_struct_fields = 57; BinaryExpr array_append = 58; + ArrayInsert array_insert = 59; } } @@ -403,6 +404,14 @@ enum NullOrdering { NullsLast = 1; } +// Array functions +message ArrayInsert { + Expr src_array_expr = 1; + Expr pos_expr = 2; + Expr item_expr = 3; + bool legacy_negative_index = 4; +} + message DataType { enum DataTypeId { BOOL = 0; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 614b48f2b..3ec2e886b 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -37,7 +37,7 @@ pub mod utils; pub use cast::{spark_cast, Cast}; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; -pub use list::{GetArrayStructFields, ListExtract}; +pub use list::{ArrayInsert, GetArrayStructFields, ListExtract}; pub use regexp::RLike; pub use structs::{CreateNamedStruct, GetStructField}; pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr}; diff --git a/native/spark-expr/src/list.rs b/native/spark-expr/src/list.rs index a376198db..7dc17b568 100644 --- a/native/spark-expr/src/list.rs +++ b/native/spark-expr/src/list.rs @@ -15,9 +15,16 @@ // specific language governing permissions and limitations // under the License. -use arrow::{array::MutableArrayData, datatypes::ArrowNativeType, record_batch::RecordBatch}; -use arrow_array::{Array, GenericListArray, Int32Array, OffsetSizeTrait, StructArray}; -use arrow_schema::{DataType, FieldRef, Schema}; +use arrow::{ + array::{as_primitive_array, Capacities, MutableArrayData}, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::ArrowNativeType, + record_batch::RecordBatch, +}; +use arrow_array::{ + make_array, Array, ArrayRef, GenericListArray, Int32Array, OffsetSizeTrait, StructArray, +}; +use arrow_schema::{DataType, Field, FieldRef, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{ @@ -27,10 +34,16 @@ use datafusion_common::{ use datafusion_physical_expr::PhysicalExpr; use std::{ any::Any, - fmt::{Display, Formatter}, + fmt::{Debug, Display, Formatter}, hash::{Hash, Hasher}, sync::Arc, }; + +// 2147483632 == java.lang.Integer.MAX_VALUE - 15 +// It is a value of ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH +// https://github.com/apache/spark/blob/master/common/utils/src/main/java/org/apache/spark/unsafe/array/ByteArrayUtils.java +const MAX_ROUNDED_ARRAY_LENGTH: usize = 2147483632; + #[derive(Debug, Hash)] pub struct ListExtract { child: Arc, @@ -413,14 +426,297 @@ impl PartialEq for GetArrayStructFields { } } +#[derive(Debug, Hash)] +pub struct ArrayInsert { + src_array_expr: Arc, + pos_expr: Arc, + item_expr: Arc, + legacy_negative_index: bool, +} + +impl ArrayInsert { + pub fn new( + src_array_expr: Arc, + pos_expr: Arc, + item_expr: Arc, + legacy_negative_index: bool, + ) -> Self { + Self { + src_array_expr, + pos_expr, + item_expr, + legacy_negative_index, + } + } + + pub fn array_type(&self, data_type: &DataType) -> DataFusionResult { + match data_type { + DataType::List(field) => Ok(DataType::List(Arc::clone(field))), + DataType::LargeList(field) => Ok(DataType::LargeList(Arc::clone(field))), + data_type => Err(DataFusionError::Internal(format!( + "Unexpected src array type in ArrayInsert: {:?}", + data_type + ))), + } + } +} + +impl PhysicalExpr for ArrayInsert { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> DataFusionResult { + self.array_type(&self.src_array_expr.data_type(input_schema)?) + } + + fn nullable(&self, input_schema: &Schema) -> DataFusionResult { + self.src_array_expr.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { + let pos_value = self + .pos_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + + // Spark supports only IntegerType (Int32): + // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4737 + if !matches!(pos_value.data_type(), DataType::Int32) { + return Err(DataFusionError::Internal(format!( + "Unexpected index data type in ArrayInsert: {:?}, expected type is Int32", + pos_value.data_type() + ))); + } + + // Check that src array is actually an array and get it's value type + let src_value = self + .src_array_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + + let src_element_type = match self.array_type(src_value.data_type())? { + DataType::List(field) => &field.data_type().clone(), + DataType::LargeList(field) => &field.data_type().clone(), + _ => unreachable!(), + }; + + // Check that inserted value has the same type as an array + let item_value = self + .item_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + if item_value.data_type() != src_element_type { + return Err(DataFusionError::Internal(format!( + "Type mismatch in ArrayInsert: array type is {:?} but item type is {:?}", + src_element_type, + item_value.data_type() + ))); + } + + match src_value.data_type() { + DataType::List(_) => { + let list_array = as_list_array(&src_value)?; + array_insert( + list_array, + &item_value, + &pos_value, + self.legacy_negative_index, + ) + } + DataType::LargeList(_) => { + let list_array = as_large_list_array(&src_value)?; + array_insert( + list_array, + &item_value, + &pos_value, + self.legacy_negative_index, + ) + } + _ => unreachable!(), // This case is checked already + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.src_array_expr, &self.pos_expr, &self.item_expr] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + match children.len() { + 3 => Ok(Arc::new(ArrayInsert::new( + Arc::clone(&children[0]), + Arc::clone(&children[1]), + Arc::clone(&children[2]), + self.legacy_negative_index, + ))), + _ => internal_err!("ArrayInsert should have exactly three childrens"), + } + } + + fn dyn_hash(&self, _state: &mut dyn Hasher) { + let mut s = _state; + self.src_array_expr.hash(&mut s); + self.pos_expr.hash(&mut s); + self.item_expr.hash(&mut s); + self.legacy_negative_index.hash(&mut s); + self.hash(&mut s); + } +} + +fn array_insert( + list_array: &GenericListArray, + items_array: &ArrayRef, + pos_array: &ArrayRef, + legacy_mode: bool, +) -> DataFusionResult { + // The code is based on the implementation of the array_append from the Apache DataFusion + // https://github.com/apache/datafusion/blob/main/datafusion/functions-nested/src/concat.rs#L513 + // + // This code is also based on the implementation of the array_insert from the Apache Spark + // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4713 + + let values = list_array.values(); + let offsets = list_array.offsets(); + let values_data = values.to_data(); + let item_data = items_array.to_data(); + let new_capacity = Capacities::Array(values_data.len() + item_data.len()); + + let mut mutable_values = + MutableArrayData::with_capacities(vec![&values_data, &item_data], true, new_capacity); + + let mut new_offsets = vec![O::usize_as(0)]; + let mut new_nulls = Vec::::with_capacity(list_array.len()); + + let pos_data: &Int32Array = as_primitive_array(&pos_array); // Spark supports only i32 for positions + + for (row_index, offset_window) in offsets.windows(2).enumerate() { + let pos = pos_data.values()[row_index]; + let start = offset_window[0].as_usize(); + let end = offset_window[1].as_usize(); + let is_item_null = items_array.is_null(row_index); + + if list_array.is_null(row_index) { + // In Spark if value of the array is NULL than nothing happens + mutable_values.extend_nulls(1); + new_offsets.push(new_offsets[row_index] + O::one()); + new_nulls.push(false); + continue; + } + + if pos == 0 { + return Err(DataFusionError::Internal( + "Position for array_insert should be greter or less than zero".to_string(), + )); + } + + if (pos > 0) || ((-pos).as_usize() < (end - start + 1)) { + let corrected_pos = if pos > 0 { + (pos - 1).as_usize() + } else { + end - start - (-pos).as_usize() + if legacy_mode { 0 } else { 1 } + }; + let new_array_len = std::cmp::max(end - start + 1, corrected_pos); + if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { + return Err(DataFusionError::Internal(format!( + "Max array length in Spark is {:?}, but got {:?}", + MAX_ROUNDED_ARRAY_LENGTH, new_array_len + ))); + } + + if (start + corrected_pos) <= end { + mutable_values.extend(0, start, start + corrected_pos); + mutable_values.extend(1, row_index, row_index + 1); + mutable_values.extend(0, start + corrected_pos, end); + new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len)); + } else { + mutable_values.extend(0, start, end); + mutable_values.extend_nulls(new_array_len - (end - start)); + mutable_values.extend(1, row_index, row_index + 1); + // In that case spark actualy makes array longer than expected; + // For example, if pos is equal to 5, len is eq to 3, than resulted len will be 5 + new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len) + O::one()); + } + } else { + // This comment is takes from the Apache Spark source code as is: + // special case- if the new position is negative but larger than the current array size + // place the new item at start of array, place the current array contents at the end + // and fill the newly created array elements inbetween with a null + let base_offset = if legacy_mode { 1 } else { 0 }; + let new_array_len = (-pos + base_offset).as_usize(); + if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { + return Err(DataFusionError::Internal(format!( + "Max array length in Spark is {:?}, but got {:?}", + MAX_ROUNDED_ARRAY_LENGTH, new_array_len + ))); + } + mutable_values.extend(1, row_index, row_index + 1); + mutable_values.extend_nulls(new_array_len - (end - start + 1)); + mutable_values.extend(0, start, end); + new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len)); + } + if is_item_null { + if (start == end) || (values.is_null(row_index)) { + new_nulls.push(false) + } else { + new_nulls.push(true) + } + } else { + new_nulls.push(true) + } + } + + let data = make_array(mutable_values.freeze()); + let data_type = match list_array.data_type() { + DataType::List(field) => field.data_type(), + DataType::LargeList(field) => field.data_type(), + _ => unreachable!(), + }; + let new_array = GenericListArray::::try_new( + Arc::new(Field::new("item", data_type.clone(), true)), + OffsetBuffer::new(new_offsets.into()), + data, + Some(NullBuffer::new(new_nulls.into())), + )?; + + Ok(ColumnarValue::Array(Arc::new(new_array))) +} + +impl Display for ArrayInsert { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ArrayInsert [array: {:?}, pos: {:?}, item: {:?}]", + self.src_array_expr, self.pos_expr, self.item_expr + ) + } +} + +impl PartialEq for ArrayInsert { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| { + self.src_array_expr.eq(&x.src_array_expr) + && self.pos_expr.eq(&x.pos_expr) + && self.item_expr.eq(&x.item_expr) + && self.legacy_negative_index.eq(&x.legacy_negative_index) + }) + .unwrap_or(false) + } +} + #[cfg(test)] mod test { - use crate::list::{list_extract, zero_based_index}; + use crate::list::{array_insert, list_extract, zero_based_index}; use arrow::datatypes::Int32Type; - use arrow_array::{Array, Int32Array, ListArray}; + use arrow_array::{Array, ArrayRef, Int32Array, ListArray}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; + use std::sync::Arc; #[test] fn test_list_extract_default_value() -> Result<()> { @@ -458,4 +754,120 @@ mod test { ); Ok(()) } + + #[test] + fn test_array_insert() -> Result<()> { + // Test inserting an item into a list array + // Inputs and expected values are taken from the Spark results + let list = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + Some(vec![None]), + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(1), Some(2), Some(3)]), + None, + ]); + + let positions = Int32Array::from(vec![2, 1, 1, 5, 6, 1]); + let items = Int32Array::from(vec![ + Some(10), + Some(20), + Some(30), + Some(100), + Some(100), + Some(40), + ]); + + let ColumnarValue::Array(result) = array_insert( + &list, + &(Arc::new(items) as ArrayRef), + &(Arc::new(positions) as ArrayRef), + false, + )? + else { + unreachable!() + }; + + let expected = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(10), Some(2), Some(3)]), + Some(vec![Some(20), Some(4), Some(5)]), + Some(vec![Some(30), None]), + Some(vec![Some(1), Some(2), Some(3), None, Some(100)]), + Some(vec![Some(1), Some(2), Some(3), None, None, Some(100)]), + None, + ]); + + assert_eq!(&result.to_data(), &expected.to_data()); + + Ok(()) + } + + #[test] + fn test_array_insert_negative_index() -> Result<()> { + // Test insert with negative index + // Inputs and expected values are taken from the Spark results + let list = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + Some(vec![Some(1)]), + None, + ]); + + let positions = Int32Array::from(vec![-2, -1, -3, -1]); + let items = Int32Array::from(vec![Some(10), Some(20), Some(100), Some(30)]); + + let ColumnarValue::Array(result) = array_insert( + &list, + &(Arc::new(items) as ArrayRef), + &(Arc::new(positions) as ArrayRef), + false, + )? + else { + unreachable!() + }; + + let expected = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(10), Some(3)]), + Some(vec![Some(4), Some(5), Some(20)]), + Some(vec![Some(100), None, Some(1)]), + None, + ]); + + assert_eq!(&result.to_data(), &expected.to_data()); + + Ok(()) + } + + #[test] + fn test_array_insert_legacy_mode() -> Result<()> { + // Test the so-called "legacy" mode exisiting in the Spark + let list = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + None, + ]); + + let positions = Int32Array::from(vec![-1, -1, -1]); + let items = Int32Array::from(vec![Some(10), Some(20), Some(30)]); + + let ColumnarValue::Array(result) = array_insert( + &list, + &(Arc::new(items) as ArrayRef), + &(Arc::new(positions) as ArrayRef), + true, + )? + else { + unreachable!() + }; + + let expected = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(10), Some(3)]), + Some(vec![Some(4), Some(20), Some(5)]), + None, + ]); + + assert_eq!(&result.to_data(), &expected.to_data()); + + Ok(()) + } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 4a130ad0d..f7d5fc91a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2192,6 +2192,35 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } + case expr if expr.prettyName == "array_insert" => + val srcExprProto = exprToProto(expr.children(0), inputs, binding) + val posExprProto = exprToProto(expr.children(1), inputs, binding) + val itemExprProto = exprToProto(expr.children(2), inputs, binding) + val legacyNegativeIndex = + SQLConf.get.getConfString("spark.sql.legacy.negativeIndexInArrayInsert").toBoolean + if (srcExprProto.isDefined && posExprProto.isDefined && itemExprProto.isDefined) { + val arrayInsertBuilder = ExprOuterClass.ArrayInsert + .newBuilder() + .setSrcArrayExpr(srcExprProto.get) + .setPosExpr(posExprProto.get) + .setItemExpr(itemExprProto.get) + .setLegacyNegativeIndex(legacyNegativeIndex) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setArrayInsert(arrayInsertBuilder) + .build()) + } else { + withInfo( + expr, + "unsupported arguments for ArrayInsert", + expr.children(0), + expr.children(1), + expr.children(2)) + None + } + case ElementAt(child, ordinal, defaultValue, failOnError) if child.dataType.isInstanceOf[ArrayType] => val childExpr = exprToProto(child, inputs, binding) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 6a106fda7..35f374bf0 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types.{Decimal, DecimalType} -import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus, isSpark40Plus} +import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus, isSpark35Plus, isSpark40Plus} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -2310,8 +2310,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("array_append") { - // array append has been added in Spark 3.4 and in Spark 4.0 it gets written to ArrayInsert - assume(isSpark34Plus && !isSpark40Plus) + assume(isSpark34Plus) Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -2331,4 +2330,64 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("array_prepend") { + assume(isSpark35Plus) // in Spark 3.5 array_prepend is implemented via array_insert + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(spark.sql("Select array_prepend(array(_1),false) from t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_2, _3, _4), 4) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_2, _3, _4), null) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1")); + checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_8), 'test') FROM t1")); + checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_19), _19) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } + } + } + + test("ArrayInsert") { + assume(isSpark34Plus) + Seq(true, false).foreach(dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + val df = spark.read + .parquet(path.toString) + .withColumn("arr", array(col("_4"), lit(null), col("_4"))) + .withColumn("arrInsertResult", expr("array_insert(arr, 1, 1)")) + .withColumn("arrInsertNegativeIndexResult", expr("array_insert(arr, -1, 1)")) + .withColumn("arrPosGreaterThanSize", expr("array_insert(arr, 8, 1)")) + .withColumn("arrNegPosGreaterThanSize", expr("array_insert(arr, -8, 1)")) + .withColumn("arrInsertNone", expr("array_insert(arr, 1, null)")) + checkSparkAnswerAndOperator(df.select("arrInsertResult")) + checkSparkAnswerAndOperator(df.select("arrInsertNegativeIndexResult")) + checkSparkAnswerAndOperator(df.select("arrPosGreaterThanSize")) + checkSparkAnswerAndOperator(df.select("arrNegPosGreaterThanSize")) + checkSparkAnswerAndOperator(df.select("arrInsertNone")) + }) + } + + test("ArrayInsertUnsupportedArgs") { + // This test checks that the else branch in ArrayInsert + // mapping to the comet is valid and fallback to spark is working fine. + assume(isSpark34Plus) + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = false, 10000) + val df = spark.read + .parquet(path.toString) + .withColumn("arr", array(col("_4"), lit(null), col("_4"))) + .withColumn("idx", udf((_: Int) => 1).apply(col("_4"))) + .withColumn("arrUnsupportedArgs", expr("array_insert(arr, idx, 1)")) + checkSparkAnswer(df.select("arrUnsupportedArgs")) + } + } }