Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pre-terminate hook #1248

Merged
merged 4 commits into from
Mar 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/_advanced-topics/hooks.md
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ The table below provides an overview of all available hooks.
| post-receive | No | regularly while data is being transmitted. | logging upload progress, stopping running uploads | Yes |
| pre-finish | Yes | after all upload data has been received but before a response is sent. | sending custom data when an upload is finished | No |
| post-finish | No | after all upload data has been received and after a response is sent. | post-processing of upload, logging of upload end | Yes |
| pre-terminate | Yes | before an upload will be terminated. | checking if an upload should be deleted | No |
| post-terminate | No | after an upload has been terminated. | clean up of allocated resources | Yes |

Users should be aware of following things:
@@ -161,6 +162,13 @@ Below you can find an annotated, JSON-ish encoded example of a hook response:
// to the client.
"RejectUpload": false,

// RejectTermination will cause upload terminations via DELETE requests to be rejected,
// allowing the hook to control whether associated resources are deleted.
// This value is only respected for pre-terminate hooks. For other hooks,
// it is ignored. Use the HTTPResponse field to send details about the rejection
// to the client.
"RejectTermination": false,

// ChangeFileInfo can be set to change selected properties of an upload before
// it has been created.
// Changes are applied on a per-property basis, meaning that specifying just
7 changes: 7 additions & 0 deletions pkg/handler/config.go
Original file line number Diff line number Diff line change
@@ -75,6 +75,13 @@ type Config struct {
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreFinishResponseCallback func(hook HookEvent) (HTTPResponse, error)
// PreUploadTerminateCallback will be invoked on DELETE requests before an upload is terminated,
// giving the application the opportunity to reject the termination. For example, to ensure resources
// used by other services are not deleted.
// If the callback returns no error, optional values from HTTPResponse will be contained in the HTTP response.
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreUploadTerminateCallback func(hook HookEvent) (HTTPResponse, error)
// GracefulRequestCompletionTimeout is the timeout for operations to complete after an HTTP
// request has ended (successfully or by error). For example, if an HTTP request is interrupted,
// instead of stopping immediately, the handler and data store will be given some additional
68 changes: 68 additions & 0 deletions pkg/handler/terminate_test.go
Original file line number Diff line number Diff line change
@@ -54,9 +54,14 @@ func TestTerminate(t *testing.T) {
composer.UseTerminater(store)
composer.UseLocker(locker)

preTerminateCalled := false
handler, _ := NewHandler(Config{
StoreComposer: composer,
NotifyTerminatedUploads: true,
PreUploadTerminateCallback: func(hook HookEvent) (HTTPResponse, error) {
preTerminateCalled = true
return HTTPResponse{}, nil
},
})

c := make(chan HookEvent, 1)
@@ -81,6 +86,69 @@ func TestTerminate(t *testing.T) {
req := event.HTTPRequest
a.Equal("DELETE", req.Method)
a.Equal("foo", req.URI)

a.True(preTerminateCalled)
})

SubTest(t, "RejectTermination", func(t *testing.T, store *MockFullDataStore, _ *StoreComposer) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl)

gomock.InOrder(
locker.EXPECT().NewLock("foo").Return(lock, nil),
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
ID: "foo",
Size: 10,
}, nil),
lock.EXPECT().Unlock().Return(nil),
)

composer := NewStoreComposer()
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseLocker(locker)

a := assert.New(t)

handler, _ := NewHandler(Config{
StoreComposer: composer,
NotifyTerminatedUploads: true,
PreUploadTerminateCallback: func(hook HookEvent) (HTTPResponse, error) {
a.Equal("foo", hook.Upload.ID)
a.Equal(int64(10), hook.Upload.Size)

req := hook.HTTPRequest
a.Equal("DELETE", req.Method)
a.Equal("foo", req.URI)

return HTTPResponse{}, ErrUploadTerminationRejected
},
})

c := make(chan HookEvent, 1)
handler.TerminatedUploads = c

(&httpTest{
Method: "DELETE",
URL: "foo",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
},
Code: http.StatusBadRequest,
ResBody: "ERR_UPLOAD_TERMINATION_REJECTED: upload termination has been rejected by server\n",
}).Run(handler, t)

select {
case <-c:
a.Fail("Expected termination to be rejected")
default:
// Expected no event
}
})

SubTest(t, "NotProvided", func(t *testing.T, store *MockFullDataStore, _ *StoreComposer) {
20 changes: 16 additions & 4 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ var (
ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest)
ErrUploadTerminationRejected = NewError("ERR_UPLOAD_TERMINATION_REJECTED", "upload termination has been rejected by server", http.StatusBadRequest)
ErrUploadInterrupted = NewError("ERR_UPLOAD_INTERRUPTED", "upload has been interrupted by another request for this upload resource", http.StatusBadRequest)
ErrServerShutdown = NewError("ERR_SERVER_SHUTDOWN", "request has been interrupted because the server is shutting down", http.StatusServiceUnavailable)
ErrOriginNotAllowed = NewError("ERR_ORIGIN_NOT_ALLOWED", "request origin is not allowed", http.StatusForbidden)
@@ -1203,23 +1204,34 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
}

var info FileInfo
if handler.config.NotifyTerminatedUploads {
if handler.config.NotifyTerminatedUploads || handler.config.PreUploadTerminateCallback != nil {
info, err = upload.GetInfo(c)
if err != nil {
handler.sendError(c, err)
return
}
}

resp := HTTPResponse{
StatusCode: http.StatusNoContent,
}

if handler.config.PreUploadTerminateCallback != nil {
resp2, err := handler.config.PreUploadTerminateCallback(newHookEvent(c, info))
if err != nil {
handler.sendError(c, err)
return
}
resp = resp.MergeWith(resp2)
}

err = handler.terminateUpload(c, upload, info)
if err != nil {
handler.sendError(c, err)
return
}

handler.sendResp(c, HTTPResponse{
StatusCode: http.StatusNoContent,
})
handler.sendResp(c, resp)
}

// terminateUpload passes a given upload to the DataStore's Terminater,
37 changes: 36 additions & 1 deletion pkg/hooks/hooks.go
Original file line number Diff line number Diff line change
@@ -68,6 +68,12 @@ type HookResponse struct {
// to the client.
RejectUpload bool

// RejectTermination will cause the termination of the upload to be rejected, keeping the upload.
// This value is only respected for pre-terminate hooks. For other hooks,
// it is ignored. Use the HTTPResponse field to send details about the rejection
// to the client.
RejectTermination bool

// ChangeFileInfo can be set to change selected properties of an upload before
// it has been created. See the handler.FileInfoChanges type for more details.
// Changes are applied on a per-property basis, meaning that specifying just
@@ -91,10 +97,11 @@ const (
HookPostCreate HookType = "post-create"
HookPreCreate HookType = "pre-create"
HookPreFinish HookType = "pre-finish"
HookPreTerminate HookType = "pre-terminate"
)

// AvailableHooks is a slice of all hooks that are implemented by tusd.
var AvailableHooks []HookType = []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish}
var AvailableHooks []HookType = []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPreTerminate, HookPostTerminate, HookPostFinish, HookPreFinish}

func preCreateCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, handler.FileInfoChanges, error) {
ok, hookRes, err := invokeHookSync(HookPreCreate, event, hookHandler)
@@ -128,6 +135,26 @@ func preFinishCallback(event handler.HookEvent, hookHandler HookHandler) (handle
return httpRes, nil
}

func preTerminateCallback(event handler.HookEvent, hookHandler HookHandler) (handler.HTTPResponse, error) {
ok, hookRes, err := invokeHookSync(HookPreTerminate, event, hookHandler)
if !ok || err != nil {
return handler.HTTPResponse{}, err
}

httpRes := hookRes.HTTPResponse

// If the hook response includes the instruction to reject the termination, reuse the error code
// and message from ErrUploadTerminationRejected, but also include custom HTTP response values.
if hookRes.RejectTermination {
err := handler.ErrUploadTerminationRejected
err.HTTPResponse = err.HTTPResponse.MergeWith(httpRes)

return handler.HTTPResponse{}, err
}

return httpRes, nil
}

func postReceiveCallback(event handler.HookEvent, hookHandler HookHandler) {
ok, hookRes, _ := invokeHookSync(HookPostReceive, event, hookHandler)
// invokeHookSync already logs the error, if any occurs. So by checking `ok`, we can ensure
@@ -166,12 +193,14 @@ func SetupHookMetrics() {
MetricsHookErrorsTotal.WithLabelValues(string(HookPostCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreCreate)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreFinish)).Add(0)
MetricsHookErrorsTotal.WithLabelValues(string(HookPreTerminate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostTerminate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostReceive)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPostCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreCreate)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreFinish)).Add(0)
MetricsHookInvocationsTotal.WithLabelValues(string(HookPreTerminate)).Add(0)
}

func invokeHookAsync(typ HookType, event handler.HookEvent, hookHandler HookHandler) {
@@ -248,6 +277,12 @@ func NewHandlerWithHooks(config *handler.Config, hookHandler HookHandler, enable
}
}

if slices.Contains(enabledHooks, HookPreTerminate) {
config.PreUploadTerminateCallback = func(event handler.HookEvent) (handler.HTTPResponse, error) {
return preTerminateCallback(event, hookHandler)
}
}

// Create handler
handler, err := handler.NewHandler(*config)
if err != nil {
35 changes: 34 additions & 1 deletion pkg/hooks/hooks_test.go
Original file line number Diff line number Diff line change
@@ -89,6 +89,19 @@ func TestNewHandlerWithHooks(t *testing.T) {
Type: HookPreFinish,
Event: event,
}).Return(HookResponse{}, error),
hookHandler.EXPECT().InvokeHook(HookRequest{
Type: HookPreTerminate,
Event: event,
}).Return(HookResponse{
HTTPResponse: response,
}, nil),
hookHandler.EXPECT().InvokeHook(HookRequest{
Type: HookPreTerminate,
Event: event,
}).Return(HookResponse{
HTTPResponse: response,
RejectTermination: true,
}, nil),
)

// The hooks are executed asynchronously, so we don't know their execution order.
@@ -112,7 +125,7 @@ func TestNewHandlerWithHooks(t *testing.T) {
Event: event,
})

uploadHandler, err := NewHandlerWithHooks(&config, hookHandler, []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish})
uploadHandler, err := NewHandlerWithHooks(&config, hookHandler, []HookType{HookPreCreate, HookPostCreate, HookPostReceive, HookPostTerminate, HookPostFinish, HookPreFinish, HookPreTerminate})
a.NoError(err)

// Successful pre-create hook
@@ -148,6 +161,26 @@ func TestNewHandlerWithHooks(t *testing.T) {
a.Equal(error, err)
a.Equal(handler.HTTPResponse{}, resp_got)

// Successful pre-terminate hook
resp_got, err = config.PreUploadTerminateCallback(event)
a.NoError(err)
a.Equal(response, resp_got)

// Pre-terminate hook with rejection
resp_got, err = config.PreUploadTerminateCallback(event)
a.Equal(handler.Error{
ErrorCode: handler.ErrUploadTerminationRejected.ErrorCode,
Message: handler.ErrUploadTerminationRejected.Message,
HTTPResponse: handler.HTTPResponse{
StatusCode: 200,
Body: "foobar",
Header: handler.HTTPHeader{
"X-Hello": "here",
"Content-Type": "text/plain; charset=utf-8",
},
},
}, err)

// Successful post-* hooks
uploadHandler.CreatedUploads <- event
uploadHandler.UploadProgress <- event