Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2170] Define Gobblin-on-Temporal dynamic ScalingDirectives with parser and FsScalingDirectiveSource #4068

Merged
merged 11 commits into from
Nov 19, 2024
Prev Previous commit
Next Next commit
Add (HDFS) FsScalingDirectiveSource with support for overlay placeh…
…older defs
phet committed Nov 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b27cb67403eddbe9480358cf760733ad93c48df2
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynamic;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

@Slf4j
public class FsScalingDirectiveSource implements ScalingDirectiveSource {
private final FileSystem fileSystem;
private final Path dirPath;
private final Optional<Path> optErrorsPath;
private final ScalingDirectiveParser parser = new ScalingDirectiveParser();

public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional<String> optErrorDirPath) {
this.fileSystem = fileSystem;
this.dirPath = new Path(directivesDirPath);
this.optErrorsPath = optErrorDirPath.map(Path::new);
}

// TODO: describe purpose of constraint (to preclude late insertion/edits of the directives stream) -
// verify and only return directives whose stated (in filename) timestamp order matches `FileStatus` modtime order
@Override
public List<ScalingDirective> getScalingDirectives() throws IOException {
List<Map.Entry<ScalingDirective, FileStatus>> directiveWithFileStatus = new ArrayList<>();
for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) {
if (!fileStatus.isFile()) {
log.warn("Ignoring non-file object: " + fileStatus);
optAcknowledgeError(fileStatus, "non-file (not an actual)");
} else {
String fileName = fileStatus.getPath().getName();
try {
try {
directiveWithFileStatus.add(new ImmutablePair<>(parseScalingDirective(fileName), fileStatus));
} catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) {
// directive used placeholder syntax to indicate the overlay definition resides inside its file... so open the file to load that def
log.info("Loading overlay definition for directive {{" + fileName + "}} from: " + fileStatus);
String overlayDef = slurpFileAsString(fileStatus.getPath());
directiveWithFileStatus.add(new ImmutablePair<>(needsDefinition.retryParsingWithDefinition(overlayDef), fileStatus));
}
} catch (ScalingDirectiveParser.InvalidSyntaxException e) {
log.warn("Ignoring unparseable scaling directive {{" + fileName + "}}: " + fileStatus + " - " + e.getClass().getName() + ": " + e.getMessage());
optAcknowledgeError(fileStatus, "unparseable");
}
}
}

// verify and only return directives whose ordering of stated (in filename) timestamp matches `FileStatus` modtime order
List<ScalingDirective> directives = new ArrayList<>();
// NOTE: for deterministic total-ordering, sort by path, rather than by timestamp, in case of modtime tie (given only secs granularity)
directiveWithFileStatus.sort(Comparator.comparing(p -> p.getValue().getPath()));
long latestValidModTime = -1;
for (Map.Entry<ScalingDirective, FileStatus> entry : directiveWithFileStatus) {
long thisModTime = entry.getValue().getModificationTime();
if (thisModTime < latestValidModTime) { // do NOT reject equal (non-increasing) modtime, given granularity of epoch seconds
log.warn("Ignoring out-of-order scaling directive " + entry.getKey() + " since FS modTime " + thisModTime + " precedes last observed "
+ latestValidModTime + ": " + entry.getValue());
optAcknowledgeError(entry.getValue(), "out-of-order");
} else {
directives.add(entry.getKey());
latestValidModTime = thisModTime;
}
}
return directives;
}

// ack error by moving the bad/non-directive to a separate errors dir
protected void optAcknowledgeError(FileStatus fileStatus, String desc) {
this.optErrorsPath.ifPresent(errorsPath ->
moveToErrors(fileStatus, errorsPath, desc)
);
}

// move broken/ignored directives into a separate directory, as an observability-enhancing ack of its rejection
protected void moveToErrors(FileStatus badDirectiveStatus, Path errorsPath, String desc) {
Path badDirectivePath = badDirectiveStatus.getPath();
try {
if (!this.fileSystem.rename(badDirectivePath, new Path(errorsPath, badDirectivePath.getName()))) {
throw new RuntimeException(); // unclear how to obtain more info about such a failure
}
} catch (IOException e) {
log.warn("Failed to move " + desc + " directive {{" + badDirectiveStatus.getPath() + "}} to '" + errorsPath + "'... leaving in place", e);
} catch (RuntimeException e) {
log.warn("Failed to move " + desc + " directive {{" + badDirectiveStatus.getPath() + "}} to '" + errorsPath + "' [unknown reason]... leaving in place");
}
}

private ScalingDirective parseScalingDirective(String fileName)
throws ScalingDirectiveParser.InvalidSyntaxException, ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition {
return parser.parse(fileName);
}

protected String slurpFileAsString(Path path) throws IOException {
try (InputStream is = this.fileSystem.open(path)) {
return IOUtils.toString(is, Charsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
@@ -19,10 +19,12 @@

import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

@@ -53,61 +55,120 @@
*/
@Slf4j
public class ScalingDirectiveParser {
public static class MalformedDirectiveException extends IllegalArgumentException {
public static class InvalidSyntaxException extends Exception {
private final String directive;
public MalformedDirectiveException(String directive, String desc) {
public InvalidSyntaxException(String directive, String desc) {
super("error: " + desc + ", in ==>" + directive + "<==");
this.directive = directive;
}
}

private static final String DIRECTIVE_REGEX = "(?x) \\s* (\\d+) \\s* \\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) "
+ "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( \\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s*";
public static class OverlayPlaceholderNeedsDefinition extends RuntimeException {
private final String directive;
private final String overlaySep;
private final boolean isAdding;
// ATTENTION: explicitly managed, rather than making this a non-static inner class so `definePlaceholder` may be `static` for testing, while avoiding:
// Static declarations in inner classes are not supported at language level '8'
private final ScalingDirectiveParser parser;

private OverlayPlaceholderNeedsDefinition(String directive, String overlaySep, boolean isAdding, ScalingDirectiveParser enclosing) {
super("overlay placeholder, in ==>" + directive + "<==");
this.directive = directive;
this.overlaySep = overlaySep;
this.isAdding = isAdding;
this.parser = enclosing;
}

// doesn't allow recursive placeholding...
public ScalingDirective retryParsingWithDefinition(String overlayDefinition) throws InvalidSyntaxException {
try {
return this.parser.parse(definePlaceholder(this.directive, this.overlaySep, this.isAdding, overlayDefinition));
} catch (OverlayPlaceholderNeedsDefinition e) {
throw new InvalidSyntaxException(this.directive, "overlay placeholder definition must not be itself another placeholder");
}
}

protected static String definePlaceholder(String directiveWithPlaceholder, String overlaySep, boolean isAdding, String overlayDefinition) {
// use care to selectively `urlEncode` parts (but NOT the entire string), to avoid disrupting syntactic chars, like [,;=]
String urlEncodedOverlayDef = Arrays.stream(overlayDefinition.split("\\s*" + overlaySep + "\\s*", -1)) // (neg. limit to disallow trailing empty strings)
.map(kvPair -> {
String[] kv = kvPair.split("\\s*=\\s*", 2);
if (isAdding && kv.length > 1) {
return kv[0] + '=' + urlEncode(kv[1]);
} else {
return kvPair;
}
}).collect(Collectors.joining(overlaySep));

// correct any double-encoding of '%', in case it arrived url-encoded
return directiveWithPlaceholder.replace(OVERLAY_DEFINITION_PLACEHOLDER, urlEncodedOverlayDef.replace("%25", "%"));
}
}


// TODO: also support non-inline overlay definitions - "(...)"
// consider an additional trailing "|" (or "," / ";") syntax when the additional props are only needed post-launch
// since we're primarily designed for HDFS file names, in addition, a max identifier length (to fit within HDFS path segment limit == 255)
// org.apache.hadoop.hdfs.protocol.FSLimitException$PathComponentTooLongException: \
// The maximum path component name limit of ... in directory ... is exceeded: limit=255 length=256
// 200 = 255 [limit] - 16 [digit timestamp] - 1 ['.'] - 1 ['='] - 1 [',' / ';'] - 6 ['+(...)' / '-(...)'] - 30 [reserved... for future]
// current millis-precision epoch timestamp requires 10 chars, but we reserve 16 for future-proofing to nanos-precision
// hence, neither (of the max two) profile identifiers may exceed 100 chars.
// TODO: syntax to indicate removing an attr during an addition
private static final String DIRECTIVE_REGEX = "(?x) (?s) ^ \\s* (\\d+) \\s* \\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) "
+ "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( \\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s* $";

public static final int MAX_PROFILE_IDENTIFIER_LENGTH = 100;
public static final String URL_ENCODING_CHARSET = "UTF-8";
public static final String OVERLAY_DEFINITION_PLACEHOLDER = "...";

private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)";
private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)";
private static final String URL_ENCODING_CHARSET = "UTF-8";
private static final Pattern directivePattern = Pattern.compile(DIRECTIVE_REGEX);
private static final Pattern keyPattern = Pattern.compile(KEY_REGEX);
private static final Pattern keyValuePattern = Pattern.compile(KEY_VALUE_REGEX);

private static final String BASELINE_ID = "baseline()";

public ScalingDirective parse(String directive) {
public ScalingDirective parse(String directive) throws InvalidSyntaxException {
Matcher parsed = directivePattern.matcher(directive);
if (parsed.matches()) {
long timestamp = Long.parseLong(parsed.group(1));
String profileId = parsed.group(2);
String profileName = identifyProfileName(profileId);
String profileName = identifyProfileName(profileId, directive);
int setpoint = Integer.parseInt(parsed.group(3));
Optional<ProfileDerivation> optDerivedFrom = Optional.empty();
String overlayIntroSep = parsed.group(4);
if (overlayIntroSep != null) {
String basisProfileName = identifyProfileName(parsed.group(5));
String basisProfileName = identifyProfileName(parsed.group(5), directive);
if (parsed.group(6) != null) { // '+' == adding
List<ProfileOverlay.KVPair> additions = new ArrayList<>();
String additionsStr = parsed.group(7);
if (!additionsStr.equals("")) {
if (additionsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
throw new OverlayPlaceholderNeedsDefinition(directive, overlayIntroSep, true, this);
} else if (!additionsStr.equals("")) {
for (String addStr : additionsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings)
Matcher keyValueParsed = keyValuePattern.matcher(addStr);
if (keyValueParsed.matches()) {
additions.add(new ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive, keyValueParsed.group(2))));
} else {
throw new MalformedDirectiveException(directive, "unable to parse key-value pair - {{" + addStr + "}}");
throw new InvalidSyntaxException(directive, "unable to parse key-value pair - {{" + addStr + "}}");
}
}
}
optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(additions)));
} else { // '-' == removing
List<String> removalKeys = new ArrayList<>();
String removalsStr = parsed.group(9);
if (!removalsStr.equals("")) {
if (removalsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
throw new OverlayPlaceholderNeedsDefinition(directive, overlayIntroSep, false, this);
} else if (!removalsStr.equals("")) {
for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings)
Matcher keyParsed = keyPattern.matcher(removeStr);
if (keyParsed.matches()) {
removalKeys.add(keyParsed.group(1));
} else {
throw new MalformedDirectiveException(directive, "unable to parse key - {{" + removeStr + "}}");
throw new InvalidSyntaxException(directive, "unable to parse key - {{" + removeStr + "}}");
}
}
}
@@ -116,7 +177,7 @@ public ScalingDirective parse(String directive) {
}
return new ScalingDirective(profileName, setpoint, timestamp, optDerivedFrom);
} else {
throw new MalformedDirectiveException(directive, "invalid syntax");
throw new InvalidSyntaxException(directive, "invalid syntax");
}
}

@@ -149,15 +210,18 @@ public static String asString(ScalingDirective directive) {
return sb.toString();
}

private static String identifyProfileName(String profileId) {
private static String identifyProfileName(String profileId, String directive) throws InvalidSyntaxException {
if (profileId.length() > MAX_PROFILE_IDENTIFIER_LENGTH) {
throw new InvalidSyntaxException(directive, "profile ID exceeds length limit (of " + MAX_PROFILE_IDENTIFIER_LENGTH + "): '" + profileId + "'");
}
return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME : profileId;
}

private static String urlDecode(String directive, String s) {
private static String urlDecode(String directive, String s) throws InvalidSyntaxException {
try {
return java.net.URLDecoder.decode(s, URL_ENCODING_CHARSET);
} catch (java.io.UnsupportedEncodingException e) {
throw new MalformedDirectiveException(directive, "unable to URL-decode - {{" + s + "}}");
throw new InvalidSyntaxException(directive, "unable to URL-decode - {{" + s + "}}");
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynamic;

import java.io.IOException;
import java.util.List;


public interface ScalingDirectiveSource extends Cloneable {
// TODO - document! (impl may choose to give only newer directives, not previously returned... or to return them all)
List<ScalingDirective> getScalingDirectives() throws IOException;
}
Loading