diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index a162df1ad3b..117b2c3ea7a 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -594,10 +594,24 @@ struct curlFileTransfer : public FileTransfer } }; - bool quit = false; std:: priority_queue, std::vector>, EmbargoComparator> incoming; + private: + bool quitting = false; + public: + void quit() + { + quitting = true; + /* We wil not be processing any more incomming requests */ + while (!incoming.empty()) + incoming.pop(); + } + + bool isQuitting() + { + return quitting; + } }; Sync state_; @@ -611,11 +625,8 @@ struct curlFileTransfer : public FileTransfer std::thread workerThread; - curlFileTransfer() - : mt19937(rd()) + void setup() { - static std::once_flag globalInit; - std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); curlm = curl_multi_init(); @@ -626,17 +637,26 @@ struct curlFileTransfer : public FileTransfer curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get()); #endif + workerThread = std::thread([&]() { workerThreadEntry(); }); + } + + curlFileTransfer() + : mt19937(rd()) + { + static std::once_flag globalInit; + std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); + #ifndef _WIN32 // TODO need graceful async exit support on Windows? wakeupPipe.create(); fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); #endif - workerThread = std::thread([&]() { workerThreadEntry(); }); + setup(); } - ~curlFileTransfer() + void tearDown(Sync::WriteLock & state) { - stopWorkerThread(); + stopWorkerThread(state); workerThread.join(); @@ -644,13 +664,27 @@ struct curlFileTransfer : public FileTransfer curl_multi_cleanup(curlm); } - void stopWorkerThread() + ~curlFileTransfer() + { + auto state(state_.lock()); + tearDown(state); + } + + void restart(Sync::WriteLock state) + { + // Same as destructor + tearDown(state); + + // Fresh state, but reuse global mutex + *state = State{}; + // Same as constructor + setup(); + } + + void stopWorkerThread(Sync::WriteLock & state) { /* Signal the worker thread to exit. */ - { - auto state(state_.lock()); - state->quit = true; - } + state->quit(); #ifndef _WIN32 // TODO need graceful async exit support on Windows? writeFull(wakeupPipe.writeSide.get(), " ", false); #endif @@ -660,7 +694,10 @@ struct curlFileTransfer : public FileTransfer { /* Cause this thread to be notified on SIGINT. */ #ifndef _WIN32 // TODO need graceful async exit support on Windows? - auto callback = createInterruptCallback([&]() { stopWorkerThread(); }); + auto callback = createInterruptCallback([&]() { + auto state(state_.lock()); + stopWorkerThread(state); + }); #endif #ifdef __linux__ @@ -750,7 +787,7 @@ struct curlFileTransfer : public FileTransfer break; } } - quit = state->quit; + quit = state->isQuitting(); } for (auto & item : incoming) { @@ -767,18 +804,20 @@ struct curlFileTransfer : public FileTransfer void workerThreadEntry() { + // Unwinding or because someone called `quit`. + bool normalExit = true; try { workerThreadMain(); } catch (nix::Interrupted & e) { + normalExit = false; } catch (std::exception & e) { printError("unexpected error in download thread: %s", e.what()); + normalExit = false; } - { + if (!normalExit) { auto state(state_.lock()); - while (!state->incoming.empty()) - state->incoming.pop(); - state->quit = true; + state->quit(); } } @@ -789,7 +828,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); - if (state->quit) + if (state->isQuitting()) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); state->incoming.push(item); } @@ -845,8 +884,8 @@ ref getFileTransfer() { static ref fileTransfer = makeCurlFileTransfer(); - if (fileTransfer->state_.lock()->quit) - fileTransfer = makeCurlFileTransfer(); + if (auto state(fileTransfer->state_.lock()); state->isQuitting()) + fileTransfer->restart(std::move(state)); return fileTransfer; }