diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 536ef5789f69..61b25a719d69 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -885,6 +885,42 @@ language governing permissions and limitations under the License. -->
2.7.0-SNAPSHOT
nar
+
+ org.apache.nifi
+ nifi-iceberg-aws-nar
+ 2.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-iceberg-parquet-writer-nar
+ 2.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-iceberg-processors-nar
+ 2.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-iceberg-rest-catalog-nar
+ 2.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api-nar
+ 2.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-iceberg-shared-nar
+ 2.7.0-SNAPSHOT
+ nar
+
org.aspectj
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/pom.xml
new file mode 100644
index 000000000000..f7d6c5cb2d33
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-aws-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-aws
+ ${project.version}
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api-nar
+ ${project.version}
+ nar
+
+
+
+
+
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+ org.apache.iceberg
+ iceberg-core
+ provided
+
+
+ org.apache.iceberg
+ iceberg-common
+ provided
+
+
+ org.apache.iceberg
+ iceberg-bundled-guava
+ provided
+
+
+ org.apache.avro
+ avro
+ provided
+
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/LICENSE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000000..44893cdb29d5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000000..9df15e1fbd3d
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,29 @@
+nifi-iceberg-aws-nar
+Copyright 2014-2025 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) AWS SDK for Java 2.0
+ The following NOTICE information applies:
+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+ - Apache Commons Lang - https://github.com/apache/commons-lang
+ - Netty Reactive Streams - https://github.com/playframework/netty-reactive-streams
+ - Jackson-core - https://github.com/FasterXML/jackson-core
+ - Jackson-dataformat-cbor - https://github.com/FasterXML/jackson-dataformats-binary
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/pom.xml
new file mode 100644
index 000000000000..5ac4d5c4bbc9
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/pom.xml
@@ -0,0 +1,122 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-aws
+ jar
+
+
+
+
+ software.amazon.awssdk
+ bom
+ ${software.amazon.awssdk.version}
+ import
+ pom
+
+
+
+
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+ org.apache.iceberg
+ iceberg-core
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-aws
+
+
+
+ software.amazon.awssdk
+ url-connection-client
+
+
+ software.amazon.awssdk
+ s3
+
+
+
+ software.amazon.awssdk
+ apache-client
+
+
+ software.amazon.awssdk
+ netty-nio-client
+
+
+
+
+ software.amazon.awssdk
+ sts
+
+
+
+ software.amazon.awssdk
+ apache-client
+
+
+ software.amazon.awssdk
+ netty-nio-client
+
+
+
+
+ software.amazon.awssdk
+ kms
+
+
+
+ software.amazon.awssdk
+ apache-client
+
+
+ software.amazon.awssdk
+ netty-nio-client
+
+
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/AuthenticationStrategy.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/AuthenticationStrategy.java
new file mode 100644
index 000000000000..3bc71a16aa24
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/AuthenticationStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.aws;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Authentication Types for S3 access
+ */
+public enum AuthenticationStrategy implements DescribedValue {
+ BASIC_CREDENTIALS("Basic Credentials", "Authentication using static Access Key ID and Secret Key"),
+
+ SESSION_CREDENTIALS("Session Credentials", "Authentication using static Access Key ID and Secret Key with Session Token"),
+
+ VENDED_CREDENTIALS("Vended Credentials", "Authentication using credentials supplied from Iceberg Catalog");
+
+ private final String displayName;
+
+ private final String description;
+
+ AuthenticationStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java
new file mode 100644
index 000000000000..6581993c228a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.aws;
+
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.iceberg.IcebergFileIOProvider;
+import org.apache.nifi.services.iceberg.ProviderContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Tags({"s3", "iceberg", "aws"})
+@CapabilityDescription("Provides S3 file input and output support for Apache Iceberg tables")
+public class S3IcebergFileIOProvider extends AbstractControllerService implements IcebergFileIOProvider {
+
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Authentication Strategy")
+ .description("Strategy for authenticating with S3 storage services")
+ .required(true)
+ .allowableValues(AuthenticationStrategy.class)
+ .defaultValue(AuthenticationStrategy.VENDED_CREDENTIALS)
+ .build();
+
+ static final PropertyDescriptor ACCESS_KEY_ID = new PropertyDescriptor.Builder()
+ .name("Access Key ID")
+ .description("Access Key ID for static credential authentication to S3 storage services")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(
+ AUTHENTICATION_STRATEGY,
+ AuthenticationStrategy.BASIC_CREDENTIALS,
+ AuthenticationStrategy.SESSION_CREDENTIALS
+ )
+ .build();
+
+ static final PropertyDescriptor SECRET_ACCESS_KEY = new PropertyDescriptor.Builder()
+ .name("Secret Access Key")
+ .description("Secret Access Key for static credential authentication to S3 storage services")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(
+ AUTHENTICATION_STRATEGY,
+ AuthenticationStrategy.BASIC_CREDENTIALS,
+ AuthenticationStrategy.SESSION_CREDENTIALS
+ )
+ .build();
+
+ static final PropertyDescriptor SESSION_TOKEN = new PropertyDescriptor.Builder()
+ .name("Session Token")
+ .description("Session Token for session-based credential authentication to S3 storage services")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(
+ AUTHENTICATION_STRATEGY,
+ AuthenticationStrategy.SESSION_CREDENTIALS
+ )
+ .build();
+
+ static final PropertyDescriptor CLIENT_REGION = new PropertyDescriptor.Builder()
+ .name("Client Region")
+ .description("Region identifier for Amazon Web Services client access")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(
+ AUTHENTICATION_STRATEGY,
+ AuthenticationStrategy.BASIC_CREDENTIALS,
+ AuthenticationStrategy.SESSION_CREDENTIALS
+ )
+ .build();
+
+ private static final List PROPERTY_DESCRIPTORS = List.of(
+ AUTHENTICATION_STRATEGY,
+ ACCESS_KEY_ID,
+ SECRET_ACCESS_KEY,
+ SESSION_TOKEN,
+ CLIENT_REGION
+ );
+
+ private final Map standardProperties = new ConcurrentHashMap<>();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final Map configuredProperties = getConfiguredProperties(context);
+ standardProperties.putAll(configuredProperties);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ standardProperties.clear();
+ }
+
+ @Override
+ public FileIO getFileIO(final ProviderContext providerContext) {
+ Objects.requireNonNull(providerContext, "Provider Context required");
+ final Map contextProperties = providerContext.getProperties();
+ Objects.requireNonNull(contextProperties, "Context properties required");
+
+ final Map mergedProperties = new HashMap<>(standardProperties);
+ mergedProperties.putAll(contextProperties);
+ final S3FileIO fileIO = new S3FileIO();
+ fileIO.initialize(mergedProperties);
+ return fileIO;
+ }
+
+ private Map getConfiguredProperties(final ConfigurationContext context) {
+ final Map contextProperties = new HashMap<>();
+ final AuthenticationStrategy authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AuthenticationStrategy.class);
+ if (AuthenticationStrategy.BASIC_CREDENTIALS == authenticationStrategy || AuthenticationStrategy.SESSION_CREDENTIALS == authenticationStrategy) {
+ final String clientRegion = context.getProperty(CLIENT_REGION).evaluateAttributeExpressions().getValue();
+ contextProperties.put(AwsClientProperties.CLIENT_REGION, clientRegion);
+
+ final String accessKeyId = context.getProperty(ACCESS_KEY_ID).getValue();
+ final String secretAccessKey = context.getProperty(SECRET_ACCESS_KEY).getValue();
+ contextProperties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId);
+ contextProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey);
+
+ if (AuthenticationStrategy.SESSION_CREDENTIALS == authenticationStrategy) {
+ final String sessionToken = context.getProperty(SESSION_TOKEN).getValue();
+ contextProperties.put(S3FileIOProperties.SESSION_TOKEN, sessionToken);
+ }
+ }
+
+ // HttpURLConnection Client Type avoids additional dependencies
+ contextProperties.put(HttpClientProperties.CLIENT_TYPE, HttpClientProperties.CLIENT_TYPE_URLCONNECTION);
+
+ // Write Checksum Verification enabled
+ contextProperties.put(S3FileIOProperties.CHECKSUM_ENABLED, Boolean.TRUE.toString());
+
+ final S3FileIOProperties ioProperties = new S3FileIOProperties(contextProperties);
+ return ioProperties.properties();
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 000000000000..93f815bfc458
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.iceberg.aws.S3IcebergFileIOProvider
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/test/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProviderTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/test/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProviderTest.java
new file mode 100644
index 000000000000..04c3a0eae90a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/test/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProviderTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.aws;
+
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.iceberg.ProviderContext;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class S3IcebergFileIOProviderTest {
+ private static final String SERVICE_ID = S3IcebergFileIOProvider.class.getSimpleName();
+
+ private static final String CLIENT_REGION = "us-east-1";
+
+ private static final String ACCESS_KEY_ID = "AccessKeyID";
+
+ private static final String SECRET_ACCESS_KEY = "SecretAccessKey";
+
+ private static final String SESSION_TOKEN = "SessionToken";
+
+ private TestRunner runner;
+
+ private S3IcebergFileIOProvider provider;
+
+ @BeforeEach
+ void setProvider() throws InitializationException {
+ provider = new S3IcebergFileIOProvider();
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService(SERVICE_ID, provider);
+ }
+
+ @AfterEach
+ void disableProvider() {
+ runner.disableControllerService(provider);
+ }
+
+ @Test
+ void testGetFileIO() {
+ runner.enableControllerService(provider);
+
+ final Map properties = Map.of();
+ final ProviderContext providerContext = () -> properties;
+
+ try (FileIO fileIO = provider.getFileIO(providerContext)) {
+ assertFileIOConfigured(fileIO);
+ }
+ }
+
+ @Test
+ void testGetFileIOSessionCredentials() {
+ runner.setProperty(provider, S3IcebergFileIOProvider.AUTHENTICATION_STRATEGY, AuthenticationStrategy.SESSION_CREDENTIALS.getValue());
+ runner.setProperty(provider, S3IcebergFileIOProvider.ACCESS_KEY_ID, ACCESS_KEY_ID);
+ runner.setProperty(provider, S3IcebergFileIOProvider.SECRET_ACCESS_KEY, SECRET_ACCESS_KEY);
+ runner.setProperty(provider, S3IcebergFileIOProvider.SESSION_TOKEN, SESSION_TOKEN);
+ runner.setProperty(provider, S3IcebergFileIOProvider.CLIENT_REGION, CLIENT_REGION);
+
+ runner.enableControllerService(provider);
+
+ final Map properties = Map.of();
+ final ProviderContext providerContext = () -> properties;
+
+ try (FileIO fileIO = provider.getFileIO(providerContext)) {
+ assertFileIOConfigured(fileIO);
+
+ final Map configuredProperties = fileIO.properties();
+ assertEquals(ACCESS_KEY_ID, configuredProperties.get(S3FileIOProperties.ACCESS_KEY_ID));
+ assertEquals(SECRET_ACCESS_KEY, configuredProperties.get(S3FileIOProperties.SECRET_ACCESS_KEY));
+ assertEquals(SESSION_TOKEN, configuredProperties.get(S3FileIOProperties.SESSION_TOKEN));
+ assertEquals(CLIENT_REGION, configuredProperties.get(AwsClientProperties.CLIENT_REGION));
+ }
+ }
+
+ private void assertFileIOConfigured(final FileIO fileIO) {
+ assertNotNull(fileIO);
+ assertInstanceOf(S3FileIO.class, fileIO);
+ final Map configuredProperties = fileIO.properties();
+ assertFalse(configuredProperties.isEmpty());
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/pom.xml
new file mode 100644
index 000000000000..1078785d2c00
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-parquet-writer-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-parquet-writer
+ ${project.version}
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api-nar
+ ${project.version}
+ nar
+
+
+
+
+
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+ org.apache.iceberg
+ iceberg-common
+ provided
+
+
+ org.apache.iceberg
+ iceberg-core
+ provided
+
+
+ org.apache.iceberg
+ iceberg-bundled-guava
+ provided
+
+
+ org.apache.avro
+ avro
+ provided
+
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/LICENSE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000000..44893cdb29d5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000000..9b406a7e15a8
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,94 @@
+nifi-iceberg-parquet-writer-nar
+Copyright 2014-2025 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Collections
+ The following NOTICE information applies:
+ Apache Commons Collections
+ Copyright 2001-2025 The Apache Software Foundation
+
+ (ASLv2) Apache Commons Pool
+ The following NOTICE information applies:
+ Apache Commons Pool
+ Copyright 2002-2019 The Apache Software Foundation
+
+ (ASLv2) Apache Parquet
+ The following NOTICE information applies:
+ Apache Parquet MR (Incubating)
+ Copyright 2014 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Guava: Google Core Libraries For Java
+ The following NOTICE information applies:
+ Guava: Google Core Libraries For Java
+ Copyright (C) 2017 The Guava Authors
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) Snappy Java
+ The following NOTICE information applies:
+ This product includes software developed by Google
+ Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+ This product includes software developed by Apache
+ PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+ (Apache 2.0 license)
+
+ This library containd statically linked libstdc++. This inclusion is allowed by
+ "GCC Runtime Library Exception"
+ http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+ (ASLv2) Woodstox Core ASL
+ The following NOTICE information applies:
+ This product currently only contains code developed by authors
+ of specific components, as identified by the source code files.
+
+ Since product implements StAX API, it has dependencies to StAX API
+ classes.
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.3.2 - http://jcp.org/en/jsr/detail?id=250)
+
+************************
+Eclipse Public License 2.0
+************************
+
+The following binary components are provided under the Eclipse Public License 2.0.
+
+ (EPL 2.0) JTS Core (org.locationtech.jts:jts-core)
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml
new file mode 100644
index 000000000000..df29780ad767
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml
@@ -0,0 +1,102 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-parquet-writer
+ jar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api
+ ${project.version}
+ provided
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-core
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-parquet
+
+
+
+ org.apache.parquet
+ parquet-avro
+ 1.16.0
+
+
+
+ org.apache.avro
+ avro
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ 3.4.2
+
+
+ *
+ *
+
+
+
+
+
+ com.fasterxml.woodstox
+ woodstox-core
+ 7.1.1
+ runtime
+
+
+ org.apache.hadoop.thirdparty
+ hadoop-shaded-guava
+ 1.4.0
+ runtime
+
+
+ org.apache.commons
+ commons-collections4
+ 4.5.0
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriter.java
new file mode 100644
index 000000000000..5194e6e37057
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet;
+
+import org.apache.iceberg.TableProperties;
+import org.apache.nifi.services.iceberg.IcebergRowWriter;
+import org.apache.nifi.services.iceberg.IcebergWriter;
+import org.apache.nifi.services.iceberg.parquet.io.ParquetFileAppenderFactory;
+import org.apache.nifi.services.iceberg.parquet.io.ParquetIcebergRowWriter;
+import org.apache.nifi.services.iceberg.parquet.io.ParquetPartitionedWriter;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.AbstractControllerService;
+
+import java.util.Objects;
+
+@Tags({"parquet", "iceberg", "record"})
+@CapabilityDescription("Provides record serialization for Apache Iceberg using Apache Parquet formatting")
+public class ParquetIcebergWriter extends AbstractControllerService implements IcebergWriter {
+
+ private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
+
+ @Override
+ public IcebergRowWriter getRowWriter(final Table table) {
+ Objects.requireNonNull(table, "Table required");
+ final TaskWriter taskWriter = getTaskWriter(table);
+ return new ParquetIcebergRowWriter(taskWriter);
+ }
+
+ private TaskWriter getTaskWriter(final Table table) {
+ final PartitionSpec spec = table.spec();
+ final Schema schema = table.schema();
+ final MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ final FileAppenderFactory appenderFactory = new ParquetFileAppenderFactory(schema, spec, metricsConfig);
+
+ final int partitionId = spec.specId();
+ final long taskId = System.currentTimeMillis();
+ final OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FILE_FORMAT).build();
+ final FileIO io = table.io();
+
+ final long writeTargetFileSize;
+ final String targetFileSize = table.properties().get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES);
+ if (targetFileSize == null || targetFileSize.isEmpty()) {
+ writeTargetFileSize = TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+ } else {
+ writeTargetFileSize = Long.parseLong(targetFileSize);
+ }
+
+ final TaskWriter taskWriter;
+ if (spec.isUnpartitioned()) {
+ taskWriter = new UnpartitionedWriter<>(spec, FILE_FORMAT, appenderFactory, outputFileFactory, io, writeTargetFileSize);
+ } else {
+ taskWriter = new ParquetPartitionedWriter(spec, appenderFactory, outputFileFactory, io, writeTargetFileSize, schema);
+ }
+ return taskWriter;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetFileAppenderFactory.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetFileAppenderFactory.java
new file mode 100644
index 000000000000..3db0d852cf02
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetFileAppenderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Objects;
+
+/**
+ * Parquet implementation of Iceberg File Appender Factory instead of GenericAppenderFactory from iceberg-data library
+ */
+public class ParquetFileAppenderFactory implements FileAppenderFactory {
+ private final Schema schema;
+
+ private final PartitionSpec spec;
+
+ private final MetricsConfig metricsConfig;
+
+ public ParquetFileAppenderFactory(final Schema schema, final PartitionSpec spec, final MetricsConfig metricsConfig) {
+ this.schema = Objects.requireNonNull(schema, "Schema required");
+ this.spec = Objects.requireNonNull(spec, "Partition Spec required");
+ this.metricsConfig = Objects.requireNonNull(metricsConfig, "Metrics Configuration required");
+ }
+
+ @Override
+ public FileAppender newAppender(final OutputFile outputFile, final FileFormat fileFormat) {
+ try {
+ return Parquet.write(outputFile)
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .metricsConfig(metricsConfig)
+ .overwrite()
+ .build();
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Parquet Appender build failed", e);
+ }
+ }
+
+ @Override
+ public DataWriter newDataWriter(final EncryptedOutputFile encryptedOutputFile, final FileFormat fileFormat, final StructLike partition) {
+ final FileAppender appender = newAppender(encryptedOutputFile, fileFormat);
+ final String location = encryptedOutputFile.encryptingOutputFile().location();
+ final EncryptionKeyMetadata keyMetadata = encryptedOutputFile.keyMetadata();
+ return new DataWriter<>(appender, fileFormat, location, spec, partition, keyMetadata);
+ }
+
+ @Override
+ public EqualityDeleteWriter newEqDeleteWriter(final EncryptedOutputFile encryptedOutputFile, final FileFormat fileFormat, final StructLike partition) {
+ throw new UnsupportedOperationException("Equality Delete Writer not supported");
+ }
+
+ @Override
+ public PositionDeleteWriter newPosDeleteWriter(final EncryptedOutputFile encryptedOutputFile, final FileFormat fileFormat, final StructLike partition) {
+ throw new UnsupportedOperationException("Position Delete Writer not supported");
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java
new file mode 100644
index 000000000000..37ac0bbf7693
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.nifi.services.iceberg.IcebergRowWriter;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Standard implementation of Iceberg Row Writer wrapping an Iceberg Task Writer for abstracted access to iceberg-io modules
+ */
+public class ParquetIcebergRowWriter implements IcebergRowWriter {
+ private final TaskWriter writer;
+
+ public ParquetIcebergRowWriter(final TaskWriter writer) {
+ this.writer = Objects.requireNonNull(writer, "Writer required");
+ }
+
+ @Override
+ public void write(final Record row) throws IOException {
+ writer.write(row);
+ }
+
+ @Override
+ public void abort() throws IOException {
+ writer.abort();
+ }
+
+ @Override
+ public DataFile[] dataFiles() throws IOException {
+ return writer.dataFiles();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
new file mode 100644
index 000000000000..e5bc9a186578
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+
+/**
+ * Parquet implementation of Partition Writer with Partition Key derived from configured Schema definition
+ */
+public class ParquetPartitionedWriter extends PartitionedFanoutWriter {
+
+ private final PartitionKey partitionKey;
+
+ public ParquetPartitionedWriter(
+ final PartitionSpec spec,
+ final FileAppenderFactory appenderFactory,
+ final OutputFileFactory fileFactory,
+ final FileIO io,
+ final long targetFileSize,
+ final Schema schema
+ ) {
+ super(spec, FileFormat.PARQUET, appenderFactory, fileFactory, io, targetFileSize);
+ this.partitionKey = new PartitionKey(spec, schema);
+ }
+
+ @Override
+ protected PartitionKey partition(final Record record) {
+ partitionKey.partition(record);
+ return partitionKey;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 000000000000..b4d53435d2cd
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.iceberg.parquet.ParquetIcebergWriter
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
new file mode 100644
index 000000000000..b41ae024811d
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet;
+
+import org.apache.nifi.services.iceberg.IcebergRowWriter;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ParquetIcebergWriterTest {
+ private static final String SERVICE_ID = ParquetIcebergWriter.class.getSimpleName();
+
+ private static final String LOCATION = "iceberg://output.parquet";
+
+ private static final int FIRST_FIELD_ID = 0;
+
+ private static final String FIRST_FIELD_NAME = "label";
+
+ private static final String FIRST_FIELD_VALUE = "value";
+
+ private ParquetIcebergWriter parquetIcebergWriter;
+
+ private TestRunner runner;
+
+ @Mock
+ private Table table;
+
+ @Mock
+ private PartitionSpec spec;
+
+ @Mock
+ private FileIO io;
+
+ @Mock
+ private LocationProvider locationProvider;
+
+ @Mock
+ private EncryptionManager encryptionManager;
+
+ @Mock
+ private EncryptedOutputFile encryptedOutputFile;
+
+ @BeforeEach
+ void setRunner() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ parquetIcebergWriter = new ParquetIcebergWriter();
+ runner.addControllerService(SERVICE_ID, parquetIcebergWriter);
+ }
+
+ @Test
+ void testEnabledDisabled() {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ runner.disableControllerService(parquetIcebergWriter);
+ }
+
+ @Test
+ void testGetRowWriter() {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ final Schema schema = getSchema();
+ final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ setTable(schema, outputFile);
+ when(spec.isUnpartitioned()).thenReturn(true);
+
+ final IcebergRowWriter rowWriter = parquetIcebergWriter.getRowWriter(table);
+
+ assertNotNull(rowWriter);
+ }
+
+ @Test
+ void testWriteDataFiles() throws IOException {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ final Schema schema = getSchema();
+ final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ setTable(schema, outputFile);
+ when(spec.isUnpartitioned()).thenReturn(true);
+
+ final IcebergRowWriter rowWriter = parquetIcebergWriter.getRowWriter(table);
+
+ final GenericRecord row = GenericRecord.create(schema);
+ row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
+ rowWriter.write(row);
+
+ final DataFile[] dataFiles = rowWriter.dataFiles();
+ assertNotNull(dataFiles);
+ assertEquals(1, dataFiles.length);
+
+ final DataFile dataFile = dataFiles[0];
+ assertNotNull(dataFile);
+ assertEquals(FileFormat.PARQUET, dataFile.format());
+ assertEquals(1, dataFile.recordCount());
+
+ final byte[] serialized = outputFile.toByteArray();
+ assertEquals(serialized.length, dataFile.fileSizeInBytes());
+ }
+
+ private Schema getSchema() {
+ final Types.NestedField nestedField = Types.NestedField.required(FIRST_FIELD_ID, FIRST_FIELD_NAME, Types.StringType.get());
+ return new Schema(nestedField);
+ }
+
+ private void setTable(final Schema schema, final OutputFile outputFile) {
+ when(table.schema()).thenReturn(schema);
+ when(table.spec()).thenReturn(spec);
+ when(table.io()).thenReturn(io);
+ when(table.locationProvider()).thenReturn(locationProvider);
+ when(table.encryption()).thenReturn(encryptionManager);
+ when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
+ when(io.newOutputFile(eq(LOCATION))).thenReturn(outputFile);
+ when(encryptionManager.encrypt(eq(outputFile))).thenReturn(encryptedOutputFile);
+ when(encryptedOutputFile.encryptingOutputFile()).thenReturn(outputFile);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
new file mode 100644
index 000000000000..e7b07120dd45
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
@@ -0,0 +1,43 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-processors-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-processors
+ ${project.version}
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api-nar
+ ${project.version}
+ nar
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000000..44893cdb29d5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
new file mode 100644
index 000000000000..87dea9b6c678
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-processors
+ jar
+
+
+
+ org.apache.nifi
+ nifi-record
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api
+ ${project.version}
+ provided
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIcebergRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIcebergRecord.java
new file mode 100644
index 000000000000..a86b9bb1c1cf
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIcebergRecord.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processors.iceberg.record.DelegatedRecord;
+import org.apache.nifi.services.iceberg.IcebergCatalog;
+import org.apache.nifi.services.iceberg.IcebergRowWriter;
+import org.apache.nifi.services.iceberg.IcebergWriter;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Clock;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"iceberg", "analytics", "polaris", "s3"})
+@CapabilityDescription("Store records in Iceberg Table using configurable Catalog for managing namespaces and tables.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutIcebergRecord extends AbstractProcessor {
+
+ static final PropertyDescriptor ICEBERG_CATALOG = new PropertyDescriptor.Builder()
+ .name("Iceberg Catalog")
+ .description("Provider Service for Iceberg Catalog")
+ .required(true)
+ .identifiesControllerService(IcebergCatalog.class)
+ .build();
+
+ static final PropertyDescriptor ICEBERG_WRITER = new PropertyDescriptor.Builder()
+ .name("Iceberg Writer")
+ .description("Provider Service for Iceberg Row Writers responsible for producing formatted Iceberg Data Files")
+ .required(true)
+ .identifiesControllerService(IcebergWriter.class)
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("Record Reader for incoming FlowFiles")
+ .required(true)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .build();
+
+ static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+ .name("Namespace")
+ .description("Iceberg Namespace containing Tables")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("Iceberg Table Name")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles transferred to Iceberg")
+ .build();
+
+ static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles not transferred to Iceberg")
+ .build();
+
+ static final String RECORDS_PROCESSED_COUNTER = "Records Processed";
+
+ static final String DATA_FILES_PROCESSED_COUNTER = "Data Files Processed";
+
+ private static final List properties = List.of(
+ ICEBERG_CATALOG,
+ ICEBERG_WRITER,
+ RECORD_READER,
+ NAMESPACE,
+ TABLE_NAME
+ );
+
+ private static final Set relationships = Set.of(SUCCESS, FAILURE);
+
+ private static final long MAXIMUM_BYTES = 536870912;
+
+ private final Clock clock = Clock.systemDefaultZone();
+
+ private volatile Catalog catalog;
+
+ private volatile IcebergWriter icebergWriter;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ final IcebergCatalog icebergCatalog = context.getProperty(ICEBERG_CATALOG).asControllerService(IcebergCatalog.class);
+ catalog = icebergCatalog.getCatalog();
+ icebergWriter = context.getProperty(ICEBERG_WRITER).asControllerService(IcebergWriter.class);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final TableIdentifierFlowFileFilter flowFileFilter = new TableIdentifierFlowFileFilter(context, MAXIMUM_BYTES);
+ final List flowFiles = session.get(flowFileFilter);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final TableIdentifier tableIdentifier = flowFileFilter.getTableIdentifier();
+ final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ processFlowFiles(session, flowFiles, tableIdentifier, recordReaderFactory);
+ }
+
+ private void processFlowFiles(final ProcessSession session, final List flowFiles, final TableIdentifier tableIdentifier, final RecordReaderFactory recordReaderFactory) {
+ final long started = clock.millis();
+
+ final AtomicReference relationship = new AtomicReference<>(SUCCESS);
+
+ final Table table = getTable(tableIdentifier);
+ final Schema schema = table.schema();
+ final Types.StructType struct = schema.asStruct();
+ final IcebergRowWriter rowWriter = icebergWriter.getRowWriter(table);
+
+ for (final FlowFile flowFile : flowFiles) {
+ try (
+ InputStream inputStream = session.read(flowFile);
+ RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, inputStream, getLogger())
+ ) {
+ final AtomicLong recordsProcessed = new AtomicLong();
+ try {
+ writeRecords(recordReader, rowWriter, struct, recordsProcessed);
+ session.adjustCounter(RECORDS_PROCESSED_COUNTER, recordsProcessed.get(), false);
+ } catch (final Exception e) {
+ getLogger().error("Write Rows to Table [{}] failed {}", tableIdentifier, flowFile, e);
+ abortWriter(rowWriter, tableIdentifier);
+ relationship.set(FAILURE);
+ }
+ } catch (final Exception e) {
+ getLogger().error("Processing Records for Table [{}] failed {}", tableIdentifier, flowFile, e);
+ abortWriter(rowWriter, tableIdentifier);
+ relationship.set(FAILURE);
+ }
+ }
+
+ if (SUCCESS.equals(relationship.get())) {
+ try {
+ final DataFile[] dataFiles = rowWriter.dataFiles();
+ appendDataFiles(table, dataFiles);
+ session.adjustCounter(DATA_FILES_PROCESSED_COUNTER, dataFiles.length, false);
+ } catch (final Exception e) {
+ getLogger().error("Appending Data Files to Table [{}] failed", tableIdentifier, e);
+ relationship.set(FAILURE);
+ }
+ }
+
+ try {
+ rowWriter.close();
+ } catch (final IOException e) {
+ getLogger().warn("Failed to close Row Writer for Table [{}]", tableIdentifier, e);
+ }
+
+ if (SUCCESS.equals(relationship.get())) {
+ final long elapsed = clock.millis() - started;
+ final String transitUri = table.location();
+ for (final FlowFile flowFile : flowFiles) {
+ session.getProvenanceReporter().send(flowFile, transitUri, elapsed);
+ }
+ }
+
+ session.transfer(flowFiles, relationship.get());
+ }
+
+ private Table getTable(final TableIdentifier tableIdentifier) {
+ final Table table;
+
+ if (catalog.tableExists(tableIdentifier)) {
+ table = catalog.loadTable(tableIdentifier);
+ } else {
+ throw new IllegalStateException("Table [%s] not found in Catalog".formatted(tableIdentifier));
+ }
+
+ return table;
+ }
+
+ private void writeRecords(
+ final RecordReader recordReader,
+ final IcebergRowWriter rowWriter,
+ final Types.StructType struct,
+ final AtomicLong recordsProcessed
+ ) throws IOException, MalformedRecordException {
+ org.apache.nifi.serialization.record.Record inputRecord = recordReader.nextRecord();
+ while (inputRecord != null) {
+ final DelegatedRecord delegatedRecord = new DelegatedRecord(inputRecord, struct);
+ // Write Records to storage based on Iceberg Table configuration
+ rowWriter.write(delegatedRecord);
+ inputRecord = recordReader.nextRecord();
+ recordsProcessed.incrementAndGet();
+ }
+ }
+
+ private void appendDataFiles(final Table table, final DataFile[] dataFiles) {
+ final AppendFiles appendFiles = table.newAppend();
+ for (final DataFile dataFile : dataFiles) {
+ appendFiles.appendFile(dataFile);
+ }
+
+ appendFiles.commit();
+ }
+
+ private void abortWriter(final IcebergRowWriter rowWriter, final TableIdentifier tableIdentifier) {
+ try {
+ rowWriter.abort();
+ } catch (final IOException e) {
+ getLogger().warn("Abort Writing to Table [{}] failed", tableIdentifier, e);
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/TableIdentifierFlowFileFilter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/TableIdentifierFlowFileFilter.java
new file mode 100644
index 000000000000..8416acb96a93
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/TableIdentifierFlowFileFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+
+/**
+ * Table Identifier FlowFile Filter returns matches based on common Table Identifier
+ */
+class TableIdentifierFlowFileFilter implements FlowFileFilter {
+ // Maximum number of FlowFiles accepted
+ private static final int MAXIMUM_FLOW_FILES = 1000;
+
+ private final ProcessContext context;
+
+ private int flowFilesAccepted;
+
+ private long flowFileBytesAccepted;
+
+ private final long maximumBytes;
+
+ private TableIdentifier tableIdentifier;
+
+ TableIdentifierFlowFileFilter(final ProcessContext context, final long maximumBytes) {
+ this.context = context;
+ this.maximumBytes = maximumBytes;
+ }
+
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ final TableIdentifier flowFileTableIdentifier = getFlowFileTableIdentifier(flowFile);
+ if (tableIdentifier == null) {
+ tableIdentifier = flowFileTableIdentifier;
+ }
+
+ final FlowFileFilterResult filterResult;
+ if (tableIdentifier.equals(flowFileTableIdentifier)) {
+ final long flowFileSize = flowFile.getSize();
+ if (flowFileSize >= maximumBytes) {
+ // Accept one FlowFile when larger than maximum number of bytes
+ filterResult = FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ } else {
+ flowFilesAccepted++;
+ flowFileBytesAccepted += flowFileSize;
+
+ if (flowFileBytesAccepted >= maximumBytes) {
+ // Reject FlowFile and terminate filtering when exceeding maximum number of bytes
+ filterResult = FlowFileFilterResult.REJECT_AND_TERMINATE;
+ } else if (flowFilesAccepted == MAXIMUM_FLOW_FILES) {
+ // Accept FlowFile and terminate filtering when reaching maximum number of FlowFiles
+ filterResult = FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ } else {
+ filterResult = FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+ }
+ } else {
+ filterResult = FlowFileFilterResult.REJECT_AND_CONTINUE;
+ }
+
+ return filterResult;
+ }
+
+ TableIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ private TableIdentifier getFlowFileTableIdentifier(final FlowFile flowFile) {
+ final String namespace = context.getProperty(PutIcebergRecord.NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(PutIcebergRecord.TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final Namespace icebergNamespace = Namespace.of(namespace);
+ return TableIdentifier.of(icebergNamespace, tableName);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
new file mode 100644
index 000000000000..c9ab79c42873
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Standard implementation of Iceberg Record wrapping NiFi Record
+ */
+public class DelegatedRecord implements Record {
+ private final org.apache.nifi.serialization.record.Record record;
+
+ private final Types.StructType struct;
+
+ public DelegatedRecord(
+ final org.apache.nifi.serialization.record.Record record,
+ final Types.StructType struct
+ ) {
+ this.record = Objects.requireNonNull(record);
+ this.struct = Objects.requireNonNull(struct);
+ }
+
+ @Override
+ public Types.StructType struct() {
+ return struct;
+ }
+
+ /**
+ * Get Field value for field name from supporting Record
+ *
+ * @param fieldName Field Name for value requested
+ * @return Field Value or null when not found
+ */
+ @Override
+ public Object getField(final String fieldName) {
+ return record.getValue(fieldName);
+ }
+
+ /**
+ * Set Field value for field name in supporting Record
+ *
+ * @param fieldName Field Name to be added
+ * @param fieldValue Field Value to be added
+ */
+ @Override
+ public void setField(final String fieldName, final Object fieldValue) {
+ record.setValue(fieldName, fieldValue);
+ }
+
+ /**
+ * Get Field value for specified position from supporting Record
+ *
+ * @param position Field position
+ * @return Field value or null when not found
+ */
+ @Override
+ public Object get(final int position) {
+ final RecordField recordField = record.getSchema().getField(position);
+ return record.getValue(recordField);
+ }
+
+ /**
+ * Create and return a copy of the Record
+ *
+ * @return Copy of the Record
+ */
+ @Override
+ public Record copy() {
+ return copy(Collections.emptyMap());
+ }
+
+ /**
+ * Create and return a copy of the Record
+ *
+ * @param overrides Fields and values to override in the copied Record
+ * @return Copy of the Record
+ */
+ @Override
+ public Record copy(final Map overrides) {
+ final Map values = record.toMap();
+ values.putAll(overrides);
+ final MapRecord mapRecord = new MapRecord(record.getSchema(), values);
+ return new DelegatedRecord(mapRecord, struct);
+ }
+
+ /**
+ * Get count of fields in the supporting Record
+ *
+ * @return Count of fields
+ */
+ @Override
+ public int size() {
+ return record.getSchema().getFieldCount();
+ }
+
+ /**
+ * Get Field value for specified position cast to specified value class
+ *
+ * @param position Field position
+ * @param valueClass Field value class
+ * @return Field value or null when not found
+ * @param Field Value Type
+ */
+ @Override
+ public T get(final int position, final Class valueClass) {
+ final Object value = get(position);
+ if (value == null || valueClass.isInstance(value)) {
+ return valueClass.cast(value);
+ }
+ throw new IllegalStateException(String.format("Field [%d] value not an instance of [%s]", position, valueClass));
+ }
+
+ /**
+ * Set Field value for specified position
+ *
+ * @param position Field position
+ * @param value Field value
+ * @param Field Value Type
+ */
+ @Override
+ public void set(final int position, final T value) {
+ final RecordField recordField = record.getSchema().getField(position);
+ record.setValue(recordField, value);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ final boolean equals;
+
+ if (other instanceof DelegatedRecord otherRecord) {
+ equals = record.equals(otherRecord.record);
+ } else {
+ equals = false;
+ }
+
+ return equals;
+ }
+
+ @Override
+ public int hashCode() {
+ return record.hashCode();
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 000000000000..c04c849c404e
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.iceberg.PutIcebergRecord
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/PutIcebergRecordTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/PutIcebergRecordTest.java
new file mode 100644
index 000000000000..9a8c0a066b92
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/PutIcebergRecordTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.services.iceberg.IcebergCatalog;
+import org.apache.nifi.services.iceberg.IcebergRowWriter;
+import org.apache.nifi.services.iceberg.IcebergWriter;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class PutIcebergRecordTest {
+ private static final String ICEBERG_CATALOG_ID = IcebergCatalog.class.getName();
+
+ private static final String ICEBERG_WRITER_ID = IcebergWriter.class.getName();
+
+ private static final String RECORD_READER_ID = RecordReaderFactory.class.getName();
+
+ private static final String NAMESPACE = PutIcebergRecord.class.getSimpleName();
+
+ private static final String TABLE_NAME = TableIdentifier.class.getSimpleName();
+
+ private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME);
+
+ private static final byte[] EMPTY = new byte[0];
+
+ private static final String FIELD_NAME = "standardField";
+
+ private static final String FIELD_VALUE = String.class.getName();
+
+ private static final RecordSchema RECORD_SCHEMA = new SimpleRecordSchema(List.of(
+ new RecordField(FIELD_NAME, RecordFieldType.STRING.getDataType())
+ ));
+
+ private static final Schema TABLE_SCHEMA = new Schema(List.of(
+ Types.NestedField.required(1, FIELD_NAME, Types.StringType.get())
+ ));
+
+ private static final Record STANDARD_RECORD = new MapRecord(RECORD_SCHEMA, Map.of(FIELD_NAME, FIELD_VALUE));
+
+ @Mock
+ private IcebergCatalog icebergCatalog;
+
+ @Mock
+ private Catalog catalog;
+
+ @Mock
+ private Table table;
+
+ @Mock
+ private AppendFiles appendFiles;
+
+ @Mock
+ private IcebergWriter icebergWriter;
+
+ @Mock
+ private IcebergRowWriter icebergRowWriter;
+
+ @Mock
+ private DataFile dataFile;
+
+ @Mock
+ private RecordReaderFactory recordReaderFactory;
+
+ @Mock
+ private RecordReader recordReader;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setRunner() throws InitializationException {
+ runner = TestRunners.newTestRunner(PutIcebergRecord.class);
+
+ setProperties();
+ }
+
+ @Test
+ void testRunNoFlowFiles() {
+ runner.run();
+ }
+
+ @Test
+ void testRunNoDataFiles() throws Exception {
+ runner.enqueue(EMPTY);
+
+ setLoadTable();
+ setRecordReader();
+ setWriter();
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutIcebergRecord.SUCCESS);
+ }
+
+ @Test
+ void testRunWriteFailed() throws Exception {
+ runner.enqueue(EMPTY);
+
+ setLoadTable();
+ setRecordReader();
+
+ when(recordReader.nextRecord()).thenReturn(STANDARD_RECORD).thenReturn(null);
+ when(icebergWriter.getRowWriter(any(Table.class))).thenReturn(icebergRowWriter);
+ doThrow(new IOException()).when(icebergRowWriter).write(any());
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutIcebergRecord.FAILURE);
+ }
+
+ @Test
+ void testRunSuccess() throws Exception {
+ runner.enqueue(EMPTY);
+
+ setLoadTable();
+ setRecordReader();
+ setWriter();
+
+ when(recordReader.nextRecord()).thenReturn(STANDARD_RECORD, STANDARD_RECORD, null);
+
+ final DataFile[] dataFiles = new DataFile[]{dataFile};
+ when(icebergRowWriter.dataFiles()).thenReturn(dataFiles);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutIcebergRecord.SUCCESS);
+
+ final Long recordsProcessed = runner.getCounterValue(PutIcebergRecord.RECORDS_PROCESSED_COUNTER);
+ assertEquals(2, recordsProcessed);
+
+ final Long dataFilesProcessed = runner.getCounterValue(PutIcebergRecord.DATA_FILES_PROCESSED_COUNTER);
+ assertEquals(dataFiles.length, dataFilesProcessed);
+ }
+
+ private void setLoadTable() {
+ when(catalog.tableExists(eq(TABLE_IDENTIFIER))).thenReturn(true);
+ when(catalog.loadTable(eq(TABLE_IDENTIFIER))).thenReturn(table);
+ when(table.schema()).thenReturn(TABLE_SCHEMA);
+ }
+
+ private void setRecordReader() throws Exception {
+ when(recordReaderFactory.createRecordReader(any(FlowFile.class), any(), any())).thenReturn(recordReader);
+ }
+
+ private void setWriter() throws Exception {
+ when(icebergWriter.getRowWriter(any(Table.class))).thenReturn(icebergRowWriter);
+ when(icebergRowWriter.dataFiles()).thenReturn(new DataFile[0]);
+ when(table.newAppend()).thenReturn(appendFiles);
+ when(table.location()).thenReturn(TABLE_IDENTIFIER.toString());
+ }
+
+ private void setProperties() throws InitializationException {
+ when(icebergCatalog.getCatalog()).thenReturn(catalog);
+
+ when(icebergCatalog.getIdentifier()).thenReturn(ICEBERG_CATALOG_ID);
+ runner.addControllerService(ICEBERG_CATALOG_ID, icebergCatalog);
+ runner.enableControllerService(icebergCatalog);
+ runner.setProperty(PutIcebergRecord.ICEBERG_CATALOG, ICEBERG_CATALOG_ID);
+
+ when(icebergWriter.getIdentifier()).thenReturn(ICEBERG_WRITER_ID);
+ runner.addControllerService(ICEBERG_WRITER_ID, icebergWriter);
+ runner.enableControllerService(icebergWriter);
+ runner.setProperty(PutIcebergRecord.ICEBERG_WRITER, ICEBERG_WRITER_ID);
+
+ when(recordReaderFactory.getIdentifier()).thenReturn(RECORD_READER_ID);
+ runner.addControllerService(RECORD_READER_ID, recordReaderFactory);
+ runner.enableControllerService(recordReaderFactory);
+ runner.setProperty(PutIcebergRecord.RECORD_READER, RECORD_READER_ID);
+
+ runner.setProperty(PutIcebergRecord.NAMESPACE, NAMESPACE);
+ runner.setProperty(PutIcebergRecord.TABLE_NAME, TABLE_NAME);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/pom.xml
new file mode 100644
index 000000000000..3f5769db74e2
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/pom.xml
@@ -0,0 +1,43 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-rest-catalog-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-rest-catalog
+ ${project.version}
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api-nar
+ ${project.version}
+ nar
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/src/main/resources/META-INF/LICENSE b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000000..44893cdb29d5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/pom.xml
new file mode 100644
index 000000000000..c8337512b8eb
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/pom.xml
@@ -0,0 +1,56 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-iceberg-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-iceberg-rest-catalog
+ jar
+
+
+
+ org.apache.nifi
+ nifi-iceberg-service-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-api
+ provided
+
+
+ org.apache.iceberg
+ iceberg-core
+ provided
+
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthenticationStrategy.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthenticationStrategy.java
new file mode 100644
index 000000000000..e019213c85b6
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthenticationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Authentication Types for REST Catalog access
+ */
+enum AuthenticationStrategy implements DescribedValue {
+ BEARER("Bearer Authentication", "Authentication using configured Bearer Token"),
+
+ OAUTH2("OAuth 2.0", "Authentication using OAuth 2.0 with selected flows");
+
+ private final String displayName;
+
+ private final String description;
+
+ AuthenticationStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthorizationGrantType.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthorizationGrantType.java
new file mode 100644
index 000000000000..ce772fe747da
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/AuthorizationGrantType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported OAuth 2.0 Authorization Grant Types
+ */
+enum AuthorizationGrantType implements DescribedValue {
+ CLIENT_CREDENTIALS("Client Credentials", "Client Credentials consisting of Client ID and Client Secret");
+
+ private final String displayName;
+
+ private final String description;
+
+ AuthorizationGrantType(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/CredentialsRefreshRESTClient.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/CredentialsRefreshRESTClient.java
new file mode 100644
index 000000000000..a98b5bf7c92f
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/CredentialsRefreshRESTClient.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTRequest;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Custom implementation of REST Client supporting Token Refresh using Client Credentials instead of Token Exchange from RFC 8693
+ */
+class CredentialsRefreshRESTClient implements RESTClient {
+ protected static final String CLIENT_ID_PARAMETER = "client_id";
+
+ protected static final String CLIENT_SECRET_PARAMETER = "client_secret";
+
+ protected static final String GRANT_TYPE_PARAMETER = "grant_type";
+
+ protected static final String CLIENT_CREDENTIALS_GRANT_TYPE = "client_credentials";
+
+ protected static final String TOKEN_EXCHANGE_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange";
+
+ private static final String SUBJECT_TOKEN_PARAMETER = "subject_token";
+
+ private static final String SUBJECT_TOKEN_TYPE_PARAMETER = "subject_token_type";
+
+ private static final Set TOKEN_EXCHANGE_PARAMETERS = Set.of(GRANT_TYPE_PARAMETER, SUBJECT_TOKEN_PARAMETER, SUBJECT_TOKEN_TYPE_PARAMETER);
+
+ private final ComponentLog log;
+
+ private final RESTClient restClient;
+
+ private final String clientId;
+
+ private final String clientSecret;
+
+ public CredentialsRefreshRESTClient(final ComponentLog log, final RESTClient restClient, final String clientId, final String clientSecret) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.restClient = Objects.requireNonNull(restClient, "REST Client required");
+ this.clientId = Objects.requireNonNull(clientId, "Client ID required");
+ this.clientSecret = Objects.requireNonNull(clientSecret, "Client Secret required");
+ }
+
+ @Override
+ public void head(final String path, final Map headers, final Consumer errorHandler) {
+ restClient.head(path, headers, errorHandler);
+ }
+
+ @Override
+ public T delete(final String path, final Class responseType, final Map headers, final Consumer errorHandler) {
+ return restClient.delete(path, responseType, headers, errorHandler);
+ }
+
+ @Override
+ public T get(final String path, final Map queryParams, final Class responseType, final Map headers,
+ final Consumer errorHandler) {
+ return restClient.get(path, queryParams, responseType, headers, errorHandler);
+ }
+
+ @Override
+ public T post(final String path, final RESTRequest body, final Class responseType, final Map headers, final Consumer errorHandler) {
+ return restClient.post(path, body, responseType, headers, errorHandler);
+ }
+
+ @Override
+ public T postForm(final String path, final Map formData, final Class responseType, final Map headers,
+ final Consumer errorHandler) {
+ final Map postFormData;
+
+ if (isTokenExchangeFormData(formData)) {
+ // Replace Token Exchange parameters with Client Credentials parameters for requests from OAuth2Util.tokenExchangeRequest()
+ postFormData = getClientCredentialsFormData(formData);
+ log.debug("Token Refresh with Client Credentials requested");
+ } else {
+ postFormData = formData;
+ }
+
+ return restClient.postForm(path, postFormData, responseType, headers, errorHandler);
+ }
+
+ @Override
+ public void close() throws IOException {
+ restClient.close();
+ }
+
+ /**
+ * Build REST Client with authentication session using current configuration
+ *
+ * @param session Authentication Session
+ * @return REST Client
+ */
+ @Override
+ public RESTClient withAuthSession(final AuthSession session) {
+ final RESTClient sessionRestClient = restClient.withAuthSession(session);
+ return new CredentialsRefreshRESTClient(log, sessionRestClient, clientId, clientSecret);
+ }
+
+ private boolean isTokenExchangeFormData(final Map formData) {
+ final boolean tokenExchangeFound;
+
+ final String grantType = formData.get(GRANT_TYPE_PARAMETER);
+ if (grantType == null) {
+ tokenExchangeFound = false;
+ } else {
+ tokenExchangeFound = TOKEN_EXCHANGE_GRANT_TYPE.contentEquals(grantType);
+ }
+
+ return tokenExchangeFound;
+ }
+
+ private Map getClientCredentialsFormData(final Map formData) {
+ final Map clientCredentialsFormData = new HashMap<>();
+
+ formData.keySet()
+ .stream()
+ .filter(Predicate.not(TOKEN_EXCHANGE_PARAMETERS::contains))
+ .forEach(formDataKey -> {
+ final String formDataValue = formData.get(formDataKey);
+ clientCredentialsFormData.put(formDataKey, formDataValue);
+ });
+
+ clientCredentialsFormData.put(GRANT_TYPE_PARAMETER, CLIENT_CREDENTIALS_GRANT_TYPE);
+ clientCredentialsFormData.put(CLIENT_ID_PARAMETER, clientId);
+ clientCredentialsFormData.put(CLIENT_SECRET_PARAMETER, clientSecret);
+
+ return clientCredentialsFormData;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
new file mode 100644
index 000000000000..1b3910b838c1
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-rest-catalog/src/main/java/org/apache/nifi/services/iceberg/catalog/RESTIcebergCatalog.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.catalog;
+
+import org.apache.iceberg.rest.RESTSessionCatalog;
+import org.apache.iceberg.rest.auth.AuthProperties;
+import org.apache.nifi.services.iceberg.IcebergCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.iceberg.IcebergFileIOProvider;
+import org.apache.nifi.services.iceberg.ProviderContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
+import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+
+@SupportsSensitiveDynamicProperties
+@Tags({"iceberg", "catalog", "polaris"})
+@CapabilityDescription("Provides Apache Iceberg integration with REST Catalogs such as Apache Polaris")
+public class RESTIcebergCatalog extends AbstractControllerService implements IcebergCatalog, VerifiableControllerService {
+ static final PropertyDescriptor CATALOG_URI = new PropertyDescriptor.Builder()
+ .name("Catalog URI")
+ .description("Apache Iceberg Catalog REST URI")
+ .required(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor FILE_IO_PROVIDER = new PropertyDescriptor.Builder()
+ .name("File IO Provider")
+ .description("Provider for Iceberg File Input and Output operations")
+ .required(true)
+ .identifiesControllerService(IcebergFileIOProvider.class)
+ .build();
+
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Authentication Strategy")
+ .description("Strategy for authenticating with the Apache Iceberg Catalog over HTTP")
+ .required(true)
+ .allowableValues(AuthenticationStrategy.class)
+ .defaultValue(AuthenticationStrategy.OAUTH2)
+ .build();
+
+ static final PropertyDescriptor BEARER_TOKEN = new PropertyDescriptor.Builder()
+ .name("Bearer Token")
+ .description("Bearer Token for authentication to Apache Iceberg Catalog")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY, AuthenticationStrategy.BEARER)
+ .build();
+
+ static final PropertyDescriptor AUTHORIZATION_SERVER_URI = new PropertyDescriptor.Builder()
+ .name("Authorization Server URI")
+ .description("Authorization Server URI supporting OAuth 2")
+ .required(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY, AuthenticationStrategy.OAUTH2)
+ .build();
+
+ static final PropertyDescriptor AUTHORIZATION_GRANT_TYPE = new PropertyDescriptor.Builder()
+ .name("Authorization Grant Type")
+ .description("OAuth 2.0 Authorization Grant Type for obtaining Access Tokens")
+ .required(true)
+ .allowableValues(AuthorizationGrantType.class)
+ .defaultValue(AuthorizationGrantType.CLIENT_CREDENTIALS)
+ .dependsOn(AUTHENTICATION_STRATEGY, AuthenticationStrategy.OAUTH2)
+ .build();
+
+ static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
+ .name("Client ID")
+ .description("Client ID for OAuth 2 Client Credentials")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHORIZATION_GRANT_TYPE, AuthorizationGrantType.CLIENT_CREDENTIALS)
+ .build();
+
+ static final PropertyDescriptor CLIENT_SECRET = new PropertyDescriptor.Builder()
+ .name("Client Secret")
+ .description("Client Secret for OAuth 2 Client Credentials")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHORIZATION_GRANT_TYPE, AuthorizationGrantType.CLIENT_CREDENTIALS)
+ .build();
+
+ static final PropertyDescriptor ACCESS_TOKEN_SCOPES = new PropertyDescriptor.Builder()
+ .name("Access Token Scopes")
+ .description("Comma-separated list of one or more OAuth 2 scopes requested for Access Tokens")
+ .required(true)
+ .defaultValue(OAuth2Properties.CATALOG_SCOPE)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY, AuthenticationStrategy.OAUTH2)
+ .build();
+
+ static final PropertyDescriptor WAREHOUSE_LOCATION = new PropertyDescriptor.Builder()
+ .name("Warehouse Location")
+ .description("Apache Iceberg Catalog Warehouse location or identifier")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ private static final List PROPERTY_DESCRIPTORS = List.of(
+ CATALOG_URI,
+ FILE_IO_PROVIDER,
+ AUTHENTICATION_STRATEGY,
+ BEARER_TOKEN,
+ AUTHORIZATION_SERVER_URI,
+ AUTHORIZATION_GRANT_TYPE,
+ CLIENT_ID,
+ CLIENT_SECRET,
+ ACCESS_TOKEN_SCOPES,
+ WAREHOUSE_LOCATION
+ );
+
+ private static final String CONFIGURATION_STEP = "Catalog Configuration";
+
+ private static final String INITIALIZED_STATUS = "Initialized";
+
+ private static final String CLIENT_CREDENTIALS_FORMAT = "%s:%s";
+
+ private static final String SPACE_SEPARATOR = " ";
+
+ private RESTSessionCatalog sessionCatalog;
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyName)
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ sessionCatalog = getInitializedCatalog(context);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ if (sessionCatalog != null) {
+ try {
+ sessionCatalog.close();
+ } catch (final IOException e) {
+ getLogger().warn("Close Catalog failed", e);
+ }
+ }
+ }
+
+ @Override
+ public List verify(final ConfigurationContext context, final ComponentLog componentLog, final Map attributes) {
+ final List results = new ArrayList<>();
+
+ try {
+ final RESTSessionCatalog initializedCatalog = getInitializedCatalog(context);
+ final String name = initializedCatalog.name();
+ componentLog.info("REST Catalog Initialized [{}]", name);
+
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName(CONFIGURATION_STEP)
+ .outcome(SUCCESSFUL)
+ .explanation(INITIALIZED_STATUS)
+ .build()
+ );
+ } catch (final Exception e) {
+ componentLog.warn("Catalog Configuration failed", e);
+ final String explanation = getExplanation(e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName(CONFIGURATION_STEP)
+ .outcome(FAILED)
+ .explanation(explanation)
+ .build()
+ );
+ }
+
+ return results;
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ final SessionCatalog.SessionContext sessionContext = SessionCatalog.SessionContext.createEmpty();
+ return sessionCatalog.asCatalog(sessionContext);
+ }
+
+ private RESTSessionCatalog getInitializedCatalog(final ConfigurationContext context) {
+ final Map properties = new HashMap<>();
+
+ // Set default implementations for Metrics
+ properties.put(CatalogProperties.METRICS_REPORTER_IMPL, LoggingMetricsReporter.class.getName());
+
+ final Map dynamicProperties = getDynamicProperties(context);
+ properties.putAll(dynamicProperties);
+
+ final String catalogUri = context.getProperty(CATALOG_URI).getValue();
+ properties.put(CatalogProperties.URI, catalogUri);
+
+ final PropertyValue warehouseLocationProperty = context.getProperty(WAREHOUSE_LOCATION);
+ if (warehouseLocationProperty.isSet()) {
+ final String warehouseLocation = warehouseLocationProperty.getValue();
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+ }
+
+ final AuthenticationStrategy authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AuthenticationStrategy.class);
+ final Function