Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cpuTimeAcc from DataSource#createSegmentMapFunction's signature #17623

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -163,7 +162,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(

if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new AtomicLong());
query.getDataSource().createSegmentMapFunction(query);
processorManager = processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() -> segmentMapFnProcessor), processorManagerFn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void addFrame(final int channelNumber, final Frame frame)

private Function<SegmentReference, SegmentReference> createSegmentMapFunction()
{
return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query, new AtomicLong());
return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query);
}

DataSource inlineChannelData(final DataSource originalDataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -97,7 +96,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public List<WritableFrameChannel> outputChannels()
@Override
public ReturnOrAwait<Function<SegmentReference, SegmentReference>> runIncrementally(final IntSet readableInputs)
{
return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query, new AtomicLong()));
return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -113,7 +112,7 @@ public interface DataSource
* @param cpuTimeAcc the cpu time accumulator
* @return the segment function
*/
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query);

/**
* Returns an updated datasource based on the specified new source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.FilteredSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -122,19 +121,10 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter)
);
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(query);
return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -171,7 +170,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -243,10 +242,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAcc
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
150 changes: 70 additions & 80 deletions processing/src/main/java/org/apache/druid/query/JoinDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -65,7 +65,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -301,14 +300,12 @@ public Set<String> getVirtualColumnCandidates()

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
Query query
)
{
return createSegmentMapFunctionInternal(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
cpuTimeAccumulator,
analysis.getBaseQuery().orElse(query)
);
}
Expand Down Expand Up @@ -444,86 +441,79 @@ private DataSourceAnalysis getAnalysisForDataSource()
private Function<SegmentReference, SegmentReference> createSegmentMapFunctionInternal(
@Nullable final Filter baseFilter,
final List<PreJoinableClause> clauses,
final AtomicLong cpuTimeAccumulator,
final Query<?> query
)
{
// compute column correlations here and RHS correlated values
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (clauses.isEmpty()) {
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(
clauses,
joinableFactoryWrapper.getJoinableFactory()
);
final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);

// Pick off any join clauses that can be converted into filters.
final Set<String> requiredColumns = query.getRequiredColumns();
final Filter baseFilterToUse;
final List<JoinableClause> clausesToUse;

if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
final Pair<List<Filter>, List<JoinableClause>> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters(
joinableClauses.getJoinableClauses(),
requiredColumns,
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
);

baseFilterToUse =
Filters.maybeAnd(
Lists.newArrayList(
Iterables.concat(
Collections.singleton(baseFilter),
conversionResult.lhs
)
)
).orElse(null);
clausesToUse = conversionResult.rhs;
} else {
baseFilterToUse = baseFilter;
clausesToUse = joinableClauses.getJoinableClauses();
}

// Analyze remaining join clauses to see if filters on them can be pushed down.
final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
filterRewriteConfig,
clausesToUse,
query.getVirtualColumns(),
Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
.orElse(null)
if (clauses.isEmpty()) {
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(
clauses,
joinableFactoryWrapper.getJoinableFactory()
);
final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);

// Pick off any join clauses that can be converted into filters.
final Set<String> requiredColumns = query.getRequiredColumns();
final Filter baseFilterToUse;
final List<JoinableClause> clausesToUse;

if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
final Pair<List<Filter>, List<JoinableClause>> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters(
joinableClauses.getJoinableClauses(),
requiredColumns,
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
);

baseFilterToUse =
Filters.maybeAnd(
Lists.newArrayList(
Iterables.concat(
Collections.singleton(baseFilter),
conversionResult.lhs
)
)
);
final Function<SegmentReference, SegmentReference> baseMapFn;
// A join data source is not concrete
// And isConcrete() of an unnest datasource delegates to its base
// Hence, in the case of a Join -> Unnest -> Join
// if we just use isConcrete on the left
// the segment map function for the unnest would never get called
// This calls us to delegate to the segmentMapFunction of the left
// only when it is not a JoinDataSource
if (left instanceof JoinDataSource) {
baseMapFn = Function.identity();
} else {
baseMapFn = left.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
}
return baseSegment ->
new HashJoinSegment(
baseMapFn.apply(baseSegment),
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
);
}
}
);
).orElse(null);
clausesToUse = conversionResult.rhs;
} else {
baseFilterToUse = baseFilter;
clausesToUse = joinableClauses.getJoinableClauses();
}

// Analyze remaining join clauses to see if filters on them can be pushed down.
final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
filterRewriteConfig,
clausesToUse,
query.getVirtualColumns(),
Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
.orElse(null)
)
);
final Function<SegmentReference, SegmentReference> baseMapFn;
// A join data source is not concrete
// And isConcrete() of an unnest datasource delegates to its base
// Hence, in the case of a Join -> Unnest -> Join
// if we just use isConcrete on the left
// the segment map function for the unnest would never get called
// This calls us to delegate to the segmentMapFunction of the left
// only when it is not a JoinDataSource
if (left instanceof JoinDataSource) {
baseMapFn = Function.identity();
} else {
baseMapFn = left.createSegmentMapFunction(
query
);
}
return baseSegment ->
new HashJoinSegment(
baseMapFn.apply(baseSegment),
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -101,10 +100,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTime
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

@JsonTypeName("query")
Expand Down Expand Up @@ -110,12 +109,11 @@ public boolean isConcrete()

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTime
Query query
)
{
final Query<?> subQuery = this.getQuery();
return subQuery.getDataSource().createSegmentMapFunction(subQuery, cpuTime);
return subQuery.getDataSource().createSegmentMapFunction(subQuery);
}

@Override
Expand Down
Loading
Loading