Skip to content

Add source table support for SQL server #32987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

martykulma
Copy link
Contributor

Adds support for CREATE TABLE .. FROM SOURCE for SQL Server.

Motivation

implements https://github.com/MaterializeInc/database-issues/issues/9202

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@martykulma martykulma marked this pull request as ready for review July 11, 2025 18:52
@martykulma martykulma requested review from a team as code owners July 11, 2025 18:52
@martykulma martykulma requested review from aljoscha and ptravers July 11, 2025 18:52
@martykulma martykulma marked this pull request as draft July 11, 2025 20:25
@martykulma martykulma marked this pull request as ready for review July 12, 2025 00:58
Copy link
Contributor

@petrosagg petrosagg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It all looks great and most of my comments are nits. The only thing that I'm worried about is the selection of snapshot lsn

)
.await?;

// There should be exactly one source_export returned for this statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use source_exports.into_element() to extract the first element and additionally assert that there isn't a second one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw the idiom you used was prior art in the other cases. We could fix those too since we're here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

external_reference: _,
} = sql_server::generate_source_export_statement_values(&scx, purified_export)?;

if let Some(text_cols_option) = with_options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you have if let Some(..) = opt_value and you also include an else statement then it's more idiomatic to write a match statement with Some(..) and None branches

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also carrying forward existing patterns. I updated the lot.

let mut cdc_handle = client
.cdc(capture_instances.keys().cloned())
.max_lsn_wait(SNAPSHOT_MAX_LSN_WAIT.get(config.config.config_set()));

// Snapshot any instances that require it.
// Snapshot any instances that requires it.
// The LSN returned here will be the max LSN for any capture instance on the SQL server.
let snapshot_lsn = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly here we are computing a snapshot LSN that is common among all exports, and that snapshot LSN depends on the max LSN that we get from each capture instance, and capture instances are created on demand as the user requests more and more exports from the available ones.

If we put all these together I think we can end up in a situation where one replica that only knows about export foo computes some snapshot_lsn1 but another replica, that say hears about two exports, foo and bar computes a higher snapshot_lsn2 > snapshot_lsn1 due to the capture instance of bar.

Is this reasoning correct? If yes I think there is a possibility for double ingesting the snapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! I'll have to capture the lsn earlier on. I think MySQL does this as well, should be able to follow the same pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now captures LSN at purification and passes it down for resume_lsn computation

tracing::warn!(%config.id, "timely-{} no resume_lsn, waiting.", config.worker_id);
std::future::pending::<()>().await;
}
let resume_lsn = resume_lsn.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: having this be a let resume_lsn = match resume_lsn {...} is more idiomatic than checking is_none() and then unwraping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we should never continue beyond that point, I opted for let .. else { ...; unreachable!() }; - but am happy to change it to match with None => std::future::pending::<Lsn>().await if you prefer.

@@ -221,22 +222,44 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(

// Start replicating from the LSN __after__ we took the snapshot.
let replication_start_lsn = snapshot_lsn.increment();
let mut rewinds:BTreeMap<_,_> = export_ids_to_snapshot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut rewinds:BTreeMap<_,_> = export_ids_to_snapshot
let mut rewinds: BTreeMap<_, _> = export_ids_to_snapshot

rustfmt gives up in this file :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it does 😂

@@ -349,7 +396,7 @@ type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
/// Helper method to return a "definite" error upstream.
async fn return_definite_error(
err: DefiniteError,
outputs: &[usize],
outputs: impl Iterator<Item = &u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function probably prefers just u64 since it unconditionally dereferences. You can add a .copied() in the .iter() calls in the flatmap above to get a u64 from a &u64 iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@martykulma martykulma requested a review from petrosagg July 16, 2025 16:05
@martykulma
Copy link
Contributor Author

It all looks great and most of my comments are nits. The only thing that I'm worried about is the selection of snapshot lsn

Thanks for the review @petrosagg! I've split review updates into separate commits. You can ignore the last couple commits, which were me rushing to fix typos and then trying to get CI to work. The most meaningful change is in the initial_lsn commit.

I ran into errors where I was setting things up too quickly and there was no max LSN yet, and tests consistently required sleeps to avoid this. So I opted to include a short retry window in purification to get the LSN for a better experience, and that added a little bit of refactoring to centralize retries.

Comment on lines +1003 to +1008
// If CDC is enabled for a table, there is a period where the max LSN will not be
// available. Rather than return an error to the user, we retry for 5 seconds to
// allow the CDC job a chance to run. By default, the job runs every 5 seconds.
let initial_lsn =
mz_sql_server_util::inspect::get_max_lsn_retry(&mut client, Duration::from_secs(5))
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we make the retry configurable? I guess it's pretty unlikely that someone will reconfigure the timing on the capture instance.

Comment on lines +1622 to +1627
// If CDC is enabled for a table, there is a period where the max LSN will not be
// available. Rather than return an error to the user, we retry for 5 seconds to
// allow the CDC job a chance to run. By default, the job runs every 5 seconds.
let initial_lsn =
mz_sql_server_util::inspect::get_max_lsn_retry(&mut client, Duration::from_secs(5))
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: at a minimum would be nice to get the value we retry for into a shared constant so it's less of a magic number.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants