Skip to content

[POC] Using fasteners to control parallel execution of Conan #18253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: develop2
Choose a base branch
from
Draft
4 changes: 4 additions & 0 deletions conan/api/subapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from conan.internal.model.settings import Settings
from conan.internal.hook_manager import HookManager
from conan.internal.util.files import load, save, rmdir, remove
from conan.internal.util.semaphore import interprocess_lock


class ConfigAPI:
Expand All @@ -35,6 +36,7 @@ def __init__(self, conan_api):
def home(self):
return self.conan_api.cache_folder

@interprocess_lock()
def install(self, path_or_url, verify_ssl, config_type=None, args=None,
source_folder=None, target_folder=None):
# TODO: We probably want to split this into git-folder-http cases?
Expand All @@ -45,6 +47,7 @@ def install(self, path_or_url, verify_ssl, config_type=None, args=None,
source_folder=source_folder, target_folder=target_folder)
self.conan_api.reinit()

@interprocess_lock()
def install_pkg(self, ref, lockfile=None, force=False, remotes=None, profile=None):
ConanOutput().warning("The 'conan config install-pkg' is experimental",
warn_tag="experimental")
Expand Down Expand Up @@ -205,6 +208,7 @@ def appending_recursive_dict_update(d, u):

return Settings(settings)

@interprocess_lock()
def clean(self):
contents = os.listdir(self.home())
packages_folder = self.global_conf.get("core.cache:storage_path") or os.path.join(self.home(), "p")
Expand Down
1 change: 1 addition & 0 deletions conan/internal/model/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"core.download:retry_wait": "Seconds to wait between download attempts from Conan server",
"core.download:download_cache": "Define path to a file download cache",
"core.cache:storage_path": "Absolute path where the packages and database are stored",
"core.cache.parallel:timeout": "Time limit in seconds when waiting for acquiring the conan cache with parallel execution of conan",
"core:update_policy": "(Legacy). If equal 'legacy' when multiple remotes, update based on order of remotes, only the timestamp of the first occurrence of each revision counts.",
# Sources backup
"core.sources:download_cache": "Folder to store the sources backup",
Expand Down
161 changes: 161 additions & 0 deletions conan/internal/util/semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
""" The semaphore module provides inter-process locking mechanisms to ensure Conan commands can
run concurrently without conflicts.

It uses the fasteners library to create and manage locks across multiple processes. Thus, this
module is a proxy in case the project need to use a different library in the future.

A timeout is defined to prevent deadlocks, and the lock files are stored in a temporary directory
in the conan cache folder. Still, the timeout is configured in seconds, using the configuration
`core.cache.parallel:timeout` in the conan.conf file.
The default value is 300 seconds (5 minutes).

The fasteners have context managers, but only acquire() method has a timeout
parameter, that's why we needed to extend sleep method to inject the timeout check.
"""
import functools
import os
import time
from datetime import datetime

from conan.errors import ConanException
from conan.api.output import ConanOutput
from conan.internal.cache.cache import PkgCache


CONAN_SEMAPHORE_TIMEOUT = 300.0 # 5 minutes
CONAN_SEMAPHORE_LOCKFILE = "conan_semaphore.lock"

def _acquire_timeout(cache_folder) -> float:
""" Get the timeout value for acquiring locks.

Imported ConanAPI locally to avoid circular import issues.

:param cache_folder: Path to the Conan cache folder
:return: Timeout value in seconds
"""
from conan.api.subapi.config import ConfigAPI
config = ConfigAPI.load_config(cache_folder)
timeout = config.get("core.cache.parallel:timeout", default=CONAN_SEMAPHORE_TIMEOUT, check_type=float)
return float(timeout)

def _lockfile_path(conan_api) -> str:
""" Get the path to the interprocess lock file.

:param conan_api: ConanAPI instance
:return: Path to the lock file in Conan cache temporary directory
"""
cache = PkgCache(conan_api.cache_folder, conan_api.config.global_conf)
return os.path.join(cache.temp_folder, CONAN_SEMAPHORE_LOCKFILE)

def interprocess_lock():
""" Decorator to acquire an interprocess lock for a function.

This method uses the fasteners library to create an interprocess lock, and serves as a proxy
for the library. The lock is acquired using the InterProcessLock class, which allows multiple
processes to safely access shared resources. The lock is released automatically when the
decorated function completes.

The timeout is injected in the sleep method of the InterProcessLock class, otherwise it would
require an extra code to manage acquire() and release() calls.

The ConanAPI is imported locally to avoid circular import issues.
"""
def decorator(func):
pid = os.getpid()
@functools.wraps(func)
def wrapper(*args, **kwargs):
from conan.api.conan_api import ConanAPI
conan_api = ConanAPI()
acquire_timeout = _acquire_timeout(conan_api.cache_folder)
now = datetime.now()
def _sleep_timeout(interval: float) -> None:
time.sleep(interval)
if (datetime.now() - now).total_seconds() > acquire_timeout:
raise ConanException(
f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds"
" during parallel process execution. Please, update the conf"
" 'core.cache.parallel:timeout' in conan.conf file to a higher value."
)

import fasteners
lock = fasteners.InterProcessLock(path=_lockfile_path(conan_api),
sleep_func=_sleep_timeout)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring semaphore lock.")
with lock:
ConanOutput().debug(f"{datetime.now()} [{pid}]: Semaphore has been locked.")
return func(*args, **kwargs)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Semaphore has been released.")
return wrapper
return decorator


def interprocess_write_lock():
""" Decorator to acquire an interprocess write lock for a function.

This method uses the fasteners library to create an interprocess write lock.
Unlike the interprocess_lock, it prevents other writers from acquiring the lock at same time,
but allows multiple readers to access the shared resource concurrently.
"""
def decorator(func):
pid = os.getpid()
@functools.wraps(func)
def wrapper(*args, **kwargs):
from conan.api.conan_api import ConanAPI
conan_api = ConanAPI()
acquire_timeout = _acquire_timeout(conan_api.cache_folder)
now = datetime.now()
def _sleep_timeout(interval: float) -> None:
time.sleep(interval)
if (datetime.now() - now).total_seconds() > acquire_timeout:
raise ConanException(
f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds"
" during parallel process execution. Please, update the conf"
" 'core.cache.parallel:timeout' in conan.conf file to a higher value."
)

import fasteners
lock = fasteners.InterProcessReaderWriterLock(path=_lockfile_path(conan_api),
sleep_func=_sleep_timeout)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring write semaphore lock.")
with lock.write_lock():
ConanOutput().debug(f"{datetime.now()} [{pid}]: Write semaphore has been locked.")
return func(*args, **kwargs)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Write semaphore has been released.")
return wrapper
return decorator


def interprocess_read_lock():
""" Decorator to acquire an interprocess read lock for a function.

This method uses the fasteners library to create an interprocess read lock, and serves
as a proxy for the fasteners library. It should be used with interprocess_write_lock to allow
multiple processes to safely access shared resources.
"""
def decorator(func):
pid = os.getpid()
@functools.wraps(func)
def wrapper(*args, **kwargs):
from conan.api.conan_api import ConanAPI
conan_api = ConanAPI()
acquire_timeout = _acquire_timeout(conan_api.cache_folder)
now = datetime.now()
def _sleep_timeout(interval: float) -> None:
time.sleep(interval)
if (datetime.now() - now).total_seconds() > acquire_timeout:
raise ConanException(
f"Conan could not acquire interprocess-lock within {acquire_timeout} seconds"
" during parallel process execution. Please, update the conf"
" 'core.cache.parallel:timeout' in conan.conf file to a higher value."
)

import fasteners
lock = fasteners.InterProcessReaderWriterLock(path=_lockfile_path(conan_api),
sleep_func=_sleep_timeout)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Acquiring read semaphore lock.")
with lock.read_lock():
ConanOutput().debug(f"{datetime.now()} [{pid}]: Read semaphore has been locked.")
return func(*args, **kwargs)
ConanOutput().debug(f"{datetime.now()} [{pid}]: Read semaphore has been released.")
return wrapper
return decorator
2 changes: 1 addition & 1 deletion conans/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ urllib3>=1.26.6, <2.1
colorama>=0.4.3, <0.5.0
PyYAML>=6.0, <7.0
patch-ng>=1.18.0, <1.19
fasteners>=0.15
fasteners>=0.19, <1.0
distro>=1.4.0, <=1.8.0; platform_system == 'Linux' or platform_system == 'FreeBSD'
Jinja2>=3.0, <4.0.0
python-dateutil>=2.8.0, <3
87 changes: 87 additions & 0 deletions test/functional/test_parallel_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import subprocess
import re
import multiprocessing as mp

from conan.test.utils.test_files import temp_folder
from conan.test.utils.tools import TestClient
from conan.internal.util.semaphore import interprocess_read_lock, interprocess_write_lock


def test_parallel_config_subprocess():
"""Validate that subprocesses can run concurrently without issues.

This test starts 30 separate subprocesses, each running the `conan config install` command.
No command should fail, and the cache should be updated correctly.

We can not use ThreadPoolExecutor neither multiprocessing.Process because they are not
compatible with fasteners, resulting in concurrency issues.
"""
workers = 30
extra_folder = temp_folder(path_with_spaces=False)
cache_folder = temp_folder(path_with_spaces=False)
env = os.environ.copy()
env["CONAN_HOME"] = cache_folder

test_client = TestClient(cache_folder=cache_folder)
test_client.run("profile detect --force")
test_client.save({os.path.join(extra_folder, "profiles", "foobar"): "include(default)"})

processes = []
for _ in range(workers):
p = subprocess.Popen([
"conan", "config", "install", "-vvv", extra_folder, "--type=dir"
], env=env, cwd=os.getcwd())
processes.append(p)
for p in processes:
p.wait()
assert p.returncode == 0, f"Process failed with exit code {p.returncode}"


@interprocess_read_lock()
def _reader(shared_file, idx):
with open(shared_file, "a") as f:
f.write(f"reader-{idx}-start\n")

with open(shared_file, "a") as f:
f.write(f"reader-{idx}-end\n")


@interprocess_write_lock()
def _writer(shared_file, idx):
with open(shared_file, "a") as f:
f.write(f"writer-{idx}-start\n")

with open(shared_file, "a") as f:
f.write(f"writer-{idx}-end\n")


def test_readers_parallel_writers_exclusive():
""" Test that multiple readers can run concurrently while writers are exclusive.

This test uses the interprocess_read_lock and interprocess_write_lock decorators
to ensure that multiple readers can access the shared file simultaneously,
while writers have exclusive access.

The test creates a shared file and starts 5 reader processes and 2 writer processes.
The readers should be able to run concurrently, while the writers should wait for
the readers to finish before acquiring the lock.
"""
cache_folder = temp_folder(path_with_spaces=False)
shared_file = os.path.join(cache_folder, "shared_rwlock.lock")
with open(shared_file, "w") as fd:
fd.write("")

tasks = [mp.Process(target=_reader, args=(shared_file, i,)) for i in range(5)]
tasks.extend([mp.Process(target=_writer, args=(shared_file, i,)) for i in range(2)])
for task in tasks:
task.start()
for task in tasks:
task.join()

with open(shared_file, "r") as fd:
lines = fd.readlines()
assert len(lines) == 14
for line in lines:
# No concurrent writes
assert re.match(r"^(reader|writer)-[0-9]+-(start|end)\n$", line)
Loading