Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support retry commit for iceberg sink #20433

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 80 additions & 31 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_s

pub const ICEBERG_SINK: &str = "iceberg";

fn default_commit_retry_num() -> u32 {
8
}

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
pub struct IcebergConfig {
Expand Down Expand Up @@ -116,6 +120,9 @@ pub struct IcebergConfig {

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,

#[serde(default = "default_commit_retry_num")]
pub commit_retry_num: u32,
}

impl IcebergConfig {
Expand Down Expand Up @@ -434,6 +441,7 @@ impl Sink for IcebergSink {
catalog,
table,
commit_notifier: commit_tx,
commit_retry_num: self.config.commit_retry_num,
_compact_task_guard: finish_tx,
})
}
Expand Down Expand Up @@ -1195,9 +1203,38 @@ pub struct IcebergSinkCommitter {
catalog: Arc<dyn Catalog>,
table: Table,
commit_notifier: Option<mpsc::UnboundedSender<()>>,
commit_retry_num: u32,
_compact_task_guard: Option<oneshot::Sender<()>>,
}

impl IcebergSinkCommitter {
// Reload table and guarantee current schema_id and partition_spec_id matches
// given `schema_id` and `partition_spec_id`
async fn reload_table(&mut self, schema_id: i32, partition_spec_id: i32) -> Result<()> {
self.table = self
.catalog
.clone()
.load_table(self.table.identifier())
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
if self.table.metadata().current_schema_id() != schema_id {
return Err(SinkError::Iceberg(anyhow!(
"Schema evolution not supported, expect schema id {}, but got {}",
schema_id,
self.table.metadata().current_schema_id()
)));
}
if self.table.metadata().default_partition_spec_id() != partition_spec_id {
return Err(SinkError::Iceberg(anyhow!(
"Partition evolution not supported, expect partition spec id {}, but got {}",
partition_spec_id,
self.table.metadata().default_partition_spec_id()
)));
}
Ok(())
}
}

#[async_trait::async_trait]
impl SinkCommitCoordinator for IcebergSinkCommitter {
async fn init(&mut self) -> Result<()> {
Expand Down Expand Up @@ -1230,31 +1267,26 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
)));
}

let expect_schema_id = write_results[0].schema_id;
let expect_partition_spec_id = write_results[0].partition_spec_id;

// Load the latest table to avoid concurrent modification with the best effort.
self.table = self
.catalog
.clone()
.load_table(self.table.identifier())
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let Some(schema) = self
.table
.metadata()
.schema_by_id(write_results[0].schema_id)
else {
self.reload_table(expect_schema_id, expect_partition_spec_id)
.await?;
let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
return Err(SinkError::Iceberg(anyhow!(
"Can't find schema by id {}",
write_results[0].schema_id
expect_schema_id
)));
};
let Some(partition_spec) = self
.table
.metadata()
.partition_spec_by_id(write_results[0].partition_spec_id)
.partition_spec_by_id(expect_partition_spec_id)
else {
return Err(SinkError::Iceberg(anyhow!(
"Can't find partition spec by id {}",
write_results[0].partition_spec_id
expect_partition_spec_id
)));
};
let partition_type = partition_spec
Expand All @@ -1273,29 +1305,45 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
})
.collect::<Result<Vec<DataFile>>>()?;

let txn = Transaction::new(&self.table);
let mut append_action = txn
.fast_append(None, vec![])
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
append_action
.add_data_files(data_files)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let tx = append_action.apply().await.map_err(|err| {
tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
SinkError::Iceberg(anyhow!(err))
})?;
let table = tx
.commit_dyn(self.catalog.as_ref())
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
self.table = table;
let mut success_commit = false;
for t in 0..self.commit_retry_num {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce Exponential Backoff And Jitter here when retries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider using tokio_retry::Retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Contributor Author

@ZENOTME ZENOTME Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using https://github.com/Xuanwo/backon. Seems tokio_retry is not update for a long time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. LGTM to use backon

let txn = Transaction::new(&self.table);
let mut append_action = txn
.fast_append(None, vec![])
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
append_action
.add_data_files(data_files.clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let tx = append_action.apply().await.map_err(|err| {
tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
SinkError::Iceberg(anyhow!(err))
})?;
match tx.commit_dyn(self.catalog.as_ref()).await {
Ok(table) => {
self.table = table;
success_commit = true;
break;
}
Err(err) => {
self.reload_table(expect_schema_id, expect_partition_spec_id)
.await?;
tracing::error!(error = %err.as_report(), "Failed to commit iceberg table {} time", t);
}
}
}

if !success_commit {
return Err(SinkError::Iceberg(anyhow!(
"Failed to commit iceberg table in epoch {}",
epoch
)));
}

if let Some(commit_notifier) = &mut self.commit_notifier {
if commit_notifier.send(()).is_err() {
warn!("failed to notify commit");
}
}

tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
Ok(())
}
Expand Down Expand Up @@ -1455,6 +1503,7 @@ mod test {
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
create_table_if_not_exists: false,
commit_retry_num: 8,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
Loading