diff --git a/clang/include/clang/Driver/Options.td b/clang/include/clang/Driver/Options.td index 0ffd8c40da7da..771b336b5585f 100644 --- a/clang/include/clang/Driver/Options.td +++ b/clang/include/clang/Driver/Options.td @@ -1243,8 +1243,9 @@ def offload_compression_level_EQ : Joined<["--"], "offload-compression-level=">, HelpText<"Compression level for offload device binaries (HIP only)">; def offload_jobs_EQ : Joined<["--"], "offload-jobs=">, - HelpText<"Specify the number of threads to use for device offloading tasks" - " during compilation.">; + HelpText<"Specify the number of threads to use for device offloading tasks " + "during compilation. Can be a positive integer or the string " + "'jobserver' to use the make-style jobserver from the environment.">; defm offload_via_llvm : BoolFOption<"offload-via-llvm", LangOpts<"OffloadViaLLVM">, DefaultFalse, diff --git a/clang/lib/Driver/ToolChains/Clang.cpp b/clang/lib/Driver/ToolChains/Clang.cpp index 2bb42a319eccf..a2a206eec700f 100644 --- a/clang/lib/Driver/ToolChains/Clang.cpp +++ b/clang/lib/Driver/ToolChains/Clang.cpp @@ -9293,14 +9293,20 @@ void LinkerWrapper::ConstructJob(Compilation &C, const JobAction &JA, addOffloadCompressArgs(Args, CmdArgs); if (Arg *A = Args.getLastArg(options::OPT_offload_jobs_EQ)) { - int NumThreads; - if (StringRef(A->getValue()).getAsInteger(10, NumThreads) || - NumThreads <= 0) - C.getDriver().Diag(diag::err_drv_invalid_int_value) - << A->getAsString(Args) << A->getValue(); - else - CmdArgs.push_back( - Args.MakeArgString("--wrapper-jobs=" + Twine(NumThreads))); + StringRef Val = A->getValue(); + + if (Val.equals_insensitive("jobserver")) + CmdArgs.push_back(Args.MakeArgString("--wrapper-jobs=jobserver")); + else { + int NumThreads; + if (Val.getAsInteger(10, NumThreads) || NumThreads <= 0) { + C.getDriver().Diag(diag::err_drv_invalid_int_value) + << A->getAsString(Args) << Val; + } else { + CmdArgs.push_back( + Args.MakeArgString("--wrapper-jobs=" + Twine(NumThreads))); + } + } } const char *Exec = diff --git a/clang/test/Driver/hip-options.hip b/clang/test/Driver/hip-options.hip index a07dca3638565..1f2b1b4858b02 100644 --- a/clang/test/Driver/hip-options.hip +++ b/clang/test/Driver/hip-options.hip @@ -259,3 +259,9 @@ // RUN: --offload-arch=gfx1100 --offload-new-driver --offload-jobs=0x4 %s 2>&1 | \ // RUN: FileCheck -check-prefix=INVJOBS %s // INVJOBS: clang: error: invalid integral value '0x4' in '--offload-jobs=0x4' + +// RUN: %clang -### -Werror --target=x86_64-unknown-linux-gnu -nogpuinc -nogpulib \ +// RUN: --offload-arch=gfx1100 --offload-new-driver --offload-jobs=jobserver %s 2>&1 | \ +// RUN: FileCheck -check-prefix=JOBSV %s +// JOBSV: clang-linker-wrapper{{.*}} "--wrapper-jobs=jobserver" + diff --git a/clang/test/Driver/linker-wrapper.c b/clang/test/Driver/linker-wrapper.c index 80b1a5745a123..de14f8cd29a13 100644 --- a/clang/test/Driver/linker-wrapper.c +++ b/clang/test/Driver/linker-wrapper.c @@ -114,6 +114,8 @@ __attribute__((visibility("protected"), used)) int x; // RUN: -fembed-offload-object=%t.out // RUN: clang-linker-wrapper --dry-run --host-triple=x86_64-unknown-linux-gnu --wrapper-jobs=4 \ // RUN: --linker-path=/usr/bin/ld %t.o -o a.out 2>&1 | FileCheck %s --check-prefix=CUDA-PAR +// RUN: clang-linker-wrapper --dry-run --host-triple=x86_64-unknown-linux-gnu --wrapper-jobs=jobserver \ +// RUN: --linker-path=/usr/bin/ld %t.o -o a.out 2>&1 | FileCheck %s --check-prefix=CUDA-PAR // CUDA-PAR: fatbinary{{.*}}-64 --create {{.*}}.fatbin diff --git a/clang/tools/clang-linker-wrapper/ClangLinkerWrapper.cpp b/clang/tools/clang-linker-wrapper/ClangLinkerWrapper.cpp index 0f1fa8b329fd6..fffbe054c1ca1 100644 --- a/clang/tools/clang-linker-wrapper/ClangLinkerWrapper.cpp +++ b/clang/tools/clang-linker-wrapper/ClangLinkerWrapper.cpp @@ -1420,12 +1420,18 @@ int main(int Argc, char **Argv) { parallel::strategy = hardware_concurrency(1); if (auto *Arg = Args.getLastArg(OPT_wrapper_jobs)) { - unsigned Threads = 0; - if (!llvm::to_integer(Arg->getValue(), Threads) || Threads == 0) - reportError(createStringError("%s: expected a positive integer, got '%s'", - Arg->getSpelling().data(), - Arg->getValue())); - parallel::strategy = hardware_concurrency(Threads); + StringRef Val = Arg->getValue(); + if (Val.equals_insensitive("jobserver")) + parallel::strategy = jobserver_concurrency(); + else { + unsigned Threads = 0; + if (!llvm::to_integer(Val, Threads) || Threads == 0) { + reportError(createStringError( + "%s: expected a positive integer or 'jobserver', got '%s'", + Arg->getSpelling().data(), Val.data())); + } else + parallel::strategy = hardware_concurrency(Threads); + } } if (Args.hasArg(OPT_wrapper_time_trace_eq)) { diff --git a/clang/tools/clang-linker-wrapper/LinkerWrapperOpts.td b/clang/tools/clang-linker-wrapper/LinkerWrapperOpts.td index 17fb9db35fe39..7b3f632e25a6a 100644 --- a/clang/tools/clang-linker-wrapper/LinkerWrapperOpts.td +++ b/clang/tools/clang-linker-wrapper/LinkerWrapperOpts.td @@ -53,7 +53,8 @@ def wrapper_time_trace_granularity : Joined<["--"], "wrapper-time-trace-granular def wrapper_jobs : Joined<["--"], "wrapper-jobs=">, Flags<[WrapperOnlyOption]>, MetaVarName<"">, - HelpText<"Sets the number of parallel jobs to use for device linking">; + HelpText<"Sets the number of parallel jobs for device linking. Can be a " + "positive integer or 'jobserver'.">; def override_image : Joined<["--"], "override-image=">, Flags<[WrapperOnlyOption]>, MetaVarName<"">, diff --git a/llvm/include/llvm/Support/Jobserver.h b/llvm/include/llvm/Support/Jobserver.h new file mode 100644 index 0000000000000..0ddcc9bc7e472 --- /dev/null +++ b/llvm/include/llvm/Support/Jobserver.h @@ -0,0 +1,141 @@ +//===- llvm/Support/Jobserver.h - Jobserver Client --------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// This file defines a client for the GNU Make jobserver protocol. This allows +// LLVM tools to coordinate parallel execution with a parent `make` process. +// +// The jobserver protocol is a mechanism for GNU Make to share its pool of +// available "job slots" with the subprocesses it invokes. This is particularly +// useful for tools that can perform parallel operations themselves (e.g., a +// multi-threaded linker or compiler). By participating in this protocol, a +// tool can ensure the total number of concurrent jobs does not exceed the +// limit specified by the user (e.g., `make -j8`). +// +// How it works: +// +// 1. Establishment: +// A child process discovers the jobserver by inspecting the `MAKEFLAGS` +// environment variable. If a jobserver is active, this variable will +// contain a `--jobserver-auth=` argument. The format of `` +// determines how to communicate with the server. +// +// 2. The Implicit Slot: +// Every command invoked by `make` is granted one "implicit" job slot. This +// means a tool can always perform at least one unit of work without needing +// to communicate with the jobserver. This implicit slot should NEVER be +// released back to the jobserver. +// +// 3. Acquiring and Releasing Slots: +// On POSIX systems, the jobserver is implemented as a pipe. The +// `--jobserver-auth` value specifies either a path to a named pipe +// (`fifo:PATH`) or a pair of file descriptors (`R,W`). The pipe is +// pre-loaded with single-character tokens, one for each available job slot. +// +// - To acquire an additional slot, a client reads a single-character token +// from the pipe. +// - To release a slot, the client must write the *exact same* character +// token back to the pipe. +// +// It is critical that a client releases all acquired slots before it exits, +// even in cases of error, to avoid deadlocking the build. +// +// Example: +// A multi-threaded linker invoked by `make -j8` wants to use multiple +// threads. It first checks for the jobserver. It knows it has one implicit +// slot, so it can use one thread. It then tries to acquire 7 more slots by +// reading 7 tokens from the jobserver pipe. If it only receives 3 tokens, +// it knows it can use a total of 1 (implicit) + 3 (acquired) = 4 threads. +// Before exiting, it must write the 3 tokens it read back to the pipe. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_JOBSERVER_H +#define LLVM_SUPPORT_JOBSERVER_H + +#include "llvm/ADT/StringRef.h" +#include +#include + +namespace llvm { + +/// A JobSlot represents a single job slot that can be acquired from or released +/// to a jobserver pool. This class is move-only. +class JobSlot { +public: + /// Default constructor creates an invalid instance. + JobSlot() = default; + + // Move operations are allowed. + JobSlot(JobSlot &&Other) noexcept : Value(Other.Value) { Other.Value = -1; } + JobSlot &operator=(JobSlot &&Other) noexcept { + if (this != &Other) { + this->Value = Other.Value; + Other.Value = -1; + } + return *this; + } + + // Copy operations are disallowed. + JobSlot(const JobSlot &) = delete; + JobSlot &operator=(const JobSlot &) = delete; + + /// Returns true if this instance is valid (either implicit or explicit). + bool isValid() const { return Value >= 0; } + + /// Returns true if this instance represents the implicit job slot. + bool isImplicit() const { return Value == kImplicitValue; } + + static JobSlot createExplicit(uint8_t V) { + return JobSlot(static_cast(V)); + } + + static JobSlot createImplicit() { return JobSlot(kImplicitValue); } + + uint8_t getExplicitValue() const; + bool isExplicit() const { return isValid() && !isImplicit(); } + +private: + friend class JobserverClient; + friend class JobserverClientImpl; + + JobSlot(int16_t V) : Value(V) {} + + static constexpr int16_t kImplicitValue = 256; + int16_t Value = -1; +}; + +/// The public interface for a jobserver client. +/// This client is a lazy-initialized singleton that is created on first use. +class JobserverClient { +public: + virtual ~JobserverClient(); + + /// Tries to acquire a job slot from the pool. On failure (e.g., if the pool + /// is empty), this returns an invalid JobSlot instance. The first successful + /// call will always return the implicit slot. + virtual JobSlot tryAcquire() = 0; + + /// Releases a job slot back to the pool. + virtual void release(JobSlot Slot) = 0; + + /// Returns the number of job slots available, as determined on first use. + /// This value is cached. Returns 0 if no jobserver is active. + virtual unsigned getNumJobs() const = 0; + + /// Returns the singleton instance of the JobserverClient. + /// The instance is created on the first call to this function. + /// Returns a nullptr if no jobserver is configured or an error occurs. + static JobserverClient *getInstance(); + + /// Resets the singleton instance. For testing purposes only. + static void resetForTesting(); +}; + +} // end namespace llvm + +#endif // LLVM_SUPPORT_JOBSERVER_H diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h index 9272760fc140a..3f0013700392d 100644 --- a/llvm/include/llvm/Support/ThreadPool.h +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -16,6 +16,7 @@ #include "llvm/ADT/DenseMap.h" #include "llvm/Config/llvm-config.h" #include "llvm/Support/Compiler.h" +#include "llvm/Support/Jobserver.h" #include "llvm/Support/RWMutex.h" #include "llvm/Support/Threading.h" #include "llvm/Support/thread.h" @@ -184,6 +185,7 @@ class LLVM_ABI StdThreadPool : public ThreadPoolInterface { void grow(int requested); void processTasks(ThreadPoolTaskGroup *WaitingForGroup); + void processTasksWithJobserver(); /// Threads in flight std::vector Threads; @@ -212,6 +214,8 @@ class LLVM_ABI StdThreadPool : public ThreadPoolInterface { /// Maximum number of threads to potentially grow this pool to. const unsigned MaxThreadCount; + + JobserverClient *TheJobserver = nullptr; }; #endif // LLVM_ENABLE_THREADS diff --git a/llvm/include/llvm/Support/Threading.h b/llvm/include/llvm/Support/Threading.h index d3fe0a57ee44e..88846807f111a 100644 --- a/llvm/include/llvm/Support/Threading.h +++ b/llvm/include/llvm/Support/Threading.h @@ -142,6 +142,11 @@ constexpr bool llvm_is_multithreaded() { return LLVM_ENABLE_THREADS; } /// the thread shall remain on the actual CPU socket. LLVM_ABI std::optional compute_cpu_socket(unsigned ThreadPoolNum) const; + + /// If true, the thread pool will attempt to coordinate with a GNU Make + /// jobserver, acquiring a job slot before processing a task. If no + /// jobserver is found in the environment, this is ignored. + bool UseJobserver = false; }; /// Build a strategy from a number of threads as a string provided in \p Num. @@ -210,6 +215,19 @@ constexpr bool llvm_is_multithreaded() { return LLVM_ENABLE_THREADS; } return S; } + /// Returns a thread strategy that attempts to coordinate with a GNU Make + /// jobserver. The number of active threads will be limited by the number of + /// available job slots. If no jobserver is detected in the environment, this + /// strategy falls back to the default hardware_concurrency() behavior. + inline ThreadPoolStrategy jobserver_concurrency() { + ThreadPoolStrategy S; + S.UseJobserver = true; + // We can still request all threads be created, as they will simply + // block waiting for a job slot if the jobserver is the limiting factor. + S.ThreadsRequested = 0; // 0 means 'use all available' + return S; + } + /// Return the current thread id, as used in various OS system calls. /// Note that not all platforms guarantee that the value returned will be /// unique across the entire system, so portable code should not assume diff --git a/llvm/lib/Support/CMakeLists.txt b/llvm/lib/Support/CMakeLists.txt index 45d961e994a1a..daacab128b498 100644 --- a/llvm/lib/Support/CMakeLists.txt +++ b/llvm/lib/Support/CMakeLists.txt @@ -205,6 +205,7 @@ add_llvm_component_library(LLVMSupport InstructionCost.cpp IntEqClasses.cpp IntervalMap.cpp + Jobserver.cpp JSON.cpp KnownBits.cpp KnownFPClass.cpp diff --git a/llvm/lib/Support/Jobserver.cpp b/llvm/lib/Support/Jobserver.cpp new file mode 100644 index 0000000000000..a0a3eeb64be28 --- /dev/null +++ b/llvm/lib/Support/Jobserver.cpp @@ -0,0 +1,257 @@ +//===- llvm/Support/Jobserver.cpp - Jobserver Client Implementation -------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/Jobserver.h" +#include "llvm/ADT/SmallVector.h" +#include "llvm/ADT/Statistic.h" +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/Debug.h" +#include "llvm/Support/Error.h" +#include "llvm/Support/raw_ostream.h" + +#include +#include +#include +#include + +#define DEBUG_TYPE "jobserver" + +using namespace llvm; + +namespace { +struct JobserverConfig { + enum Mode { + None, + PosixFifo, + PosixPipe, + Win32Semaphore, + }; + Mode TheMode = None; + std::string Path; + int ReadFD = -1; + int WriteFD = -1; +}; +} // namespace + +namespace { +Expected parseNativeMakeFlags(StringRef MakeFlags); +} // namespace + +class JobserverClientImpl : public JobserverClient { + bool IsInitialized = false; + std::atomic HasImplicitSlot{true}; + unsigned NumJobs = 0; + +public: + JobserverClientImpl(const JobserverConfig &Config); + ~JobserverClientImpl() override; + + JobSlot tryAcquire() override; + void release(JobSlot Slot) override; + unsigned getNumJobs() const override { return NumJobs; } + + bool isValid() const { return IsInitialized; } + +private: +#if defined(LLVM_ON_UNIX) + int ReadFD = -1; + int WriteFD = -1; + std::string FifoPath; +#elif defined(_WIN32) + void *Semaphore = nullptr; +#endif +}; + +// Include the platform-specific parts of the class. +#if defined(LLVM_ON_UNIX) +#include "Unix/Jobserver.inc" +#elif defined(_WIN32) +#include "Windows/Jobserver.inc" +#endif + +JobserverClient::~JobserverClient() = default; + +uint8_t JobSlot::getExplicitValue() const { + assert(isExplicit() && "Cannot get value of implicit or invalid slot"); + return static_cast(Value); +} + +static std::once_flag GJobserverOnceFlag; +static std::unique_ptr GJobserver; + +/// This is the main entry point for acquiring a jobserver client. It uses a +/// std::call_once to ensure the singleton `GJobserver` instance is created +/// safely in a multi-threaded environment. On first call, it reads the +/// `MAKEFLAGS` environment variable, parses it, and attempts to construct and +/// initialize a `JobserverClientImpl`. If successful, the global instance is +/// stored in `GJobserver`. Subsequent calls will return the existing instance. +JobserverClient *JobserverClient::getInstance() { + std::call_once(GJobserverOnceFlag, []() { + LLVM_DEBUG( + dbgs() + << "JobserverClient::getInstance() called for the first time.\n"); + const char *MakeFlagsEnv = getenv("MAKEFLAGS"); + if (!MakeFlagsEnv) { + errs() << "Warning: failed to create jobserver client due to MAKEFLAGS " + "environment variable not found\n"; + return; + } + + LLVM_DEBUG(dbgs() << "Found MAKEFLAGS = \"" << MakeFlagsEnv << "\"\n"); + + auto ConfigOrErr = parseNativeMakeFlags(MakeFlagsEnv); + if (Error Err = ConfigOrErr.takeError()) { + errs() << "Warning: failed to create jobserver client due to invalid " + "MAKEFLAGS environment variable: " + << toString(std::move(Err)) << "\n"; + return; + } + + JobserverConfig Config = *ConfigOrErr; + if (Config.TheMode == JobserverConfig::None) { + errs() << "Warning: failed to create jobserver client due to jobserver " + "mode missing in MAKEFLAGS environment variable\n"; + return; + } + + if (Config.TheMode == JobserverConfig::PosixPipe) { +#if defined(LLVM_ON_UNIX) + if (!areFdsValid(Config.ReadFD, Config.WriteFD)) { + errs() << "Warning: failed to create jobserver client due to invalid " + "Pipe FDs in MAKEFLAGS environment variable\n"; + return; + } +#endif + } + + auto Client = std::make_unique(Config); + if (Client->isValid()) { + LLVM_DEBUG(dbgs() << "Jobserver client created successfully!\n"); + GJobserver = std::move(Client); + } else + errs() << "Warning: jobserver client initialization failed.\n"; + }); + return GJobserver.get(); +} + +/// For testing purposes only. This function resets the singleton instance by +/// destroying the existing client and re-initializing the `std::once_flag`. +/// This allows tests to simulate the first-time initialization of the +/// jobserver client multiple times. +void JobserverClient::resetForTesting() { + GJobserver.reset(); + // Re-construct the std::once_flag in place to reset the singleton state. + new (&GJobserverOnceFlag) std::once_flag(); +} + +namespace { +/// A helper function that checks if `Input` starts with `Prefix`. +/// If it does, it removes the prefix from `Input`, assigns the remainder to +/// `Value`, and returns true. Otherwise, it returns false. +bool getPrefixedValue(StringRef Input, StringRef Prefix, StringRef &Value) { + if (Input.consume_front(Prefix)) { + Value = Input; + return true; + } + return false; +} + +/// A helper function to parse a string in the format "R,W" where R and W are +/// non-negative integers representing file descriptors. It populates the +/// `ReadFD` and `WriteFD` output parameters. Returns true on success. +bool getFileDescriptorPair(StringRef Input, int &ReadFD, int &WriteFD) { + if (sscanf(Input.str().c_str(), "%d,%d", &ReadFD, &WriteFD) != 2) + return false; + return ReadFD >= 0 && WriteFD >= 0; +} + +/// Parses the `MAKEFLAGS` environment variable string to find jobserver +/// arguments. It splits the string into space-separated arguments and searches +/// for `--jobserver-auth` or `--jobserver-fds`. Based on the value of these +/// arguments, it determines the jobserver mode (Pipe, FIFO, or Semaphore) and +/// connection details (file descriptors or path). +Expected parseNativeMakeFlags(StringRef MakeFlags) { + JobserverConfig Config; + if (MakeFlags.empty()) + return Config; + + // Split the MAKEFLAGS string into arguments. + SmallVector Args; + StringRef S = MakeFlags; + while (!S.empty()) { + size_t Start = S.find_first_not_of(" \t"); + if (Start == StringRef::npos) + break; + S = S.substr(Start); + size_t End = S.find_first_of(" \t"); + if (End == StringRef::npos) { + Args.push_back(S); + break; + } + Args.push_back(S.substr(0, End)); + S = S.substr(End); + } + + // If '-n' (dry-run) is present as a legacy flag (not starting with '-'), + // disable the jobserver. + if (!Args.empty() && !Args[0].starts_with("-") && Args[0].contains('n')) + return Config; + + // Iterate through arguments to find jobserver flags. + // Note that make may pass multiple --jobserver-auth flags; the last one wins. + for (StringRef Arg : Args) { + StringRef Value; + if (getPrefixedValue(Arg, "--jobserver-auth=", Value)) { + int R, W; + // Try to parse as a file descriptor pair first. + if (getFileDescriptorPair(Value, R, W)) { + Config.TheMode = JobserverConfig::PosixPipe; + Config.ReadFD = R; + Config.WriteFD = W; + } else { + StringRef FifoPath; + // If not FDs, try to parse as a named pipe (fifo). + if (getPrefixedValue(Value, "fifo:", FifoPath)) { + Config.TheMode = JobserverConfig::PosixFifo; + Config.Path = FifoPath.str(); + } else { + // Otherwise, assume it's a Windows semaphore. + Config.TheMode = JobserverConfig::Win32Semaphore; + Config.Path = Value.str(); + } + } + } else if (getPrefixedValue(Arg, "--jobserver-fds=", Value)) { + // This is an alternative, older syntax for the pipe-based server. + int R, W; + if (getFileDescriptorPair(Value, R, W)) { + Config.TheMode = JobserverConfig::PosixPipe; + Config.ReadFD = R; + Config.WriteFD = W; + } else { + return createStringError(inconvertibleErrorCode(), + "Invalid file descriptor pair in MAKEFLAGS"); + } + } + } + +// Perform platform-specific validation. +#ifdef _WIN32 + if (Config.TheMode == JobserverConfig::PosixFifo || + Config.TheMode == JobserverConfig::PosixPipe) + return createStringError( + inconvertibleErrorCode(), + "FIFO/Pipe-based jobserver is not supported on Windows"); +#else + if (Config.TheMode == JobserverConfig::Win32Semaphore) + return createStringError( + inconvertibleErrorCode(), + "Semaphore-based jobserver is not supported on this platform"); +#endif + return Config; +} +} // namespace diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp index 2ba02b73dd8f1..fb127b37edf04 100644 --- a/llvm/lib/Support/Parallel.cpp +++ b/llvm/lib/Support/Parallel.cpp @@ -7,12 +7,16 @@ //===----------------------------------------------------------------------===// #include "llvm/Support/Parallel.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/Jobserver.h" #include "llvm/Support/ManagedStatic.h" #include "llvm/Support/Threading.h" #include #include +#include +#include #include #include @@ -49,6 +53,9 @@ class Executor { class ThreadPoolExecutor : public Executor { public: explicit ThreadPoolExecutor(ThreadPoolStrategy S) { + if (S.UseJobserver) + TheJobserver = JobserverClient::getInstance(); + ThreadCount = S.compute_thread_count(); // Spawn all but one of the threads in another thread as spawning threads // can take a while. @@ -69,6 +76,10 @@ class ThreadPoolExecutor : public Executor { }); } + // To make sure the thread pool executor can only be created with a parallel + // strategy. + ThreadPoolExecutor() = delete; + void stop() { { std::lock_guard Lock(Mutex); @@ -119,7 +130,25 @@ class ThreadPoolExecutor : public Executor { auto Task = std::move(WorkStack.back()); WorkStack.pop_back(); Lock.unlock(); - Task(); + + if (TheJobserver) { + JobSlot Slot = TheJobserver->tryAcquire(); + if (Slot.isValid()) { + auto Releaser = + make_scope_exit([&] { TheJobserver->release(std::move(Slot)); }); + Task(); + } else { + // The task could not be run because no job slot was + // available. Re-queue the task so that another thread can try + // to run it later. + std::lock_guard RequeueLock(Mutex); + WorkStack.push_back(std::move(Task)); + Cond.notify_one(); + // Yield to give another thread a chance to release a token. + std::this_thread::yield(); + } + } else + Task(); } } @@ -130,9 +159,20 @@ class ThreadPoolExecutor : public Executor { std::promise ThreadsCreated; std::vector Threads; unsigned ThreadCount; + + JobserverClient *TheJobserver = nullptr; }; -Executor *Executor::getDefaultExecutor() { +// A global raw pointer to the executor. Lifetime is managed by the +// objects created within createExecutor(). +static Executor *TheExec = nullptr; +static std::once_flag Flag; + +// This function will be called exactly once to create the executor. +// It contains the necessary platform-specific logic. Since functions +// called by std::call_once cannot return value, we have to set the +// executor as a global variable. +void createExecutor() { #ifdef _WIN32 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This @@ -156,16 +196,22 @@ Executor *Executor::getDefaultExecutor() { ThreadPoolExecutor::Deleter> ManagedExec; static std::unique_ptr Exec(&(*ManagedExec)); - return Exec.get(); + TheExex = Exec.get(); #else // ManagedStatic is not desired on other platforms. When `Exec` is destroyed // by llvm_shutdown(), worker threads will clean up and invoke TLS // destructors. This can lead to race conditions if other threads attempt to // access TLS objects that have already been destroyed. static ThreadPoolExecutor Exec(strategy); - return &Exec; + TheExec = &Exec; #endif } + +Executor *Executor::getDefaultExecutor() { + // Use std::call_once to lazily and safely initialize the executor. + std::call_once(Flag, createExecutor); + return TheExec; +} } // namespace } // namespace detail diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp index c304f0f45360b..19d92273b3fe2 100644 --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -14,6 +14,7 @@ #include "llvm/Config/llvm-config.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" #include "llvm/Support/raw_ostream.h" @@ -33,7 +34,10 @@ ThreadPoolInterface::~ThreadPoolInterface() = default; #if LLVM_ENABLE_THREADS StdThreadPool::StdThreadPool(ThreadPoolStrategy S) - : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} + : Strategy(S), MaxThreadCount(S.compute_thread_count()) { + if (Strategy.UseJobserver) + TheJobserver = JobserverClient::getInstance(); +} void StdThreadPool::grow(int requested) { llvm::sys::ScopedWriter LockGuard(ThreadsLock); @@ -45,7 +49,10 @@ void StdThreadPool::grow(int requested) { Threads.emplace_back([this, ThreadID] { set_thread_name(formatv("llvm-worker-{0}", ThreadID)); Strategy.apply_thread_strategy(ThreadID); - processTasks(nullptr); + if (TheJobserver) + processTasksWithJobserver(); + else + processTasks(nullptr); }); } } @@ -79,7 +86,7 @@ void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { // Yeah, we have a task, grab it and release the lock on the queue // We first need to signal that we are active before popping the queue - // in order for wait() to properly detect that even if the queue is + // in order for wait() to a properly detect that even if the queue is // empty, there is still a task in flight. ++ActiveThreads; Task = std::move(Tasks.front().first); @@ -133,6 +140,85 @@ void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { } } +/// Main loop for worker threads when using a jobserver. +/// This function uses a two-level queue; it first acquires a job slot from the +/// external jobserver, then retrieves a task from the internal queue. +/// This allows the thread pool to cooperate with build systems like `make -j`. +void StdThreadPool::processTasksWithJobserver() { + while (true) { + // Acquire a job slot from the external jobserver. + // This polls for a slot and yields the thread to avoid a high-CPU wait. + JobSlot Slot; + while (true) { + // Return if the thread pool is shutting down. + { + std::unique_lock LockGuard(QueueLock); + if (!EnableFlag) + return; + } + Slot = TheJobserver->tryAcquire(); + if (Slot.isValid()) + break; // Successfully acquired a slot. + + // If the jobserver is busy, yield to let other threads run. + std::this_thread::yield(); + } + + // `make_scope_exit` guarantees the job slot is released, even if the + // task throws or we exit early. This prevents deadlocking the build. + auto SlotReleaser = + make_scope_exit([&] { TheJobserver->release(std::move(Slot)); }); + + // With an external job slot secured, get a task from the internal queue. + std::function Task; + ThreadPoolTaskGroup *GroupOfTask = nullptr; + + { + std::unique_lock LockGuard(QueueLock); + + // Wait until a task is available or the pool is shutting down. + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + + // If shutting down and the queue is empty, the thread can terminate. + if (!EnableFlag && Tasks.empty()) + return; + + // Handle spurious wakeups by re-checking if the queue has tasks. + if (Tasks.empty()) + continue; + + // A task is available. Mark it as active before releasing the lock + // to prevent race conditions with `wait()`. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; + Tasks.pop_front(); + } // The queue lock is released. + + // Run the task. The job slot remains acquired during execution. + Task(); + + // The task has finished. Update the active count and notify any waiters. + { + std::lock_guard LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + // If all tasks are complete, notify any waiting threads. + if (workCompletedUnlocked(nullptr)) + CompletionCondition.notify_all(); + } + + // The SlotReleaser is destroyed here, returning the token to the jobserver. + // The loop then repeats to process the next job. + } +} bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { if (Group == nullptr) return !ActiveThreads && Tasks.empty(); diff --git a/llvm/lib/Support/Threading.cpp b/llvm/lib/Support/Threading.cpp index 693de0e6400fb..9da357a7ebb91 100644 --- a/llvm/lib/Support/Threading.cpp +++ b/llvm/lib/Support/Threading.cpp @@ -14,6 +14,7 @@ #include "llvm/Support/Threading.h" #include "llvm/Config/config.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/Jobserver.h" #include #include @@ -51,6 +52,10 @@ int llvm::get_physical_cores() { return -1; } static int computeHostNumHardwareThreads(); unsigned llvm::ThreadPoolStrategy::compute_thread_count() const { + if (UseJobserver) + if (auto JS = JobserverClient::getInstance()) + return JS->getNumJobs(); + int MaxThreadCount = UseHyperThreads ? computeHostNumHardwareThreads() : get_physical_cores(); if (MaxThreadCount <= 0) diff --git a/llvm/lib/Support/Unix/Jobserver.inc b/llvm/lib/Support/Unix/Jobserver.inc new file mode 100644 index 0000000000000..48599cf1126c7 --- /dev/null +++ b/llvm/lib/Support/Unix/Jobserver.inc @@ -0,0 +1,195 @@ +//===- llvm/Support/Unix/Jobserver.inc - Unix Jobserver Impl ----*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// This file implements the UNIX-specific parts of the JobserverClient class. +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include +#include +#include +#include + +namespace { +/// Returns true if the given file descriptor is a FIFO (named pipe). +bool isFifo(int FD) { + struct stat StatBuf; + if (::fstat(FD, &StatBuf) != 0) + return false; + return S_ISFIFO(StatBuf.st_mode); +} + +/// Returns true if the given file descriptors are valid. +bool areFdsValid(int ReadFD, int WriteFD) { + if (ReadFD == -1 || WriteFD == -1) + return false; + // Check if the file descriptors are actually valid by checking their flags. + return ::fcntl(ReadFD, F_GETFD) != -1 && ::fcntl(WriteFD, F_GETFD) != -1; +} +} // namespace + +/// The constructor sets up the client based on the provided configuration. +/// For pipe-based jobservers, it duplicates the inherited file descriptors, +/// sets them to close-on-exec, and makes the read descriptor non-blocking. +/// For FIFO-based jobservers, it opens the named pipe. After setup, it drains +/// all available tokens from the jobserver to determine the total number of +/// available jobs (`NumJobs`), then immediately releases them back. +JobserverClientImpl::JobserverClientImpl(const JobserverConfig &Config) { + switch (Config.TheMode) { + case JobserverConfig::PosixPipe: { + // Duplicate the read and write file descriptors. + int NewReadFD = ::dup(Config.ReadFD); + if (NewReadFD < 0) + return; + int NewWriteFD = ::dup(Config.WriteFD); + if (NewWriteFD < 0) { + ::close(NewReadFD); + return; + } + // Set the new descriptors to be closed automatically on exec(). + if (::fcntl(NewReadFD, F_SETFD, FD_CLOEXEC) == -1 || + ::fcntl(NewWriteFD, F_SETFD, FD_CLOEXEC) == -1) { + ::close(NewReadFD); + ::close(NewWriteFD); + return; + } + // Set the read descriptor to non-blocking. + int flags = ::fcntl(NewReadFD, F_GETFL, 0); + if (flags == -1 || ::fcntl(NewReadFD, F_SETFL, flags | O_NONBLOCK) == -1) { + ::close(NewReadFD); + ::close(NewWriteFD); + return; + } + ReadFD = NewReadFD; + WriteFD = NewWriteFD; + break; + } + case JobserverConfig::PosixFifo: + // Open the FIFO for reading. It must be non-blocking and close-on-exec. + ReadFD = ::open(Config.Path.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC); + if (ReadFD < 0 || !isFifo(ReadFD)) { + if (ReadFD >= 0) + ::close(ReadFD); + ReadFD = -1; + return; + } + FifoPath = Config.Path; + // The write FD is opened on-demand in release(). + WriteFD = -1; + break; + default: + return; + } + + IsInitialized = true; + // Determine the total number of jobs by acquiring all available slots and + // then immediately releasing them. + SmallVector Slots; + while (true) { + auto S = tryAcquire(); + if (!S.isValid()) + break; + Slots.push_back(std::move(S)); + } + NumJobs = Slots.size(); + assert(NumJobs >= 1 && "Invalid number of jobs"); + for (auto &S : Slots) + release(std::move(S)); +} + +/// The destructor closes any open file descriptors. +JobserverClientImpl::~JobserverClientImpl() { + if (ReadFD >= 0) + ::close(ReadFD); + if (WriteFD >= 0) + ::close(WriteFD); +} + +/// Tries to acquire a job slot. The first call to this function will always +/// successfully acquire the single "implicit" slot that is granted to every +/// process started by `make`. Subsequent calls attempt to read a one-byte +/// token from the jobserver's read pipe. A successful read grants one +/// explicit job slot. The read is non-blocking; if no token is available, +/// it fails and returns an invalid JobSlot. +JobSlot JobserverClientImpl::tryAcquire() { + if (!IsInitialized) + return JobSlot(); + + // The first acquisition is always for the implicit slot. + if (HasImplicitSlot.exchange(false, std::memory_order_acquire)) { + LLVM_DEBUG(dbgs() << "Acquired implicit job slot.\n"); + return JobSlot::createImplicit(); + } + + char Token; + ssize_t Ret; + LLVM_DEBUG(dbgs() << "Attempting to read token from FD " << ReadFD << ".\n"); + // Loop to retry on EINTR (interrupted system call). + do { + Ret = ::read(ReadFD, &Token, 1); + } while (Ret < 0 && errno == EINTR); + + if (Ret == 1) { + LLVM_DEBUG(dbgs() << "Acquired explicit token '" << Token << "'.\n"); + return JobSlot::createExplicit(static_cast(Token)); + } + + LLVM_DEBUG(dbgs() << "Failed to acquire job slot, read returned " << Ret + << ".\n"); + return JobSlot(); +} + +/// Releases a job slot back to the pool. If the slot is implicit, it simply +/// resets a flag. If the slot is explicit, it writes the character token +/// associated with the slot back into the jobserver's write pipe. For FIFO +/// jobservers, this may require opening the FIFO for writing if it hasn't +/// been already. +void JobserverClientImpl::release(JobSlot Slot) { + if (!Slot.isValid()) + return; + + // Releasing the implicit slot just makes it available for the next acquire. + if (Slot.isImplicit()) { + LLVM_DEBUG(dbgs() << "Released implicit job slot.\n"); + [[maybe_unused]] bool was_already_released = + HasImplicitSlot.exchange(true, std::memory_order_release); + assert(!was_already_released && "Implicit slot released twice"); + return; + } + + uint8_t Token = Slot.getExplicitValue(); + LLVM_DEBUG(dbgs() << "Releasing explicit token '" << (char)Token + << "' to FD " << WriteFD << ".\n"); + + // For FIFO-based jobservers, the write FD might not be open yet. + // Open it on the first release. + if (WriteFD < 0) { + LLVM_DEBUG(dbgs() << "WriteFD is invalid, opening FIFO: " << FifoPath + << "\n"); + WriteFD = ::open(FifoPath.c_str(), O_WRONLY | O_CLOEXEC); + if (WriteFD < 0) { + LLVM_DEBUG(dbgs() << "Failed to open FIFO for writing.\n"); + return; + } + LLVM_DEBUG(dbgs() << "Opened FIFO as new WriteFD: " << WriteFD << "\n"); + } + + ssize_t Written; + // Loop to retry on EINTR (interrupted system call). + do { + Written = ::write(WriteFD, &Token, 1); + } while (Written < 0 && errno == EINTR); + + if (Written <= 0) { + LLVM_DEBUG(dbgs() << "Failed to write token to pipe, write returned " + << Written << "\n"); + } +} diff --git a/llvm/lib/Support/Windows/Jobserver.inc b/llvm/lib/Support/Windows/Jobserver.inc new file mode 100644 index 0000000000000..f7fd6cf89c83f --- /dev/null +++ b/llvm/lib/Support/Windows/Jobserver.inc @@ -0,0 +1,75 @@ +//==- llvm/Support/Windows/Jobserver.inc - Windows Jobserver Impl -*- C++ -*-=// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// This file implements the Windows-specific parts of the JobserverClient class. +// On Windows, the jobserver is implemented using a named semaphore. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/Windows/WindowsSupport.h" +#include +#include + +/// The constructor for the Windows jobserver client. It attempts to open a +/// handle to an existing named semaphore, the name of which is provided by +/// GNU make in the --jobserver-auth argument. If the semaphore is opened +/// successfully, the client is marked as initialized. +JobserverClientImpl::JobserverClientImpl(const JobserverConfig &Config) { + Semaphore = (void *)::OpenSemaphoreA(SEMAPHORE_MODIFY_STATE | SYNCHRONIZE, + FALSE, Config.Path.c_str()); + if (Semaphore != nullptr) + IsInitialized = true; +} + +/// The destructor closes the handle to the semaphore, releasing the resource. +JobserverClientImpl::~JobserverClientImpl() { + if (Semaphore != nullptr) + ::CloseHandle((HANDLE)Semaphore); +} + +/// Tries to acquire a job slot. The first call always returns the implicit +/// slot. Subsequent calls use a non-blocking wait on the semaphore +/// (`WaitForSingleObject` with a timeout of 0). If the wait succeeds, the +/// semaphore's count is decremented, and an explicit job slot is acquired. +/// If the wait times out, it means no slots are available, and an invalid +/// slot is returned. +JobSlot JobserverClientImpl::tryAcquire() { + if (!IsInitialized) return JobSlot(); + + // First, grant the implicit slot. + if (HasImplicitSlot.exchange(false, std::memory_order_acquire)) { + return JobSlot::createImplicit(); + } + + // Try to acquire a slot from the semaphore without blocking. + if (::WaitForSingleObject((HANDLE)Semaphore, 0) == WAIT_OBJECT_0) { + // The explicit token value is arbitrary on Windows, as the semaphore + // count is the real resource. + return JobSlot::createExplicit(1); + } + + return JobSlot(); // Invalid slot +} + +/// Releases a job slot back to the pool. If the slot is implicit, it simply +/// resets a flag. For an explicit slot, it increments the semaphore's count +/// by one using `ReleaseSemaphore`, making the slot available to other +/// processes. +void JobserverClientImpl::release(JobSlot Slot) { + if (!IsInitialized || !Slot.isValid()) return; + + if (Slot.isImplicit()) { + [[maybe_unused]] bool was_already_released = + HasImplicitSlot.exchange(true, std::memory_order_release); + assert(!was_already_released && "Implicit slot released twice"); + return; + } + + // Release the slot by incrementing the semaphore count. + (void)::ReleaseSemaphore((HANDLE)Semaphore, 1, NULL); +} diff --git a/llvm/unittests/Support/CMakeLists.txt b/llvm/unittests/Support/CMakeLists.txt index d048e871fd0fb..9ff4620001643 100644 --- a/llvm/unittests/Support/CMakeLists.txt +++ b/llvm/unittests/Support/CMakeLists.txt @@ -51,6 +51,7 @@ add_llvm_unittest(SupportTests IndexedAccessorTest.cpp InstructionCostTest.cpp InterleavedRangeTest.cpp + JobserverTest.cpp JSONTest.cpp KnownBitsTest.cpp LEB128Test.cpp diff --git a/llvm/unittests/Support/JobserverTest.cpp b/llvm/unittests/Support/JobserverTest.cpp new file mode 100644 index 0000000000000..b4922a5f378fa --- /dev/null +++ b/llvm/unittests/Support/JobserverTest.cpp @@ -0,0 +1,436 @@ +//===- llvm/unittest/Support/JobserverTest.cpp ----------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +/// +/// \file +/// Jobserver.h unit tests. +/// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/Jobserver.h" +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/Debug.h" +#include "llvm/Support/Parallel.h" +#include "llvm/Support/ThreadPool.h" +#include "llvm/Support/raw_ostream.h" +#include "gtest/gtest.h" +#include +#include +#include + +#if defined(LLVM_ON_UNIX) +#include "llvm/ADT/SmallString.h" +#include "llvm/Support/FileSystem.h" +#include +#include +#include +#include +#include +#include +#include +#elif defined(_WIN32) +#include +#endif + +#define DEBUG_TYPE "jobserver-test" + +using namespace llvm; + +namespace { + +// RAII helper to set an environment variable for the duration of a test. +class ScopedEnvironment { + std::string Name; + std::string OldValue; + bool HadOldValue; + +public: + ScopedEnvironment(const char *Name, const char *Value) : Name(Name) { +#if defined(_WIN32) + char *Old = nullptr; + size_t OldLen; + errno_t err = _dupenv_s(&Old, &OldLen, Name); + if (err == 0 && Old != nullptr) { + HadOldValue = true; + OldValue = Old; + free(Old); + } else { + HadOldValue = false; + } + _putenv_s(Name, Value); +#else + const char *Old = getenv(Name); + if (Old) { + HadOldValue = true; + OldValue = Old; + } else { + HadOldValue = false; + } + setenv(Name, Value, 1); +#endif + } + + ~ScopedEnvironment() { +#if defined(_WIN32) + if (HadOldValue) + _putenv_s(Name.c_str(), OldValue.c_str()); + else + // On Windows, setting an environment variable to an empty string + // unsets it, making getenv() return NULL. + _putenv_s(Name.c_str(), ""); +#else + if (HadOldValue) + setenv(Name.c_str(), OldValue.c_str(), 1); + else + unsetenv(Name.c_str()); +#endif + } +}; + +TEST(Jobserver, Slot) { + // Default constructor creates an invalid slot. + JobSlot S1; + EXPECT_FALSE(S1.isValid()); + EXPECT_FALSE(S1.isImplicit()); + + // Create an implicit slot. + JobSlot S2 = JobSlot::createImplicit(); + EXPECT_TRUE(S2.isValid()); + EXPECT_TRUE(S2.isImplicit()); + + // Create an explicit slot. + JobSlot S3 = JobSlot::createExplicit(42); + EXPECT_TRUE(S3.isValid()); + EXPECT_FALSE(S3.isImplicit()); + + // Test move construction. + JobSlot S4 = std::move(S2); + EXPECT_TRUE(S4.isValid()); + EXPECT_TRUE(S4.isImplicit()); + EXPECT_FALSE(S2.isValid()); // S2 is now invalid. + + // Test move assignment. + S1 = std::move(S3); + EXPECT_TRUE(S1.isValid()); + EXPECT_FALSE(S1.isImplicit()); + EXPECT_FALSE(S3.isValid()); // S3 is now invalid. +} + +// Test fixture for parsing tests to ensure the singleton state is +// reset between each test case. +class JobserverParsingTest : public ::testing::Test { +protected: + void TearDown() override { JobserverClient::resetForTesting(); } +}; + +TEST_F(JobserverParsingTest, NoMakeflags) { + // No MAKEFLAGS, should be null. + ScopedEnvironment Env("MAKEFLAGS", ""); + // On Unix, setting an env var to "" makes getenv() return an empty + // string, not NULL. We must call unsetenv() to test the case where + // the variable is truly not present. +#if !defined(_WIN32) + unsetenv("MAKEFLAGS"); +#endif + EXPECT_EQ(JobserverClient::getInstance(), nullptr); +} + +TEST_F(JobserverParsingTest, EmptyMakeflags) { + // Empty MAKEFLAGS, should be null. + ScopedEnvironment Env("MAKEFLAGS", ""); + EXPECT_EQ(JobserverClient::getInstance(), nullptr); +} + +TEST_F(JobserverParsingTest, DryRunFlag) { + // Dry-run flag 'n', should be null. + ScopedEnvironment Env("MAKEFLAGS", "n -j --jobserver-auth=fifo:/tmp/foo"); + EXPECT_EQ(JobserverClient::getInstance(), nullptr); +} + +// Separate fixture for non-threaded client tests. +class JobserverClientTest : public JobserverParsingTest {}; + +#if defined(LLVM_ON_UNIX) +// RAII helper to create and clean up a temporary FIFO file. +class ScopedFifo { + SmallString<128> Path; + bool IsValid = false; + +public: + ScopedFifo() { + // To get a unique, non-colliding name for a FIFO, we use the + // createTemporaryFile function to reserve a name in the filesystem. + std::error_code EC = + sys::fs::createTemporaryFile("jobserver-test", "fifo", Path); + if (EC) + return; + // Then we immediately remove the regular file it created, but keep the + // unique path. + sys::fs::remove(Path); + // Finally, we create the FIFO at that safe, unique path. + if (mkfifo(Path.c_str(), 0600) != 0) + return; + IsValid = true; + } + + ~ScopedFifo() { + if (IsValid) + sys::fs::remove(Path); + } + + const char *c_str() const { return Path.data(); } + bool isValid() const { return IsValid; } +}; + +TEST_F(JobserverClientTest, UnixClientFifo) { + // This test covers basic FIFO client creation and behavior with an empty + // FIFO. No job tokens are available. + ScopedFifo F; + ASSERT_TRUE(F.isValid()); + + std::string Makeflags = "--jobserver-auth=fifo:"; + Makeflags += F.c_str(); + ScopedEnvironment Env("MAKEFLAGS", Makeflags.c_str()); + + JobserverClient *Client = JobserverClient::getInstance(); + ASSERT_NE(Client, nullptr); + + // Get the implicit token. + JobSlot S1 = Client->tryAcquire(); + EXPECT_TRUE(S1.isValid()); + EXPECT_TRUE(S1.isImplicit()); + + // FIFO is empty, next acquire fails. + JobSlot S2 = Client->tryAcquire(); + EXPECT_FALSE(S2.isValid()); + + // Release does not write to the pipe for the implicit token. + Client->release(std::move(S1)); + + // Re-acquire the implicit token. + S1 = Client->tryAcquire(); + EXPECT_TRUE(S1.isValid()); +} + +#if LLVM_ENABLE_THREADS +// Test fixture for tests that use the jobserver strategy. It creates a +// temporary FIFO, sets MAKEFLAGS, and provides a helper to pre-load the FIFO +// with job tokens, simulating `make -jN`. +class JobserverStrategyTest : public JobserverParsingTest { +protected: + std::unique_ptr TheFifo; + std::thread MakeThread; + std::atomic StopMakeThread{false}; + + void SetUp() override { + TheFifo = std::make_unique(); + ASSERT_TRUE(TheFifo->isValid()); + + std::string MakeFlags = "--jobserver-auth=fifo:"; + MakeFlags += TheFifo->c_str(); + setenv("MAKEFLAGS", MakeFlags.c_str(), 1); + } + + void TearDown() override { + if (MakeThread.joinable()) { + StopMakeThread = true; + MakeThread.join(); + } + unsetenv("MAKEFLAGS"); + TheFifo.reset(); + JobserverClient::resetForTesting(); + } + + // Starts a background thread that emulates `make`. It populates the FIFO + // with initial tokens and then recycles tokens released by clients. + void startMakeProxy(int NumInitialJobs) { + MakeThread = std::thread([this, NumInitialJobs]() { + LLVM_DEBUG(dbgs() << "[MakeProxy] Thread started.\n"); + // Open the FIFO for reading and writing. This call does not block. + int RWFd = open(TheFifo->c_str(), O_RDWR); + LLVM_DEBUG(dbgs() << "[MakeProxy] Opened FIFO " << TheFifo->c_str() + << " with O_RDWR, FD=" << RWFd << "\n"); + if (RWFd == -1) { + LLVM_DEBUG( + dbgs() + << "[MakeProxy] ERROR: Failed to open FIFO with O_RDWR. Errno: " + << errno << "\n"); + return; + } + + // Populate with initial jobs. + LLVM_DEBUG(dbgs() << "[MakeProxy] Writing " << NumInitialJobs + << " initial tokens.\n"); + for (int i = 0; i < NumInitialJobs; ++i) { + if (write(RWFd, "+", 1) != 1) { + LLVM_DEBUG(dbgs() + << "[MakeProxy] ERROR: Failed to write initial token " << i + << ".\n"); + close(RWFd); + return; + } + } + LLVM_DEBUG(dbgs() << "[MakeProxy] Finished writing initial tokens.\n"); + + // Make the read non-blocking so we can periodically check StopMakeThread. + int flags = fcntl(RWFd, F_GETFL, 0); + fcntl(RWFd, F_SETFL, flags | O_NONBLOCK); + + while (!StopMakeThread) { + char Token; + ssize_t Ret = read(RWFd, &Token, 1); + if (Ret == 1) { + LLVM_DEBUG(dbgs() << "[MakeProxy] Read token '" << Token + << "' to recycle.\n"); + // A client released a token, 'make' makes it available again. + std::this_thread::sleep_for(std::chrono::microseconds(100)); + ssize_t WRet; + do { + WRet = write(RWFd, &Token, 1); + } while (WRet < 0 && errno == EINTR); + if (WRet <= 0) { + LLVM_DEBUG( + dbgs() + << "[MakeProxy] ERROR: Failed to write recycled token.\n"); + break; // Error, stop the proxy. + } + LLVM_DEBUG(dbgs() + << "[MakeProxy] Wrote token '" << Token << "' back.\n"); + } else if (Ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + LLVM_DEBUG(dbgs() << "[MakeProxy] ERROR: Read failed with errno " + << errno << ".\n"); + break; // Error, stop the proxy. + } + // Yield to prevent this thread from busy-waiting. + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + LLVM_DEBUG(dbgs() << "[MakeProxy] Thread stopping.\n"); + close(RWFd); + }); + + // Give the proxy thread a moment to start and populate the FIFO. + // This is a simple way to avoid a race condition where the client starts + // before the initial tokens are in the pipe. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } +}; + +TEST_F(JobserverStrategyTest, ThreadPoolConcurrencyIsLimited) { + // This test simulates `make -j3`. We will have 1 implicit job slot and + // we will add 2 explicit job tokens to the FIFO, for a total of 3. + const int NumExplicitJobs = 2; + const int ConcurrencyLimit = NumExplicitJobs + 1; // +1 for the implicit slot + const int NumTasks = 8; // More tasks than available slots. + + LLVM_DEBUG(dbgs() << "Calling startMakeProxy with " << NumExplicitJobs + << " jobs.\n"); + startMakeProxy(NumExplicitJobs); + LLVM_DEBUG(dbgs() << "MakeProxy is running.\n"); + + // Create the thread pool. Its constructor will call jobserver_concurrency() + // and create a client that reads from our pre-loaded FIFO. + StdThreadPool Pool(jobserver_concurrency()); + + std::atomic ActiveTasks{0}; + std::atomic MaxActiveTasks{0}; + std::atomic CompletedTasks{0}; + std::mutex M; + std::condition_variable CV; + + // Dispatch more tasks than there are job slots. The pool should block + // and only run up to `ConcurrencyLimit` tasks at once. + for (int i = 0; i < NumTasks; ++i) { + Pool.async([&, i] { + // Track the number of concurrently running tasks. + int CurrentActive = ++ActiveTasks; + LLVM_DEBUG(dbgs() << "Task " << i << ": Active tasks: " << CurrentActive + << "\n"); + int OldMax = MaxActiveTasks.load(); + while (CurrentActive > OldMax) + MaxActiveTasks.compare_exchange_weak(OldMax, CurrentActive); + + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + + --ActiveTasks; + if (++CompletedTasks == NumTasks) { + std::lock_guard Lock(M); + CV.notify_one(); + } + }); + } + + // Wait for all tasks to complete. + std::unique_lock Lock(M); + CV.wait(Lock, [&] { return CompletedTasks == NumTasks; }); + + LLVM_DEBUG(dbgs() << "Test finished. Max active tasks was " << MaxActiveTasks + << ".\n"); + // The key assertion: the maximum number of concurrent tasks should + // not have exceeded the limit imposed by the jobserver. + EXPECT_LE(MaxActiveTasks, ConcurrencyLimit); + EXPECT_EQ(CompletedTasks, NumTasks); +} + +TEST_F(JobserverStrategyTest, ParallelForIsLimited) { + // This test verifies that llvm::parallelFor respects the jobserver limit. + const int NumExplicitJobs = 3; + const int ConcurrencyLimit = NumExplicitJobs + 1; // +1 implicit + const int NumTasks = 20; + + LLVM_DEBUG(dbgs() << "Calling startMakeProxy with " << NumExplicitJobs + << " jobs.\n"); + startMakeProxy(NumExplicitJobs); + LLVM_DEBUG(dbgs() << "MakeProxy is running.\n"); + + // Set the global strategy. parallelFor will use this. + parallel::strategy = jobserver_concurrency(); + + std::atomic ActiveTasks{0}; + std::atomic MaxActiveTasks{0}; + + parallelFor(0, NumTasks, [&](int i) { + int CurrentActive = ++ActiveTasks; + LLVM_DEBUG(dbgs() << "Task " << i << ": Active tasks: " << CurrentActive + << "\n"); + int OldMax = MaxActiveTasks.load(); + while (CurrentActive > OldMax) + MaxActiveTasks.compare_exchange_weak(OldMax, CurrentActive); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + --ActiveTasks; + }); + + LLVM_DEBUG(dbgs() << "ParallelFor finished. Max active tasks was " + << MaxActiveTasks << ".\n"); + EXPECT_LE(MaxActiveTasks, ConcurrencyLimit); +} + +TEST_F(JobserverStrategyTest, ParallelSortIsLimited) { + // This test serves as an integration test to ensure parallelSort completes + // correctly when running under the jobserver strategy. It doesn't directly + // measure concurrency but verifies correctness. + const int NumExplicitJobs = 3; + startMakeProxy(NumExplicitJobs); + + parallel::strategy = jobserver_concurrency(); + + std::vector V(1024); + // Fill with random data + std::mt19937 randEngine; + std::uniform_int_distribution dist; + for (int &i : V) + i = dist(randEngine); + + parallelSort(V.begin(), V.end()); + ASSERT_TRUE(llvm::is_sorted(V)); +} + +#endif // LLVM_ENABLE_THREADS + +#endif // defined(LLVM_ON_UNIX) + +} // end anonymous namespace