-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(feat) Ingestor shutdown via pod annotation #560
base: main
Are you sure you want to change the base?
Conversation
ingestor/service.go
Outdated
@@ -343,6 +348,12 @@ func (s *Service) HandleTransfer(w http.ResponseWriter, r *http.Request) { | |||
} | |||
}() | |||
|
|||
if !s.isOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use the DisableWrites()
which will shutdown the HTTP server altogether and close the Store
. That will prevent new writes from all paths.
isOpen
actually creates a race condition because HandleTransfer
reads it and Close
writes it in separate goroutines. If you add a mutex to protect it, it creates lock contentions and slows write throughput if not done carefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisableWrites() function I only see close on store and metrics service. How does it shutdown http server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake. Looks like the http shutdown is done in main
just before DisableWrites
is called. We could either refactor the Service to include the http server so that DisableWrites
could call it or use isOpen
approach. I'd say use isOpen
but switch it to use atomics vs member var w/ a mutex. I'd suggest going with the atomics for now (even though it's still racy), and then we can refactor the server/service separately and switch to that later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted to using annotations based invocation on the ingestor pod. Hope this mitigates the denial of attack concerns.
cmd/ingestor/main.go
Outdated
@@ -365,6 +365,7 @@ func realMain(ctx *cli.Context) error { | |||
mux := http.NewServeMux() | |||
mux.HandleFunc("/transfer", svc.HandleTransfer) | |||
mux.HandleFunc("/readyz", svc.HandleReady) | |||
mux.HandleFunc("/shutdown", svc.HandleShutdown) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't put this on the exposed API. You could run a denial of service attack with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer exposing an endpoint. Thanks
//get ingestor pod in which this runner is running | ||
pod, err := r.k8sClient.CoreV1().Pods(namespace).Get(ctx, os.Getenv("HOSTNAME"), metav1.GetOptions{}) | ||
if err != nil { | ||
logger.Errorf("failed to get pod annotations: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return fmt.Errorf('... %w') since caller logs the error already.
if _, ok := pod.Annotations[SHUTDOWN_REQUESTED]; ok { | ||
logger.Infof("shutting down the service") | ||
if err := r.httpServer.Close(); err != nil { | ||
logger.Errorf("failed to close http server: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return wrapped error
} | ||
|
||
if err := r.service.Shutdown(); err != nil { | ||
logger.Errorf("failed to shutdown the service: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return wrapped error
logger.Errorf("failed to shutdown the service: %v", err) | ||
return err | ||
} | ||
logger.Infof("service shutdown completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize Service
for log messages to keep consistent with rest of the code base.
//set shutdown-completed annotation | ||
pod.Annotations[SHUTDOWN_COMPLETED] = "true" | ||
if _, err := r.k8sClient.CoreV1().Pods(namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil { | ||
logger.Errorf("failed to set shutdown-completed annotation: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the wrapped error
|
||
//check if shutdown-completed annotation is set | ||
if _, ok := pod.Annotations[SHUTDOWN_COMPLETED]; ok { | ||
logger.Infof("shutdown already completed on the pod, skipping shutting down") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capital Shutdown
|
||
//shutdown the service | ||
if _, ok := pod.Annotations[SHUTDOWN_REQUESTED]; ok { | ||
logger.Infof("shutting down the service") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize first word
} | ||
logger.Infof("service shutdown completed") | ||
//set shutdown-completed annotation | ||
pod.Annotations[SHUTDOWN_COMPLETED] = "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should only get set after the all uploads have completed?
//shutdown the service | ||
if _, ok := pod.Annotations[SHUTDOWN_REQUESTED]; ok { | ||
logger.Infof("shutting down the service") | ||
if err := r.httpServer.Close(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if a shutdown is request and the pods just exits unexpectedly? How can we make this SHUTDOWN_REQUESTED phase idempotent and resumable?
return err | ||
} | ||
|
||
if err := s.Close(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you want to Close
here because then the pod will just restart and be able to take new writes. I think it would be better to have the instance stay in this disabled state and signal back that shudown is complete here. The operator can then see that pod has signaled shutdown is completed and can delete the pod/scale down the statefulset.
That actually reminds me that when we scale down we have to do it in reverse order because of the way that statefulsets works.
Adding a service shutdown endpoint to the ingestor service. It stops the service from accepting new requests by sending a 503 and waits until all writes have been completed. This can eventually be used by ingestor scaler to scale down service pods safely.