Skip to content

Commit

Permalink
Enable spotless for enrich gradle project in 7 dot 5 branch. (#48977)
Browse files Browse the repository at this point in the history
Backport of #48908

The enrich project doesn't have much history as all the other gradle projects,
so it makes sense to enable spotless for this gradle project.

Also:
* restructure xcontent mapping code to be more readable with new code style.
* Applied codestyle after code style update.
  • Loading branch information
martijnvg authored Nov 13, 2019
1 parent 3067147 commit f6a60f7
Show file tree
Hide file tree
Showing 44 changed files with 2,077 additions and 1,220 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ subprojects {
// is greater than the number of unformatted projects, this can be
// switched to an exclude list, and eventualy removed completely.
def projectPathsToFormat = [
// ':build-tools'
':x-pack:plugin:enrich'
]

if (projectPathsToFormat.contains(project.path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,31 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
protected final String matchField;
protected final int maxMatches;

protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
protected AbstractEnrichProcessor(
String tag,
Client client,
String policyName,
String field,
String targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
int maxMatches
) {
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}

protected AbstractEnrichProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
String matchField, int maxMatches) {
protected AbstractEnrichProcessor(
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
int maxMatches
) {
super(tag);
this.policyName = policyName;
this.searchRunner = searchRunner;
Expand Down Expand Up @@ -155,13 +171,11 @@ int getMaxMatches() {

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
client.execute(
EnrichCoordinatorProxyAction.INSTANCE,
req,
ActionListener.wrap(resp -> { handler.accept(resp, null); }, e -> { handler.accept(null, e); })
);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
EnrichMetadata that = (EnrichMetadata) o;
return policies.equals(that.policies);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,57 @@

public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {

static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
Setting.intSetting("enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING = Setting.intSetting(
"enrich.fetch_size",
10000,
1,
1000000,
Setting.Property.NodeScope
);

static final Setting<Integer> ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS =
Setting.intSetting("enrich.max_concurrent_policy_executions", 50, 1, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS = Setting.intSetting(
"enrich.max_concurrent_policy_executions",
50,
1,
Setting.Property.NodeScope
);

static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD =
Setting.timeSetting("enrich.cleanup_period", new TimeValue(15, TimeUnit.MINUTES), Setting.Property.NodeScope);
static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD = Setting.timeSetting(
"enrich.cleanup_period",
new TimeValue(15, TimeUnit.MINUTES),
Setting.Property.NodeScope
);

public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS =
Setting.intSetting("enrich.coordinator_proxy.max_concurrent_requests", 8, 1, 10000, Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS = Setting.intSetting(
"enrich.coordinator_proxy.max_concurrent_requests",
8,
1,
10000,
Setting.Property.NodeScope
);

public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST =
Setting.intSetting("enrich.coordinator_proxy.max_lookups_per_request", 128, 1, 10000, Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST = Setting.intSetting(
"enrich.coordinator_proxy.max_lookups_per_request",
128,
1,
10000,
Setting.Property.NodeScope
);

static final Setting<Integer> ENRICH_MAX_FORCE_MERGE_ATTEMPTS =
Setting.intSetting("enrich.max_force_merge_attempts", 3, 1, 10, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_MAX_FORCE_MERGE_ATTEMPTS = Setting.intSetting(
"enrich.max_force_merge_attempts",
3,
1,
10,
Setting.Property.NodeScope
);

private static final String QUEUE_CAPACITY_SETTING_NAME = "enrich.coordinator_proxy.queue_capacity";
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME,
settings -> {
int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings);
int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings);
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
},
val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME),
Setting.Property.NodeScope);
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME, settings -> {
int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings);
int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings);
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
}, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);

private final Settings settings;
private final Boolean enabled;
Expand All @@ -119,7 +143,9 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory);
}

protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}

public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
Expand All @@ -138,10 +164,15 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
);
}

public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
if (enabled == false) {
return emptyList();
}
Expand All @@ -156,17 +187,29 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry
) {
if (enabled == false || transportClientMode) {
return emptyList();
}

EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client,
clusterService, threadPool, enrichPolicyLocks);
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(
settings,
client,
clusterService,
threadPool,
enrichPolicyLocks
);
enrichPolicyMaintenanceService.initialize();
return Arrays.asList(
enrichPolicyLocks,
Expand All @@ -188,8 +231,11 @@ public Collection<Module> createGuiceModules() {
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, EnrichMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, EnrichMetadata.TYPE,
in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in))
new NamedWriteableRegistry.Entry(
NamedDiff.class,
EnrichMetadata.TYPE,
in -> EnrichMetadata.readDiffFrom(MetaData.Custom.class, EnrichMetadata.TYPE, in)
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ public class EnrichPolicyExecutor {
private final int maxForceMergeAttempts;
private final Semaphore policyExecutionPermits;

public EnrichPolicyExecutor(Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier) {
public EnrichPolicyExecutor(
Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier
) {
this.clusterService = clusterService;
this.client = client;
this.taskManager = taskManager;
Expand All @@ -69,8 +71,14 @@ private void tryLockingPolicy(String policyName) {
if (policyExecutionPermits.tryAcquire() == false) {
// Release policy lock, and throw a different exception
policyLocks.releasePolicy(policyName);
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + "] would exceed " +
"maximum concurrent policy executions [" + maximumConcurrentPolicyExecutions + "]");
throw new EsRejectedExecutionException(
"Policy execution failed. Policy execution for ["
+ policyName
+ "] would exceed "
+ "maximum concurrent policy executions ["
+ maximumConcurrentPolicyExecutions
+ "]"
);
}
}

Expand All @@ -88,8 +96,12 @@ private class PolicyCompletionListener implements ActionListener<ExecuteEnrichPo
private final BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse;
private final BiConsumer<Task, Exception> onFailure;

PolicyCompletionListener(String policyName, ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
PolicyCompletionListener(
String policyName,
ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
this.policyName = policyName;
this.task = task;
this.onResponse = onResponse;
Expand Down Expand Up @@ -120,10 +132,24 @@ public void onFailure(Exception e) {
}
}

protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
return new EnrichPolicyRunner(policyName, policy, task, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
fetchSize, maxForceMergeAttempts);
protected Runnable createPolicyRunner(
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return new EnrichPolicyRunner(
policyName,
policy,
task,
listener,
clusterService,
client,
indexNameExpressionResolver,
nowSupplier,
fetchSize,
maxForceMergeAttempts
);
}

private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
Expand All @@ -143,18 +169,28 @@ public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<Ex
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener) {
public Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener
) {
return runPolicy(request, policy, listener::onResponse, listener::onFailure);
}

private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
private Task runPolicy(
ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
tryLockingPolicy(request.getName());
try {
return runPolicyTask(request, policy, onResponse, onFailure);
Expand All @@ -165,8 +201,12 @@ private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy p
}
}

private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
private Task runPolicyTask(
final ExecuteEnrichPolicyAction.Request request,
EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse,
BiConsumer<Task, Exception> onFailure
) {
Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ public void lockPolicy(String policyName) {
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
boolean acquired = runLock.tryAcquire();
if (acquired == false) {
throw new EsRejectedExecutionException("Could not obtain lock because policy execution for [" + policyName +
"] is already in progress.");
throw new EsRejectedExecutionException(
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
);
}
policyRunCounter.incrementAndGet();
} finally {
Expand Down Expand Up @@ -105,8 +106,7 @@ public EnrichPolicyExecutionState captureExecutionState() {
*/
boolean isSameState(EnrichPolicyExecutionState previousState) {
EnrichPolicyExecutionState currentState = captureExecutionState();
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight &&
currentState.executions == previousState.executions;
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight && currentState.executions == previousState.executions;
}

/**
Expand Down
Loading

0 comments on commit f6a60f7

Please sign in to comment.