@@ -260,14 +260,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
260260 reader_table_info. len( )
261261 ) ;
262262
263- // Nothing needs to be snapshot.
264- if all_outputs. is_empty ( ) {
265- trace ! ( %id, "no exports to snapshot" ) ;
266- // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
267- // as we do not want to attempt to override the current value with 0. We
268- // just leave it null.
269- return Ok ( ( ) ) ;
270- }
263+
271264
272265 let connection_config = connection
273266 . connection
@@ -277,44 +270,58 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
277270 InTask :: Yes ,
278271 )
279272 . await ?;
280- let task_name = format ! ( "timely-{worker_id} PG snapshotter" ) ;
281273
282- let client = if is_snapshot_leader {
274+
275+ // The snapshot operator is responsible for creating the replication slot(s).
276+ // This first slot is the permanent slot that will be used for reading the replication
277+ // stream. A temporary slot is created further on to capture table snapshots.
278+ let replication_client = if is_snapshot_leader {
283279 let client = connection_config
284280 . connect_replication ( & config. config . connection_context . ssh_tunnel_manager )
285281 . await ?;
286-
287- // Attempt to export the snapshot by creating the main replication slot. If that
288- // succeeds then there is no need for creating additional temporary slots.
289282 let main_slot = & connection. publication_details . slot ;
290- let snapshot_info = match export_snapshot ( & client, main_slot, false ) . await {
291- Ok ( info) => info,
292- Err ( TransientError :: ReplicationSlotAlreadyExists ) => {
293- let tmp_slot = format ! (
294- "mzsnapshot_{}" ,
295- uuid:: Uuid :: new_v4( ) ) . replace ( '-' , ""
296- ) ;
297- export_snapshot ( & client, & tmp_slot, true ) . await ?
298- }
299- Err ( err) => return Err ( err) ,
300- } ;
301- trace ! (
302- %id,
303- "timely-{worker_id} exporting snapshot info {snapshot_info:?}" ) ;
304- snapshot_handle. give ( & snapshot_cap_set[ 0 ] , snapshot_info) ;
305283
306- client
284+ tracing:: info!( %id, "ensuring replication slot {main_slot} exists" ) ;
285+ super :: ensure_replication_slot ( & client, main_slot) . await ?;
286+ Some ( client)
307287 } else {
308- // Only the snapshot leader needs a replication connection.
309- connection_config
310- . connect (
311- & task_name,
312- & config. config . connection_context . ssh_tunnel_manager ,
313- )
314- . await ?
288+ None
315289 } ;
316290 * slot_ready_cap_set = CapabilitySet :: new ( ) ;
317291
292+ // Nothing needs to be snapshot.
293+ if all_outputs. is_empty ( ) {
294+ trace ! ( %id, "no exports to snapshot" ) ;
295+ // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
296+ // as we do not want to attempt to override the current value with 0. We
297+ // just leave it null.
298+ return Ok ( ( ) ) ;
299+ }
300+
301+ // replication client is only set if this worker is the snapshot leader
302+ let client = match replication_client {
303+ Some ( client) => {
304+ let tmp_slot = format ! ( "mzsnapshot_{}" , uuid:: Uuid :: new_v4( ) ) . replace ( '-' , "" ) ;
305+ let snapshot_info = export_snapshot ( & client, & tmp_slot, true ) . await ?;
306+ trace ! (
307+ %id,
308+ "timely-{worker_id} exporting snapshot info {snapshot_info:?}" ) ;
309+ snapshot_handle. give ( & snapshot_cap_set[ 0 ] , snapshot_info) ;
310+
311+ client
312+ }
313+ None => {
314+ // Only the snapshot leader needs a replication connection.
315+ let task_name = format ! ( "timely-{worker_id} PG snapshotter" ) ;
316+ connection_config
317+ . connect (
318+ & task_name,
319+ & config. config . connection_context . ssh_tunnel_manager ,
320+ )
321+ . await ?
322+ }
323+ } ;
324+
318325 // Configure statement_timeout based on param. We want to be able to
319326 // override the server value here in case it's set too low,
320327 // respective to the size of the data we need to copy.
0 commit comments