diff --git a/csp/impl/struct.py b/csp/impl/struct.py index aaf2bcd29..fcf5e0a1f 100644 --- a/csp/impl/struct.py +++ b/csp/impl/struct.py @@ -105,18 +105,12 @@ def collectts(cls, **kwargs): return csp.struct_collectts(cls, kwargs) - @classmethod - def _postprocess_dict_to_python(cls, d): - return d - @classmethod def _obj_to_python(cls, obj): if isinstance(obj, Struct): - return obj._postprocess_dict_to_python( - {k: cls._obj_to_python(getattr(obj, k)) for k in obj.__full_metadata_typed__ if hasattr(obj, k)} - ) + return {k: cls._obj_to_python(getattr(obj, k)) for k in obj.__full_metadata_typed__ if hasattr(obj, k)} elif isinstance(obj, dict): - return {k: cls._obj_to_python(v) for k, v in obj.items()} + return type(obj)({k: cls._obj_to_python(v) for k, v in obj.items()}) # type() for derived dict types elif ( isinstance(obj, (list, tuple, set)) or type(obj).__name__ == "FastList" ): # hack for FastList that is not a list @@ -126,10 +120,6 @@ def _obj_to_python(cls, obj): else: return obj - @classmethod - def _preprocess_dict_from_python(cls, d): - return d - @classmethod def _obj_from_python(cls, json, obj_type): obj_type = ContainerTypeNormalizer.normalize_type(obj_type) @@ -154,7 +144,6 @@ def _obj_from_python(cls, json, obj_type): elif issubclass(obj_type, Struct): if not isinstance(json, dict): raise TypeError("Representation of struct as json is expected to be of dict type") - json = obj_type._preprocess_dict_from_python(json) res = obj_type() for k, v in json.items(): expected_type = obj_type.__full_metadata_typed__.get(k, None) diff --git a/csp/tests/impl/utils/__init__.py b/csp/tests/impl/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/csp/tests/impl/utils/test_lock_file.py b/csp/tests/impl/utils/test_lock_file.py deleted file mode 100644 index f30174432..000000000 --- a/csp/tests/impl/utils/test_lock_file.py +++ /dev/null @@ -1,138 +0,0 @@ -import os -import sys -import tempfile -import threading -import time -import unittest - -from csp.utils.lock_file import LockFile - - -class _SimpleOneEventCondVar: - def __init__(self): - self._lock = threading.Lock() - self._cond_var = threading.Condition(lock=self._lock) - self._event_idx = 0 - - @property - def lock(self): - return self._lock - - def notify(self, lock=True): - if lock: - self._lock.acquire() - try: - self._event_idx += 1 - self._cond_var.notify_all() - return self._event_idx - finally: - if lock: - self._lock.release() - - def wait(self, event_value, lock=True): - if lock: - self._lock.acquire() - try: - while self._event_idx != event_value: - self._cond_var.wait() - finally: - if lock: - self._lock.release() - - -@unittest.skipIf(sys.platform == "win32", "Not supported on windows") -class TestLockFile(unittest.TestCase): - def test_multiple_threads(self): - for shared in [True, False]: - for lock_top_level in [True, False]: - fd, f_path = tempfile.mkstemp() - os.close(fd) - - def lock_multiple(cond_var: _SimpleOneEventCondVar): - # Wait until the parent obtained the lock - cond_var.wait(1) - file_lock = LockFile(f_path, timeout_seconds=0, shared=shared) - if lock_top_level: - # Notify the parent that the thread started - cond_var.notify() - file_lock.lock() - with file_lock: - if not lock_top_level: - # Notify the parent that the thread started - cond_var.notify() - - with file_lock: - if shared: - with LockFile(f_path, timeout_seconds=0, shared=shared): - pass - - if not lock_top_level: - cond_var.wait(3) - - if lock_top_level: - cond_var.wait(3) - file_lock.unlock() - - cond_var = _SimpleOneEventCondVar() - t = threading.Thread(target=lock_multiple, kwargs={"cond_var": cond_var}) - - t.start() - - with cond_var.lock: - # Notify the thread parent has the lock - cond_var.notify(lock=False) - - # Wait until we are certain that the file is locked - cond_var.wait(2, lock=False) - - if shared: - with LockFile(f_path, timeout_seconds=0, shared=shared): - pass - else: - with self.assertRaises(BlockingIOError): - with LockFile(f_path, timeout_seconds=0, shared=shared): - pass - - cond_var.notify() - # This can fail on a really busy machine but we want to make sure that timeout works - try: - with LockFile(f_path, timeout_seconds=5, retry_period_seconds=0.01, shared=shared): - pass - except BlockingIOError: - # VERY rare, should happen only if machine is busy, give another 30 seconds - with LockFile(f_path, timeout_seconds=30, retry_period_seconds=0.1, shared=shared): - pass - - t.join() - - def test_fork(self): - fd, f_path = tempfile.mkstemp() - os.close(fd) - for shared in [True, False]: - - def fork_and_lock(lock_parent=True): - pid = os.fork() - if pid == 0: - try: - time.sleep(0.05) - with LockFile(f_path, timeout_seconds=0, shared=shared): - pass - os._exit(0) - except BaseException: - os._exit(1) - else: - if lock_parent: - with LockFile(f_path, timeout_seconds=0, shared=shared): - _, res = os.waitpid(pid, 0) - self.assertTrue((res == 0) == shared) - else: - _, res = os.waitpid(pid, 0) - self.assertTrue((res == 0) == shared) - - fork_and_lock() - with LockFile(f_path, timeout_seconds=0, shared=shared): - fork_and_lock(lock_parent=False) - - -if __name__ == "__main__": - unittest.main() diff --git a/csp/utils/file_permissions.py b/csp/utils/file_permissions.py deleted file mode 100644 index 513da3797..000000000 --- a/csp/utils/file_permissions.py +++ /dev/null @@ -1,134 +0,0 @@ -try: - import grp - import os - import pwd - import random - import time -except ImportError: - import sys - - if sys.platform != "win32": - raise - -from csp.impl.struct import Struct - - -class RWXPermissions: - READ = 0x4 - WRITE = 0x2 - EXECUTE = 0x1 - - -class FilePermissions(Struct): - owner_user: str - owner_group: str - user_permissions: int - group_permissions: int - others_permissions: int - - def get_folder_permissions(self): - res = self.copy() - if getattr(res, "user_permissions"): - res.user_permissions |= RWXPermissions.EXECUTE - if getattr(res, "group_permissions"): - res.group_permissions |= RWXPermissions.EXECUTE - if getattr(res, "others_permissions"): - res.others_permissions |= RWXPermissions.EXECUTE - return res - - @classmethod - def _postprocess_dict_to_python(cls, d): - for k in ["user_permissions", "group_permissions", "others_permission"]: - if k in d: - d[k] = hex(d[k]) - return d - - @classmethod - def _preprocess_dict_from_python(cls, d): - for k in ["user_permissions", "group_permissions", "others_permission"]: - if k in d: - d[k] = int(d[k], 16) - return d - - -def _get_uid(user_name): - # There are some random sporadic errors in getpwnam, so we need to retry - for i in range(5): - try: - return pwd.getpwnam(user_name).pw_uid - except KeyError: - time.sleep(random.random()) - return pwd.getpwnam(user_name).pw_uid - - -def _get_gid(group_name): - # There are some random sporadic errors in getgrnam, so we need to retry - for i in range(5): - try: - return grp.getgrnam(group_name).gr_gid - except KeyError: - time.sleep(random.random()) - return grp.getgrnam(group_name).gr_gid - - -def apply_file_permissions(file_path, file_permissions: FilePermissions): - file_stat = os.stat(file_path) - - old_perm = file_stat.st_mode & 0xFFF - new_perm = old_perm - - if hasattr(file_permissions, "user_permissions"): - new_perm = new_perm & 0x77 | (file_permissions.user_permissions << 6) - if hasattr(file_permissions, "group_permissions"): - new_perm = new_perm & 0x1C7 | (file_permissions.group_permissions << 3) - if hasattr(file_permissions, "others_permissions"): - new_perm = new_perm & 0x1F8 | (file_permissions.others_permissions) - if old_perm != new_perm: - try: - os.chmod(file_path, new_perm) - except PermissionError: - pass - - if hasattr(file_permissions, "owner_user") or hasattr(file_permissions, "owner_group"): - uid, gid = file_stat.st_uid, file_stat.st_gid - if hasattr(file_permissions, "owner_user"): - uid = _get_uid(file_permissions.owner_user) - if hasattr(file_permissions, "owner_group"): - gid = _get_gid(file_permissions.owner_group) - try: - os.chown(file_path, uid=uid, gid=gid) - except PermissionError: - pass - - -def ensure_file_exists_with_permissions(file_path: str, file_permissions: FilePermissions): - if not os.path.exists(file_path): - with open(file_path, "a+"): - pass - - apply_file_permissions(file_path=file_path, file_permissions=file_permissions) - - -def create_folder_with_permissions(folder_path: str, file_permissions: FilePermissions): - """ - :param folder_path: The path of the folder to create - :param file_permissions: The permissions of the files in the folder (the folder permissions are derived from it) - """ - if os.path.exists(folder_path): - return - - cur_path = os.path.abspath(os.sep) - path_parts = os.path.normpath(os.path.abspath(folder_path)).split(os.sep) - folder_permissions = file_permissions.get_folder_permissions() - for sub_folder in path_parts: - if not sub_folder: - continue - cur_path = os.path.join(cur_path, sub_folder) - if os.path.exists(cur_path): - continue - try: - os.mkdir(cur_path) - except FileExistsError: - pass - - apply_file_permissions(cur_path, folder_permissions) diff --git a/csp/utils/lock_file.py b/csp/utils/lock_file.py deleted file mode 100644 index aa90221a1..000000000 --- a/csp/utils/lock_file.py +++ /dev/null @@ -1,344 +0,0 @@ -import datetime -import os -import threading -import time -from typing import Dict, Optional - -from csp.impl.struct import Struct -from csp.utils.file_permissions import FilePermissions, ensure_file_exists_with_permissions -from csp.utils.rm_utils import rm_file_or_folder - -try: - import fcntl -except ImportError: - import sys - - if sys.platform != "win32": - raise - - -class _FileLockRecord(Struct): - open_file: object - ref_count: int - shared: bool - thread_lock: object - - -class _LockWrapper: - """A simple wrapper. That ensures at __exit__ that lock is released. - - The point of this wrapper supports the following use case that raw lock doesn't: - - with _LockWrapper(lock) as lock_wrapper: - lock_wrapper.release() - """ - - def __init__(self, lock): - self._lock = lock - self._locked = False - - def acquire(self): - if self._locked: - raise RuntimeError("Trying to lock wrapper twice") - self._lock.acquire(blocking=True) - self._locked = True - - def release(self): - if self._locked: - self._locked = False - self._lock.release() - - def __enter__(self): - self.acquire() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - - -class _TimeoutMonitor: - def __init__(self, timeout_seconds): - self._timeout_seconds = timeout_seconds - self._init_time = datetime.datetime.now() - self._timed_out = None - - def timed_out(self): - # We want the first call to always return False. - if self._timed_out is None: - self._timed_out = False - elif not self._timed_out: - self._timed_out = (datetime.datetime.now() - self._init_time).total_seconds() > self._timeout_seconds - return self._timed_out - - def __bool__(self): - return not self._timed_out - - -class LockFile: - # We need to maintain it for reentrance of file lock - _LOCKS_DICT_PID = None - _LOCKS_DICT: Dict[str, _FileLockRecord] = {} - _LOCKS_DICT_GUARD = threading.Lock() - DEFAULT_RETRY_PERIOD_SECONDS = 1 - DEFAULT_TIMEOUT_SECONDS = 3600 - _CSP_LOCK_FILE_NAME = ".csp_lock" - - def __init__( - self, - file_path, - timeout_seconds=None, - retry_period_seconds=None, - shared: bool = False, - file_permissions: Optional[FilePermissions] = None, - ): - self._file_path = os.path.realpath(file_path) - self._fd = None - self._timeout_seconds = timeout_seconds if timeout_seconds is not None else self.DEFAULT_TIMEOUT_SECONDS - self._retry_period_seconds = ( - retry_period_seconds if retry_period_seconds is not None else self.DEFAULT_RETRY_PERIOD_SECONDS - ) - self._shared = shared - self._file_permissions = file_permissions - self._cur_object_lock_count = 0 - self._lock_record: Optional[_FileLockRecord] = None - - @property - def file_path(self): - return self._file_path - - @classmethod - def csp_lock_for_folder(cls, folder_path, shared=True): - return LockFile(os.path.join(folder_path, cls._CSP_LOCK_FILE_NAME), shared=shared) - - def __del__(self): - while self._cur_object_lock_count > 0: - self.unlock() - - @classmethod - def _clean_and_get_locks_dict(cls): - assert (not cls._LOCKS_DICT) or (cls._LOCKS_DICT_PID is not None) - if cls._LOCKS_DICT_PID: - cur_pid = os.getpid() - if cur_pid != cls._LOCKS_DICT_PID: - for record in cls._LOCKS_DICT.values(): - record.open_file.close() - cls._LOCKS_DICT.clear() - cls._LOCKS_DICT_PID = cur_pid - else: - cls._LOCKS_DICT_PID = os.getpid() - return cls._LOCKS_DICT - - def lock(self): - if self._lock_record: - with self._lock_record.thread_lock: - self._cur_object_lock_count += 1 - self._lock_record.ref_count += 1 - return - - timeout_monitor = _TimeoutMonitor(self._timeout_seconds) - - with _LockWrapper(self._LOCKS_DICT_GUARD) as locks_dict_guard: - locks_dict = self._clean_and_get_locks_dict() - existing_lock_record = locks_dict.get(self._file_path) - - while self._lock_record is None: - timed_out = timeout_monitor.timed_out() - if existing_lock_record is not None and (not self._shared or not existing_lock_record.shared): - if timed_out: - if not self._shared or not existing_lock_record.shared: - if self._shared: - existing_lock_str, cur_lock_str = "exclusive", "shared" - else: - existing_lock_str, cur_lock_str = "shared", "exclusive" - raise BlockingIOError( - f"Unable to obtain {cur_lock_str} lock {self._file_path} within given time another {existing_lock_str} exists" - ) - elif existing_lock_record: - # There is an existing record that matches the shared lock - with existing_lock_record.thread_lock: - self._lock_record = existing_lock_record - self._lock_record.ref_count += 1 - self._cur_object_lock_count += 1 - - return - else: - lock_file = self._try_open_and_lock_file(self._file_path, self._shared, self._file_permissions) - if lock_file is not None: - self._lock_record = _FileLockRecord( - open_file=lock_file, - ref_count=1, - shared=self._shared, - thread_lock=threading.RLock(), - ) - with self._lock_record.thread_lock: - locks_dict[self._file_path] = self._lock_record - self._cur_object_lock_count = 1 - assert self._lock_record.ref_count == self._cur_object_lock_count - return - elif timed_out: - raise BlockingIOError( - f"Failed to obtain lock {self._file_path} withing given timedout period of {self._timeout_seconds} seconds" - ) - # we failed to obtain the lock, continue trying - locks_dict_guard.release() - time.sleep(self._retry_period_seconds) - locks_dict_guard.acquire() - existing_lock_record = locks_dict.get(self._file_path) - - def delete_file(self): - if not self._lock_record: - raise RuntimeError(f"Trying to delete unlocked file {self.file_path}") - rm_file_or_folder(self._file_path, is_file=True) - - def unlock(self): - if self._lock_record is None: - return - with self._lock_record.thread_lock: - self._cur_object_lock_count -= 1 - self._lock_record.ref_count -= 1 - if self._cur_object_lock_count > 0: - return - - if self._lock_record.ref_count > 0: - self._lock_record = None - return - # It seems like we have to clean up but there might be some race condition and other thread obtained the lock - # So we need to check again under both _LOCKS_DICT_GUARD and self._lock_record.thread_lock. We need to lock them in - # order to avoid dead locks, that's why we can't do it in the block above - with self._LOCKS_DICT_GUARD: - with self._lock_record.thread_lock: - if self._lock_record.ref_count == 0: - # We need to decrease ref count and make it negative for deallocated object, otherwise it's - # possible that 2 threads will end up here in rare race case: - # T1: acquire lock and release, decrease ref count to 0 but not get into this block yet - # T2: locate the same object which is still in dict, acquire and release, decrease ref count to 0 again - # Now both threads will see ref count of 0 and will want to cleanup, will let to do that only for the first - self._lock_record.ref_count -= 1 - self._LOCKS_DICT.pop(self._file_path) - self._lock_record.open_file.close() - self._lock_record = None - - def __enter__(self): - self.lock() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.unlock() - - @classmethod - def _try_open_and_lock_file(cls, file_name, shared, file_permissions): - folder = os.path.dirname(file_name) - if not os.path.exists(folder): - os.makedirs(folder, exist_ok=True) - - num_retries = 10 - # Since we might be removing lock files while they're locked, it's possible that the file disappears on us. So we might need - # to do a few retries here - while True: - try: - if file_permissions is not None: - ensure_file_exists_with_permissions(file_name, file_permissions) - - fd = open(file_name, "a+") - try: - exclusive_flag = fcntl.LOCK_SH if shared else fcntl.LOCK_EX - fcntl.lockf(fd, exclusive_flag | fcntl.LOCK_NB) - orig_fd_stat = os.fstat(fd.fileno()) - cur_fd_stat = os.stat(file_name) - - # The condition orig_fd_stat.st_ino != cur_fd_stat.st_ino is for nfs. On nfs removed file might not be removed - # it will be renamed instead. In this case we might still have locked non existent file. After we locked, we - # need to check that the id of the file on disk is the same is the id of the file that we locked. - # Note that on regular file system we also need this since file might have been erased and then recreated. We could just - # check orig_fd_stat.st_nlink but this wouldn't work on nfs. The current condition should work for both. - if orig_fd_stat.st_ino != cur_fd_stat.st_ino: - # The file was deleted - fd.close() - return None - return fd - except BlockingIOError: - fd.close() - return None - except Exception: - fd.close() - raise - except FileNotFoundError: - if num_retries >= 0: - num_retries -= 1 - else: - raise - - -class MultipleFilesLock: - def __init__(self, file_paths_or_locks, shared: bool = False, remove_on_unlock=True): - self._file_paths_or_locks = file_paths_or_locks - self._shared = shared - self._locks = None - self._remove_on_unlock = remove_on_unlock - - def lock(self): - """ - :return: True if the locks were succesfully locked, False otherwise - """ - if self._locks: - raise RuntimeError("Trying to lock MultipleFilesLock more than once") - - locks = [] - try: - for f in self._file_paths_or_locks: - if isinstance(f, LockFile): - lock = f - else: - lock = LockFile( - file_path=f, - timeout_seconds=0, - retry_period_seconds=0, - shared=self._shared, - ) - lock.lock() - locks.append(lock) - except BlockingIOError: - for lock in locks: - lock.unlock() - return False - self._locks = locks - return True - - def unlock(self): - locks = self._locks - self._locks = None - if locks: - for lock in locks: - if self._remove_on_unlock: - lock.delete_file() - lock.unlock() - - def __enter__(self): - self.lock() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.unlock() - - -class SimpleOneEventCondVar: - def __init__(self): - self._lock = threading.RLock() - self._cond_var = threading.Condition(lock=self._lock) - self._event_happened = False - - @property - def lock(self): - return self._lock - - def notify(self): - with self._lock: - assert not self._event_happened - self._event_happened = True - self._cond_var.notify_all() - - def wait(self): - with self._lock: - while not self._event_happened: - self._cond_var.wait() - self._event_happened = False