Skip to content

Commit

Permalink
fix: handling consistent cancellation across stream and locks (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Jan 18, 2025
1 parent b1c07bc commit 42c6267
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 36 deletions.
6 changes: 6 additions & 0 deletions .changeset/calm-adults-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tus/server": patch
"@tus/utils": patch
---

Consistent cancellation across streams and locks, fixing lock on file never being unlocked when the request ends prematurely.
34 changes: 19 additions & 15 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import EventEmitter from 'node:events'
import stream from 'node:stream/promises'
import {addAbortSignal, PassThrough} from 'node:stream'
import {PassThrough, Readable} from 'node:stream'
import type http from 'node:http'

import type {ServerOptions} from '../types'
Expand Down Expand Up @@ -121,15 +121,15 @@ export class BaseHandler extends EventEmitter {

const lock = locker.newLock(id)

await lock.lock(() => {
await lock.lock(context.signal, () => {
context.cancel()
})

return lock
}

protected writeToStore(
req: http.IncomingMessage,
data: Readable,
upload: Upload,
maxFileSize: number,
context: CancellationContext
Expand All @@ -145,16 +145,25 @@ export class BaseHandler extends EventEmitter {
// Create a PassThrough stream as a proxy to manage the request stream.
// This allows for aborting the write process without affecting the incoming request stream.
const proxy = new PassThrough()
addAbortSignal(context.signal, proxy)

// gracefully terminate the proxy stream when the request is aborted
const onAbort = () => {
data.unpipe(proxy)

if (!proxy.closed) {
proxy.end()
}
}
context.signal.addEventListener('abort', onAbort, {once: true})

proxy.on('error', (err) => {
req.unpipe(proxy)
data.unpipe(proxy)
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})

const postReceive = throttle(
(offset: number) => {
this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset})
this.emit(EVENTS.POST_RECEIVE_V2, data, {...upload, offset})
},
this.options.postReceiveInterval,
{leading: false}
Expand All @@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter {
postReceive(tempOffset)
})

req.on('error', () => {
if (!proxy.closed) {
// we end the stream gracefully here so that we can upload the remaining bytes to the store
// as an incompletePart
proxy.end()
}
})

// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
// which would result in a socket hangup error for the client.
stream
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
})
.then(resolve)
.catch(reject)
.finally(() => {
context.signal.removeEventListener('abort', onAbort)
})
})
}

Expand Down
15 changes: 9 additions & 6 deletions packages/server/src/lockers/MemoryLocker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ class MemoryLock implements Lock {
private timeout: number = 1000 * 30
) {}

async lock(requestRelease: RequestRelease): Promise<void> {
async lock(stopSignal: AbortSignal, requestRelease: RequestRelease): Promise<void> {
const abortController = new AbortController()

const abortSignal = AbortSignal.any([stopSignal, abortController.signal])

const lock = await Promise.race([
this.waitTimeout(abortController.signal),
this.acquireLock(this.id, requestRelease, abortController.signal),
this.waitTimeout(abortSignal),
this.acquireLock(this.id, requestRelease, abortSignal),
])

abortController.abort()
Expand All @@ -68,12 +71,12 @@ class MemoryLock implements Lock {
requestRelease: RequestRelease,
signal: AbortSignal
): Promise<boolean> {
const lock = this.locker.locks.get(id)

if (signal.aborted) {
return false
return typeof lock !== 'undefined'
}

const lock = this.locker.locks.get(id)

if (!lock) {
const lock = {
requestRelease,
Expand Down
16 changes: 16 additions & 0 deletions packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ export class Server extends EventEmitter {
): Promise<http.ServerResponse | stream.Writable | void> {
const context = this.createContext(req)

// Once the request is closed we abort the context to clean up underline resources
req.on('close', () => {
context.abort()
})

log(`[TusServer] handle: ${req.method} ${req.url}`)
// Allow overriding the HTTP method. The reason for this is
// that some libraries/environments to not support PATCH and
Expand Down Expand Up @@ -289,6 +294,17 @@ export class Server extends EventEmitter {

res.writeHead(status, headers)
res.write(body)

// Abort the context once the response is sent.
// Useful for clean-up when the server uses keep-alive
if (!isAborted) {
res.on('finish', () => {
if (!req.closed) {
context.abort()
}
})
}

return res.end()
}

Expand Down
51 changes: 45 additions & 6 deletions packages/server/test/Locker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ describe('MemoryLocker', () => {
it('will acquire a lock by notifying another to release it', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const cancel = sinon.spy()
const cancel2 = sinon.spy()

const lock1 = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

await lock1.lock(async () => {
await lock1.lock(abortController.signal, async () => {
await lock1.unlock()
cancel()
})

await lock2.lock(async () => {
await lock2.lock(abortController.signal, async () => {
cancel2()
})

Expand All @@ -32,19 +33,21 @@ describe('MemoryLocker', () => {
const locker = new MemoryLocker({
acquireLockTimeout: 500,
})
const abortController = new AbortController()

const lockId = 'upload-id-1'
const lock = locker.newLock(lockId)

const cancel = sinon.spy()

await lock.lock(async () => {
await lock.lock(abortController.signal, async () => {
cancel()
// We note that the function has been called, but do not
// release the lock
})

try {
await lock.lock(async () => {
await lock.lock(abortController.signal, async () => {
throw new Error('panic should not be called')
})
} catch (e) {
Expand All @@ -57,18 +60,20 @@ describe('MemoryLocker', () => {
it('request lock and unlock', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const lock = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

const cancel = sinon.spy()
await lock.lock(() => {
await lock.lock(abortController.signal, () => {
cancel()
setTimeout(async () => {
await lock.unlock()
}, 50)
})

await lock2.lock(() => {
await lock2.lock(abortController.signal, () => {
throw new Error('should not be called')
})

Expand All @@ -79,4 +84,38 @@ describe('MemoryLocker', () => {
`request released called more times than expected - ${cancel.callCount}`
)
})

it('will stop trying to acquire the lock if the abort signal is aborted', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const cancel = sinon.spy()
const cancel2 = sinon.spy()

const lock1 = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

await lock1.lock(abortController.signal, async () => {
// do not unlock when requested
cancel()
})

// Abort signal is aborted after lock2 tries to acquire the lock
setTimeout(() => {
abortController.abort()
}, 100)

try {
await lock2.lock(abortController.signal, async () => {
cancel2()
})
assert(false, 'lock2 should not have been acquired')
} catch (e) {
assert(e === ERRORS.ERR_LOCK_TIMEOUT, `error returned is not correct ${e}`)
}

assert(cancel.callCount > 1, `calls count dont match ${cancel.callCount} !== 1`)
assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`)
})
})
67 changes: 66 additions & 1 deletion packages/server/test/PatchHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {EventEmitter} from 'node:events'
import {addPipableStreamBody} from './utils'
import {MemoryLocker} from '../src'
import streamP from 'node:stream/promises'
import stream from 'node:stream'
import stream, {PassThrough} from 'node:stream'

describe('PatchHandler', () => {
const path = '/test/output'
Expand Down Expand Up @@ -245,4 +245,69 @@ describe('PatchHandler', () => {
assert.equal(context.signal.aborted, true)
}
})

it('should gracefully terminate request stream when context is cancelled', async () => {
handler = new PatchHandler(store, {path, locker: new MemoryLocker()})

const bodyStream = new PassThrough() // 20kb buffer
const req = addPipableStreamBody(
httpMocks.createRequest({
method: 'PATCH',
url: `${path}/1234`,
body: bodyStream,
})
)

const abortController = new AbortController()
context = {
cancel: () => abortController.abort(),
abort: () => abortController.abort(),
signal: abortController.signal,
}

const res = httpMocks.createResponse({req})
req.headers = {
'upload-offset': '0',
'content-type': 'application/offset+octet-stream',
}
req.url = `${path}/file`

let accumulatedBuffer: Buffer = Buffer.alloc(0)

store.getUpload.resolves(new Upload({id: '1234', offset: 0}))
store.write.callsFake(async (readable: http.IncomingMessage | stream.Readable) => {
const writeStream = new stream.PassThrough()
const chunks: Buffer[] = []

writeStream.on('data', (chunk) => {
chunks.push(chunk) // Accumulate chunks in the outer buffer
})

await streamP.pipeline(readable, writeStream)

accumulatedBuffer = Buffer.concat([accumulatedBuffer, ...chunks])

return writeStream.readableLength
})
store.declareUploadLength.resolves()

await new Promise((resolve, reject) => {
handler.send(req, res, context).then(resolve).catch(reject)

// sends the first 20kb
bodyStream.write(Buffer.alloc(1024 * 20))

// write 15kb
bodyStream.write(Buffer.alloc(1024 * 15))

// simulate that the request was cancelled
setTimeout(() => {
context.abort()
}, 200)
})

// We expect that all the data was written to the store, 35kb
assert.equal(accumulatedBuffer.byteLength, 35 * 1024)
bodyStream.end()
})
})
33 changes: 26 additions & 7 deletions packages/server/test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type httpMocks from 'node-mocks-http'
import stream from 'node:stream'
import stream, {Readable, Transform, TransformCallback} from 'node:stream'
import type http from 'node:http'

export function addPipableStreamBody<
Expand All @@ -8,21 +8,40 @@ export function addPipableStreamBody<
// Create a Readable stream that simulates the request body
const bodyStream = new stream.Duplex({
read() {
this.push(
mockRequest.body instanceof Buffer
? mockRequest.body
: JSON.stringify(mockRequest.body)
)
this.push(null)
// This function is intentionally left empty since the data flow
// is controlled by event listeners registered outside of this method.
},
})

// Handle cases where the body is a Readable stream
if (mockRequest.body instanceof Readable) {
// Pipe the mockRequest.body to the bodyStream
mockRequest.body.on('data', (chunk) => {
bodyStream.push(chunk) // Push the chunk to the bodyStream
})

mockRequest.body.on('end', () => {
bodyStream.push(null) // Signal the end of the stream
})
} else {
// Handle cases where the body is not a stream (e.g., Buffer or plain object)
const bodyBuffer =
mockRequest.body instanceof Buffer
? mockRequest.body
: Buffer.from(JSON.stringify(mockRequest.body))

// Push the bodyBuffer and signal the end of the stream
bodyStream.push(bodyBuffer)
bodyStream.push(null)
}

// Add the pipe method to the mockRequest
// @ts-ignore
mockRequest.pipe = (dest: stream.Writable) => bodyStream.pipe(dest)

// Add the unpipe method to the mockRequest
// @ts-ignore
mockRequest.unpipe = (dest: stream.Writable) => bodyStream.unpipe(dest)

return mockRequest
}
Loading

0 comments on commit 42c6267

Please sign in to comment.