Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2170] Define Gobblin-on-Temporal dynamic ScalingDirectives with parser and FsScalingDirectiveSource #4068

Merged
merged 11 commits into from
Nov 19, 2024
Prev Previous commit
misc javadoc updates
phet committed Nov 8, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 6997cae9d0fa21c3bf15a73db813ac5aacb8d5af
Original file line number Diff line number Diff line change
@@ -38,8 +38,8 @@
* A {@link ScalingDirectiveSource} that reads {@link ScalingDirective}s from a {@link FileSystem} directory, where each directive is the name
* of a single file inside the directory. Directives too long for one filename path component MUST use the
* {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content.
* Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive files is a
* NameNode-only operation, while their metadata-only nature additionally conserves NN object count/quota.
* Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive filenames is a
* NameNode-only operation, with their metadata-only nature conserving NN object count/quota.
*/
@Slf4j
public class FsScalingDirectiveSource implements ScalingDirectiveSource {
@@ -61,18 +61,20 @@ public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath,
* Ignore invalid directives, and, when `optErrorsDirPath` was provided to the ctor, acknowledge each by moving it to a separate "errors" directory.
* Regardless, always swallow {@link ScalingDirectiveParser.InvalidSyntaxException}.
*
* Like un-parseable directives, so too are out-of-order directives invalid. This prevents late/out-of-order insertion and/or edits to the directives
* Like un-parseable directives, also invalid are out-of-order directives. This blocks late/out-of-order insertion and/or edits to the directives
* stream. Each directive contains its own {@link ScalingDirective#getTimestampEpochMillis()} stated in its filename. Later-modtime directives are
* rejected when directive-timestamp-order does not match {@link FileStatus} modtime order. In the case of a modtime tie, the directive with the
* alphabetically-later filename is rejected.
*
* NOTE: This returns ALL known directives, even those already returned by a prior invocation.
* ATTENTION: This returns ALL known directives, even those already returned by a prior invocation. When the underlying directory is unchanged
* before the next invocation, the result will be equal elements in the same order.
*
* @throws IOException when unable to read the directory (or file data, in the case of an overlay definition placeholder)
*/
@Override
public List<ScalingDirective> getScalingDirectives() throws IOException {
List<Map.Entry<ScalingDirective, FileStatus>> directiveWithFileStatus = new ArrayList<>();
// TODO: add caching by dir modtime to avoid re-listing the same, unchanged contents, while also avoiding repetitive parsing
// to begin, just parse w/o worrying about ordering... that comes next
for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) {
if (!fileStatus.isFile()) {
Original file line number Diff line number Diff line change
@@ -35,27 +35,31 @@
* Parser for {@link ScalingDirective} syntax of the form:
* TIMESTAMP '.' PROFILE_NAME '=' SET_POINT [ ( ',' | ';' ) PROFILE_NAME ( '+(' KV_PAIR (<SEP> KV_PAIR)* ')' | '-( KEY (<SEP> KEY)* ')' ) ]
* where:
* only ( TIMESTAMP '.' PROFILE_NAME '=' SET_POINT ) are non-optional
* only ( TIMESTAMP '.' PROFILE_NAME '=' SET_POINT ) are non-optional. An optional trailing definition for that profile may name the
* "basis" profile to derive from through an "adding" or "removing" overlay.
*
* <SEP> is either ',' or ';' (whichever did follow SET_POINT)
* <SEP> is either ',' or ';' (whichever did follow SET_POINT); choose which to minimize escaping (a KV_PAIR's VALUE, by URL-encoding).
*
* TIMESTAMP is millis-since-epoch
* TIMESTAMP is millis-since-epoch.
*
* PROFILE_NAME is a simple [a-zA-Z0-9_]+ identifier familiar from many programming languages. The special name "baseline()" is reserved
* for the baseline profile, which may alternatively be spelled as the empty identifier ("").
*
* SET_POINT must be a non-negative integer ('0' indicates no instances desired).
*
* The first form introduced by '+' is an "adding" (upsert) overlay; the second form with '-' is a "removing" overlay.
* When an overlay is present, the form introduced by '+' is an "adding" (upsert) overlay and the form prefixed by '-' is a "removing" overlay.
* @see ProfileOverlay for {@link ProfileOverlay.Adding} and {@link ProfileOverlay.Removing} semantics.
*
* KV_PAIR (for "adding") is an '='-delimited (KEY '=' VALUE), where VALUE may use URL-encoding to escape characters.
*
* KEY (for "removing" and in the "adding" KV_PAIR) is a '.'-separated sequence of alphanumeric identifier segments, as in a {@link java.util.Properties}
* KEY (for "removing"; also in the "adding" KV_PAIR) is a '.'-separated sequence of alphanumeric identifier segments, as in a {@link java.util.Properties}
* or {@link com.typesafe.config.Config} name.
*
* Whitespace may appear around any tokens, though not within a KEY or a VALUE. Comments are unsupported.
* Whitespace may appear around any tokens, though not within a KEY or a VALUE.
*
* As an alternative to inlining a lengthy adding or removing overlay definition, {@link #OVERLAY_DEFINITION_PLACEHOLDER} may stand in to indicate that
* Comments are unsupported.
*
* As an alternative to inlining a lengthy "adding" or "removing" overlay definition, {@link #OVERLAY_DEFINITION_PLACEHOLDER} may stand in to indicate that
* the definition itself will be supplied separately. Supply it and {@link OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)}, upon
* the same UNCHECKED exception (originally thrown by {@link #parse(String)}).
*
@@ -82,8 +86,9 @@
* 1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)
* 1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen)
*
* - similar derivation, but demonstrating URL-encoding (to preserve ',' and literal space in the value):
* - similar derivation, but demonstrating URL-encoding, to preserve ',' and/or literal space in the value (equiv. forms):
* 1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again)
* 1728460832.new_profile=16;bar+(a.b.c=7;l.m=sixteen,%20again)
*
* - define a new profile, `other_profile`, with a set point of 9 by deriving via "removing" overlay from the existing profile, `my_profile` (equiv. forms):
* 1728436436.other_profile=9,my_profile-(x,y.z)
@@ -150,7 +155,7 @@ public ScalingDirective retryParsingWithDefinition(String overlayDefinition) thr
}
}

/** encapsulate the intricacies of splicing `overlayDefinition` into `directiveWithPlaceholder`, after attending to the necessary URL-encoding */
/** encapsulate the intricacies of splicing `overlayDefinition` into `directiveWithPlaceholder`, while performing the necessary URL-encoding */
@VisibleForTesting
protected static String definePlaceholder(String directiveWithPlaceholder, String overlaySep, boolean isAdding, String overlayDefinition) {
// use care to selectively `urlEncode` parts (but NOT the entire string), to avoid disrupting syntactic chars, like [,;=]
@@ -190,7 +195,7 @@ protected static String definePlaceholder(String directiveWithPlaceholder, Strin
/**
* Parse `directive` into a {@link ScalingDirective} or throw {@link InvalidSyntaxException}
*
* When an overlay definition was not inlined because {@link #OVERLAY_DEFINITION_PLACEHOLDER} was used instead, throw the UNCHECKED exception
* When an overlay definition is not inlined and {@link #OVERLAY_DEFINITION_PLACEHOLDER} is used instead, throw the UNCHECKED exception
* {@link OverlayPlaceholderNeedsDefinition}, which facilitates a subsequent attempt to supply the overlay definition and
* {@link OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)} (a form of the Proxy pattern).
*/
@@ -250,7 +255,7 @@ public ScalingDirective parse(String directive) throws InvalidSyntaxException {
*
* NOTE: regardless of its length or content, the result inlines the entire overlay def, with {@link #OVERLAY_DEFINITION_PLACEHOLDER} NEVER used
*
* @see #parse(String), the inverse operation (approximately - modulo {@link #OVERLAY_DEFINITION_PLACEHOLDER}, noted above)
* @see #parse(String), the (approximate) inverse operation (modulo {@link #OVERLAY_DEFINITION_PLACEHOLDER}, noted above)
*/
public static String asString(ScalingDirective directive) {
StringBuilder sb = new StringBuilder();
Original file line number Diff line number Diff line change
@@ -114,10 +114,11 @@ public synchronized void revise(ScalingDirective directive) throws IllegalRevisi
}
}
// TODO - make idempotent, since any retry attempt following failure between `addProfile` and `reviseStaffing` would thereafter fail with
// `IllegalRevisionException.Redefinition`, despite us wishing to adjust the set point for that new profile just defined...
// how to ensure the profile def is the same / unchanged? (e.g. compare full profile `Config` equality)?
// `IllegalRevisionException.Redefinition`, despite us wishing to adjust the set point for that new profile defined just before the failure.
// - how to ensure the profile def is the same / unchanged? (e.g. compare full profile `Config` equality)?
// NOTE: the current outcome would be a profile defined in `WorkforceProfiles` with no set point in `WorkforceStaffing`. fortunately,
// that would NOT lead to `calcStaffingDeltas` throwing {@link WorkforceProfiles.UnknownProfileException}!
// that would NOT lead to `calcStaffingDeltas` throwing {@link WorkforceProfiles.UnknownProfileException}! The out-of-band (manual)
// workaround/repair would be revision by another, later directive that provides the set point for that profile (WITHOUT providing the definition)

this.staffing.reviseStaffing(name, directive.getSetPoint(), directive.getTimestampEpochMillis());
this.lastRevisionEpochMillis = directive.getTimestampEpochMillis();
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
@ThreadSafe
public class WorkforceProfiles implements Function<String, Optional<WorkerProfile>> {

/** Indicates `profileName` NOT found */
/** Indicates {@link #getProfileName()} NOT found */
public static class UnknownProfileException extends RuntimeException {
@Getter private final String profileName;

Original file line number Diff line number Diff line change
@@ -31,9 +31,14 @@


/**
* Collection to map {@link WorkerProfile} names, each to a given set point. It might be "managed" by a {@link WorkforcePlan}, to reflect
* desired staffing, or else "unmanaged", where it might represent the current, actual per-worker scaling level. Those two might then be compared via
* {@link WorkforcePlan#calcStaffingDeltas(WorkforceStaffing)} to calculate {@link StaffingDeltas} against the "managed" workforce plan.
* Collection to map {@link WorkerProfile} names, each to a given set point.
*
* An instance might be "managed" by a {@link WorkforcePlan}, to reflect desired staffing, or else "unmanaged", where it might represent the
* current, actual per-worker scaling level. Those two could be compared via {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}, to
* calculate the {@link StaffingDeltas} between the two (i.e. between the staffing for the "managed" workforce plan of record vs. the independently
* maintained, "unmanaged" staffing levels).
*
* TIP: for encapsulation simplicity, invoke the "managed" form through {@link WorkforcePlan#calcStaffingDeltas(WorkforceStaffing)}
*/
@ThreadSafe
public class WorkforceStaffing {
@@ -42,8 +47,8 @@ public class WorkforceStaffing {
public static final long UNKNOWN_PROVENANCE_EPOCH_MILLIS = -1L;

/**
* internal rep. for a set point, with associated provenance timestamp, that will be returned by {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)},
* to inform debugging
* internal rep. for a set point, with associated provenance timestamp, to inform debugging, when returned by
* {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}
*/
@Data
private static class SetPoint {