Skip to content

Commit

Permalink
Add (most) javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
phet committed Nov 6, 2024
1 parent b27cb67 commit 617f776
Show file tree
Hide file tree
Showing 16 changed files with 336 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.gobblin.temporal.dynamic;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand All @@ -26,6 +25,7 @@
import java.util.Map;
import java.util.Optional;

import com.google.common.base.Charsets;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.io.IOUtils;
Expand All @@ -34,24 +34,46 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

/**
* A {@link ScalingDirectiveSource} that reads {@link ScalingDirective}s from a {@link FileSystem} directory, where each directive is the name
* of a single file inside the directory. Directives too long for one filename path component MUST use the
* {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content.
* Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive files is a
* NameNode-only operation, while their metadata-only nature additionally conserves NN object count/quota.
*/
@Slf4j
public class FsScalingDirectiveSource implements ScalingDirectiveSource {
private final FileSystem fileSystem;
private final Path dirPath;
private final Optional<Path> optErrorsPath;
private final ScalingDirectiveParser parser = new ScalingDirectiveParser();

public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional<String> optErrorDirPath) {
/** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */
public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional<String> optErrorsDirPath) {
this.fileSystem = fileSystem;
this.dirPath = new Path(directivesDirPath);
this.optErrorsPath = optErrorDirPath.map(Path::new);
this.optErrorsPath = optErrorsDirPath.map(Path::new);
}

// TODO: describe purpose of constraint (to preclude late insertion/edits of the directives stream) -
// verify and only return directives whose stated (in filename) timestamp order matches `FileStatus` modtime order
/**
* @return all valid (parseable, in-order) scaling directives currently in the directory, ordered by ascending modtime
*
* Ignore invalid directives, and, when `optErrorsDirPath` was provided to the ctor, acknowledge each by moving it to a separate "errors" directory.
* Regardless, always swallow {@link ScalingDirectiveParser.InvalidSyntaxException}.
*
* Like un-parseable directives, so too are out-of-order directives invalid. This prevents late/out-of-order insertion and/or edits to the directives
* stream. Each directive contains its own {@link ScalingDirective#getTimestampEpochMillis()} stated in its filename. Later-modtime directives are
* rejected when directive-timestamp-order does not match {@link FileStatus} modtime order. In the case of a modtime tie, the directive with the
* alphabetically-later filename is rejected.
*
* NOTE: This returns ALL known directives, even those already returned by a prior invocation.
*
* @throws IOException when unable to read the directory (or file data, in the case of an overlay definition placeholder)
*/
@Override
public List<ScalingDirective> getScalingDirectives() throws IOException {
List<Map.Entry<ScalingDirective, FileStatus>> directiveWithFileStatus = new ArrayList<>();
// to begin, just parse w/o worrying about ordering... that comes next
for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) {
if (!fileStatus.isFile()) {
log.warn("Ignoring non-file object: " + fileStatus);
Expand All @@ -60,7 +82,7 @@ public List<ScalingDirective> getScalingDirectives() throws IOException {
String fileName = fileStatus.getPath().getName();
try {
try {
directiveWithFileStatus.add(new ImmutablePair<>(parseScalingDirective(fileName), fileStatus));
directiveWithFileStatus.add(new ImmutablePair<>(parser.parse(fileName), fileStatus));
} catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) {
// directive used placeholder syntax to indicate the overlay definition resides inside its file... so open the file to load that def
log.info("Loading overlay definition for directive {{" + fileName + "}} from: " + fileStatus);
Expand All @@ -74,15 +96,15 @@ public List<ScalingDirective> getScalingDirectives() throws IOException {
}
}

// verify and only return directives whose ordering of stated (in filename) timestamp matches `FileStatus` modtime order
// verify ordering: only return directives whose stated timestamp ordering (of filename prefix) matches `FileStatus` modtime order
List<ScalingDirective> directives = new ArrayList<>();
// NOTE: for deterministic total-ordering, sort by path, rather than by timestamp, in case of modtime tie (given only secs granularity)
// NOTE: for deterministic total-ordering, sort by path, rather than by timestamp, in case of modtime tie (given only millisecs granularity)
directiveWithFileStatus.sort(Comparator.comparing(p -> p.getValue().getPath()));
long latestValidModTime = -1;
for (Map.Entry<ScalingDirective, FileStatus> entry : directiveWithFileStatus) {
long thisModTime = entry.getValue().getModificationTime();
if (thisModTime < latestValidModTime) { // do NOT reject equal (non-increasing) modtime, given granularity of epoch seconds
log.warn("Ignoring out-of-order scaling directive " + entry.getKey() + " since FS modTime " + thisModTime + " precedes last observed "
if (thisModTime <= latestValidModTime) { // when equal (non-increasing) modtime: reject alphabetically-later filename (path)
log.warn("Ignoring out-of-order scaling directive " + entry.getKey() + " since FS modTime " + thisModTime + " NOT later than last observed "
+ latestValidModTime + ": " + entry.getValue());
optAcknowledgeError(entry.getValue(), "out-of-order");
} else {
Expand All @@ -93,32 +115,32 @@ public List<ScalingDirective> getScalingDirectives() throws IOException {
return directives;
}

// ack error by moving the bad/non-directive to a separate errors dir
protected void optAcknowledgeError(FileStatus fileStatus, String desc) {
/** "acknowledge" the rejection of an invalid directive by moving it to a separate "errors" dir (when `optErrorsDirPath` was given to the ctor) */
protected void optAcknowledgeError(FileStatus invalidDirectiveFileStatus, String desc) {
this.optErrorsPath.ifPresent(errorsPath ->
moveToErrors(fileStatus, errorsPath, desc)
moveDirectiveToDir(invalidDirectiveFileStatus, errorsPath, desc)
);
}

// move broken/ignored directives into a separate directory, as an observability-enhancing ack of its rejection
protected void moveToErrors(FileStatus badDirectiveStatus, Path errorsPath, String desc) {
Path badDirectivePath = badDirectiveStatus.getPath();
/**
* move `invalidDirectiveFileStatus` to a designated `destDirPath`, with the reason for moving (e.g. the error) described in `desc`.
* This is used to promote observability by acknowledging invalid, rejected directives
*/
protected void moveDirectiveToDir(FileStatus invalidDirectiveFileStatus, Path destDirPath, String desc) {
Path invalidDirectivePath = invalidDirectiveFileStatus.getPath();
try {
if (!this.fileSystem.rename(badDirectivePath, new Path(errorsPath, badDirectivePath.getName()))) {
if (!this.fileSystem.rename(invalidDirectivePath, new Path(destDirPath, invalidDirectivePath.getName()))) {
throw new RuntimeException(); // unclear how to obtain more info about such a failure
}
} catch (IOException e) {
log.warn("Failed to move " + desc + " directive {{" + badDirectiveStatus.getPath() + "}} to '" + errorsPath + "'... leaving in place", e);
log.warn("Failed to move " + desc + " directive {{" + invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath + "'... leaving in place", e);
} catch (RuntimeException e) {
log.warn("Failed to move " + desc + " directive {{" + badDirectiveStatus.getPath() + "}} to '" + errorsPath + "' [unknown reason]... leaving in place");
log.warn("Failed to move " + desc + " directive {{" + invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath
+ "' [unknown reason]... leaving in place", e);
}
}

private ScalingDirective parseScalingDirective(String fileName)
throws ScalingDirectiveParser.InvalidSyntaxException, ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition {
return parser.parse(fileName);
}

/** @return all contents of `path` as a single (UTF-8) `String` */
protected String slurpFileAsString(Path path) throws IOException {
try (InputStream is = this.fileSystem.open(path)) {
return IOUtils.toString(is, Charsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import lombok.Getter;


/**
* Defines a new {@link WorkerProfile} by evolving from another profile, the basis. Such evolution creates a new immutable profile through
* {@link ProfileOverlay}, which either adds or removes properties from the basis profile's definition. That basis profile must already exist.
*/
@Data
public class ProfileDerivation {

/** Flags when the basis profile was NOT found */
public static class UnknownBasisException extends Exception {
@Getter
private final String name;
Expand All @@ -39,6 +45,7 @@ public UnknownBasisException(String basisName) {
private final String basisProfileName;
private final ProfileOverlay overlay;

/** @return a new profile definition through evolution from the basis profile, which is to be obtained via `basisResolver` */
public Config formulateConfig(Function<String, Optional<WorkerProfile>> basisResolver) throws UnknownBasisException {
Optional<WorkerProfile> optProfile = basisResolver.apply(basisProfileName);
if (!optProfile.isPresent()) {
Expand All @@ -48,6 +55,7 @@ public Config formulateConfig(Function<String, Optional<WorkerProfile>> basisRes
}
}

/** @return the canonical display name of {@link #getBasisProfileName()} for tracing/debugging */
public String renderName() {
return WorkforceProfiles.renderName(this.basisProfileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.dynamic;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -27,27 +28,37 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;
import lombok.RequiredArgsConstructor;


/** Alt. forms of profile overlay to evolve one profile {@link Config} into another. Two overlays may be combined hierarchically into a new overlay. */
public interface ProfileOverlay {

/** @return a new, evolved {@link Config}, by application of this overlay */
Config applyOverlay(Config config);

/** @return a new overlay, by combining this overlay *over* another */
ProfileOverlay over(ProfileOverlay other);


/** A key-value pair/duple */
@Data
class KVPair {
private final String key;
private final String value;
}


/** An overlay to evolve any profile by adding key-value pairs */
@Data
// TODO: variadic ctor/factory
@RequiredArgsConstructor // explicit, due to second, variadic ctor
class Adding implements ProfileOverlay {
private final List<KVPair> additionPairs;

public Adding(KVPair... kvPairs) {
this(Arrays.asList(kvPairs));
}

@Override
public Config applyOverlay(Config config) {
return additionPairs.stream().sequential().reduce(config,
Expand All @@ -70,18 +81,23 @@ public ProfileOverlay over(ProfileOverlay other) {
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize((Adding) this.over(otherCombo.getAdding()), otherCombo.getRemoving());
} else {
} else { // should NEVER happen!
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}
}


/** An overlay to evolve any profile by removing named keys */
@Data
// TODO: variadic ctor/factory
@RequiredArgsConstructor // explicit, due to second, variadic ctor
class Removing implements ProfileOverlay {
private final List<String> removalKeys;

public Removing(String... keys) {
this(Arrays.asList(keys));
}

@Override
public Config applyOverlay(Config config) {
return removalKeys.stream().sequential().reduce(config,
Expand All @@ -103,33 +119,25 @@ public ProfileOverlay over(ProfileOverlay other) {
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize(otherCombo.getAdding(), (Removing) this.over(otherCombo.getRemoving()));
} else {
} else { // should NEVER happen!
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}
}


/** An overlay to evolve any profile by adding key-value pairs while also removing named keys */
@Data
class Combo implements ProfileOverlay {
private final Adding adding;
private final Removing removing;

// merely restrict access modifier from `public` to `protected`, as not meant to be instantiated outside this enclosing interface
/** restricted-access ctor: instead use {@link Combo#normalize(Adding, Removing)} */
private Combo(Adding adding, Removing removing) {
this.adding = adding;
this.removing = removing;
}

protected static Combo normalize(Adding toAdd, Removing toRemove) {
// pre-remove any in `toAdd` that are also in `toRemove`... yet still maintain them in `toRemove`, in case the eventual `Config` "basis" also has any
Set<String> removeKeysLookup = toRemove.getRemovalKeys().stream().collect(Collectors.toSet());
List<KVPair> unmatchedAdditionPairs = toAdd.getAdditionPairs().stream().sequential().filter(additionPair ->
!removeKeysLookup.contains(additionPair.getKey())
).collect(Collectors.toList());
return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new ArrayList<>(removeKeysLookup)));
}

@Override
public Config applyOverlay(Config config) {
return adding.applyOverlay(removing.applyOverlay(config));
Expand All @@ -144,9 +152,19 @@ public ProfileOverlay over(ProfileOverlay other) {
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize((Adding) this.adding.over(otherCombo.getAdding()), (Removing) this.removing.over(otherCombo.getRemoving()));
} else {
} else { // should NEVER happen!
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}

/** @return a `Combo` overlay, by combining an `Adding` overlay with a `Removing` overlay */
protected static Combo normalize(Adding toAdd, Removing toRemove) {
// pre-remove any in `toAdd` that are also in `toRemove`... yet still maintain all in `toRemove`, in case also in the eventual `Config` "basis"
Set<String> removeKeysLookup = toRemove.getRemovalKeys().stream().collect(Collectors.toSet());
List<KVPair> unmatchedAdditionPairs = toAdd.getAdditionPairs().stream().sequential().filter(additionPair ->
!removeKeysLookup.contains(additionPair.getKey())
).collect(Collectors.toList());
return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new ArrayList<>(removeKeysLookup)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import lombok.RequiredArgsConstructor;


/**
* Core abstraction to model scaling adjustment, which originates at a given moment in time. A directive provides a set point for a given worker profile.
* The set point is the number of instances presently desired for that profile. When naming a heretofore unknown worker profile, the directive MUST also
* define that new profile through a {@link ProfileDerivation} that references a known profile. Known worker profiles MUST NOT be redefined.
*/
@Data
@RequiredArgsConstructor
public class ScalingDirective {
Expand All @@ -30,6 +35,7 @@ public class ScalingDirective {
private final long timestampEpochMillis;
private final Optional<ProfileDerivation> optDerivedFrom;

/** Create a set-point-only directive (for a known profile, with no {@link ProfileDerivation}) */
public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) {
this(profileName, setPoint, timestampEpochMillis, Optional.empty());
}
Expand All @@ -38,6 +44,7 @@ public ScalingDirective(String profileName, int setPoint, long timestampEpochMil
this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay)));
}

/** @return the canonical display name (of {@link #getProfileName()}) for tracing/debugging */
public String renderName() {
return WorkforceProfiles.renderName(this.profileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.List;


/** An opaque source of {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s */
public interface ScalingDirectiveSource extends Cloneable {
// TODO - document! (impl may choose to give only newer directives, not previously returned... or to return them all)
/** @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer directives than previously returned */
List<ScalingDirective> getScalingDirectives() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@
import lombok.Data;


/** Staffing set point {@link ProfileDelta}s for multiple {@link WorkerProfile}s */
@Data
public class StaffingDeltas {
/**
* Difference for a {@link WorkerProfile}'s staffing set point (e.g. between desired and current levels). Positive `delta` reflects increase,
* while negative, a decrease.
*/
@Data
public static class ProfileDelta {
private final WorkerProfile profile;
private final int delta;
private final long setPointProvenanceEpochMillis;

public boolean isUnchanged() {
return delta == 0;
/** @return whether {@link #getDelta()} is non-zero */
public boolean isChange() {
return delta != 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;


/** A named worker {@link Config} */
@Data
public class WorkerProfile {
private final String name;
Expand Down
Loading

0 comments on commit 617f776

Please sign in to comment.