Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 61 additions & 22 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,24 @@ struct curlFileTransfer : public FileTransfer
}
};

bool quit = false;
std::
priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator>
incoming;
private:
bool quitting = false;
public:
void quit()
{
quitting = true;
/* We wil not be processing any more incomming requests */
while (!incoming.empty())
incoming.pop();
}

bool isQuitting()
{
return quitting;
}
};

Sync<State> state_;
Expand All @@ -611,11 +625,8 @@ struct curlFileTransfer : public FileTransfer

std::thread workerThread;

curlFileTransfer()
: mt19937(rd())
void setup()
{
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);

curlm = curl_multi_init();

Expand All @@ -626,31 +637,54 @@ struct curlFileTransfer : public FileTransfer
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get());
#endif

workerThread = std::thread([&]() { workerThreadEntry(); });
}

curlFileTransfer()
: mt19937(rd())
{
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);

#ifndef _WIN32 // TODO need graceful async exit support on Windows?
wakeupPipe.create();
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
#endif

workerThread = std::thread([&]() { workerThreadEntry(); });
setup();
}

~curlFileTransfer()
void tearDown(Sync<State>::WriteLock & state)
{
stopWorkerThread();
stopWorkerThread(state);

workerThread.join();

if (curlm)
curl_multi_cleanup(curlm);
}

void stopWorkerThread()
~curlFileTransfer()
{
auto state(state_.lock());
tearDown(state);
}

void restart(Sync<State>::WriteLock state)
{
// Same as destructor
tearDown(state);

// Fresh state, but reuse global mutex
*state = State{};
// Same as constructor
setup();
}

void stopWorkerThread(Sync<State>::WriteLock & state)
{
/* Signal the worker thread to exit. */
{
auto state(state_.lock());
state->quit = true;
}
state->quit();
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " ", false);
#endif
Expand All @@ -660,7 +694,10 @@ struct curlFileTransfer : public FileTransfer
{
/* Cause this thread to be notified on SIGINT. */
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
auto callback = createInterruptCallback([&]() { stopWorkerThread(); });
auto callback = createInterruptCallback([&]() {
auto state(state_.lock());
stopWorkerThread(state);
});
#endif

#ifdef __linux__
Expand Down Expand Up @@ -750,7 +787,7 @@ struct curlFileTransfer : public FileTransfer
break;
}
}
quit = state->quit;
quit = state->isQuitting();
}

for (auto & item : incoming) {
Expand All @@ -767,18 +804,20 @@ struct curlFileTransfer : public FileTransfer

void workerThreadEntry()
{
// Unwinding or because someone called `quit`.
bool normalExit = true;
try {
workerThreadMain();
} catch (nix::Interrupted & e) {
normalExit = false;
} catch (std::exception & e) {
printError("unexpected error in download thread: %s", e.what());
normalExit = false;
}

{
if (!normalExit) {
auto state(state_.lock());
while (!state->incoming.empty())
state->incoming.pop();
state->quit = true;
state->quit();
}
}

Expand All @@ -789,7 +828,7 @@ struct curlFileTransfer : public FileTransfer

{
auto state(state_.lock());
if (state->quit)
if (state->isQuitting())
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item);
}
Expand Down Expand Up @@ -845,8 +884,8 @@ ref<FileTransfer> getFileTransfer()
{
static ref<curlFileTransfer> fileTransfer = makeCurlFileTransfer();

if (fileTransfer->state_.lock()->quit)
fileTransfer = makeCurlFileTransfer();
if (auto state(fileTransfer->state_.lock()); state->isQuitting())
fileTransfer->restart(std::move(state));

return fileTransfer;
}
Expand Down
Loading