Skip to content

Commit 7992d06

Browse files
authored
NIFI-15019 Standardized property names in GCP, Geohash, Graph, and Groovy bundles (#10397)
Signed-off-by: David Handermann <[email protected]>
1 parent de33614 commit 7992d06

File tree

32 files changed

+362
-281
lines changed

32 files changed

+362
-281
lines changed

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,16 @@ public abstract class AbstractGCPProcessor<
5252
private static final String OBSOLETE_PROXY_USERNAME = "gcp-proxy-user-name";
5353
private static final String OBSOLETE_PROXY_PASSWORD = "gcp-proxy-user-password";
5454

55-
public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor
56-
.Builder().name("gcp-project-id")
57-
.displayName("Project ID")
55+
public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor.Builder()
56+
.name("Project ID")
5857
.description("Google Cloud Project ID")
5958
.required(false)
6059
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
6160
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
6261
.build();
6362

64-
public static final PropertyDescriptor RETRY_COUNT = new PropertyDescriptor
65-
.Builder().name("gcp-retry-count")
66-
.displayName("Number of retries")
63+
public static final PropertyDescriptor RETRY_COUNT = new PropertyDescriptor.Builder()
64+
.name("Number of Retries")
6765
.description("How many retry attempts should be made before routing to the failure relationship.")
6866
.defaultValue("6")
6967
.required(true)
@@ -89,6 +87,8 @@ protected CloudService getCloudService() {
8987
@Override
9088
public void migrateProperties(final PropertyConfiguration config) {
9189
ProxyServiceMigration.migrateProxyProperties(config, PROXY_CONFIGURATION_SERVICE, OBSOLETE_PROXY_HOST, OBSOLETE_PROXY_PORT, OBSOLETE_PROXY_USERNAME, OBSOLETE_PROXY_PASSWORD);
90+
config.renameProperty("gcp-project-id", PROJECT_ID.getName());
91+
config.renameProperty("gcp-retry-count", RETRY_COUNT.getName());
9292
}
9393

9494
/**

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.nifi.components.ValidationResult;
3030
import org.apache.nifi.expression.ExpressionLanguageScope;
3131
import org.apache.nifi.logging.ComponentLog;
32+
import org.apache.nifi.migration.PropertyConfiguration;
3233
import org.apache.nifi.processor.ProcessContext;
3334
import org.apache.nifi.processor.Relationship;
3435
import org.apache.nifi.processor.VerifiableProcessor;
@@ -52,9 +53,9 @@
5253
*/
5354
public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> implements VerifiableProcessor {
5455

55-
static final int BUFFER_SIZE = 65536;
56-
5756
private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("bigquery.tables.updateData");
57+
private static final String DATASET_ATTR = "Dataset";
58+
private static final String TABLE_NAME_ATTR = "Table Name";
5859

5960
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
6061
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
@@ -69,40 +70,35 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
6970
);
7071

7172
public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder()
72-
.name(BigQueryAttributes.DATASET_ATTR)
73-
.displayName("Dataset")
74-
.description(BigQueryAttributes.DATASET_DESC)
73+
.name(DATASET_ATTR)
74+
.description("BigQuery dataset name (Note - The dataset must exist in GCP)")
7575
.required(true)
76-
.defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
76+
.defaultValue("${" + DATASET_ATTR + "}")
7777
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
7878
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
7979
.build();
8080

8181
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
82-
.name(BigQueryAttributes.TABLE_NAME_ATTR)
83-
.displayName("Table Name")
84-
.description(BigQueryAttributes.TABLE_NAME_DESC)
82+
.name(TABLE_NAME_ATTR)
83+
.description("BigQuery table name")
8584
.required(true)
86-
.defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
85+
.defaultValue("${" + TABLE_NAME_ATTR + "}")
8786
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
8887
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
8988
.build();
9089

91-
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
92-
.name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
93-
.displayName("Ignore Unknown Values")
94-
.description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
95-
.required(true)
96-
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
97-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
98-
.defaultValue("false")
99-
.build();
100-
10190
@Override
10291
public Set<Relationship> getRelationships() {
10392
return RELATIONSHIPS;
10493
}
10594

95+
@Override
96+
public void migrateProperties(PropertyConfiguration config) {
97+
super.migrateProperties(config);
98+
config.renameProperty("bq.dataset", DATASET.getName());
99+
config.renameProperty("bq.table.name", TABLE_NAME.getName());
100+
}
101+
106102
@Override
107103
protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
108104
return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_BIGQUERY_SCOPE);

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java

Lines changed: 0 additions & 50 deletions
This file was deleted.

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.nifi.components.PropertyDescriptor;
6060
import org.apache.nifi.expression.ExpressionLanguageScope;
6161
import org.apache.nifi.flowfile.FlowFile;
62+
import org.apache.nifi.migration.PropertyConfiguration;
6263
import org.apache.nifi.processor.ProcessContext;
6364
import org.apache.nifi.processor.ProcessSession;
6465
import org.apache.nifi.processor.exception.ProcessException;
@@ -96,19 +97,19 @@
9697
"are skipped. Exactly once delivery semantics are achieved via stream offsets.")
9798
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
9899
@WritesAttributes({
99-
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
100+
@WritesAttribute(attribute = PutBigQuery.JOB_NB_RECORDS_ATTR, description = "Number of records successfully inserted")
100101
})
101102
public class PutBigQuery extends AbstractBigQueryProcessor {
102103

103104
static final String STREAM = "STREAM";
104105
static final String BATCH = "BATCH";
105106
static final AllowableValue STREAM_TYPE = new AllowableValue(STREAM, STREAM, "Use streaming record handling strategy");
106107
static final AllowableValue BATCH_TYPE = new AllowableValue(BATCH, BATCH, "Use batching record handling strategy");
108+
static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
107109

108-
private static final String APPEND_RECORD_COUNT_NAME = "bq.append.record.count";
109110
private static final String APPEND_RECORD_COUNT_DESC = "The number of records to be appended to the write stream at once. Applicable for both batch and stream types";
110-
private static final String TRANSFER_TYPE_NAME = "bq.transfer.type";
111111
private static final String TRANSFER_TYPE_DESC = "Defines the preferred transfer type streaming or batching";
112+
private static final String SKIP_INVALID_ROWS_ATTR = "Skip Invalid Rows";
112113

113114
private static final List<Status.Code> RETRYABLE_ERROR_CODES = Arrays.asList(Status.Code.INTERNAL, Status.Code.ABORTED, Status.Code.CANCELLED);
114115

@@ -128,8 +129,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
128129
.build();
129130

130131
public static final PropertyDescriptor BIGQUERY_API_ENDPOINT = new PropertyDescriptor.Builder()
131-
.name("bigquery-api-endpoint")
132-
.displayName("BigQuery API Endpoint")
132+
.name("BigQuery API Endpoint")
133133
.description("Can be used to override the default BigQuery endpoint. Default is "
134134
+ BigQueryWriteStubSettings.getDefaultEndpoint() + ". "
135135
+ "Format must be hostname:port.")
@@ -140,35 +140,32 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
140140
.build();
141141

142142
static final PropertyDescriptor TRANSFER_TYPE = new PropertyDescriptor.Builder()
143-
.name(TRANSFER_TYPE_NAME)
144-
.displayName("Transfer Type")
143+
.name("Transfer Type")
145144
.description(TRANSFER_TYPE_DESC)
146145
.required(true)
147146
.defaultValue(STREAM_TYPE.getValue())
148147
.allowableValues(STREAM_TYPE, BATCH_TYPE)
149148
.build();
150149

151150
static final PropertyDescriptor APPEND_RECORD_COUNT = new PropertyDescriptor.Builder()
152-
.name(APPEND_RECORD_COUNT_NAME)
153-
.displayName("Append Record Count")
151+
.name("Append Record Count")
154152
.description(APPEND_RECORD_COUNT_DESC)
155153
.required(true)
156154
.defaultValue("20")
157155
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
158156
.build();
159157

160158
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
161-
.name(BigQueryAttributes.RECORD_READER_ATTR)
162-
.displayName("Record Reader")
163-
.description(BigQueryAttributes.RECORD_READER_DESC)
159+
.name("Record Reader")
160+
.description("Specifies the Controller Service to use for parsing incoming data.")
164161
.identifiesControllerService(RecordReaderFactory.class)
165162
.required(true)
166163
.build();
167164

168165
public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
169-
.name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
170-
.displayName("Skip Invalid Rows")
171-
.description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
166+
.name(SKIP_INVALID_ROWS_ATTR)
167+
.description("Sets whether to insert all valid rows of a request, even if invalid "
168+
+ "rows exist. If not set the entire insert request will fail if it contains an invalid row.")
172169
.required(true)
173170
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
174171
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -246,14 +243,24 @@ public void onTrigger(ProcessContext context, ProcessSession session) {
246243
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
247244
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows, tableSchema);
248245
}
249-
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
246+
flowFile = session.putAttribute(flowFile, JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
250247
} catch (Exception e) {
251248
error.set(e);
252249
} finally {
253250
finishProcessing(session, flowFile, streamWriter, writeStream.getName(), tableName.toString());
254251
}
255252
}
256253

254+
@Override
255+
public void migrateProperties(PropertyConfiguration config) {
256+
super.migrateProperties(config);
257+
config.renameProperty("bigquery-api-endpoint", BIGQUERY_API_ENDPOINT.getName());
258+
config.renameProperty("bq.transfer.type", TRANSFER_TYPE.getName());
259+
config.renameProperty("bq.append.record.count", APPEND_RECORD_COUNT.getName());
260+
config.renameProperty("bq.record.reader", RECORD_READER.getName());
261+
config.renameProperty("bq.skip.invalid.rows", SKIP_INVALID_ROWS.getName());
262+
}
263+
257264
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) throws Exception {
258265
Record currentRecord;
259266
int offset = 0;
@@ -321,7 +328,7 @@ private void finishProcessing(ProcessSession session, FlowFile flowFile, StreamW
321328
// Verify that no error occurred in the stream.
322329
if (error.get() != null) {
323330
getLogger().error("Stream processing failed", error.get());
324-
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount));
331+
flowFile = session.putAttribute(flowFile, JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount));
325332
session.penalize(flowFile);
326333
session.transfer(flowFile, REL_FAILURE);
327334
error.set(null); // set error to null for next execution

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/CredentialPropertyDescriptors.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ private CredentialPropertyDescriptors() { }
3838
* </a>
3939
*/
4040
public static final PropertyDescriptor USE_APPLICATION_DEFAULT_CREDENTIALS = new PropertyDescriptor.Builder()
41-
.name("application-default-credentials")
42-
.displayName("Use Application Default Credentials")
41+
.name("Use Application Default Credentials")
4342
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
4443
.required(false)
4544
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
@@ -53,8 +52,7 @@ private CredentialPropertyDescriptors() { }
5352
.build();
5453

5554
public static final PropertyDescriptor USE_COMPUTE_ENGINE_CREDENTIALS = new PropertyDescriptor.Builder()
56-
.name("compute-engine-credentials")
57-
.displayName("Use Compute Engine Credentials")
55+
.name("Use Compute Engine Credentials")
5856
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
5957
.required(false)
6058
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
@@ -73,17 +71,15 @@ private CredentialPropertyDescriptors() { }
7371
* </a>
7472
*/
7573
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = new PropertyDescriptor.Builder()
76-
.name("service-account-json-file")
77-
.displayName("Service Account JSON File")
74+
.name("Service Account JSON File")
7875
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
7976
.required(false)
8077
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
8178
.description("Path to a file containing a Service Account key file in JSON format.")
8279
.build();
8380

8481
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON = new PropertyDescriptor.Builder()
85-
.name("service-account-json")
86-
.displayName("Service Account JSON")
82+
.name("Service Account JSON")
8783
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
8884
.required(false)
8985
.addValidator(JsonValidator.INSTANCE)

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.nifi.controller.VerifiableControllerService;
3535
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
3636
import org.apache.nifi.logging.ComponentLog;
37+
import org.apache.nifi.migration.PropertyConfiguration;
3738
import org.apache.nifi.processor.exception.ProcessException;
3839
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
3940
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
@@ -133,6 +134,14 @@ public void onConfigured(final ConfigurationContext context) throws Initializati
133134
}
134135
}
135136

137+
@Override
138+
public void migrateProperties(PropertyConfiguration config) {
139+
config.renameProperty("application-default-credentials", USE_APPLICATION_DEFAULT_CREDENTIALS.getName());
140+
config.renameProperty("compute-engine-credentials", USE_COMPUTE_ENGINE_CREDENTIALS.getName());
141+
config.renameProperty("service-account-json-file", SERVICE_ACCOUNT_JSON_FILE.getName());
142+
config.renameProperty("service-account-json", SERVICE_ACCOUNT_JSON.getName());
143+
}
144+
136145
private GoogleCredentials getGoogleCredentials(final ConfigurationContext context) throws IOException {
137146
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
138147
final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.nifi.expression.ExpressionLanguageScope;
5050
import org.apache.nifi.flowfile.FlowFile;
5151
import org.apache.nifi.flowfile.attributes.CoreAttributes;
52+
import org.apache.nifi.migration.PropertyConfiguration;
5253
import org.apache.nifi.processor.AbstractProcessor;
5354
import org.apache.nifi.processor.ProcessContext;
5455
import org.apache.nifi.processor.ProcessSession;
@@ -207,8 +208,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
207208

208209

209210
public static final PropertyDescriptor FILE_ID = new PropertyDescriptor.Builder()
210-
.name("drive-file-id")
211-
.displayName("File ID")
211+
.name("File ID")
212212
.description("The Drive ID of the File to fetch. Please see Additional Details for information on how to obtain the Drive ID.")
213213
.required(true)
214214
.defaultValue("${drive.id}")
@@ -363,6 +363,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
363363
}
364364
}
365365

366+
@Override
367+
public void migrateProperties(PropertyConfiguration config) {
368+
config.renameProperty("drive-file-id", FILE_ID.getName());
369+
config.renameProperty(OLD_CONNECT_TIMEOUT_PROPERTY_NAME, CONNECT_TIMEOUT.getName());
370+
config.renameProperty(OLD_READ_TIMEOUT_PROPERTY_NAME, READ_TIMEOUT.getName());
371+
config.renameProperty(GoogleUtils.OLD_GCP_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME, GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE.getName());
372+
}
373+
366374
private String getExportType(final String mimeType, final ProcessContext context) {
367375
if (mimeType == null) {
368376
return null;

0 commit comments

Comments
 (0)