Skip to content

Commit 8850a63

Browse files
committed
adapter: use persist schema evolution for builtin migration
This commit starts using persist schema evolution for builtin item migration. For each builtin storage collection to migrate, we first attempt to perform a schema evolution. The attempt may fail, if persist considers the new schema not compatible with the old one, in which case the existing shard replacement approach is performed as a fallback.
1 parent 95e2633 commit 8850a63

File tree

1 file changed

+193
-108
lines changed

1 file changed

+193
-108
lines changed

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 193 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
2828
use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
2929
use mz_persist_client::critical::SinceHandle;
3030
use mz_persist_client::read::ReadHandle;
31+
use mz_persist_client::schema::CaESchema;
3132
use mz_persist_client::write::WriteHandle;
3233
use mz_persist_client::{Diagnostics, PersistClient};
3334
use mz_persist_types::ShardId;
34-
use mz_persist_types::codec_impls::ShardIdSchema;
35+
use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
3536
use mz_repr::{CatalogItemId, GlobalId, Timestamp};
3637
use mz_sql::catalog::CatalogItem as _;
3738
use mz_storage_client::controller::StorageTxn;
3839
use mz_storage_types::StorageDiff;
40+
use mz_storage_types::sources::SourceData;
3941
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
40-
use tracing::{debug, error};
42+
use tracing::{debug, error, info};
4143

4244
use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
4345
use crate::catalog::state::LocalExpressionCache;
@@ -54,6 +56,15 @@ pub(crate) struct BuiltinItemMigrationResult {
5456
}
5557

5658
/// Perform migrations for any builtin items that may have changed between versions.
59+
///
60+
/// We only need to do anything for items that have an associated storage collection. Others
61+
/// (views, indexes) don't have any durable state that requires migration.
62+
///
63+
/// We have the ability to handle some backward-compatible schema changes through persist schema
64+
/// evolution, and we do so when possible. For changes that schema evolution doesn't support, we
65+
/// instead "migrate" the affected storage collections by creating new persist shards with the new
66+
/// schemas and dropping the old ones. See [`migrate_builtin_collections_incompatible`] for
67+
/// details.
5768
pub(crate) async fn migrate_builtin_items(
5869
state: &mut CatalogState,
5970
txn: &mut Transaction<'_>,
@@ -64,27 +75,177 @@ pub(crate) async fn migrate_builtin_items(
6475
read_only,
6576
}: BuiltinItemMigrationConfig,
6677
) -> Result<BuiltinItemMigrationResult, Error> {
67-
migrate_builtin_items_0dt(
78+
assert_eq!(
79+
read_only,
80+
txn.is_savepoint(),
81+
"txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
82+
);
83+
84+
update_catalog_fingerprints(state, txn, &migrated_builtins)?;
85+
86+
// Collect GlobalIds of storage collections we need to migrate.
87+
let collections_to_migrate: Vec<_> = migrated_builtins
88+
.into_iter()
89+
.filter_map(|id| {
90+
use CatalogItem::*;
91+
match &state.get_entry(&id).item() {
92+
Table(table) => Some(table.global_ids().into_element()),
93+
Source(source) => Some(source.global_id()),
94+
MaterializedView(mv) => Some(mv.global_id()),
95+
ContinualTask(ct) => Some(ct.global_id()),
96+
Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
97+
| Connection(_) => None,
98+
}
99+
})
100+
.collect();
101+
102+
// Attempt to perform schema evolution.
103+
let collections_to_migrate =
104+
try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
105+
106+
// For collections whose schemas we couldn't evolve, perform the replacement process.
107+
// Note that we need to invoke this process even if `collections_to_migrate` is empty because
108+
// it also cleans up any leftovers of previous migrations from the migration shard.
109+
migrate_builtin_collections_incompatible(
68110
state,
69111
txn,
70112
local_expr_cache,
71113
persist_client,
72-
migrated_builtins,
114+
collections_to_migrate,
73115
read_only,
74116
)
75117
.await
76118
}
77119

78-
/// An implementation of builtin item migrations that is compatible with zero down-time upgrades.
79-
/// The issue with the legacy approach is that it mints new global IDs for each migrated item and
80-
/// its descendents, without durably writing those IDs down in the catalog. As a result, the
81-
/// previous Materialize version, which is still running, may allocate the same global IDs. This
82-
/// would cause confusion for the current version when it's promoted to the leader because its
83-
/// definition of global IDs will no longer be valid. At best, the current version would have to
84-
/// rehydrate all objects that depend on migrated items. At worst, it would panic.
120+
/// Update the durably stored fingerprints of `migrated_builtins`.
121+
fn update_catalog_fingerprints(
122+
state: &mut CatalogState,
123+
txn: &mut Transaction<'_>,
124+
migrated_builtins: &[CatalogItemId],
125+
) -> Result<(), Error> {
126+
let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
127+
.map(|builtin| {
128+
let id = state.resolve_builtin_object(builtin);
129+
let fingerprint = builtin.fingerprint();
130+
(id, fingerprint)
131+
})
132+
.collect();
133+
let mut migrated_system_object_mappings = BTreeMap::new();
134+
for item_id in migrated_builtins {
135+
let fingerprint = id_fingerprint_map
136+
.get(item_id)
137+
.expect("missing fingerprint");
138+
let entry = state.get_entry(item_id);
139+
let schema_name = state
140+
.get_schema(
141+
&entry.name().qualifiers.database_spec,
142+
&entry.name().qualifiers.schema_spec,
143+
entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
144+
)
145+
.name
146+
.schema
147+
.as_str();
148+
// Builtin Items can only be referenced by a single GlobalId.
149+
let global_id = state.get_entry(item_id).global_ids().into_element();
150+
151+
migrated_system_object_mappings.insert(
152+
*item_id,
153+
SystemObjectMapping {
154+
description: SystemObjectDescription {
155+
schema_name: schema_name.to_string(),
156+
object_type: entry.item_type(),
157+
object_name: entry.name().item.clone(),
158+
},
159+
unique_identifier: SystemObjectUniqueIdentifier {
160+
catalog_id: *item_id,
161+
global_id,
162+
fingerprint: fingerprint.clone(),
163+
},
164+
},
165+
);
166+
}
167+
txn.update_system_object_mappings(migrated_system_object_mappings)?;
168+
169+
Ok(())
170+
}
171+
172+
/// Attempt to migrate the given builtin collections using persist schema evolution.
173+
///
174+
/// Returns the IDs of collections for which schema evolution did not succeed.
175+
async fn try_evolve_persist_schemas(
176+
state: &mut CatalogState,
177+
txn: &mut Transaction<'_>,
178+
migrated_storage_collections: Vec<GlobalId>,
179+
persist_client: &PersistClient,
180+
) -> Result<Vec<GlobalId>, Error> {
181+
let collection_metadata = txn.get_collection_metadata();
182+
183+
let mut failed = Vec::new();
184+
for id in migrated_storage_collections {
185+
let Some(&shard_id) = collection_metadata.get(&id) else {
186+
return Err(Error::new(ErrorKind::Internal(format!(
187+
"builtin migration: missing metadata for builtin collection {id}"
188+
))));
189+
};
190+
191+
let diagnostics = Diagnostics {
192+
shard_name: id.to_string(),
193+
handle_purpose: "migrate builtin schema".to_string(),
194+
};
195+
let Some((old_schema_id, old_schema, _)) = persist_client
196+
.latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
197+
.await
198+
.expect("invalid usage")
199+
else {
200+
return Err(Error::new(ErrorKind::Internal(format!(
201+
"builtin migration: missing old schema for builtin collection {id}"
202+
))));
203+
};
204+
205+
let entry = state.get_entry_by_global_id(&id);
206+
let Some(new_schema) = entry.desc_opt_latest() else {
207+
return Err(Error::new(ErrorKind::Internal(format!(
208+
"builtin migration: missing new schema for builtin collection {id}"
209+
))));
210+
};
211+
212+
info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
213+
214+
let result = persist_client
215+
.compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
216+
shard_id,
217+
old_schema_id,
218+
&new_schema,
219+
&UnitSchema,
220+
diagnostics,
221+
)
222+
.await
223+
.expect("invalid usage");
224+
225+
match result {
226+
CaESchema::Ok(_) => {
227+
info!("builtin schema evolution succeeded");
228+
}
229+
CaESchema::Incompatible => {
230+
info!("builtin schema evolution failed");
231+
failed.push(id);
232+
}
233+
CaESchema::ExpectedMismatch { schema_id, .. } => {
234+
return Err(Error::new(ErrorKind::Internal(format!(
235+
"builtin migration: unexpected schema mismatch ({} != {})",
236+
schema_id, old_schema_id,
237+
))));
238+
}
239+
}
240+
}
241+
242+
Ok(failed)
243+
}
244+
245+
/// Migrate builtin collections that are not supported by persist schema evolution.
85246
///
86247
/// The high level description of this approach is that we create new shards for each migrated
87-
/// builtin table with the new table schema, without changing the global ID. Dependent objects are
248+
/// builtin collection with the new schema, without changing the global ID. Dependent objects are
88249
/// not modified but now read from the new shards.
89250
///
90251
/// A detailed description of this approach follows. It's important that all of these steps are
@@ -93,21 +254,20 @@ pub(crate) async fn migrate_builtin_items(
93254
/// 1. Each environment has a dedicated persist shard, called the migration shard, that allows
94255
/// environments to durably write down metadata while in read-only mode. The shard is a
95256
/// mapping of `(GlobalId, build_version)` to `ShardId`.
96-
/// 2. Collect the `GlobalId` of all migrated tables for the current build version.
97-
/// 3. Read in the current contents of the migration shard.
98-
/// 4. Collect all the `ShardId`s from the migration shard that are not at the current
99-
/// `build_version` or are not in the set of migrated tables.
257+
/// 2. Read in the current contents of the migration shard.
258+
/// 3. Collect all the `ShardId`s from the migration shard that are not at the current
259+
/// `build_version` or are not in the set of migrated collections.
100260
/// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
101261
/// from an incomplete migration. Finalize them and remove them from the migration shard.
102262
/// Note: care must be taken to not remove the shard from the migration shard until we are
103263
/// sure that they will be finalized, otherwise the shard will leak.
104264
/// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
105-
/// complete migration. Remove them from the migration shard.
106-
/// 5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
107-
/// for the current build version. Generate new `ShardId`s and add them to the migration
108-
/// shard.
109-
/// 6. At this point the migration shard should only logically contain a mapping of migrated
110-
/// table `GlobalId`s to new `ShardId`s for the current build version. For each of these
265+
/// complete migration. Remove them from the migration shard.
266+
/// 4. Collect all the `GlobalId`s of collections that are migrated, but not in the migration
267+
/// shard for the current build version. Generate new `ShardId`s and add them to the
268+
/// migration shard.
269+
/// 5. At this point the migration shard should only logically contain a mapping of migrated
270+
/// collection `GlobalId`s to new `ShardId`s for the current build version. For each of these
111271
/// `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
112272
/// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
113273
/// b. Finalize the removed `ShardId`s.
@@ -119,7 +279,6 @@ pub(crate) async fn migrate_builtin_items(
119279
///
120280
/// Since the new shards are created in read-only mode, they will be left empty and all dependent
121281
/// items will fail to hydrate.
122-
/// TODO(jkosh44) Back-fill these tables in read-only mode so they can properly hydrate.
123282
///
124283
/// While in read-only mode we write the migration changes to `txn`, which will update the
125284
/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't
@@ -129,65 +288,25 @@ pub(crate) async fn migrate_builtin_items(
129288
/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to
130289
/// remove anything from the migration shard until we're sure that its results have been made
131290
/// durable elsewhere.
132-
async fn migrate_builtin_items_0dt(
291+
async fn migrate_builtin_collections_incompatible(
133292
state: &mut CatalogState,
134293
txn: &mut Transaction<'_>,
135294
local_expr_cache: &mut LocalExpressionCache,
136295
persist_client: PersistClient,
137-
migrated_builtins: Vec<CatalogItemId>,
296+
migrated_storage_collections: Vec<GlobalId>,
138297
read_only: bool,
139298
) -> Result<BuiltinItemMigrationResult, Error> {
140-
assert_eq!(
141-
read_only,
142-
txn.is_savepoint(),
143-
"txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
144-
);
145-
146299
let build_version = state.config.build_info.semver_version();
147300

148-
// 0. Update durably stored fingerprints.
149-
let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
150-
.map(|builtin| {
151-
let id = state.resolve_builtin_object(builtin);
152-
let fingerprint = builtin.fingerprint();
153-
(id, fingerprint)
301+
// The migration shard only stores raw GlobalIds, so it's more convenient to keep the list of
302+
// migrated collections in that form.
303+
let migrated_storage_collections: Vec<_> = migrated_storage_collections
304+
.into_iter()
305+
.map(|gid| match gid {
306+
GlobalId::System(raw) => raw,
307+
_ => panic!("builtins must have system IDs"),
154308
})
155309
.collect();
156-
let mut migrated_system_object_mappings = BTreeMap::new();
157-
for item_id in &migrated_builtins {
158-
let fingerprint = id_fingerprint_map
159-
.get(item_id)
160-
.expect("missing fingerprint");
161-
let entry = state.get_entry(item_id);
162-
let schema_name = state
163-
.get_schema(
164-
&entry.name().qualifiers.database_spec,
165-
&entry.name().qualifiers.schema_spec,
166-
entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
167-
)
168-
.name
169-
.schema
170-
.as_str();
171-
// Builtin Items can only be referenced by a single GlobalId.
172-
let global_id = state.get_entry(item_id).global_ids().into_element();
173-
174-
migrated_system_object_mappings.insert(
175-
*item_id,
176-
SystemObjectMapping {
177-
description: SystemObjectDescription {
178-
schema_name: schema_name.to_string(),
179-
object_type: entry.item_type(),
180-
object_name: entry.name().item.clone(),
181-
},
182-
unique_identifier: SystemObjectUniqueIdentifier {
183-
catalog_id: *item_id,
184-
global_id,
185-
fingerprint: fingerprint.clone(),
186-
},
187-
},
188-
);
189-
}
190-
txn.update_system_object_mappings(migrated_system_object_mappings)?;
191310

192311
// 1. Open migration shard.
193312
let organization_id = state.config.environment_id.organization_id();
@@ -238,38 +357,7 @@ async fn migrate_builtin_items_0dt(
238357
debug!("migration shard already initialized: {e:?}");
239358
}
240359

241-
// 2. Get the `GlobalId` of all migrated storage collections.
242-
let migrated_storage_collections: BTreeSet<_> = migrated_builtins
243-
.into_iter()
244-
.filter_map(|item_id| {
245-
let gid = match state.get_entry(&item_id).item() {
246-
CatalogItem::Table(table) => {
247-
let mut ids: Vec<_> = table.global_ids().collect();
248-
assert_eq!(ids.len(), 1, "{ids:?}");
249-
ids.pop().expect("checked length")
250-
}
251-
CatalogItem::Source(source) => source.global_id(),
252-
CatalogItem::MaterializedView(mv) => mv.global_id(),
253-
CatalogItem::ContinualTask(ct) => ct.global_id(),
254-
CatalogItem::Log(_)
255-
| CatalogItem::Sink(_)
256-
| CatalogItem::View(_)
257-
| CatalogItem::Index(_)
258-
| CatalogItem::Type(_)
259-
| CatalogItem::Func(_)
260-
| CatalogItem::Secret(_)
261-
| CatalogItem::Connection(_) => return None,
262-
};
263-
let GlobalId::System(raw_gid) = gid else {
264-
unreachable!(
265-
"builtin objects must have system ID, found: {item_id:?} with {gid:?}"
266-
);
267-
};
268-
Some(raw_gid)
269-
})
270-
.collect();
271-
272-
// 3. Read in the current contents of the migration shard.
360+
// 2. Read in the current contents of the migration shard.
273361
// We intentionally fetch the upper AFTER opening the read handle to address races between
274362
// the upper and since moving forward in some other process.
275363
let upper = fetch_upper(&mut write_handle).await;
@@ -305,10 +393,7 @@ async fn migrate_builtin_items_0dt(
305393
// 4. Clean up contents of migration shard.
306394
let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
307395
let mut migration_shards_to_finalize = BTreeSet::new();
308-
let storage_collection_metadata = {
309-
let txn: &mut dyn StorageTxn<Timestamp> = txn;
310-
txn.get_collection_metadata()
311-
};
396+
let storage_collection_metadata = txn.get_collection_metadata();
312397
for (table_key, shard_id) in global_id_shards.clone() {
313398
if table_key.build_version > build_version {
314399
halt!(

0 commit comments

Comments
 (0)