Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate if the implementation apply for the case #14

Open
github-actions bot opened this issue Mar 23, 2022 · 0 comments
Open

validate if the implementation apply for the case #14

github-actions bot opened this issue Mar 23, 2022 · 0 comments
Labels

Comments

@github-actions
Copy link

validate if the implementation apply for the case

final Optional fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);

if (fileSystem.isPresent()) {

final int KiB = 1024;

final int MiB = 1024 * KiB;

try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(

ObjectFileSource.this.inputUrl), 1 * MiB)) {

final BufferedReader bufferedReader = new BufferedReader(

new InputStreamReader(lis, ObjectFileSource.this.encoding)

);

char[] cbuf = new char[1024];

int numReadChars, numLineFeeds = 0;

while ((numReadChars = bufferedReader.read(cbuf)) != -1) {

for (int i = 0; i < numReadChars; i++) {

if (cbuf[i] == '\n') {

numLineFeeds++;

}

}

}

ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);

return OptionalDouble.empty();

}

return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);

} catch (IOException e) {

ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);

}

}

https://github.com/databloom-ai/incubator-wayang/blob/9cd36ffc61387bf546ee402c98051028dc0b005f/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java#L102

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.basic.operators;

import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.fs.FileSystems;

/**
 * This source reads a text file and outputs the lines as data units.
 */
public class ObjectFileSource<T> extends UnarySource<T> {

    private final Logger logger = LogManager.getLogger(this.getClass());

    private final String inputUrl;

    private final Class<T> tClass;

    public ObjectFileSource(String inputUrl, DataSetType<T> type) {
        super(type);
        this.inputUrl = inputUrl;
        this.tClass = type.getDataUnitType().getTypeClass();
    }

    public ObjectFileSource(String inputUrl, Class<T> tClass) {
        super(DataSetType.createDefault(tClass));
        this.inputUrl = inputUrl;
        this.tClass = tClass;
    }

    /**
     * Copies an instance (exclusive of broadcasts).
     *
     * @param that that should be copied
     */
    public ObjectFileSource(ObjectFileSource that) {
        super(that);
        this.inputUrl = that.getInputUrl();
        this.tClass = that.getTypeClass();
    }

    public String getInputUrl() {
        return this.inputUrl;
    }

    public Class<T> getTypeClass(){
        return this.tClass;
    }

    @Override
    public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
            final int outputIndex,
            final Configuration configuration) {
        Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
        return Optional.of(new ObjectFileSource.CardinalityEstimator());
    }


    /**
     * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
     */
    protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {

        public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);

        public static final double CORRECTNESS_PROBABILITY = 0.95d;

        /**
         * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
         */
        public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;

        @Override
        public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
            //TODO validate if the implementation apply for the case
            Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);

            // see Job for StopWatch measurements
            final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
                    "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
            );

            // Query the job cache first to see if there is already an estimate.
            String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
            CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
            if (cardinalityEstimate != null) return  cardinalityEstimate;

            // Otherwise calculate the cardinality.
            // First, inspect the size of the file and its line sizes.
            OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
            if (!fileSize.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;

            } else if (fileSize.getAsLong() == 0L) {
                timeMeasurement.stop();
                return new CardinalityEstimate(0L, 0L, 1d);
            }

            OptionalDouble bytesPerLine = this.estimateBytesPerLine();
            if (!bytesPerLine.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;
            }

            // Extrapolate a cardinality estimate for the complete file.
            double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
            double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
            cardinalityEstimate = new CardinalityEstimate(
                    (long) (numEstimatedLines - expectedDeviation),
                    (long) (numEstimatedLines + expectedDeviation),
                    CORRECTNESS_PROBABILITY
            );

            // Cache the result, so that it will not be recalculated again.
            optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);

            timeMeasurement.stop();
            return cardinalityEstimate;
        }

        /**
         * Estimate the number of bytes that are in each line of a given file.
         *
         * @return the average number of bytes per line if it could be determined
         */
        private OptionalDouble estimateBytesPerLine() {
            //TODO validate if the implementation apply for the case
//            final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);
//            if (fileSystem.isPresent()) {
//
//                // Construct a limited reader for the first x KiB of the file.
//                final int KiB = 1024;
//                final int MiB = 1024 * KiB;
//                try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(
//                    ObjectFileSource.this.inputUrl), 1 * MiB)) {
//                    final BufferedReader bufferedReader = new BufferedReader(
//                            new InputStreamReader(lis, ObjectFileSource.this.encoding)
//                    );
//
//                    // Read as much as possible.
//                    char[] cbuf = new char[1024];
//                    int numReadChars, numLineFeeds = 0;
//                    while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
//                        for (int i = 0; i < numReadChars; i++) {
//                            if (cbuf[i] == '\n') {
//                                numLineFeeds++;
//                            }
//                        }
//                    }
//
//                    if (numLineFeeds == 0) {
//                        ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);
//                        return OptionalDouble.empty();
//                    }
//                    return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
//                } catch (IOException e) {
//                    ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
//                }
//            }

            return OptionalDouble.empty();
        }
    }

}

3207bb63994dfa13a7081193b1cb9541ad2d28de

@github-actions github-actions bot added the todo label Mar 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants