-
Notifications
You must be signed in to change notification settings - Fork 42
timeout: fix data races via copied context; abort main context post finish/timeout; restore fast-path BufferPool; tests pass with -race #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8e4a11c
7ebbdf0
19c8214
0f981a5
262c645
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,27 +5,42 @@ import ( | |||||||||||||||
| "sync" | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| // BufferPool represents a pool of buffers. | ||||||||||||||||
| // It uses sync.Pool to manage the reuse of buffers, reducing memory allocation and garbage collection overhead. | ||||||||||||||||
| // BufferPool represents a pool of buffers with a fast-path cache for the last buffer. | ||||||||||||||||
| type BufferPool struct { | ||||||||||||||||
| pool sync.Pool | ||||||||||||||||
| mu sync.Mutex | ||||||||||||||||
| last *bytes.Buffer | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Get returns a buffer from the buffer pool. | ||||||||||||||||
| // If the pool is empty, a new buffer is created and returned. | ||||||||||||||||
| // This method ensures the reuse of buffers, improving performance. | ||||||||||||||||
| // Get returns a buffer from the buffer pool. It first tries to return the most | ||||||||||||||||
| // recently returned buffer without touching sync.Pool to minimize contention. | ||||||||||||||||
| func (p *BufferPool) Get() *bytes.Buffer { | ||||||||||||||||
| p.mu.Lock() | ||||||||||||||||
| if p.last != nil { | ||||||||||||||||
| b := p.last | ||||||||||||||||
| p.last = nil | ||||||||||||||||
| p.mu.Unlock() | ||||||||||||||||
| return b | ||||||||||||||||
| } | ||||||||||||||||
| p.mu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| buf := p.pool.Get() | ||||||||||||||||
| if buf == nil { | ||||||||||||||||
| // If there are no available buffers in the pool, create a new one | ||||||||||||||||
| return &bytes.Buffer{} | ||||||||||||||||
| } | ||||||||||||||||
| // Convert the retrieved buffer to *bytes.Buffer type and return it | ||||||||||||||||
| return buf.(*bytes.Buffer) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Put adds a buffer back to the pool. | ||||||||||||||||
| // This method allows the buffer to be reused in the future, reducing the number of memory allocations. | ||||||||||||||||
| // Put adds a buffer back to the pool without resetting it. The caller is | ||||||||||||||||
| // responsible for calling Reset() when appropriate. | ||||||||||||||||
| func (p *BufferPool) Put(buf *bytes.Buffer) { | ||||||||||||||||
|
Comment on lines
+34
to
36
|
||||||||||||||||
| // Put adds a buffer back to the pool without resetting it. The caller is | |
| // responsible for calling Reset() when appropriate. | |
| func (p *BufferPool) Put(buf *bytes.Buffer) { | |
| // Put adds a buffer back to the pool, automatically resetting it to prevent | |
| // dirty buffers from being reused. | |
| func (p *BufferPool) Put(buf *bytes.Buffer) { | |
| buf.Reset() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,10 @@ | |
| import ( | ||
| "fmt" | ||
| "net/http" | ||
| "reflect" | ||
| "runtime/debug" | ||
| "time" | ||
| "unsafe" | ||
|
|
||
| "github.com/gin-gonic/gin" | ||
| ) | ||
|
|
@@ -53,6 +55,8 @@ | |
| cCopy := c.Copy() | ||
| // Set the copied context's writer to our timeout writer to ensure proper buffering | ||
| cCopy.Writer = tw | ||
| // Clone handler state so cCopy can continue the chain independently | ||
| cloneHandlerState(cCopy, c) | ||
|
|
||
| // Channel to signal handler completion. | ||
| finish := make(chan struct{}, 1) | ||
|
|
@@ -69,8 +73,8 @@ | |
| } | ||
| } | ||
| }() | ||
| // Use the copied context to avoid data race when running handler in a goroutine. | ||
| c.Next() | ||
| // Execute the remaining handlers on the copied context. | ||
| cCopy.Next() | ||
| finish <- struct{}{} | ||
| }() | ||
|
|
||
|
|
@@ -79,20 +83,15 @@ | |
| // Handler panicked: free buffer, restore writer, and print stack trace if in debug mode. | ||
| tw.FreeBuffer() | ||
| c.Writer = w | ||
| // If in debug mode, write error and stack trace to response for easier debugging. | ||
| // Always write a 500 with a panic message. In debug, also include the stack trace. | ||
| _ = c.Error(fmt.Errorf("%v", pi.Value)) | ||
| c.Writer.WriteHeader(http.StatusInternalServerError) | ||
| _, _ = fmt.Fprintf(c.Writer, "panic caught: %v\n", pi.Value) | ||
| if gin.IsDebugging() { | ||
| // Add the panic error to Gin's error list and write 500 status and stack trace to response. | ||
| // Check the error return value of c.Error to satisfy errcheck linter. | ||
| _ = c.Error(fmt.Errorf("%v", pi.Value)) | ||
| c.Writer.WriteHeader(http.StatusInternalServerError) | ||
| // Use fmt.Fprintf instead of Write([]byte(fmt.Sprintf(...))) to satisfy staticcheck. | ||
| _, _ = fmt.Fprintf(c.Writer, "panic caught: %v\n", pi.Value) | ||
| _, _ = c.Writer.Write([]byte("Panic stack trace:\n")) | ||
| _, _ = c.Writer.Write(pi.Stack) | ||
| return | ||
| } | ||
| // In non-debug mode, re-throw the original panic value to be handled by the upper middleware. | ||
| panic(pi.Value) | ||
| return | ||
|
||
| case <-finish: | ||
| // Handler finished successfully: flush buffer to response. | ||
| tw.mu.Lock() | ||
|
|
@@ -116,6 +115,9 @@ | |
| tw.FreeBuffer() | ||
| bufPool.Put(buffer) | ||
|
|
||
| // Prevent Gin from executing subsequent handlers; we've already run them via cCopy | ||
| c.Abort() | ||
|
||
|
|
||
| case <-time.After(t.timeout): | ||
| tw.mu.Lock() | ||
| // Handler timed out: set timeout flag and clean up | ||
|
|
@@ -133,8 +135,33 @@ | |
| if !w.Written() { | ||
| t.response(timeoutCtx) | ||
| } | ||
| // Abort the context to prevent further middleware execution after timeout | ||
| c.AbortWithStatus(http.StatusRequestTimeout) | ||
| // Prevent Gin from executing subsequent handlers after timeout. | ||
| c.Abort() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // cloneHandlerState copies the handler chain and index from src to dst using reflection/unsafe | ||
| // so that dst.Next() can proceed with the same remaining handlers independently. | ||
| func cloneHandlerState(dst, src *gin.Context) { | ||
| vdst := reflect.ValueOf(dst).Elem() | ||
| vsrc := reflect.ValueOf(src).Elem() | ||
|
|
||
| // Copy handlers slice | ||
| srcHandlersField := vsrc.FieldByName("handlers") | ||
| dstHandlersField := vdst.FieldByName("handlers") | ||
| if srcHandlersField.IsValid() && dstHandlersField.IsValid() { | ||
| srcHandlers := reflect.NewAt(srcHandlersField.Type(), unsafe.Pointer(srcHandlersField.UnsafeAddr())).Elem() | ||
| dstHandlers := reflect.NewAt(dstHandlersField.Type(), unsafe.Pointer(dstHandlersField.UnsafeAddr())).Elem() | ||
| dstHandlers.Set(srcHandlers) | ||
| } | ||
|
|
||
| // Copy index | ||
| srcIndexField := vsrc.FieldByName("index") | ||
| dstIndexField := vdst.FieldByName("index") | ||
| if srcIndexField.IsValid() && dstIndexField.IsValid() { | ||
| srcIndex := reflect.NewAt(srcIndexField.Type(), unsafe.Pointer(srcIndexField.UnsafeAddr())).Elem() | ||
| dstIndex := reflect.NewAt(dstIndexField.Type(), unsafe.Pointer(dstIndexField.UnsafeAddr())).Elem() | ||
| dstIndex.Set(srcIndex) | ||
| } | ||
| } | ||
|
Comment on lines
+146
to
+167
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fast-path optimization introduces mutex contention on every Get() call, which may negate the benefit of avoiding sync.Pool. Under high concurrency, this mutex could become a bottleneck. Consider using atomic operations or sync.Pool's inherent per-P caching instead.