diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 5c491c0780d2..0d1fd527086e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -86,6 +86,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; import org.apache.druid.msq.indexing.InputChannelFactory; @@ -162,6 +163,7 @@ import org.apache.druid.msq.querykit.scan.ScanQueryKit; import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory; +import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; import org.apache.druid.msq.util.IntervalUtils; import org.apache.druid.msq.util.MSQFutureUtils; @@ -514,7 +516,7 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina stagesReport = null; } - return new MSQTaskReportPayload( + final MSQTaskReportPayload msqTaskReportPayload = new MSQTaskReportPayload( makeStatusReport( taskStateForReport, errorForReport, @@ -529,6 +531,44 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina countersSnapshot, null ); + // Emit summary metrics + emitSummaryMetrics(msqTaskReportPayload, querySpec); + return msqTaskReportPayload; + } + + private void emitSummaryMetrics(final MSQTaskReportPayload msqTaskReportPayload, final MSQSpec querySpec) + { + final Set stagesToInclude = new HashSet<>(); + final MSQStagesReport stagesReport = msqTaskReportPayload.getStages(); + if (stagesReport != null) { + for (MSQStagesReport.Stage stage : stagesReport.getStages()) { + boolean hasParentStage = stage.getStageDefinition().getInputSpecs().stream() + .anyMatch(stageInput -> stageInput instanceof StageInputSpec); + if (!hasParentStage) { + stagesToInclude.add(stage.getStageNumber()); + } + } + } + long totalProcessedBytes = 0; + + if (msqTaskReportPayload.getCounters() != null) { + totalProcessedBytes = msqTaskReportPayload.getCounters() + .copyMap() + .entrySet() + .stream() + .filter(entry -> stagesReport == null || stagesToInclude.contains(entry.getKey())) + .flatMap(counterSnapshotsMap -> counterSnapshotsMap.getValue().values().stream()) + .flatMap(counterSnapshots -> counterSnapshots.getMap().entrySet().stream()) + .filter(entry -> entry.getKey().startsWith("input")) + .mapToLong(entry -> { + ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) entry.getValue(); + return snapshot.getBytes() == null ? 0L : Arrays.stream(snapshot.getBytes()).sum(); + }) + .sum(); + } + + log.debug("Processed bytes[%d] for query[%s].", totalProcessedBytes, querySpec.getQuery()); + context.emitMetric("ingest/input/bytes", totalProcessedBytes); } /** @@ -2418,7 +2458,7 @@ private void startTaskLauncher() } /** - * Enqueues the fetching {@link org.apache.druid.msq.statistics.ClusterByStatisticsCollector} + * Enqueues the fetching {@link ClusterByStatisticsCollector} * from each worker via {@link WorkerSketchFetcher} */ private void fetchStatsFromWorkers() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 03d50c43f06d..594816974929 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -64,6 +64,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.SegmentSchemaMapping; @@ -1279,11 +1280,29 @@ private void updateAndWriteCompletionReports(TaskStatus status) private void writeCompletionReports() { + emitCompletionMetrics(); if (!isCompactionTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } + private void emitCompletionMetrics() + { + final Map rowStats = getTaskCompletionRowStats(); + if (rowStats == null) { + return; + } + + final Number totalProcessedBytes = (Number) rowStats.get("processedBytes"); + if (totalProcessedBytes == null) { + return; + } + + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, this); + toolbox.getEmitter().emit(metricBuilder.setMetric("ingest/input/bytes", totalProcessedBytes)); + } + private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig) { return new IndexTuningConfig(