diff --git a/conan/api/subapi/config.py b/conan/api/subapi/config.py index f7342e475b2..01eae23361a 100644 --- a/conan/api/subapi/config.py +++ b/conan/api/subapi/config.py @@ -22,6 +22,7 @@ from conan.internal.model.settings import Settings from conan.internal.hook_manager import HookManager from conan.internal.util.files import load, save, rmdir, remove +from conan.internal.util.semaphore import interprocess_lock class ConfigAPI: @@ -35,6 +36,7 @@ def __init__(self, conan_api): def home(self): return self.conan_api.cache_folder + @interprocess_lock() def install(self, path_or_url, verify_ssl, config_type=None, args=None, source_folder=None, target_folder=None): # TODO: We probably want to split this into git-folder-http cases? @@ -45,6 +47,7 @@ def install(self, path_or_url, verify_ssl, config_type=None, args=None, source_folder=source_folder, target_folder=target_folder) self.conan_api.reinit() + @interprocess_lock() def install_pkg(self, ref, lockfile=None, force=False, remotes=None, profile=None): ConanOutput().warning("The 'conan config install-pkg' is experimental", warn_tag="experimental") @@ -205,6 +208,7 @@ def appending_recursive_dict_update(d, u): return Settings(settings) + @interprocess_lock() def clean(self): contents = os.listdir(self.home()) packages_folder = self.global_conf.get("core.cache:storage_path") or os.path.join(self.home(), "p") diff --git a/conan/internal/model/conf.py b/conan/internal/model/conf.py index 4cb9d7c293a..08d0eee336b 100644 --- a/conan/internal/model/conf.py +++ b/conan/internal/model/conf.py @@ -30,6 +30,7 @@ "core.download:retry_wait": "Seconds to wait between download attempts from Conan server", "core.download:download_cache": "Define path to a file download cache", "core.cache:storage_path": "Absolute path where the packages and database are stored", + "core.cache.parallel:timeout": "Time limit in seconds when waiting for acquiring the conan cache with parallel execution of conan", "core:update_policy": "(Legacy). If equal 'legacy' when multiple remotes, update based on order of remotes, only the timestamp of the first occurrence of each revision counts.", # Sources backup "core.sources:download_cache": "Folder to store the sources backup", diff --git a/conan/internal/util/semaphore.py b/conan/internal/util/semaphore.py new file mode 100644 index 00000000000..805907bed2e --- /dev/null +++ b/conan/internal/util/semaphore.py @@ -0,0 +1,161 @@ +""" The semaphore module provides inter-process locking mechanisms to ensure Conan commands can + run concurrently without conflicts. + + It uses the fasteners library to create and manage locks across multiple processes. Thus, this + module is a proxy in case the project need to use a different library in the future. + + A timeout is defined to prevent deadlocks, and the lock files are stored in a temporary directory + in the conan cache folder. Still, the timeout is configured in seconds, using the configuration + `core.cache.parallel:timeout` in the conan.conf file. + The default value is 300 seconds (5 minutes). + + The fasteners have context managers, but only acquire() method has a timeout + parameter, that's why we needed to extend sleep method to inject the timeout check. +""" +import functools +import os +import time +from datetime import datetime + +from conan.errors import ConanException +from conan.api.output import ConanOutput +from conan.internal.cache.cache import PkgCache + + +CONAN_SEMAPHORE_TIMEOUT = 300.0 # 5 minutes +CONAN_SEMAPHORE_LOCKFILE = "conan_semaphore.lock" + +def _acquire_timeout(cache_folder) -> float: + """ Get the timeout value for acquiring locks. + + Imported ConanAPI locally to avoid circular import issues. + + :param cache_folder: Path to the Conan cache folder + :return: Timeout value in seconds + """ + from conan.api.subapi.config import ConfigAPI + config = ConfigAPI.load_config(cache_folder) + timeout = config.get("core.cache.parallel:timeout", default=CONAN_SEMAPHORE_TIMEOUT, check_type=float) + return float(timeout) + +def _lockfile_path(conan_api) -> str: + """ Get the path to the interprocess lock file. + + :param conan_api: ConanAPI instance + :return: Path to the lock file in Conan cache temporary directory + """ + cache = PkgCache(conan_api.cache_folder, conan_api.config.global_conf) + return os.path.join(cache.temp_folder, CONAN_SEMAPHORE_LOCKFILE) + +def interprocess_lock(): + """ Decorator to acquire an interprocess lock for a function. + + This method uses the fasteners library to create an interprocess lock, and serves as a proxy + for the library. The lock is acquired using the InterProcessLock class, which allows multiple + processes to safely access shared resources. The lock is released automatically when the + decorated function completes. + + The timeout is injected in the sleep method of the InterProcessLock class, otherwise it would + require an extra code to manage acquire() and release() calls. + + The ConanAPI is imported locally to avoid circular import issues. + """ + def decorator(func): + pid = os.getpid() + @functools.wraps(func) + def wrapper(*args, **kwargs): + from conan.api.conan_api import ConanAPI + conan_api = ConanAPI() + acquire_timeout = _acquire_timeout(conan_api.cache_folder) + now = datetime.now() + def _sleep_timeout(interval: float) -> None: + time.sleep(interval) + if (datetime.now() - now).total_seconds() > acquire_timeout: + raise ConanException( + f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds" + " during parallel process execution. Please, update the conf" + " 'core.cache.parallel:timeout' in conan.conf file to a higher value." + ) + + import fasteners + lock = fasteners.InterProcessLock(path=_lockfile_path(conan_api), + sleep_func=_sleep_timeout) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring semaphore lock.") + with lock: + ConanOutput().debug(f"{datetime.now()} [{pid}]: Semaphore has been locked.") + return func(*args, **kwargs) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Semaphore has been released.") + return wrapper + return decorator + + +def interprocess_write_lock(): + """ Decorator to acquire an interprocess write lock for a function. + + This method uses the fasteners library to create an interprocess write lock. + Unlike the interprocess_lock, it prevents other writers from acquiring the lock at same time, + but allows multiple readers to access the shared resource concurrently. + """ + def decorator(func): + pid = os.getpid() + @functools.wraps(func) + def wrapper(*args, **kwargs): + from conan.api.conan_api import ConanAPI + conan_api = ConanAPI() + acquire_timeout = _acquire_timeout(conan_api.cache_folder) + now = datetime.now() + def _sleep_timeout(interval: float) -> None: + time.sleep(interval) + if (datetime.now() - now).total_seconds() > acquire_timeout: + raise ConanException( + f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds" + " during parallel process execution. Please, update the conf" + " 'core.cache.parallel:timeout' in conan.conf file to a higher value." + ) + + import fasteners + lock = fasteners.InterProcessReaderWriterLock(path=_lockfile_path(conan_api), + sleep_func=_sleep_timeout) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring write semaphore lock.") + with lock.write_lock(): + ConanOutput().debug(f"{datetime.now()} [{pid}]: Write semaphore has been locked.") + return func(*args, **kwargs) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Write semaphore has been released.") + return wrapper + return decorator + + +def interprocess_read_lock(): + """ Decorator to acquire an interprocess read lock for a function. + + This method uses the fasteners library to create an interprocess read lock, and serves + as a proxy for the fasteners library. It should be used with interprocess_write_lock to allow + multiple processes to safely access shared resources. + """ + def decorator(func): + pid = os.getpid() + @functools.wraps(func) + def wrapper(*args, **kwargs): + from conan.api.conan_api import ConanAPI + conan_api = ConanAPI() + acquire_timeout = _acquire_timeout(conan_api.cache_folder) + now = datetime.now() + def _sleep_timeout(interval: float) -> None: + time.sleep(interval) + if (datetime.now() - now).total_seconds() > acquire_timeout: + raise ConanException( + f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds" + " during parallel process execution. Please, update the conf" + " 'core.cache.parallel:timeout' in conan.conf file to a higher value." + ) + + import fasteners + lock = fasteners.InterProcessReaderWriterLock(path=_lockfile_path(conan_api), + sleep_func=_sleep_timeout) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring read semaphore lock.") + with lock.read_lock(): + ConanOutput().debug(f"{datetime.now()} [{pid}]: Read semaphore has been locked.") + return func(*args, **kwargs) + ConanOutput().debug(f"{datetime.now()} [{pid}]: Read semaphore has been released.") + return wrapper + return decorator diff --git a/conans/requirements.txt b/conans/requirements.txt index d1b66f81ee0..6c04dbb76c0 100644 --- a/conans/requirements.txt +++ b/conans/requirements.txt @@ -3,7 +3,7 @@ urllib3>=1.26.6, <2.1 colorama>=0.4.3, <0.5.0 PyYAML>=6.0, <7.0 patch-ng>=1.18.0, <1.19 -fasteners>=0.15 +fasteners>=0.19, <1.0 distro>=1.4.0, <=1.8.0; platform_system == 'Linux' or platform_system == 'FreeBSD' Jinja2>=3.0, <4.0.0 python-dateutil>=2.8.0, <3 diff --git a/test/functional/test_parallel_cache.py b/test/functional/test_parallel_cache.py new file mode 100644 index 00000000000..1fdae24b812 --- /dev/null +++ b/test/functional/test_parallel_cache.py @@ -0,0 +1,87 @@ +import os +import subprocess +import re +import multiprocessing as mp + +from conan.test.utils.test_files import temp_folder +from conan.test.utils.tools import TestClient +from conan.internal.util.semaphore import interprocess_read_lock, interprocess_write_lock + + +def test_parallel_config_subprocess(): + """Validate that subprocesses can run concurrently without issues. + + This test starts 30 separate subprocesses, each running the `conan config install` command. + No command should fail, and the cache should be updated correctly. + + We can not use ThreadPoolExecutor neither multiprocessing.Process because they are not + compatible with fasteners, resulting in concurrency issues. + """ + workers = 30 + extra_folder = temp_folder(path_with_spaces=False) + cache_folder = temp_folder(path_with_spaces=False) + env = os.environ.copy() + env["CONAN_HOME"] = cache_folder + + test_client = TestClient(cache_folder=cache_folder) + test_client.run("profile detect --force") + test_client.save({os.path.join(extra_folder, "profiles", "foobar"): "include(default)"}) + + processes = [] + for _ in range(workers): + p = subprocess.Popen([ + "conan", "config", "install", "-vvv", extra_folder, "--type=dir" + ], env=env, cwd=os.getcwd()) + processes.append(p) + for p in processes: + p.wait() + assert p.returncode == 0, f"Process failed with exit code {p.returncode}" + + +@interprocess_read_lock() +def _reader(shared_file, idx): + with open(shared_file, "a") as f: + f.write(f"reader-{idx}-start\n") + + with open(shared_file, "a") as f: + f.write(f"reader-{idx}-end\n") + + +@interprocess_write_lock() +def _writer(shared_file, idx): + with open(shared_file, "a") as f: + f.write(f"writer-{idx}-start\n") + + with open(shared_file, "a") as f: + f.write(f"writer-{idx}-end\n") + + +def test_readers_parallel_writers_exclusive(): + """ Test that multiple readers can run concurrently while writers are exclusive. + + This test uses the interprocess_read_lock and interprocess_write_lock decorators + to ensure that multiple readers can access the shared file simultaneously, + while writers have exclusive access. + + The test creates a shared file and starts 5 reader processes and 2 writer processes. + The readers should be able to run concurrently, while the writers should wait for + the readers to finish before acquiring the lock. + """ + cache_folder = temp_folder(path_with_spaces=False) + shared_file = os.path.join(cache_folder, "shared_rwlock.lock") + with open(shared_file, "w") as fd: + fd.write("") + + tasks = [mp.Process(target=_reader, args=(shared_file, i,)) for i in range(5)] + tasks.extend([mp.Process(target=_writer, args=(shared_file, i,)) for i in range(2)]) + for task in tasks: + task.start() + for task in tasks: + task.join() + + with open(shared_file, "r") as fd: + lines = fd.readlines() + assert len(lines) == 14 + for line in lines: + # No concurrent writes + assert re.match(r"^(reader|writer)-[0-9]+-(start|end)\n$", line)