You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageorg.apache.wayang.basic.operators;
importjava.util.Optional;
importjava.util.OptionalDouble;
importjava.util.OptionalLong;
importorg.apache.commons.lang3.Validate;
importorg.apache.logging.log4j.LogManager;
importorg.apache.logging.log4j.Logger;
importorg.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
importorg.apache.wayang.core.api.Configuration;
importorg.apache.wayang.core.optimizer.OptimizationContext;
importorg.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
importorg.apache.wayang.core.plan.wayangplan.UnarySource;
importorg.apache.wayang.core.types.DataSetType;
importorg.apache.wayang.core.util.fs.FileSystems;
/** * This source reads a text file and outputs the lines as data units. */publicclassObjectFileSource<T> extendsUnarySource<T> {
privatefinalLoggerlogger = LogManager.getLogger(this.getClass());
privatefinalStringinputUrl;
privatefinalClass<T> tClass;
publicObjectFileSource(StringinputUrl, DataSetType<T> type) {
super(type);
this.inputUrl = inputUrl;
this.tClass = type.getDataUnitType().getTypeClass();
}
publicObjectFileSource(StringinputUrl, Class<T> tClass) {
super(DataSetType.createDefault(tClass));
this.inputUrl = inputUrl;
this.tClass = tClass;
}
/** * Copies an instance (exclusive of broadcasts). * * @param that that should be copied */publicObjectFileSource(ObjectFileSourcethat) {
super(that);
this.inputUrl = that.getInputUrl();
this.tClass = that.getTypeClass();
}
publicStringgetInputUrl() {
returnthis.inputUrl;
}
publicClass<T> getTypeClass(){
returnthis.tClass;
}
@OverridepublicOptional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
finalintoutputIndex,
finalConfigurationconfiguration) {
Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
returnOptional.of(newObjectFileSource.CardinalityEstimator());
}
/** * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s. */protectedclassCardinalityEstimatorimplementsorg.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
publicfinalCardinalityEstimateFALLBACK_ESTIMATE = newCardinalityEstimate(1000L, 100000000L, 0.7);
publicstaticfinaldoubleCORRECTNESS_PROBABILITY = 0.95d;
/** * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}. */publicstaticfinaldoubleEXPECTED_ESTIMATE_DEVIATION = 0.05;
@OverridepublicCardinalityEstimateestimate(OptimizationContextoptimizationContext, CardinalityEstimate... inputEstimates) {
//TODO validate if the implementation apply for the caseValidate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);
// see Job for StopWatch measurementsfinalTimeMeasurementtimeMeasurement = optimizationContext.getJob().getStopWatch().start(
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
);
// Query the job cache first to see if there is already an estimate.StringjobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
CardinalityEstimatecardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
if (cardinalityEstimate != null) returncardinalityEstimate;
// Otherwise calculate the cardinality.// First, inspect the size of the file and its line sizes.OptionalLongfileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
if (!fileSize.isPresent()) {
ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
ObjectFileSource.this.inputUrl);
timeMeasurement.stop();
returnthis.FALLBACK_ESTIMATE;
} elseif (fileSize.getAsLong() == 0L) {
timeMeasurement.stop();
returnnewCardinalityEstimate(0L, 0L, 1d);
}
OptionalDoublebytesPerLine = this.estimateBytesPerLine();
if (!bytesPerLine.isPresent()) {
ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
ObjectFileSource.this.inputUrl);
timeMeasurement.stop();
returnthis.FALLBACK_ESTIMATE;
}
// Extrapolate a cardinality estimate for the complete file.doublenumEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
doubleexpectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
cardinalityEstimate = newCardinalityEstimate(
(long) (numEstimatedLines - expectedDeviation),
(long) (numEstimatedLines + expectedDeviation),
CORRECTNESS_PROBABILITY
);
// Cache the result, so that it will not be recalculated again.optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
timeMeasurement.stop();
returncardinalityEstimate;
}
/** * Estimate the number of bytes that are in each line of a given file. * * @return the average number of bytes per line if it could be determined */privateOptionalDoubleestimateBytesPerLine() {
//TODO validate if the implementation apply for the case// final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);// if (fileSystem.isPresent()) {//// // Construct a limited reader for the first x KiB of the file.// final int KiB = 1024;// final int MiB = 1024 * KiB;// try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(// ObjectFileSource.this.inputUrl), 1 * MiB)) {// final BufferedReader bufferedReader = new BufferedReader(// new InputStreamReader(lis, ObjectFileSource.this.encoding)// );//// // Read as much as possible.// char[] cbuf = new char[1024];// int numReadChars, numLineFeeds = 0;// while ((numReadChars = bufferedReader.read(cbuf)) != -1) {// for (int i = 0; i < numReadChars; i++) {// if (cbuf[i] == '\n') {// numLineFeeds++;// }// }// }//// if (numLineFeeds == 0) {// ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);// return OptionalDouble.empty();// }// return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);// } catch (IOException e) {// ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);// }// }returnOptionalDouble.empty();
}
}
}
aeeaf4382233888144a6aa0922cc5643e9028166
The text was updated successfully, but these errors were encountered:
validate if the implementation apply for the case
First, inspect the size of the file and its line sizes.
https://github.com/databloom-ai/incubator-wayang/blob/9cd36ffc61387bf546ee402c98051028dc0b005f/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java#L102
aeeaf4382233888144a6aa0922cc5643e9028166
The text was updated successfully, but these errors were encountered: