|
# Copyright 2025 OpenStack Foundation |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
|
# not use this file except in compliance with the License. You may obtain |
|
# a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|
# License for the specific language governing permissions and limitations |
|
# under the License. |
|
|
|
"""Volume driver that wraps LVM and injects configurable delays. |
|
|
|
This driver is intended for testing graceful shutdown behavior with |
|
long-running operations on a devstack environment. It wraps the real |
|
LVMVolumeDriver and adds a configurable sleep before selected driver |
|
operations, simulating slow storage backend behavior. |
|
|
|
The delay runs in a real native OS thread so that it cannot be |
|
interrupted by eventlet greenthread kills (which happen during |
|
SIGTERM/graceful shutdown). This accurately simulates real-world |
|
storage backend operations (e.g. SAN provisioning API calls) that |
|
are not interruptible by local process signals. |
|
|
|
Deployment:: |
|
|
|
scp fake_slow_driver.py \ |
|
waboring@devstack.hemna.com:/opt/stack/cinder/cinder/tests/ |
|
|
|
Usage in cinder.conf:: |
|
|
|
[DEFAULT] |
|
enabled_backends = slow-lvm |
|
|
|
[slow-lvm] |
|
volume_driver = cinder.tests.fake_slow_driver.FakeSlowVolumeDriver |
|
volume_backend_name = slow-lvm |
|
volume_group = stack-volumes-lvmdriver-1 |
|
target_helper = lioadm |
|
slow_driver_delay = 60 |
|
slow_driver_operations = create_volume |
|
|
|
Then restart cinder-volume:: |
|
|
|
sudo systemctl restart devstack@c-vol |
|
""" |
|
|
|
import concurrent.futures |
|
import threading |
|
import time |
|
|
|
from oslo_config import cfg |
|
from oslo_log import log as logging |
|
|
|
from cinder.volume.drivers import lvm |
|
|
|
LOG = logging.getLogger(__name__) |
|
|
|
slow_driver_opts = [ |
|
cfg.IntOpt( |
|
'slow_driver_delay', |
|
default=30, |
|
min=0, |
|
help='Number of seconds to sleep during each configured ' |
|
'driver operation. Set to 0 to disable delays. ' |
|
'This is used for testing graceful shutdown behavior ' |
|
'with long-running operations.', |
|
), |
|
cfg.ListOpt( |
|
'slow_driver_operations', |
|
default=['create_volume'], |
|
help='List of driver operations to inject delays into. ' |
|
'Supported operations: create_volume, delete_volume, ' |
|
'create_snapshot, delete_snapshot, extend_volume, ' |
|
'create_cloned_volume, create_volume_from_snapshot, ' |
|
'copy_volume_to_image, create_group, delete_group, ' |
|
'create_group_snapshot, delete_group_snapshot.', |
|
), |
|
] |
|
|
|
CONF = cfg.CONF |
|
|
|
# Use a single-thread executor for running delays in real native |
|
# threads. This ensures the sleep cannot be interrupted by eventlet |
|
# greenthread kills (e.g. during SIGTERM), accurately simulating |
|
# a real storage backend operation that is not interruptible by |
|
# local process signals. |
|
_delay_executor = concurrent.futures.ThreadPoolExecutor( |
|
max_workers=4, |
|
thread_name_prefix="slow-driver-delay", |
|
) |
|
|
|
|
|
def _native_sleep(seconds): |
|
"""Sleep in a real OS thread, immune to eventlet greenthread kills. |
|
|
|
Under eventlet monkey-patching, time.sleep becomes |
|
eventlet.greenthread.sleep which can be interrupted when a |
|
greenthread is killed (e.g. during SIGTERM shutdown). |
|
|
|
By running this in a ThreadPoolExecutor, the actual sleep happens |
|
in a native OS thread that eventlet cannot kill. The calling |
|
greenthread blocks on future.result() but the underlying sleep |
|
is uninterruptible. |
|
""" |
|
future = _delay_executor.submit(time.sleep, seconds) |
|
# Block until the sleep completes. If the greenthread is killed |
|
# while we wait, the native thread sleep still runs to completion |
|
# in the background. |
|
try: |
|
future.result(timeout=seconds + 30) |
|
except concurrent.futures.TimeoutError: |
|
LOG.warning("[SlowDriver] Delay timed out after %d+30 seconds", |
|
seconds) |
|
|
|
|
|
class FakeSlowVolumeDriver(lvm.LVMVolumeDriver): |
|
"""LVM driver wrapper that injects configurable delays. |
|
|
|
Wraps the real LVMVolumeDriver and sleeps for a configurable |
|
number of seconds before selected operations. This is useful |
|
for testing graceful shutdown, timeout behavior, and |
|
long-running operation handling. |
|
|
|
The delay is injected BEFORE the real driver method is called, |
|
so the operation itself still executes normally against real |
|
LVM volumes. |
|
|
|
The delay runs in a native OS thread (not an eventlet greenthread) |
|
so it cannot be interrupted by SIGTERM signal handling or |
|
greenthread kills. This accurately models real storage backends |
|
where operations like LUN provisioning cannot be cancelled once |
|
started. |
|
""" |
|
|
|
VERSION = '1.0.0' |
|
CI_WIKI_NAME = "Cinder_Jenkins" |
|
|
|
# Tracks active operations for test observability |
|
_active_operations = {} |
|
_active_operations_lock = threading.Lock() |
|
|
|
SUPPORTED_SLOW_OPERATIONS = frozenset([ |
|
'create_volume', |
|
'delete_volume', |
|
'create_snapshot', |
|
'delete_snapshot', |
|
'extend_volume', |
|
'create_cloned_volume', |
|
'create_volume_from_snapshot', |
|
'copy_volume_to_image', |
|
'create_group', |
|
'delete_group', |
|
'create_group_snapshot', |
|
'delete_group_snapshot', |
|
]) |
|
|
|
def __init__(self, *args, **kwargs): |
|
super(FakeSlowVolumeDriver, self).__init__(*args, **kwargs) |
|
self.configuration.append_config_values(slow_driver_opts) |
|
self._delay = self.configuration.slow_driver_delay |
|
self._slow_ops = set(self.configuration.slow_driver_operations) |
|
|
|
unsupported = self._slow_ops - self.SUPPORTED_SLOW_OPERATIONS |
|
if unsupported: |
|
LOG.warning("[SlowDriver] Unsupported operations configured " |
|
"for delay (ignored): %s", unsupported) |
|
self._slow_ops -= unsupported |
|
|
|
LOG.info("[SlowDriver] Initialized with delay=%ds for " |
|
"operations: %s", self._delay, self._slow_ops) |
|
|
|
def _inject_delay(self, operation, resource_id=None): |
|
"""Sleep for the configured delay if this operation is slowed. |
|
|
|
The sleep runs in a native OS thread via ThreadPoolExecutor, |
|
so it is immune to eventlet greenthread kills during SIGTERM. |
|
This accurately simulates a real storage backend operation |
|
that cannot be interrupted by local signals. |
|
|
|
Returns True if a delay was injected, False otherwise. |
|
""" |
|
if operation not in self._slow_ops or self._delay <= 0: |
|
return False |
|
|
|
with self._active_operations_lock: |
|
self._active_operations[operation] = { |
|
'resource_id': resource_id, |
|
'start_time': time.time(), |
|
'delay': self._delay, |
|
} |
|
|
|
LOG.info("[SlowDriver] Sleeping %d seconds for %s " |
|
"operation (resource: %s) [native thread]...", |
|
self._delay, operation, resource_id) |
|
_native_sleep(self._delay) |
|
LOG.info("[SlowDriver] Delay complete for %s " |
|
"operation (resource: %s)", |
|
operation, resource_id) |
|
|
|
with self._active_operations_lock: |
|
self._active_operations.pop(operation, None) |
|
|
|
return True |
|
|
|
@staticmethod |
|
def _get_vol_id(volume): |
|
"""Safely extract volume id for logging.""" |
|
if hasattr(volume, 'id'): |
|
return volume.id |
|
if isinstance(volume, dict): |
|
return volume.get('id', 'unknown') |
|
return 'unknown' |
|
|
|
@staticmethod |
|
def _get_snap_id(snapshot): |
|
"""Safely extract snapshot id for logging.""" |
|
if hasattr(snapshot, 'id'): |
|
return snapshot.id |
|
if isinstance(snapshot, dict): |
|
return snapshot.get('id', 'unknown') |
|
return 'unknown' |
|
|
|
# -- Wrapped operations -- |
|
|
|
def create_volume(self, volume): |
|
self._inject_delay('create_volume', self._get_vol_id(volume)) |
|
return super(FakeSlowVolumeDriver, self).create_volume(volume) |
|
|
|
def delete_volume(self, volume): |
|
self._inject_delay('delete_volume', self._get_vol_id(volume)) |
|
return super(FakeSlowVolumeDriver, self).delete_volume(volume) |
|
|
|
def create_snapshot(self, snapshot): |
|
self._inject_delay('create_snapshot', self._get_snap_id(snapshot)) |
|
return super(FakeSlowVolumeDriver, self).create_snapshot(snapshot) |
|
|
|
def delete_snapshot(self, snapshot): |
|
self._inject_delay('delete_snapshot', self._get_snap_id(snapshot)) |
|
return super(FakeSlowVolumeDriver, self).delete_snapshot(snapshot) |
|
|
|
def extend_volume(self, volume, new_size): |
|
self._inject_delay('extend_volume', self._get_vol_id(volume)) |
|
return super(FakeSlowVolumeDriver, self).extend_volume( |
|
volume, new_size) |
|
|
|
def create_cloned_volume(self, volume, src_vref): |
|
self._inject_delay('create_cloned_volume', self._get_vol_id(volume)) |
|
return super(FakeSlowVolumeDriver, self).create_cloned_volume( |
|
volume, src_vref) |
|
|
|
def create_volume_from_snapshot(self, volume, snapshot): |
|
self._inject_delay('create_volume_from_snapshot', |
|
self._get_vol_id(volume)) |
|
return super( |
|
FakeSlowVolumeDriver, self |
|
).create_volume_from_snapshot(volume, snapshot) |
|
|
|
def copy_volume_to_image(self, context, volume, image_service, |
|
image_meta): |
|
self._inject_delay('copy_volume_to_image', self._get_vol_id(volume)) |
|
return super(FakeSlowVolumeDriver, self).copy_volume_to_image( |
|
context, volume, image_service, image_meta) |
|
|
|
# -- Group operations -- |
|
|
|
@staticmethod |
|
def _get_group_id(group): |
|
"""Safely extract group id for logging.""" |
|
if hasattr(group, 'id'): |
|
return group.id |
|
if isinstance(group, dict): |
|
return group.get('id', 'unknown') |
|
return 'unknown' |
|
|
|
def create_group(self, context, group): |
|
"""Create a group with optional delay. |
|
|
|
Injects a delay then returns 'available' status. This overrides |
|
the NotImplementedError from the base class so the manager does |
|
not fall through to the generic (instant) path. |
|
""" |
|
self._inject_delay('create_group', self._get_group_id(group)) |
|
return {'status': 'available'} |
|
|
|
def delete_group(self, context, group, volumes): |
|
"""Delete a group with optional delay.""" |
|
self._inject_delay('delete_group', self._get_group_id(group)) |
|
# Delete each volume in the group |
|
volume_model_updates = [] |
|
for volume in volumes: |
|
volume_model_update = {'id': volume.id} |
|
try: |
|
self.delete_volume(volume) |
|
volume_model_update['status'] = 'deleted' |
|
except Exception: |
|
volume_model_update['status'] = 'error' |
|
volume_model_updates.append(volume_model_update) |
|
return {'status': 'deleted'}, volume_model_updates |
|
|
|
def create_group_snapshot(self, context, group_snapshot, snapshots): |
|
"""Create a group snapshot with optional delay.""" |
|
self._inject_delay('create_group_snapshot', |
|
self._get_group_id(group_snapshot)) |
|
snapshot_model_updates = [] |
|
for snapshot in snapshots: |
|
snapshot_model_update = {'id': snapshot.id} |
|
try: |
|
self.create_snapshot(snapshot) |
|
snapshot_model_update['status'] = 'available' |
|
except Exception: |
|
snapshot_model_update['status'] = 'error' |
|
snapshot_model_updates.append(snapshot_model_update) |
|
return {'status': 'available'}, snapshot_model_updates |
|
|
|
def delete_group_snapshot(self, context, group_snapshot, snapshots): |
|
"""Delete a group snapshot with optional delay.""" |
|
self._inject_delay('delete_group_snapshot', |
|
self._get_group_id(group_snapshot)) |
|
snapshot_model_updates = [] |
|
for snapshot in snapshots: |
|
snapshot_model_update = {'id': snapshot.id} |
|
try: |
|
self.delete_snapshot(snapshot) |
|
snapshot_model_update['status'] = 'deleted' |
|
except Exception: |
|
snapshot_model_update['status'] = 'error' |
|
snapshot_model_updates.append(snapshot_model_update) |
|
return {'status': 'deleted'}, snapshot_model_updates |