Skip to content

Commit

Permalink
Finish the JDBCAutoScalerStateStore
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Dec 29, 2023
1 parent 3a82f04 commit 55f327e
Showing 1 changed file with 184 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,46 @@
package org.apache.flink.autoscaler.jdbc.state;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.jdbc.JobKeySerializer;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
import static org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;

/** An AutoscalerStateStore which persists its state in JDBC related database. */
@Experimental
Expand All @@ -63,64 +83,206 @@ public JDBCAutoScalerStateStore(JobKeySerializer<KEY> jobKeySerializer) throws S
public void storeScalingHistory(
JobAutoScalerContext<KEY> jobContext,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory)
throws Exception {}
throws Exception {
jdbcStore.putSerializedState(
getSerializeKey(jobContext),
SCALING_HISTORY,
serializeScalingHistory(scalingHistory));
}

@Nonnull
@Override
public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory(
JobAutoScalerContext<KEY> jobContext) throws Exception {
return null;
JobAutoScalerContext<KEY> jobContext) {
Optional<String> serializedScalingHistory =
jdbcStore.getSerializedState(getSerializeKey(jobContext), SCALING_HISTORY);
if (serializedScalingHistory.isEmpty()) {
return new HashMap<>();
}
try {
return deserializeScalingHistory(serializedScalingHistory.get());
} catch (JacksonException e) {
LOG.error(
"Could not deserialize scaling history, possibly the format changed. Discarding...",
e);
jdbcStore.removeSerializedState(getSerializeKey(jobContext), SCALING_HISTORY);
return new HashMap<>();
}
}

@Override
public void storeScalingTracking(
JobAutoScalerContext<KEY> jobContext, ScalingTracking scalingTrack) throws Exception {}
public void removeScalingHistory(JobAutoScalerContext<KEY> jobContext) {
jdbcStore.removeSerializedState(getSerializeKey(jobContext), SCALING_HISTORY);
}

@Override
public ScalingTracking getScalingTracking(JobAutoScalerContext<KEY> jobContext)
throws Exception {
return null;
public void storeScalingTracking(
JobAutoScalerContext<KEY> jobContext, ScalingTracking scalingTrack) throws Exception {
jdbcStore.putSerializedState(
getSerializeKey(jobContext),
SCALING_TRACKING,
serializeScalingTracking(scalingTrack));
}

@Override
public void removeScalingHistory(JobAutoScalerContext<KEY> jobContext) throws Exception {}
public ScalingTracking getScalingTracking(JobAutoScalerContext<KEY> jobContext) {
Optional<String> serializedRescalingHistory =
jdbcStore.getSerializedState(getSerializeKey(jobContext), SCALING_TRACKING);
if (serializedRescalingHistory.isEmpty()) {
return new ScalingTracking();
}
try {
return deserializeScalingTracking(serializedRescalingHistory.get());
} catch (JacksonException e) {
LOG.error(
"Could not deserialize rescaling history, possibly the format changed. Discarding...",
e);
jdbcStore.removeSerializedState(getSerializeKey(jobContext), SCALING_TRACKING);
return new ScalingTracking();
}
}

@Override
public void storeCollectedMetrics(
JobAutoScalerContext<KEY> jobContext, SortedMap<Instant, CollectedMetrics> metrics)
throws Exception {}
throws Exception {
jdbcStore.putSerializedState(
getSerializeKey(jobContext), COLLECTED_METRICS, serializeEvaluatedMetrics(metrics));
}

@Nonnull
@Override
public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(
JobAutoScalerContext<KEY> jobContext) throws Exception {
return null;
JobAutoScalerContext<KEY> jobContext) {
Optional<String> serializedEvaluatedMetricsOpt =
jdbcStore.getSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS);
if (serializedEvaluatedMetricsOpt.isEmpty()) {
return new TreeMap<>();
}
try {
return deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get());
} catch (JacksonException e) {
LOG.error(
"Could not deserialize metric history, possibly the format changed. Discarding...",
e);
jdbcStore.removeSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS);
return new TreeMap<>();
}
}

@Override
public void removeCollectedMetrics(JobAutoScalerContext<KEY> jobContext) throws Exception {}
public void removeCollectedMetrics(JobAutoScalerContext<KEY> jobContext) {
jdbcStore.removeSerializedState(getSerializeKey(jobContext), COLLECTED_METRICS);
}

@Override
public void storeParallelismOverrides(
JobAutoScalerContext<KEY> jobContext, Map<String, String> parallelismOverrides)
throws Exception {}
JobAutoScalerContext<KEY> jobContext, Map<String, String> parallelismOverrides) {
jdbcStore.putSerializedState(
getSerializeKey(jobContext),
PARALLELISM_OVERRIDES,
serializeParallelismOverrides(parallelismOverrides));
}

@Nonnull
@Override
public Map<String, String> getParallelismOverrides(JobAutoScalerContext<KEY> jobContext)
throws Exception {
return null;
public Map<String, String> getParallelismOverrides(JobAutoScalerContext<KEY> jobContext) {
return jdbcStore
.getSerializedState(getSerializeKey(jobContext), PARALLELISM_OVERRIDES)
.map(JDBCAutoScalerStateStore::deserializeParallelismOverrides)
.orElse(new HashMap<>());
}

@Override
public void removeParallelismOverrides(JobAutoScalerContext<KEY> jobContext) throws Exception {}
public void removeParallelismOverrides(JobAutoScalerContext<KEY> jobContext) {
jdbcStore.removeSerializedState(getSerializeKey(jobContext), PARALLELISM_OVERRIDES);
}

@Override
public void clearAll(JobAutoScalerContext<KEY> jobContext) throws Exception {}
public void clearAll(JobAutoScalerContext<KEY> jobContext) {
jdbcStore.clearAll(getSerializeKey(jobContext));
}

@Override
public void flush(JobAutoScalerContext<KEY> jobContext) throws Exception {}
public void flush(JobAutoScalerContext<KEY> jobContext) throws Exception {
jdbcStore.flush(getSerializeKey(jobContext));
}

@Override
public void removeInfoFromCache(KEY jobKey) {}
public void removeInfoFromCache(KEY jobKey) {
jdbcStore.removeInfoFromCache(getSerializeKey(jobKey));
}

private String getSerializeKey(JobAutoScalerContext<KEY> jobContext) {
return getSerializeKey(jobContext.getJobKey());
}

private String getSerializeKey(KEY jobKey) {
return jobKeySerializer.serialize(jobKey);
}

// The serialization and deserialization are same with KubernetesAutoScalerStateStore
protected static String serializeScalingHistory(
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) throws Exception {
return compress(YAML_MAPPER.writeValueAsString(scalingHistory));
}

private static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> deserializeScalingHistory(
String scalingHistory) throws JacksonException {
return YAML_MAPPER.readValue(decompress(scalingHistory), new TypeReference<>() {});
}

protected static String serializeScalingTracking(ScalingTracking scalingTracking)
throws Exception {
return compress(YAML_MAPPER.writeValueAsString(scalingTracking));
}

private static ScalingTracking deserializeScalingTracking(String scalingTracking)
throws JacksonException {
return YAML_MAPPER.readValue(decompress(scalingTracking), new TypeReference<>() {});
}

@VisibleForTesting
protected static String serializeEvaluatedMetrics(
SortedMap<Instant, CollectedMetrics> evaluatedMetrics) throws Exception {
return compress(YAML_MAPPER.writeValueAsString(evaluatedMetrics));
}

private static SortedMap<Instant, CollectedMetrics> deserializeEvaluatedMetrics(
String evaluatedMetrics) throws JacksonException {
return YAML_MAPPER.readValue(decompress(evaluatedMetrics), new TypeReference<>() {});
}

private static String serializeParallelismOverrides(Map<String, String> overrides) {
return ConfigurationUtils.convertValue(overrides, String.class);
}

private static Map<String, String> deserializeParallelismOverrides(String overrides) {
return ConfigurationUtils.convertValue(overrides, Map.class);
}

private static String compress(String original) throws IOException {
ByteArrayOutputStream rstBao = new ByteArrayOutputStream();
try (var zos = new GZIPOutputStream(rstBao)) {
zos.write(original.getBytes(StandardCharsets.UTF_8));
}

return Base64.getEncoder().encodeToString(rstBao.toByteArray());
}

private static String decompress(String compressed) {
if (compressed == null) {
return null;
}
try {
byte[] bytes = Base64.getDecoder().decode(compressed);
try (var zi = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
return IOUtils.toString(zi, StandardCharsets.UTF_8);
}
} catch (Exception e) {
LOG.warn("Error while decompressing scaling data, treating as uncompressed");
// Fall back to non-compressed for migration
return compressed;
}
}
}

0 comments on commit 55f327e

Please sign in to comment.