diff --git a/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go b/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go new file mode 100644 index 0000000..6c41327 --- /dev/null +++ b/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine/opid" + "log/slog" + "os" + "runtime/pprof" + "time" +) + +func main() { + slog.Info("Program started") + + for i := 0; i < 10; i++ { + foo(opid.NewContext()) + } + + // Wait enough time to have some routine started + time.Sleep(4 * time.Second) + + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + fmt.Scanln() +} + +func foo(ctx context.Context) { + slog.Info("foo() started", + "opid", opid.FromContext(ctx)) + bar(ctx) + slog.Info("foo() ended", + "opid", opid.FromContext(ctx)) +} + +func bar(ctx context.Context) { + slog.Info("bar() started", + "opid", opid.FromContext(ctx)) + go parentGoroutine(ctx) + slog.Info("bar() ended", + "opid", opid.FromContext(ctx)) +} + +func parentGoroutine(ctx context.Context) { + slog.Info("parentGoroutine() started", + "opid", opid.FromContext(ctx)) + + go stuckInSelect() + time.Sleep(500 * time.Millisecond) + slog.Info("parentGoroutine() ended", + "opid", opid.FromContext(ctx)) +} + +func stuckInSelect() { + slog.Info("stuckInSelect() started") + select {} + slog.Info("stuckInSelect() ended") +} diff --git a/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go b/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go new file mode 100644 index 0000000..a01c23c --- /dev/null +++ b/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/opid" + "log/slog" + "time" +) + +var _ async.RoutinesObserver = (*exampleRoutineObserver)(nil) + +func main() { + slog.Info("Program started") + + // Setup the AsyncRoutineManager + async.Manager( + // Take a snapshot every 2 seconds + async.WithSnapshottingInterval(2 * time.Second)). + // Start the routine monitor + Monitor().Start() + + // Add our custom observer to the list of routine observers + _ = async.Manager().AddObserver(&exampleRoutineObserver{}) + + for i := 0; i < 10; i++ { + foo(opid.NewContext()) + } + + // Wait enough time to have some routine started + time.Sleep(4 * time.Second) +} + +func foo(ctx context.Context) { + slog.Info("foo() started", + "opid", opid.FromContext(ctx)) + bar(ctx) + slog.Info("foo() ended", + "opid", opid.FromContext(ctx)) +} + +func bar(ctx context.Context) { + slog.Info("bar() started", + "opid", opid.FromContext(ctx)) + async.NewAsyncRoutine("parent go routine", ctx, + func() { + parentGoroutine(ctx) + }). + Timebox(2 * time.Second). + Run() + slog.Info("bar() ended", + "opid", opid.FromContext(ctx)) +} + +func parentGoroutine(ctx context.Context) { + slog.Info("parentGoroutine() started", + "opid", opid.FromContext(ctx)) + + async.NewAsyncRoutine("stuck in select", ctx, stuckInSelect). + Timebox(2 * time.Second). + Run() + time.Sleep(500 * time.Millisecond) + + slog.Info("parentGoroutine() ended", + "opid", opid.FromContext(ctx)) +} + +func stuckInSelect() { + slog.Info("parentGoroutine() started") + select {} + slog.Info("parentGoroutine() ended") +} + +type exampleRoutineObserver struct{} + +func (e exampleRoutineObserver) RoutineStarted(routine async.AsyncRoutine) { + slog.Info("Routine started", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + ) +} + +func (e exampleRoutineObserver) RoutineFinished(routine async.AsyncRoutine) { + slog.Info("Routine finished", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + ) +} + +func (e exampleRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) { + slog.Warn("Routine exceeded timebox", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + "startedAt", routine.StartedAt(), + ) +} + +func (e exampleRoutineObserver) RunningRoutineCount(count int) { + // nothing to do in this example +} + +func (e exampleRoutineObserver) RunningRoutineByNameCount(name string, count int) { + // nothing to do in this example +} diff --git a/examples/routine_leak/example2/README.md b/examples/routine_leak/example2/README.md new file mode 100644 index 0000000..fd5880a --- /dev/null +++ b/examples/routine_leak/example2/README.md @@ -0,0 +1,13 @@ +# Routine Leak Detection with AsyncRoutineManager +This demonstration illustrates how the `AsyncRoutineManager` help identifying routine leaks and pinpoint their source. + +Each folder, named step1 to stepN, adds a progressive integration of the `AsyncRoutineManager`, starting from the +naked code of `step1` to the full integration in the last step. + +The application we are going to use for this demonstration is very simple: + +1. The `main` function repeatedly starts cycles where it processes multiple websites. + For each cycle, randomly selects 10 website URLs from a predefined list. + For each selected URL, asynchronously invokes the `getWebsiteResponseSize` +2. The `getWebsiteResponseSize` asynchronously calls the `getResponseSize` and do some fun stuff with the result +3. The `getResponseSize` contacts the site and returns the response size diff --git a/examples/routine_leak/example2/data/data.go b/examples/routine_leak/example2/data/data.go new file mode 100644 index 0000000..00b6135 --- /dev/null +++ b/examples/routine_leak/example2/data/data.go @@ -0,0 +1,69 @@ +package data + +var Websites = []string{ + "https://google.com", + "https://facebook.com", + "https://youtube.com", + "https://amazon.com", + "https://wikipedia.com", + "https://instagram.com", + "https://linkedin.com", + "https://reddit.com", + "https://ebay.com", + "https://microsoft.com", + "https://apple.com", + "https://walmart.com", + "https://espn-bad.com", + "https://bbc.com", + "https://cnn.com", + "https://foxnews.com", + "https://nytimes.com", + "https://forbes.com", + "https://cnbc.com", + "https://theguardian.com", + "https://nbcnews.com", + "https://abc.com", + "https://time.com", + "https://nationalgeographic.com", + "https://wired-bad.com", + "https://techcrunch.com", + "https://engadget.com", + "https://gizmodo.com", + "https://mashable.com", + "https://stackoverflow.com", + "https://github.com", + "https://medium-bad.com", + "https://dropbox.com", + "https://box.com", + "https://slack.com", + "https://zoom.com", + "https://stripe.com", + "https://airbnb.com", + "https://booking.com", + "https://hotels-bad.com", + "https://kayak.com", + "https://monster.com", + "https://edx.com", + "https://duolingo-bad.com", + "https://britannica-bad.com", + "https://dictionary.com", + "https://merriam-webster.com", + "https://thesaurus.com", + "https://weather-bad.com", + "https://trulia-bad.com", + "https://epicurious.com", + "https://tasteofhome.com", + "https://healthline.com", + "https://webmd.com", + "https://mayoclinic-bad.com", + "https://everydayhealth.com", + "https://livestrong.com", + "https://fool-bad.com", + "https://seekingalpha.com", + "https://nerdwallet.com", + "https://bankrate.com", + "https://creditkarma.com", + "https://mint.com", + "https://fidelity.com", + "https://vanguard.com", +} diff --git a/examples/routine_leak/example2/step1/README.md b/examples/routine_leak/example2/step1/README.md new file mode 100644 index 0000000..1063728 --- /dev/null +++ b/examples/routine_leak/example2/step1/README.md @@ -0,0 +1,24 @@ +# STEP1 - the leaking code + +In this step, we will use the native `go` keyword to start go routines. +The code is very simple: + +1. The `main` function repeatedly starts cycles where it processes multiple websites. + For each cycle, randomly selects 10 website URLs from a predefined list. + For each selected URL, asynchronously invokes the `getWebsiteResponseSize` +2. The `getWebsiteResponseSize` asynchronously calls the `getResponseSize` and do some fun stuff with the result +3. The `getResponseSize` contacts the site and returns the response size + +Running the application we can see how the total number of goroutine keeps increasing: +``` +goroutine profile: total 18 +14 @ 0x102d71768 0x102d361a8 0x102d70a10 0x102db7a38 0x102db830c 0x102db82fd 0x102e451d8 0x102e4d974 0x102e87430 0x102dcd820 0x102e87610 0x102e84dac 0x102e8ad44 0x102e8ad45 0x102eb83b4 0x102d9d940 0x102ee5b18 0x102ee5aed 0x102ee6148 0x102ef6b50 0x102ef60c8 0x102d79aa4 +# 0x102d70a0f internal/poll.runtime_pollWait+0x9f +... +... +goroutine profile: total 33 +26 @ 0x102d71768 0x102d361a8 0x102d70a10 0x102db7a38 0x102db830c 0x102db82fd 0x102e451d8 0x102e4d974 0x102e87430 0x102dcd820 0x102e87610 0x102e84dac 0x102e8ad44 0x102e8ad45 0x102eb83b4 0x102d9d940 0x102ee5b18 0x102ee5aed 0x102ee6148 0x102ef6b50 0x102ef60c8 0x102d79aa4 +# 0x102d70a0f internal/poll.runtime_pollWait+0x9f +``` + +Understanding which routine we are leaking is not immediate. diff --git a/examples/routine_leak/example2/step1/app_leaking_routine.go b/examples/routine_leak/example2/step1/app_leaking_routine.go new file mode 100644 index 0000000..54adaca --- /dev/null +++ b/examples/routine_leak/example2/step1/app_leaking_routine.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" + "math/rand" + "net/http" + "os" + "runtime/pprof" + "strconv" + "time" +) + +func main() { + for { + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + go doJob(url) + time.Sleep(500 * time.Millisecond) + } + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } +} + +func doJob(url string) { + resultChan := make(chan int64) + go getResponseSize(url, resultChan) + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size +} + +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() + + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } + + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil +} diff --git a/examples/routine_leak/example2/step2/README.md b/examples/routine_leak/example2/step2/README.md new file mode 100644 index 0000000..e121b4b --- /dev/null +++ b/examples/routine_leak/example2/step2/README.md @@ -0,0 +1,7 @@ +# STEP2 - integrating the AsyncRoutineManager + +In this step, we integrate the `AsyncRoutineManager` to handle routine management. +In fact, incorporating the `AsyncRoutineManager` is almost as simple as replacing the `go` keyword with a call +to the `NewAsyncRoutine` function. +Running this code produces the same behavior as in STEP 1, but now we have established the foundation to +fully leverage the capabilities of the `AsyncRoutineManager`. \ No newline at end of file diff --git a/examples/routine_leak/example2/step2/app_leaking_routine.go b/examples/routine_leak/example2/step2/app_leaking_routine.go new file mode 100644 index 0000000..95f7a6a --- /dev/null +++ b/examples/routine_leak/example2/step2/app_leaking_routine.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" + "math/rand" + "net/http" + "os" + "runtime/pprof" + "strconv" + "time" +) + +func main() { + for { + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } +} + +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size +} + +// GetResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() + + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } + + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil +} diff --git a/examples/routine_leak/example2/step3/README.md b/examples/routine_leak/example2/step3/README.md new file mode 100644 index 0000000..1af3858 --- /dev/null +++ b/examples/routine_leak/example2/step3/README.md @@ -0,0 +1,54 @@ +# STEP3 - Replacing the pproof dump with a Routine Observer + +In this example, we implement an observer that prints the number of running routine: +```go + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} +... +... +``` + +Next, to ensure the observer is automatically notified of all running routines, we need to register the +observer and start the monitor. + +```go +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + ... + ... +} +``` + +The output will be something like this: +``` +running routine count: 2 +running routine count: 3 +running routine count: 3 +running routine count: 2 +running routine count: 4 +running routine count: 4 +running routine count: 2 +running routine count: 4 +running routine count: 4 +running routine count: 2 +running routine count: 3 +running routine count: 5 +running routine count: 3 +running routine count: 3 +running routine count: 5 +running routine count: 5 +running routine count: 3 +running routine count: 4 +running routine count: 4 +running routine count: 4 +running routine count: 5 +running routine count: 5 +running routine count: 7 +``` + +We can see we have a routine leak, but we still have no idea of what is the routine leak. \ No newline at end of file diff --git a/examples/routine_leak/example2/step3/app_leaking_routine.go b/examples/routine_leak/example2/step3/app_leaking_routine.go new file mode 100644 index 0000000..4c7863e --- /dev/null +++ b/examples/routine_leak/example2/step3/app_leaking_routine.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" + "math/rand" + "net/http" + "strconv" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + for { + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } + } +} + +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size +} + +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() + + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } + + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil +} diff --git a/examples/routine_leak/example2/step3/leaking_routine_observer.go b/examples/routine_leak/example2/step3/leaking_routine_observer.go new file mode 100644 index 0000000..cf38f54 --- /dev/null +++ b/examples/routine_leak/example2/step3/leaking_routine_observer.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) {} diff --git a/examples/routine_leak/example2/step4/README.md b/examples/routine_leak/example2/step4/README.md new file mode 100644 index 0000000..4526942 --- /dev/null +++ b/examples/routine_leak/example2/step4/README.md @@ -0,0 +1,50 @@ +# STEP 4 - Identify which routine is being leaked + +In this step, we extend the observer by implementing a method to track the number of active instances +for each routine type at any given time: + +```go +type leakingRoutineObserver struct{} +... +... +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { + fmt.Printf("Routine count for %s: %d\n", name, count) +} +... +... +``` + +The output will be similar to this: +``` +... +running routine count: 10 +Routine count for do-job: 9 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +``` +The `async-routine-montor` is the routine the manager started when we started the monitor. +Now it is clear: we are leaking the `do-job` routine. However, we don't have any context, +so it is hard to understand why and when the routine is leaked. \ No newline at end of file diff --git a/examples/routine_leak/example2/step4/app_leaking_routine.go b/examples/routine_leak/example2/step4/app_leaking_routine.go new file mode 100644 index 0000000..4c7863e --- /dev/null +++ b/examples/routine_leak/example2/step4/app_leaking_routine.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" + "math/rand" + "net/http" + "strconv" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + for { + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } + } +} + +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size +} + +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() + + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } + + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil +} diff --git a/examples/routine_leak/example2/step4/leaking_routine_observer.go b/examples/routine_leak/example2/step4/leaking_routine_observer.go new file mode 100644 index 0000000..674f456 --- /dev/null +++ b/examples/routine_leak/example2/step4/leaking_routine_observer.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { + fmt.Printf("Routine count for %s: %d\n", name, count) +} diff --git a/examples/routine_leak/example2/step5/README.md b/examples/routine_leak/example2/step5/README.md new file mode 100644 index 0000000..e73923c --- /dev/null +++ b/examples/routine_leak/example2/step5/README.md @@ -0,0 +1,95 @@ +# STEP 5 - Adding some context + +The `AsyncRoutineManager` gives the ability to add arbitrary data to each routine so that each execution can be tied to a set of data. +We will leverage this ability to restrict even more our view to the leaked routine: we will timebox +the routine to 1 second of execution and we add the received value as routine data. + +```go + async.NewAsyncRoutine("do-job", context.Background(), + func() { + getWebsiteResponseSize(url) + }). + Timebox(5*time.Second). + WithData("url", url). + Run() + time.Sleep(500 * time.Millisecond) +``` + +Now the output will be similar to: +``` +... +routine: [count: 6] +routine: [count: 6] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.415189 +0000 UTC data: map[url:https://fool-bad.com]] +routine: [count: 4] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.415189 +0000 UTC data: map[url:https://fool-bad.com]] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.915739 +0000 UTC data: map[url:https://weather-bad.com]] +routine: [count: 4] +... +``` +Analyzing the data, we observe that the routine leaks occur when there is an error with the website url. +Inspecting the code we see that `getWebsiteResponseSize` calls `getResponseSize` but ignores any error: +```go + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + + ... + ... + func getResponseSize(url string, ch chan<- int64) error { +``` + +Let's fix the code: +```go + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + err := getResponseSize(url, resultChan) + if err != nil { + resultChan <- -1 + fmt.Println("error fetching website size:", err) + } + }).Run() + + ... + ... + func getResponseSize(url string, ch chan<- int64) error { +``` + +Now the output is: +```go +routine: [count: 1] +Error getting response size: site unreachable: Get "https://duolingo-bad.com": dial tcp: lookup duolingo-bad.com: no such host +routine: [count: 1] +routine: [count: 1] +Error getting response size: site unreachable: Get "https://trulia-bad.com": dial tcp: lookup trulia-bad.com: no such host +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 3] +routine: [count: 3] +routine: [count: 5] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 1] +routine: [count: 1] +``` + +and we don't have leaked routines anymore: we can say that because we don't have any routine exceeding the timebox +anymore and the routine count always drops to 1 (the only running routine: the `async-routine-monitor` one) \ No newline at end of file diff --git a/examples/routine_leak/example2/step5/app_leaking_routine.go b/examples/routine_leak/example2/step5/app_leaking_routine.go new file mode 100644 index 0000000..dc2b898 --- /dev/null +++ b/examples/routine_leak/example2/step5/app_leaking_routine.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" + "math/rand" + "net/http" + "strconv" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + for { + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine("do-job", context.Background(), + func() { + doJob(url) + }). + Timebox(5*time.Second). + WithData("url", url). + Run() + time.Sleep(500 * time.Millisecond) + } + } +} + +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size +} + +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() + + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } + + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil +} diff --git a/examples/routine_leak/example2/step5/leaking_routine_observer.go b/examples/routine_leak/example2/step5/leaking_routine_observer.go new file mode 100644 index 0000000..a3e237f --- /dev/null +++ b/examples/routine_leak/example2/step5/leaking_routine_observer.go @@ -0,0 +1,25 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) { + fmt.Printf("leaked routine: [name: %s started-at: %v data: %s]\n", routine.Name(), routine.StartedAt(), routine.GetData()) +} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { +}