Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest/processed/bytes metric #17581

Merged
merged 15 commits into from
Jan 24, 2025
Next Next commit
changes
neha-ellur committed Dec 17, 2024
commit 7c1e15b635a30902c4cd8871bf8d4b5824516d05
Original file line number Diff line number Diff line change
@@ -993,6 +993,7 @@ private TaskStatus generateAndPublishSegments(
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
published.getSegments().size() + tombStones.size()
);
emitMetric(toolbox.getEmitter(), "ingest/processed/bytes", buildSegmentsMeters.getProcessedBytes());

log.debugSegments(published.getSegments(), "Published segments");

Original file line number Diff line number Diff line change
@@ -1633,6 +1633,14 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

// Emit the processed bytes metric
try {
emitMetric(toolbox.getEmitter(), "ingest/processed/bytes", rowStatsForRunningTasks.getProcessedBytes());
}
catch (Exception e) {
LOG.warn(e, "Unable to emit processed bytes metric");
}

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}

Original file line number Diff line number Diff line change
@@ -78,6 +78,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -251,6 +253,7 @@ public enum Status
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
private final ServiceEmitter emitter;

public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
@@ -272,6 +275,7 @@ public SeekableStreamIndexTaskRunner(
this.sequences = new CopyOnWriteArrayList<>();
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse;
this.emitter = toolbox.getEmitter();

minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
@@ -657,6 +661,24 @@ public void run()
shouldProcess
);

long bytesProcessed = 0;
for (ByteEntity entity : record.getData()) {
bytesProcessed += entity.getBuffer().remaining();
}

// Emit the processed bytes metric
try {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("taskId", task.getId())
.setDimension("dataSource", task.getDataSource())
.setMetric("ingest/processed/bytes", bytesProcessed)
);
}
catch (Exception e) {
log.warn(e, "Unable to emit processed bytes metric");
}

if (shouldProcess) {
final List<InputRow> rows = parser.parse(record.getData(), isEndOfShard(record.getSequenceNumber()));
boolean isPersistRequired = false;