Skip to content

[core] Row lineage core support. #5935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 28, 2025
Merged

Conversation

leaves12138
Copy link
Contributor

@leaves12138 leaves12138 commented Jul 22, 2025

Purpose

For paimon append table (bucket = -1), support new mod, row with unique row id.

This pull request introduce a new kind of table, which based on unaware bucket append table. It writes external two column in file (_row_id BIGINT, _snapshot_version BIGINT), it follows the following rules:

  • If the file was firstly written. The _row_id column and _snapshot_version column is set to null. It will be allocated by committer.
  • If one row moved from one file to another for any reason, it should copy the _row_id value from read, to the target file.
  • If one row moved from one file to another for any reason, it changed , than _snapshot_version column should be set to null, otherwise, copied from the source.
  • The read from file which enable row_lineage, will set _row_id column and _snapshot_version column from file or metadata. (If the file target column is null, then fetch it from metadata)
  • Only FileKind equals APPEND (not COMPACT) file, will be assign firstRowId, the firstRowId is a long, which stored in DataFileMeta, the _row_id for the first time is set to firstRowId + offset in file.
image image

RELATED:
https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=373886632&draftShareId=6a8aca3b-913f-4123-83e4-9106f2825736&

Tests

API and Format

Documentation

@leaves12138 leaves12138 changed the title [WIP] [core] Row lineage core support. [core] Row lineage core support. Jul 22, 2025
@@ -1904,6 +1904,11 @@ public InlineElement getDescription() {
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ "and 'hilbert' for 5 or more columns.");
public static final ConfigOption<Boolean> APPEND_ROW_LINEAGE_ENABLED =
key("append.row.lineage.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row-tracking.enabled

You should check the table should be append table in SchemaValidation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row-tracking.enabled

You should check the table should be append table in SchemaValidation.

OK

public static final DataField ROW_ID =
new DataField(Integer.MAX_VALUE - 5, "_row_id", DataTypes.BIGINT());

public static final DataField SNAPSHOT_VERSION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer using SEQUENCE_NUMBER.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then we should make SEQUENCE_NUMBER nullable

@@ -87,6 +87,12 @@ public class SpecialFields {
new DataField(
Integer.MAX_VALUE - 4, "rowkind", new VarCharType(VarCharType.MAX_LENGTH));

public static final DataField ROW_ID =
new DataField(Integer.MAX_VALUE - 5, "_row_id", DataTypes.BIGINT());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ROW_ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -52,6 +54,10 @@ public class AppendCompactTask {
public AppendCompactTask(BinaryRow partition, List<DataFileMeta> files) {
Preconditions.checkArgument(files != null);
this.partition = partition;
files.sort(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep the row id in one file in order, this will may make "add new column function" more convenient.

@leaves12138 leaves12138 requested a review from JingsongLi July 24, 2025 03:07
new DataField(17, "_EXTERNAL_PATH", newStringType(true))));
new DataField(17, "_EXTERNAL_PATH", newStringType(true)),
new DataField(18, "_ROW_ID", new BigIntType(true)),
new DataField(19, "_SEQUENCE_NUMBER", new BigIntType(true))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use min max sequence number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -83,7 +83,9 @@ public class DataFileMeta {
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull())),
new DataField(17, "_EXTERNAL_PATH", newStringType(true))));
new DataField(17, "_EXTERNAL_PATH", newStringType(true)),
new DataField(18, "_ROW_ID", new BigIntType(true)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_FIRST_ROW_ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -74,7 +74,7 @@ public class SpecialFields {
public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START;

public static final DataField SEQUENCE_NUMBER =
new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add nullable to append table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

private final long minSequenceNumber;
private final long maxSequenceNumber;
// As for row-lineage table, this will be reassigned while committing
private long minSequenceNumber;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use copy instead making it variable.

@@ -74,6 +78,13 @@ public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite wr
Preconditions.checkArgument(
dvEnabled || compactBefore.size() > 1,
"AppendOnlyCompactionTask need more than one file input.");
if (table.coreOptions().rowTrackingEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this? Let's finish compact in next PR.

@@ -124,6 +126,8 @@ public class DataFileMeta {
/** external path of file, if it is null, it is in the default warehouse path. */
private final @Nullable String externalPath;

private @Nullable Long rowIdStart;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_FIRST_ROW_ID or rowIdStart?


/** An counter that sums up {@code long} values. */
public class LongCounter implements Serializable {
public class LongCounter implements SequenceNumberCounter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to change this class. This class just have 50 lines, we don't need to reuse codes for it.

import org.apache.paimon.data.InternalRow;

/** Sequence number counter only generate min sequence. */
public class MinSequenceCounter implements SequenceNumberCounter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it, it just for compact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

// assign row id for new files
long start = startRowId;
for (ManifestEntry entry : deltaFiles) {
if (entry.file().fileSource().orElse(FileSource.COMPACT).equals(FileSource.APPEND)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert must have fileSource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

import org.apache.paimon.types.RowKind;

/** Row with row lineage inject in. */
public class PartialMappingRow implements InternalRow {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowWithLineage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -81,6 +102,29 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
final ProjectedRow projectedRow = ProjectedRow.from(indexMapping);
iterator = iterator.transform(projectedRow::replaceRow);
}

if (rowLineageEnabled && !metaColumnIndex.isEmpty()) {
GenericRow genericRow = new GenericRow(metaColumnIndex.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lineageRow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


@Override
public boolean isNullAt(int pos) {
for (int i = 0; i < mappings.length; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

/** A {@link Table} for reading row id of table. */
public class RowLineageTable implements DataTable, ReadonlyTable {

public static final String WITH_METADATA = "row_lineage";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ROW_LINEAGE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@leaves12138 leaves12138 requested a review from JingsongLi July 28, 2025 05:19
@@ -1911,6 +1911,11 @@ public InlineElement getDescription() {
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ "and 'hilbert' for 5 or more columns.");
public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep a line above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -124,6 +126,8 @@ public class DataFileMeta {
/** external path of file, if it is null, it is in the default warehouse path. */
private final @Nullable String externalPath;

private @Nullable Long firstRowId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

this.firstRowId = firstRowId;
}

public void setFirstRowId(@Nullable Long firstRowId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assignFirstRowId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

firstRowId);
}

public DataFileMeta copyWithMaxSequenceNumber(long maxSequenceNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assignSequenceNumber(min, max)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if (rowLineageEnabled && !metaColumnIndex.isEmpty()) {
GenericRow lineageRow = new GenericRow(metaColumnIndex.size());

int[] fallbackToMetaRowLineageMappings = new int[tableRowType.getFieldCount()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallbackToLineageMappings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

boolean rowLineageEnabled,
@Nullable Long firstRowId,
long snapshotId,
Map<String, Integer> metaColumnIndex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

systemFields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

private final boolean rowLineageEnabled;
@Nullable private final Long firstRowId;
private final long snapshotId;
private final Map<String, Integer> metaColumnIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

systemFields

private final FileRecordReader<InternalRow> reader;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
private final boolean rowLineageEnabled;
@Nullable private final Long firstRowId;
private final long snapshotId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxSequenceNumber

@@ -76,7 +76,8 @@ public void testCompatibilityToV4CommitV7() throws IOException {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
"hdfs://localhost:9000/path/to/file");
"hdfs://localhost:9000/path/to/file",
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

LinkedHashMap<String, DeletionVectorMeta> dvMetas = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -426,7 +427,10 @@ private static FunctionWithIOException<DataInputView, DataFileMeta> getFileMetaS
} else if (version == 3 || version == 4) {
DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer();
return serializer::deserialize;
} else if (version >= 5) {
} else if (version == 5 || version == 6) {
DataFileMeta12LegacySerializer serializer = new DataFileMeta12LegacySerializer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@leaves12138 leaves12138 requested a review from JingsongLi July 28, 2025 08:54
@@ -136,7 +140,8 @@ public static DataFileMeta forAppend(
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
@Nullable String externalPath,
@Nullable Long rowIdStart) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firstRowId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -165,8 +173,13 @@ public FormatReaderMapping build(

// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
if (CoreOptions.fromMap(dataSchema.options()).rowTrackingEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass rowTrackingEnabled to this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 020ed14 into apache:master Jul 28, 2025
22 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants