Skip to content

Commit

Permalink
[SPARK-51097][SS] Adding state store instance metrics for last upload…
Browse files Browse the repository at this point in the history
…ed snapshot version in RocksDB

### What changes were proposed in this pull request?

SPARK-51097

This PR sets up instance-specific metrics (`SnapshotLastUploaded.partition_<partition id>_<state store name>` to be precise) in the executor side and publishes them through StreamingQueryProgress.

### Why are the changes needed?

There's currently a lack of observability into state store specific maintenance information, notably metrics of the last snapshot version uploaded. This affects the ability to identify performance degradation issues behind maintenance tasks and more as described in SPARK-51097.

### Does this PR introduce _any_ user-facing change?

There will be some new metrics displayed from StreamingQueryProgress:
```
Streaming query made progress: {
  ...
  "stateOperators" : [ {
    ...
    "customMetrics" : {
      ...
      "SnapshotLastUploaded.partition_0_default" : 2,
      "SnapshotLastUploaded.partition_12_default" : 10,
      "SnapshotLastUploaded.partition_8_default" : 10,
      ...
    }
  } ],
  "sources" : ...,
  "sink" : ...
}
```
Not all state store instance's metrics will be published to remove noise in query progress messages. The upper threshold is configured using `STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT`, and will by default report 5 instance metrics.

### How was this patch tested?

Four new tests are added in RocksDBStateStoreIntegrationSuite.

The first two tests execute a dedup streaming query and verifies metrics are properly filtered + updated through the StreamingQueryProgress logs, but with different StateStore providers that skip maintenance tasks for specific partitions.

The other two tests execute a join streaming query, which contains four state stores per partition instead of one. These two tests verifies metrics are properly collected and filtered as well.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49816 from zecookiez/SPARK-51097.

Lead-authored-by: Zeyu Chen <[email protected]>
Co-authored-by: Zeyu Chen <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
2 people authored and HeartSaVioR committed Feb 20, 2025
1 parent bbb9c2c commit da1854e
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,19 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
.internal()
.doc(
"Number of state store instance metrics included in streaming query progress messages " +
"per stateful operator. Instance metrics are selected based on metric-specific ordering " +
"to minimize noise in the progress report."
)
.version("4.0.0")
.intConf
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)

val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
.internal()
Expand Down Expand Up @@ -5727,6 +5740,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)

def numStateStoreInstanceMetricsToReport: Int =
getConf(STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ case class StreamingSymmetricHashJoinExec(

override def shortName: String = "symmetricHashJoin"

private val stateStoreNames =
override val stateStoreNames: Seq[String] =
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

override def operatorStateMetadata(
Expand Down Expand Up @@ -527,9 +527,8 @@ case class StreamingSymmetricHashJoinExec(
(leftSideJoiner.numUpdatedStateRows + rightSideJoiner.numUpdatedStateRows)
numTotalStateRows += combinedMetrics.numKeys
stateMemory += combinedMetrics.memoryUsedBytes
combinedMetrics.customMetrics.foreach { case (metric, value) =>
longMetric(metric.name) += value
}
setStoreCustomMetrics(combinedMetrics.customMetrics)
setStoreInstanceMetrics(combinedMetrics.instanceMetrics)
}

val stateStoreNames = SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import java.util.Set
import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
Expand Down Expand Up @@ -147,6 +147,10 @@ class RocksDB(
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
@volatile protected var loadedVersion: Long = -1L // -1 = nothing valid is loaded

// Can be updated by whichever thread uploaded a snapshot, which could be either task,
// maintenance, or both. -1 represents no version has ever been uploaded.
protected val lastUploadedSnapshotVersion: AtomicLong = new AtomicLong(-1L)

// variables to manage checkpoint ID. Once a checkpointing finishes, it needs to return
// `lastCommittedStateStoreCkptId` as the committed checkpointID, as well as
// `lastCommitBasedStateStoreCkptId` as the checkpontID of the previous version that is based on.
Expand Down Expand Up @@ -1293,6 +1297,7 @@ class RocksDB(
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
lastUploadedSnapshotVersion = lastUploadedSnapshotVersion.get(),
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
nativeOpsMetrics = nativeOpsMetrics)
}
Expand Down Expand Up @@ -1461,6 +1466,7 @@ class RocksDB(
log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
lastUploadedSnapshotVersion.set(snapshot.version)
} finally {
snapshot.close()
}
Expand Down Expand Up @@ -1912,7 +1918,8 @@ case class RocksDBMetrics(
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long],
nativeOpsMetrics: Map[String, Long]) {
nativeOpsMetrics: Map[String, Long],
lastUploadedSnapshotVersion: Long) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,20 @@ private[sql] class RocksDBStateStoreProvider
) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map())

val stateStoreInstanceMetrics = Map[StateStoreInstanceMetric, Long](
CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED
.withNewId(id.partitionId, id.storeName) -> rocksDBMetrics.lastUploadedSnapshotVersion
)

StateStoreMetrics(
rocksDBMetrics.numUncommittedKeys,
rocksDBMetrics.totalMemUsageBytes,
stateStoreCustomMetrics)
stateStoreCustomMetrics,
stateStoreInstanceMetrics)
} else {
logInfo(log"Failed to collect metrics for store_id=${MDC(STATE_STORE_ID, id)} " +
log"and version=${MDC(VERSION_NUM, version)}")
StateStoreMetrics(0, 0, Map.empty)
StateStoreMetrics(0, 0, Map.empty, Map.empty)
}
}

Expand Down Expand Up @@ -497,6 +503,8 @@ private[sql] class RocksDBStateStoreProvider

override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = ALL_CUSTOM_METRICS

override def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = ALL_INSTANCE_METRICS

private[state] def latestVersion: Long = rocksDB.getLatestVersion()

/** Internal fields and methods */
Expand Down Expand Up @@ -888,6 +896,10 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS,
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)

val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric()

val ALL_INSTANCE_METRICS = Seq(CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED)
}

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,17 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore {
* @param memoryUsedBytes Memory used by the state store
* @param customMetrics Custom implementation-specific metrics
* The metrics reported through this must have the same `name` as those
* reported by `StateStoreProvider.customMetrics`.
* reported by `StateStoreProvider.supportedCustomMetrics`.
* @param instanceMetrics Custom implementation-specific metrics that are specific to state stores
* The metrics reported through this must have the same `name` as those
* reported by `StateStoreProvider.supportedInstanceMetrics`,
* including partition id and store name.
*/
case class StateStoreMetrics(
numKeys: Long,
memoryUsedBytes: Long,
customMetrics: Map[StateStoreCustomMetric, Long])
customMetrics: Map[StateStoreCustomMetric, Long],
instanceMetrics: Map[StateStoreInstanceMetric, Long] = Map.empty)

/**
* State store checkpoint information, used to pass checkpointing information from executors
Expand Down Expand Up @@ -284,7 +289,8 @@ object StateStoreMetrics {
StateStoreMetrics(
allMetrics.map(_.numKeys).sum,
allMetrics.map(_.memoryUsedBytes).sum,
combinedCustomMetrics)
combinedCustomMetrics,
allMetrics.flatMap(_.instanceMetrics).toMap)
}
}

Expand Down Expand Up @@ -321,6 +327,86 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}

trait StateStoreInstanceMetric {
def metricPrefix: String
def descPrefix: String
def partitionId: Option[Int]
def storeName: String
def initValue: Long

def createSQLMetric(sparkContext: SparkContext): SQLMetric

/**
* Defines how instance metrics are selected for progress reporting.
* Metrics are sorted by value using this ordering, and only the first N metrics are displayed.
* For example, the highest N metrics by value should use Ordering.Long.reverse.
*/
def ordering: Ordering[Long]

/** Should this instance metric be reported if it is unchanged from its initial value */
def ignoreIfUnchanged: Boolean

/**
* Defines how to merge metric values from different executors for the same state store
* instance in situations like speculative execution or provider unloading. In most cases,
* the original metric value is at its initial value.
*/
def combine(originalMetric: SQLMetric, value: Long): Long

def name: String = {
assert(partitionId.isDefined, "Partition ID must be defined for instance metric name")
s"$metricPrefix.partition_${partitionId.get}_$storeName"
}

def desc: String = {
assert(partitionId.isDefined, "Partition ID must be defined for instance metric description")
s"$descPrefix (partitionId = ${partitionId.get}, storeName = $storeName)"
}

def withNewId(partitionId: Int, storeName: String): StateStoreInstanceMetric
}

case class StateStoreSnapshotLastUploadInstanceMetric(
partitionId: Option[Int] = None,
storeName: String = StateStoreId.DEFAULT_STORE_NAME)
extends StateStoreInstanceMetric {

override def metricPrefix: String = "SnapshotLastUploaded"

override def descPrefix: String = {
"The last uploaded version of the snapshot for a specific state store instance"
}

override def initValue: Long = -1L

override def createSQLMetric(sparkContext: SparkContext): SQLMetric = {
SQLMetrics.createSizeMetric(sparkContext, desc, initValue)
}

override def ordering: Ordering[Long] = Ordering.Long

override def ignoreIfUnchanged: Boolean = false

override def combine(originalMetric: SQLMetric, value: Long): Long = {
// Check for cases where the initial value is less than 0, forcing metric.value to
// convert it to 0. Since the last uploaded snapshot version can have an initial
// value of -1, we need special handling to avoid turning the -1 into a 0.
if (originalMetric.isZero) {
value
} else {
// Use max to grab the most recent snapshot version across all executors
// of the same store instance
Math.max(originalMetric.value, value)
}
}

override def withNewId(
partitionId: Int,
storeName: String): StateStoreSnapshotLastUploadInstanceMetric = {
copy(partitionId = Some(partitionId), storeName = storeName)
}
}

sealed trait KeyStateEncoderSpec {
def keySchema: StructType
def jsonValue: JValue
Expand Down Expand Up @@ -495,9 +581,16 @@ trait StateStoreProvider {
/**
* Optional custom metrics that the implementation may want to report.
* @note The StateStore objects created by this provider must report the same custom metrics
* (specifically, same names) through `StateStore.metrics`.
* (specifically, same names) through `StateStore.metrics.customMetrics`.
*/
def supportedCustomMetrics: Seq[StateStoreCustomMetric] = Nil

/**
* Optional custom state store instance metrics that the implementation may want to report.
* @note The StateStore objects created by this provider must report the same instance metrics
* (specifically, same names) through `StateStore.metrics.instanceMetrics`.
*/
def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = Seq.empty
}

object StateStoreProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ class SymmetricHashJoinStateManager(
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (metric, value) => (metric.withNewDesc(desc = newDesc(metric.desc)), value)
}
},
// We want to collect instance metrics from both state stores
keyWithIndexToValueMetrics.instanceMetrics ++ keyToNumValuesMetrics.instanceMetrics
)
}

Expand Down
Loading

0 comments on commit da1854e

Please sign in to comment.