Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ def break_snapmirror(self, source_share_obj, dest_share_obj, mount=True,
1. Quiesce any ongoing snapmirror transfers
2. Wait until snapmirror finishes transfers and enters quiesced state
3. Break snapmirror
4. Mount the destination volume so it is exported as a share
4. Wait for volume to transition from DP to RW
5. Mount the destination volume so it is exported as a share
"""
dest_volume_name, dest_vserver, dest_backend = (
self.get_backend_info_for_share(dest_share_obj))
Expand All @@ -434,7 +435,41 @@ def break_snapmirror(self, source_share_obj, dest_share_obj, mount=True,
dest_vserver,
dest_volume_name)

# 3. Mount the destination volume and create a junction path
# 3. Wait for volume to become RW after break
config = get_backend_configuration(dest_backend)
timeout = config.netapp_snapmirror_quiesce_timeout
retries = int(timeout / 5) or 1

@utils.retry(retry_param=exception.ReplicationException,
interval=5,
retries=retries,
backoff_rate=1)
def wait_for_rw_volume():
volume = dest_client.get_volume(dest_volume_name)
volume_type = volume.get('type')
if volume_type != 'rw':
msg_args = {
'volume': dest_volume_name,
'type': volume_type,
}
msg = ('Volume %(volume)s is still type %(type)s, '
'waiting for RW after snapmirror break.') % msg_args
LOG.debug(msg)
raise exception.ReplicationException(reason=msg)

try:
wait_for_rw_volume()
except exception.ReplicationException:
msg_args = {
'volume': dest_volume_name,
'timeout': timeout,
}
msg = _('Volume %(volume)s did not become RW within %(timeout)s '
'seconds after snapmirror break.') % msg_args
LOG.exception(msg)
raise exception.NetAppException(message=msg)

# 4. Mount the destination volume and create a junction path
if mount:
dest_client.mount_volume(dest_volume_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,12 @@ def _convert_destination_replica_to_independent(
LOG.warning(msg)
pass
# 2. Break SnapMirror
# For readable replicas, the volume is already mounted (with junction
# path) so we don't need to mount again after break. DR replicas are
# not mounted and need mounting after break.
is_readable = self._is_readable_replica(replica)
dm_session.break_snapmirror(orig_active_replica, replica,
mount=not is_readable,
quiesce_wait_time=quiesce_wait_time)

# 3. Setup access rules
Expand Down
37 changes: 29 additions & 8 deletions manila/share/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3028,18 +3028,11 @@ def _share_replica_update(self, context, share_replica_id, share_id=None):
# or if they are the 'active' instance.
if (share_replica['status'] in constants.TRANSITIONAL_STATUSES
or share_replica['status'] == constants.STATUS_ERROR_DELETING
or share_replica['status'] == constants.STATUS_REPLICATION_CHANGE
or share_replica['replica_state'] ==
constants.REPLICA_STATE_ACTIVE):
return

share_server = self._get_share_server(context, share_replica)

access_rules = self.db.share_access_get_all_for_share(
context, share_replica['share_id'])

LOG.debug("Updating status of share share_replica %s: ",
share_replica['id'])

# _get_share_instance_dict will fetch share server
replica_list = (
self.db.share_replicas_get_all_by_share(
Expand All @@ -3064,6 +3057,34 @@ def _share_replica_update(self, context, share_replica_id, share_id=None):
# non-active replica
return

# If both the active replica and the target replica are in error
# status, skip the update. This likely indicates a failed promotion
# or other critical operation that requires manual intervention.
# Attempting to restore replication automatically could recreate
# the relationship in the wrong direction.
if (_active_replica['status'] == constants.STATUS_ERROR and
share_replica['status'] == constants.STATUS_ERROR):
msg_args = {
'replica_id': share_replica['id'],
'active_id': _active_replica['id'],
'share_id': share_replica['share_id'],
}
msg = ("Skipping replica update for replica %(replica_id)s. "
"Both the active replica %(active_id)s and this replica "
"are in error status for share %(share_id)s. This may "
"indicate a failed promotion that requires manual "
"intervention.") % msg_args
LOG.warning(msg)
return

share_server = self._get_share_server(context, share_replica)

access_rules = self.db.share_access_get_all_for_share(
context, share_replica['share_id'])

LOG.debug("Updating status of share share_replica %s: ",
share_replica['id'])

# Get snapshots for the share.
share_snapshots = self.db.share_snapshot_get_all_for_share(
context, share_replica['share_id'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ def test_delete_snapmirror_source_unreachable(self):

def test_break_snapmirror(self):
self.mock_object(self.dm_session, 'quiesce_then_abort')
self.mock_dest_client.get_volume.return_value = {'type': 'rw'}

self.dm_session.break_snapmirror(self.fake_src_share,
self.fake_dest_share)
Expand All @@ -510,11 +511,15 @@ def test_break_snapmirror(self):
self.fake_src_share, self.fake_dest_share,
quiesce_wait_time=None)

self.mock_dest_client.get_volume.assert_called_once_with(
self.fake_dest_vol_name)

self.mock_dest_client.mount_volume.assert_called_once_with(
self.fake_dest_vol_name)

def test_break_snapmirror_no_mount(self):
self.mock_object(self.dm_session, 'quiesce_then_abort')
self.mock_dest_client.get_volume.return_value = {'type': 'rw'}

self.dm_session.break_snapmirror(self.fake_src_share,
self.fake_dest_share,
Expand All @@ -528,10 +533,44 @@ def test_break_snapmirror_no_mount(self):
self.fake_src_share, self.fake_dest_share,
quiesce_wait_time=None)

self.mock_dest_client.get_volume.assert_called_once_with(
self.fake_dest_vol_name)

self.assertFalse(self.mock_dest_client.mount_volume.called)

def test_break_snapmirror_wait_for_quiesced(self):
self.mock_object(self.dm_session, 'quiesce_then_abort')
self.mock_dest_client.get_volume.return_value = {'type': 'rw'}

self.dm_session.break_snapmirror(self.fake_src_share,
self.fake_dest_share)

self.dm_session.quiesce_then_abort.assert_called_once_with(
self.fake_src_share, self.fake_dest_share,
quiesce_wait_time=None)

self.mock_dest_client.break_snapmirror_vol.assert_called_once_with(
self.source_vserver, self.fake_src_vol_name,
self.dest_vserver, self.fake_dest_vol_name)

self.mock_dest_client.get_volume.assert_called_once_with(
self.fake_dest_vol_name)

self.mock_dest_client.mount_volume.assert_called_once_with(
self.fake_dest_vol_name)

def test_break_snapmirror_wait_for_rw(self):
self.mock_object(self.dm_session, 'quiesce_then_abort')
self.mock_object(time, 'sleep')
# First call returns DP, second call returns RW
self.mock_dest_client.get_volume.side_effect = [
{'type': 'dp'},
{'type': 'rw'},
]
mock_backend_config = na_fakes.create_configuration()
mock_backend_config.netapp_snapmirror_quiesce_timeout = 10
self.mock_object(data_motion, 'get_backend_configuration',
mock.Mock(return_value=mock_backend_config))

self.dm_session.break_snapmirror(self.fake_src_share,
self.fake_dest_share)
Expand All @@ -544,9 +583,42 @@ def test_break_snapmirror_wait_for_quiesced(self):
self.source_vserver, self.fake_src_vol_name,
self.dest_vserver, self.fake_dest_vol_name)

# Should be called twice - first returns dp, second returns rw
self.assertEqual(2, self.mock_dest_client.get_volume.call_count)
time.sleep.assert_called_with(5)

self.mock_dest_client.mount_volume.assert_called_once_with(
self.fake_dest_vol_name)

def test_break_snapmirror_timeout_waiting_for_rw(self):
self.mock_object(self.dm_session, 'quiesce_then_abort')
self.mock_object(time, 'sleep')
# Always return DP - never becomes RW
self.mock_dest_client.get_volume.return_value = {'type': 'dp'}
mock_backend_config = na_fakes.create_configuration()
mock_backend_config.netapp_snapmirror_quiesce_timeout = 10
self.mock_object(data_motion, 'get_backend_configuration',
mock.Mock(return_value=mock_backend_config))

self.assertRaises(exception.NetAppException,
self.dm_session.break_snapmirror,
self.fake_src_share,
self.fake_dest_share)

self.dm_session.quiesce_then_abort.assert_called_once_with(
self.fake_src_share, self.fake_dest_share,
quiesce_wait_time=None)

self.mock_dest_client.break_snapmirror_vol.assert_called_once_with(
self.source_vserver, self.fake_src_vol_name,
self.dest_vserver, self.fake_dest_vol_name)

# Should retry based on timeout/interval (10s / 5s = 2 total attempts)
self.assertEqual(2, self.mock_dest_client.get_volume.call_count)

# Should NOT mount if volume never became RW
self.assertFalse(self.mock_dest_client.mount_volume.called)

@ddt.data(None, 2, 30)
def test_quiesce_then_abort_wait_time(self, wait_time):
self.mock_object(time, 'sleep')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4921,6 +4921,9 @@ def test_convert_destination_replica_to_independent(self):
mock.Mock(return_value=mock.Mock()))
self.mock_object(self.library, '_create_export',
mock.Mock(return_value='fake_export_location'))
self.mock_object(self.library,
'_is_readable_replica',
mock.Mock(return_value=False))

replica = self.library._convert_destination_replica_to_independent(
None, self.mock_dm_session, self.fake_replica,
Expand All @@ -4930,7 +4933,7 @@ def test_convert_destination_replica_to_independent(self):
self.fake_replica, self.fake_replica_2)
self.mock_dm_session.break_snapmirror.assert_called_once_with(
self.fake_replica, self.fake_replica_2,
quiesce_wait_time=None)
mount=True, quiesce_wait_time=None)

self.assertEqual('fake_export_location',
replica['export_locations'])
Expand All @@ -4947,6 +4950,9 @@ def test_convert_destination_replica_to_independent_update_failed(self):
mock.Mock(return_value=mock.Mock()))
self.mock_object(self.library, '_create_export',
mock.Mock(return_value='fake_export_location'))
self.mock_object(self.library,
'_is_readable_replica',
mock.Mock(return_value=False))
self.mock_object(
self.mock_dm_session, 'update_snapmirror',
mock.Mock(side_effect=exception.StorageCommunicationException))
Expand All @@ -4959,7 +4965,7 @@ def test_convert_destination_replica_to_independent_update_failed(self):
self.fake_replica, self.fake_replica_2)
self.mock_dm_session.break_snapmirror.assert_called_once_with(
self.fake_replica, self.fake_replica_2,
quiesce_wait_time=None)
mount=True, quiesce_wait_time=None)

self.assertEqual('fake_export_location',
replica['export_locations'])
Expand Down Expand Up @@ -5039,6 +5045,9 @@ def test_convert_destination_replica_to_independent_with_access_rules(
mock.Mock(return_value=fake_helper))
self.mock_object(self.library, '_create_export',
mock.Mock(return_value='fake_export_location'))
self.mock_object(self.library,
'_is_readable_replica',
mock.Mock(return_value=False))

replica = self.library._convert_destination_replica_to_independent(
None, self.mock_dm_session, self.fake_replica,
Expand All @@ -5048,7 +5057,7 @@ def test_convert_destination_replica_to_independent_with_access_rules(
self.fake_replica, self.fake_replica_2)
self.mock_dm_session.break_snapmirror.assert_called_once_with(
self.fake_replica, self.fake_replica_2,
quiesce_wait_time=None)
mount=True, quiesce_wait_time=None)

self.assertEqual('fake_export_location',
replica['export_locations'])
Expand All @@ -5075,6 +5084,9 @@ def test_convert_destination_replica_to_independent_failed_access_rules(
mock.Mock(return_value=fake_helper))
self.mock_object(self.library, '_create_export',
mock.Mock(return_value='fake_export_location'))
self.mock_object(self.library,
'_is_readable_replica',
mock.Mock(return_value=False))

replica = self.library._convert_destination_replica_to_independent(
None, self.mock_dm_session, self.fake_replica,
Expand All @@ -5084,7 +5096,7 @@ def test_convert_destination_replica_to_independent_failed_access_rules(
self.fake_replica, self.fake_replica_2)
self.mock_dm_session.break_snapmirror.assert_called_once_with(
self.fake_replica, self.fake_replica_2,
quiesce_wait_time=None)
mount=True, quiesce_wait_time=None)

fake_helper.assert_has_calls([
mock.call.set_client(mock.ANY),
Expand Down
32 changes: 31 additions & 1 deletion manila/tests/share/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,34 @@ def test__share_replica_update_driver_exception(self, replica_state):
resource_id=replica['id'],
exception=mock.ANY)

def test__share_replica_update_both_replicas_in_error(self):
"""Test that update is skipped when both replicas are in error."""
mock_warning_log = self.mock_object(manager.LOG, 'warning')
replica = fake_replica(
replica_state=constants.REPLICA_STATE_OUT_OF_SYNC,
status=constants.STATUS_ERROR)
active_replica = fake_replica(
id='fake2',
replica_state=constants.REPLICA_STATE_ACTIVE,
status=constants.STATUS_ERROR)
self.mock_object(db, 'share_replicas_get_all_by_share',
mock.Mock(return_value=[replica, active_replica]))
self.mock_object(self.share_manager.db, 'share_replica_get',
mock.Mock(return_value=replica))
mock_driver_call = self.mock_object(
self.share_manager.driver, 'update_replica_state')

self.share_manager._share_replica_update(
self.context, replica['id'], share_id=replica['share_id'])

# Driver should not be called
self.assertFalse(mock_driver_call.called)
# Warning should be logged
self.assertEqual(1, mock_warning_log.call_count)
warning_msg = mock_warning_log.call_args[0][0]
self.assertIn('Both the active replica', warning_msg)
self.assertIn('are in error status', warning_msg)

def test__share_replica_update_driver_exception_ignored(self):
mock_debug_log = self.mock_object(manager.LOG, 'debug')
replica = fake_replica(replica_state=constants.STATUS_ERROR)
Expand Down Expand Up @@ -1907,7 +1935,9 @@ def test__share_replica_update_driver_exception_ignored(self):
{'status': constants.STATUS_EXTENDING,
'replica_state': constants.REPLICA_STATE_IN_SYNC, },
{'status': constants.STATUS_SHRINKING,
'replica_state': constants.REPLICA_STATE_IN_SYNC, })
'replica_state': constants.REPLICA_STATE_IN_SYNC, },
{'status': constants.STATUS_REPLICATION_CHANGE,
'replica_state': constants.REPLICA_STATE_OUT_OF_SYNC, })
def test__share_replica_update_unqualified_replica(self, state):
mock_debug_log = self.mock_object(manager.LOG, 'debug')
mock_warning_log = self.mock_object(manager.LOG, 'warning')
Expand Down