Skip to content
Open
18 changes: 17 additions & 1 deletion src/lib/libpipefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ addToLibrary({
// able to read from the read end after write end is closed.
refcnt : 2,
timestamp: new Date(),
readableHandlers: [],
registerReadableHanlders: (notifyCallback) => {
if (notifyCallback == null) return;
notifyCallback.registerCleanupFunc(() => {
const i = pipe.readableHandlers.indexOf(notifyCallback);
if (i !== -1) pipe.readableHandlers.splice(i, 1);
});
pipe.readableHandlers.push(notifyCallback);
},
notifyReadableHanders: () => {
pipe.readableHandlers.forEach(cb => cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}));
pipe.readableHandlers = [];
}
};

pipe.buckets.push({
Expand Down Expand Up @@ -80,7 +93,7 @@ addToLibrary({
blocks: 0,
};
},
poll(stream) {
poll(stream, timeout, notifyCallback) {
var pipe = stream.node.pipe;

if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
Expand All @@ -92,6 +105,7 @@ addToLibrary({
}
}

pipe.registerReadableHanlders(notifyCallback);
return 0;
},
dup(stream) {
Expand Down Expand Up @@ -204,6 +218,7 @@ addToLibrary({
if (freeBytesInCurrBuffer >= dataLen) {
currBucket.buffer.set(data, currBucket.offset);
currBucket.offset += dataLen;
pipe.notifyReadableHanders();
return dataLen;
} else if (freeBytesInCurrBuffer > 0) {
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
Expand Down Expand Up @@ -235,6 +250,7 @@ addToLibrary({
newBucket.buffer.set(data);
}

pipe.notifyReadableHanders();
return dataLen;
},
close(stream) {
Expand Down
56 changes: 55 additions & 1 deletion src/lib/libpthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ var LibraryPThread = {
'exit',
#if PTHREADS_DEBUG || ASSERTIONS
'$ptrToString',
#endif
#if PROXY_TO_PTHREAD
'$addThreadToActiveSelectCallbacks',
'$removeThreadFromActiveSelectCallbacks',
'$activeSelectCallbacks',
#endif
],
$PThread: {
Expand Down Expand Up @@ -577,6 +582,9 @@ var LibraryPThread = {
#endif
#if ASSERTIONS
assert(worker);
#endif
#if PROXY_TO_PTHREAD
removeThreadFromActiveSelectCallbacks(pthread_ptr);
#endif
PThread.returnWorkerToPool(worker);
},
Expand Down Expand Up @@ -621,6 +629,49 @@ var LibraryPThread = {
$registerTLSInit: (tlsInitFunc) => PThread.tlsInitFunctions.push(tlsInitFunc),
#endif

#if PROXY_TO_PTHREAD
// On the main worker, activeSelectCallbacks records the set of callbacks
// that are allowed to update the shared region. Any callback not in this
// set (i.e. when !isActiveSelectCallback) must not update the region.
//
// Each select syscall invocation must call deactivateSelectCallbacks to
// reset this set, ensuring that callbacks from previous invocations don't
// affect the current one.
//
// If a callback executes after the thread worker has already returned (due
// to a timeout, a readiness notification or other exceptional conditions)
// but before the next deactivation, it may still update the shared region.
// However the thread worker will not read that value and just ignore it.
//
// activeSelectCallbacks records multiple callback lists one per thread
// worker so that each worker can manage its own set of active callbacks
// independently.
$activeSelectCallbacks: {},
$addThreadToActiveSelectCallbacks__deps: ['malloc'],
$addThreadToActiveSelectCallbacks: (pthread_ptr) => {
activeSelectCallbacks[pthread_ptr] = {
buf: _malloc(8),
callbacks: [],
};
},
$removeThreadFromActiveSelectCallbacks: (pthread_ptr) => {
delete activeSelectCallbacks[pthread_ptr];
},
$getActiveSelectCallbacks: (pthread_ptr) => {
return activeSelectCallbacks[pthread_ptr];
},
$deactivateSelectCallbacks: (pthread_ptr) => {
activeSelectCallbacks[pthread_ptr].callbacks = [];
},
$activateSelectCallback: (pthread_ptr, cb) => {
activeSelectCallbacks[pthread_ptr].callbacks.push(cb);
},
$isActiveSelectCallback: (pthread_ptr, cb) => {
return (activeSelectCallbacks[pthread_ptr] != null) &&
(activeSelectCallbacks[pthread_ptr].callbacks.indexOf(cb) != -1);
},
#endif

$spawnThread: (threadParams) => {
#if ASSERTIONS
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! spawnThread() can only ever be called from main application thread!');
Expand Down Expand Up @@ -648,6 +699,9 @@ var LibraryPThread = {
arg: threadParams.arg,
pthread_ptr: threadParams.pthread_ptr,
};
#if PROXY_TO_PTHREAD
addThreadToActiveSelectCallbacks(threadParams.pthread_ptr);
#endif
#if OFFSCREENCANVAS_SUPPORT
// Note that we do not need to quote these names because they are only used
// in this file, and not from the external worker.js.
Expand Down Expand Up @@ -692,7 +746,7 @@ var LibraryPThread = {
$pthreadCreateProxied__deps: ['__pthread_create_js'],
$pthreadCreateProxied: (pthread_ptr, attr, startRoutine, arg) => ___pthread_create_js(pthread_ptr, attr, startRoutine, arg),

#if OFFSCREENCANVAS_SUPPORT
#if OFFSCREENCANVAS_SUPPORT || PROXY_TO_PTHREAD
// ASan wraps the emscripten_builtin_pthread_create call in
// __lsan::ScopedInterceptorDisabler. Unfortunately, that only disables it on
// the thread that made the call. __pthread_create_js gets proxied to the
Expand Down
203 changes: 151 additions & 52 deletions src/lib/libsyscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,71 @@ var SyscallsLibrary = {
#endif
return ret;
},

getTimeoutInMillis(timeout) {
// select(2) is declared to accept "struct timeval { time_t tv_sec; suseconds_t tv_usec; }".
// However, musl passes the two values to the syscall as an array of long values.
// Note that sizeof(time_t) != sizeof(long) in wasm32. The former is 8, while the latter is 4.
// This means using "C_STRUCTS.timeval\.tv_usec" leads to a wrong offset.
// So, instead, we use POINTER_SIZE.
var tv_sec = ({{{ makeGetValue('timeout', 0, 'i32') }}}),
tv_usec = ({{{ makeGetValue('timeout', POINTER_SIZE, 'i32') }}});
return (tv_sec + tv_usec / 1000000) * 1000;
},

parseSelectFDSet(readfds, writefds, exceptfds) {
var total = 0;

var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);

var dstReadLow = 0,
dstReadHigh = 0;
var dstWriteLow = 0,
dstWriteHigh = 0;
var dstExceptLow = 0,
dstExceptHigh = 0;

var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);

return {
getTotal: () => total,
setFlags: (fd, flags) => {
var mask = 1 << (fd % 32);

if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
total++;
}
},
commit: () => {
if (readfds) {
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
}
if (writefds) {
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
}
if (exceptfds) {
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};
}
}
};
},
},

$syscallGetVarargI__internal: true,
Expand Down Expand Up @@ -542,31 +607,46 @@ var SyscallsLibrary = {
FS.chdir(stream.path);
return 0;
},
__syscall__newselect__deps: ['$newselectInner','malloc','free'],
__syscall__newselect__proxy: 'none',
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeout) => {
#if PROXY_TO_PTHREAD
var waitPtr = _malloc(8);
var result = newselectInner(nfds, readfds, writefds, exceptfds, timeout, waitPtr);
if ((result != 0) || ((timeout) && (SYSCALLS.getTimeoutInMillis(timeout) == 0))) {
_free(waitPtr);
return result;
}
var fdRegion = {{{ makeGetValue('waitPtr', 0, '*') }}};
Atomics.wait(HEAP32 , fdRegion >> 2, -1);
var fd = Atomics.load(HEAP32 , fdRegion >> 2);
var flags = Atomics.load(HEAP32 , fdRegion >> 2 + 1);
_free(waitPtr);
if (fd < 0) return 0;
var fdSet = SYSCALLS.parseSelectFDSet(readfds, writefds, exceptfds);
fdSet.setFlags(fd, flags);
fdSet.commit();
return fdSet.getTotal();
#else
return newselectInner(nfds, readfds, writefds, exceptfds, timeout, -1);
#endif
},
#if PROXY_TO_PTHREAD
$newselectInner__deps: ['$PThread', '$deactivateSelectCallbacks', '$getActiveSelectCallbacks', '$activateSelectCallback', '$isActiveSelectCallback'],
#endif
$newselectInner__proxy: 'sync',
$newselectInner: (nfds, readfds, writefds, exceptfds, timeout, waitPtr) => {
// readfds are supported,
// writefds checks socket open status
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
// and so the exceptfds list will always return empty.
// timeout is supported, although on SOCKFS and PIPEFS these are ignored and always treated as 0 - fully async
// timeout is supported, although on SOCKFS these are ignored and always treated as 0 - fully async
// and PIPEFS supports timeout only when PROXY_TO_PTHREAD is enabled.
#if ASSERTIONS
assert(nfds <= 64, 'nfds must be less than or equal to 64'); // fd sets have 64 bits // TODO: this could be 1024 based on current musl headers
#endif

var total = 0;

var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);

var dstReadLow = 0,
dstReadHigh = 0;
var dstWriteLow = 0,
dstWriteHigh = 0;
var dstExceptLow = 0,
dstExceptHigh = 0;
var fdSet = SYSCALLS.parseSelectFDSet(readfds, writefds, exceptfds);

var allLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0) |
(writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0) |
Expand All @@ -577,6 +657,44 @@ var SyscallsLibrary = {

var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);

var timeoutInMillis = -1;
if (timeout) {
timeoutInMillis = SYSCALLS.getTimeoutInMillis(timeout);
}

#if PROXY_TO_PTHREAD
const pthread_ptr = PThread.currentProxiedOperationCallerThread;
deactivateSelectCallbacks(pthread_ptr); // deactivate all old callbacks
var makeNotifyCallback = (fd) => null;
var cleanupFuncs = [];
if (timeoutInMillis != 0) {
var info = getActiveSelectCallbacks(pthread_ptr);
{{{ makeSetValue('waitPtr', 0, 'info.buf', '*') }}};
Atomics.store(HEAP32, info.buf >> 2, -1); // Initialize the shared region
makeNotifyCallback = (fd) => {
var cb = (flags) => {
if (!isActiveSelectCallback(pthread_ptr, cb)) {
return; // This callback is no longer active.
}
deactivateSelectCallbacks(pthread_ptr); // Only the first event is notified.
cleanupFuncs.forEach(cb => cb());
Atomics.store(HEAP32, info.buf >> 2 + 1, flags);
Atomics.store(HEAP32, info.buf >> 2, fd);
Atomics.notify(HEAP32, info.buf >> 2);
}
cb.registerCleanupFunc = (f) => {
if (f != null) cleanupFuncs.push(f);
}
activateSelectCallback(pthread_ptr, cb);
return cb;
}
if (timeoutInMillis > 0) {
var cb = makeNotifyCallback(-2);
setTimeout(() => cb(0), timeoutInMillis);
}
}
#endif

for (var fd = 0; fd < nfds; fd++) {
var mask = 1 << (fd % 32);
if (!(check(fd, allLow, allHigh, mask))) {
Expand All @@ -588,48 +706,29 @@ var SyscallsLibrary = {
var flags = SYSCALLS.DEFAULT_POLLMASK;

if (stream.stream_ops.poll) {
var timeoutInMillis = -1;
if (timeout) {
// select(2) is declared to accept "struct timeval { time_t tv_sec; suseconds_t tv_usec; }".
// However, musl passes the two values to the syscall as an array of long values.
// Note that sizeof(time_t) != sizeof(long) in wasm32. The former is 8, while the latter is 4.
// This means using "C_STRUCTS.timeval.tv_usec" leads to a wrong offset.
// So, instead, we use POINTER_SIZE.
var tv_sec = (readfds ? {{{ makeGetValue('timeout', 0, 'i32') }}} : 0),
tv_usec = (readfds ? {{{ makeGetValue('timeout', POINTER_SIZE, 'i32') }}} : 0);
timeoutInMillis = (tv_sec + tv_usec / 1000000) * 1000;
}
flags = stream.stream_ops.poll(stream, timeoutInMillis);
#if PROXY_TO_PTHREAD
flags = stream.stream_ops.poll(stream, timeoutInMillis, makeNotifyCallback(fd));
#else
flags = stream.stream_ops.poll(stream, ((timeoutInMillis < 0) || readfds) ? timeoutInMillis : 0);
#endif
}

if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
total++;
}
fdSet.setFlags(fd, flags);
}

if (readfds) {
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
}
if (writefds) {
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
}
if (exceptfds) {
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};

#if PROXY_TO_PTHREAD
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
fdSet.commit(fd, flags);
// No wait will happen in the caller. Deactivate all callbacks.
deactivateSelectCallbacks(pthread_ptr);
cleanupFuncs.forEach(f => f());
}
#else
fdSet.commit(fd, flags);
#endif

return total;
return fdSet.getTotal();
},
_msync_js__i53abi: true,
_msync_js: (addr, len, prot, flags, fd, offset) => {
Expand Down
Loading