From eed1eeb4cc6fa779b8c2c38dda43c11b6d9fdaac Mon Sep 17 00:00:00 2001 From: Massimiliano Ziccardi Date: Wed, 7 May 2025 10:25:14 +0200 Subject: [PATCH 1/2] Adds some example --- .../without_async_routine_manager.go | 57 ++++++++++ .../with_async_routine_manager.go | 107 ++++++++++++++++++ examples/routine_leak/example2/README.md | 13 +++ .../routine_leak/example2/step1/README.md | 24 ++++ .../example2/step1/app_leaking_routine.go | 44 +++++++ .../routine_leak/example2/step2/README.md | 7 ++ .../example2/step2/app_leaking_routine.go | 52 +++++++++ .../routine_leak/example2/step3/README.md | 40 +++++++ .../example2/step3/app_leaking_routine.go | 51 +++++++++ .../step3/leaking_routine_observer.go | 22 ++++ .../routine_leak/example2/step4/README.md | 37 ++++++ .../example2/step4/app_leaking_routine.go | 51 +++++++++ .../step4/leaking_routine_observer.go | 24 ++++ .../routine_leak/example2/step5/README.md | 27 +++++ .../example2/step5/app_leaking_routine.go | 54 +++++++++ .../step5/leaking_routine_observer.go | 24 ++++ 16 files changed, 634 insertions(+) create mode 100644 examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go create mode 100644 examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go create mode 100644 examples/routine_leak/example2/README.md create mode 100644 examples/routine_leak/example2/step1/README.md create mode 100644 examples/routine_leak/example2/step1/app_leaking_routine.go create mode 100644 examples/routine_leak/example2/step2/README.md create mode 100644 examples/routine_leak/example2/step2/app_leaking_routine.go create mode 100644 examples/routine_leak/example2/step3/README.md create mode 100644 examples/routine_leak/example2/step3/app_leaking_routine.go create mode 100644 examples/routine_leak/example2/step3/leaking_routine_observer.go create mode 100644 examples/routine_leak/example2/step4/README.md create mode 100644 examples/routine_leak/example2/step4/app_leaking_routine.go create mode 100644 examples/routine_leak/example2/step4/leaking_routine_observer.go create mode 100644 examples/routine_leak/example2/step5/README.md create mode 100644 examples/routine_leak/example2/step5/app_leaking_routine.go create mode 100644 examples/routine_leak/example2/step5/leaking_routine_observer.go diff --git a/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go b/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go new file mode 100644 index 0000000..6c41327 --- /dev/null +++ b/examples/routine_leak/example1_no_async_routine_manager/without_async_routine_manager.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine/opid" + "log/slog" + "os" + "runtime/pprof" + "time" +) + +func main() { + slog.Info("Program started") + + for i := 0; i < 10; i++ { + foo(opid.NewContext()) + } + + // Wait enough time to have some routine started + time.Sleep(4 * time.Second) + + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + fmt.Scanln() +} + +func foo(ctx context.Context) { + slog.Info("foo() started", + "opid", opid.FromContext(ctx)) + bar(ctx) + slog.Info("foo() ended", + "opid", opid.FromContext(ctx)) +} + +func bar(ctx context.Context) { + slog.Info("bar() started", + "opid", opid.FromContext(ctx)) + go parentGoroutine(ctx) + slog.Info("bar() ended", + "opid", opid.FromContext(ctx)) +} + +func parentGoroutine(ctx context.Context) { + slog.Info("parentGoroutine() started", + "opid", opid.FromContext(ctx)) + + go stuckInSelect() + time.Sleep(500 * time.Millisecond) + slog.Info("parentGoroutine() ended", + "opid", opid.FromContext(ctx)) +} + +func stuckInSelect() { + slog.Info("stuckInSelect() started") + select {} + slog.Info("stuckInSelect() ended") +} diff --git a/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go b/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go new file mode 100644 index 0000000..a01c23c --- /dev/null +++ b/examples/routine_leak/example1_with_async_routine_manager/with_async_routine_manager.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/opid" + "log/slog" + "time" +) + +var _ async.RoutinesObserver = (*exampleRoutineObserver)(nil) + +func main() { + slog.Info("Program started") + + // Setup the AsyncRoutineManager + async.Manager( + // Take a snapshot every 2 seconds + async.WithSnapshottingInterval(2 * time.Second)). + // Start the routine monitor + Monitor().Start() + + // Add our custom observer to the list of routine observers + _ = async.Manager().AddObserver(&exampleRoutineObserver{}) + + for i := 0; i < 10; i++ { + foo(opid.NewContext()) + } + + // Wait enough time to have some routine started + time.Sleep(4 * time.Second) +} + +func foo(ctx context.Context) { + slog.Info("foo() started", + "opid", opid.FromContext(ctx)) + bar(ctx) + slog.Info("foo() ended", + "opid", opid.FromContext(ctx)) +} + +func bar(ctx context.Context) { + slog.Info("bar() started", + "opid", opid.FromContext(ctx)) + async.NewAsyncRoutine("parent go routine", ctx, + func() { + parentGoroutine(ctx) + }). + Timebox(2 * time.Second). + Run() + slog.Info("bar() ended", + "opid", opid.FromContext(ctx)) +} + +func parentGoroutine(ctx context.Context) { + slog.Info("parentGoroutine() started", + "opid", opid.FromContext(ctx)) + + async.NewAsyncRoutine("stuck in select", ctx, stuckInSelect). + Timebox(2 * time.Second). + Run() + time.Sleep(500 * time.Millisecond) + + slog.Info("parentGoroutine() ended", + "opid", opid.FromContext(ctx)) +} + +func stuckInSelect() { + slog.Info("parentGoroutine() started") + select {} + slog.Info("parentGoroutine() ended") +} + +type exampleRoutineObserver struct{} + +func (e exampleRoutineObserver) RoutineStarted(routine async.AsyncRoutine) { + slog.Info("Routine started", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + ) +} + +func (e exampleRoutineObserver) RoutineFinished(routine async.AsyncRoutine) { + slog.Info("Routine finished", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + ) +} + +func (e exampleRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) { + slog.Warn("Routine exceeded timebox", + "name", routine.Name(), + "opid", routine.OpId(), + "parent-opid", routine.OriginatorOpId(), + "startedAt", routine.StartedAt(), + ) +} + +func (e exampleRoutineObserver) RunningRoutineCount(count int) { + // nothing to do in this example +} + +func (e exampleRoutineObserver) RunningRoutineByNameCount(name string, count int) { + // nothing to do in this example +} diff --git a/examples/routine_leak/example2/README.md b/examples/routine_leak/example2/README.md new file mode 100644 index 0000000..a598044 --- /dev/null +++ b/examples/routine_leak/example2/README.md @@ -0,0 +1,13 @@ +# Routine Leak Detection with AsyncRoutineManager +This demonstration illustrates how the `AsyncRoutineManager` help identifying routine leaks and pinpoint their source. + +Each folder, named step1 to stepN, adds a progressive integration of the `AsyncRoutineManager`, starting from the +naked code of `step1` to the full integration in the last step. + +The application we are going to use for this demonstration is very simple: + +1. The `main` function starts the `doJob` go routine and then, every 4 seconds, prints the `pproof` goroutine dump +2. `doJob` runs indefinitely and every 50 milliseconds invokes the `foo` function +3. `foo` just invokes `bar` wich in turn starts the `parentGoRoutine` routine +4. `parentGoRoutine` starts the `doInterestingStuff` routine passing a random number between 0 and 99 +5. `doInterestingStuff` hangs indefinitely if the received value is a multiple of 4, otherwise exits \ No newline at end of file diff --git a/examples/routine_leak/example2/step1/README.md b/examples/routine_leak/example2/step1/README.md new file mode 100644 index 0000000..e7a428f --- /dev/null +++ b/examples/routine_leak/example2/step1/README.md @@ -0,0 +1,24 @@ +# STEP1 - the leaking code + +In this step, we will use the native `go` keyword to start go routines. +The code is very simple: + +1. The `main` function starts the `doJob` go routine and then, every 4 seconds, prints the `pproof` goroutine dump +2. `doJob` runs indefinitely and every 50 milliseconds invokes the `foo` function +3. `foo` just invokes `bar` wich in turn starts the `parentGoRoutine` routine +4. `parentGoRoutine` starts the `doInterestingStuff` routine passing a random number between 0 and 99 +5. `doInterestingStuff` hangs indefinitely if the received value is a multiple of 4, otherwise exits + +Running the application we can see how the total number of goroutine keeps increasing: +``` +goroutine profile: total 32 +20 @ 0x100e9caa8 0x100e7d520 0x100ef588c 0x100ea42b4 +# 0x100ef588b main.doInterestingStuff+0x3b +... +... +goroutine profile: total 50 +38 @ 0x100e9caa8 0x100e7d520 0x100ef588c 0x100ea42b4 +# 0x100ef588b main.doInterestingStuff+0x3b +``` + +We can see that the leaked routine os the `doInterestingStuff` but we have no idea of the context of execution. \ No newline at end of file diff --git a/examples/routine_leak/example2/step1/app_leaking_routine.go b/examples/routine_leak/example2/step1/app_leaking_routine.go new file mode 100644 index 0000000..83b182a --- /dev/null +++ b/examples/routine_leak/example2/step1/app_leaking_routine.go @@ -0,0 +1,44 @@ +package main + +import ( + "math/rand" + "os" + "runtime/pprof" + "time" +) + +func main() { + go doJob() + + for { + time.Sleep(4 * time.Second) + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } + +} + +func doJob() { + for { + foo() + time.Sleep(50 * time.Millisecond) + } +} + +func foo() { + bar() +} + +func bar() { + go parentGoroutine() +} + +func parentGoroutine() { + go doInterestingStuff(rand.Intn(100)) + time.Sleep(500 * time.Millisecond) +} + +func doInterestingStuff(value int) { + if value%4 == 0 { + select {} + } +} diff --git a/examples/routine_leak/example2/step2/README.md b/examples/routine_leak/example2/step2/README.md new file mode 100644 index 0000000..e121b4b --- /dev/null +++ b/examples/routine_leak/example2/step2/README.md @@ -0,0 +1,7 @@ +# STEP2 - integrating the AsyncRoutineManager + +In this step, we integrate the `AsyncRoutineManager` to handle routine management. +In fact, incorporating the `AsyncRoutineManager` is almost as simple as replacing the `go` keyword with a call +to the `NewAsyncRoutine` function. +Running this code produces the same behavior as in STEP 1, but now we have established the foundation to +fully leverage the capabilities of the `AsyncRoutineManager`. \ No newline at end of file diff --git a/examples/routine_leak/example2/step2/app_leaking_routine.go b/examples/routine_leak/example2/step2/app_leaking_routine.go new file mode 100644 index 0000000..200d261 --- /dev/null +++ b/examples/routine_leak/example2/step2/app_leaking_routine.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "github.com/openshift-online/async-routine" + "math/rand" + "os" + "runtime/pprof" + "time" +) + +func main() { + async. + NewAsyncRoutine("main-job", context.Background(), doJob). + Run() + + for { + time.Sleep(4 * time.Second) + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } + +} + +func doJob() { + for { + foo() + time.Sleep(50 * time.Millisecond) + } +} + +func foo() { + bar() +} + +func bar() { + async. + NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). + Run() +} + +func parentGoroutine() { + async. + NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). + Run() + time.Sleep(500 * time.Millisecond) +} + +func doInterestingStuff(value int) { + if value%4 == 0 { + select {} + } +} diff --git a/examples/routine_leak/example2/step3/README.md b/examples/routine_leak/example2/step3/README.md new file mode 100644 index 0000000..dd9c598 --- /dev/null +++ b/examples/routine_leak/example2/step3/README.md @@ -0,0 +1,40 @@ +# STEP3 - Replacing the pproof dump with a Routine Observer + +In this example, we implement an observer that prints the number of running routine: +```go + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} +... +... +``` + +Next, to ensure the observer is automatically notified of all running routines, we need to register the +observer and start the monitor. + +```go +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + ... + ... +} +``` + +The output will be something like this: +``` +running routine count: 14 +running routine count: 17 +running routine count: 19 +running routine count: 21 +running routine count: 23 +running routine count: 28 +running routine count: 30 +running routine count: 31 +... +``` + +We can see we have a routine leak, but we still have no idea of what is the routine leak. \ No newline at end of file diff --git a/examples/routine_leak/example2/step3/app_leaking_routine.go b/examples/routine_leak/example2/step3/app_leaking_routine.go new file mode 100644 index 0000000..bcc9c5b --- /dev/null +++ b/examples/routine_leak/example2/step3/app_leaking_routine.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "math/rand" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + + async. + NewAsyncRoutine("main-job", context.Background(), doJob). + Run() + + // wait for enter to be pressed + _, _ = fmt.Scanln() +} + +func doJob() { + for { + foo() + time.Sleep(50 * time.Millisecond) + } +} + +func foo() { + bar() +} + +func bar() { + async. + NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). + Run() +} + +func parentGoroutine() { + async. + NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). + Run() + time.Sleep(500 * time.Millisecond) +} + +func doInterestingStuff(value int) { + if value%4 == 0 { + select {} + } +} diff --git a/examples/routine_leak/example2/step3/leaking_routine_observer.go b/examples/routine_leak/example2/step3/leaking_routine_observer.go new file mode 100644 index 0000000..cf38f54 --- /dev/null +++ b/examples/routine_leak/example2/step3/leaking_routine_observer.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) {} diff --git a/examples/routine_leak/example2/step4/README.md b/examples/routine_leak/example2/step4/README.md new file mode 100644 index 0000000..b479e66 --- /dev/null +++ b/examples/routine_leak/example2/step4/README.md @@ -0,0 +1,37 @@ +# STEP 4 - Identify which routine is being leaked + +In this step, we extend the observer by implementing a method to track the number of active instances +for each routine type at any given time: + +```go +type leakingRoutineObserver struct{} +... +... +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { + fmt.Printf("Routine count for %s: %d\n", name, count) +} +... +... +``` + +The output will be similar to this: +``` +running routine count: 30 +Routine count for run-command: 18 +Routine count for async-routine-monitor: 1 +Routine count for parent-go-routine: 10 +Routine count for main-job: 1 +running routine count: 31 +Routine count for run-command: 20 +Routine count for parent-go-routine: 9 +Routine count for async-routine-monitor: 1 +Routine count for main-job: 1 +running routine count: 37 +Routine count for run-command: 25 +Routine count for parent-go-routine: 10 +Routine count for async-routine-monitor: 1 +Routine count for main-job: 1 +``` + +Now it is clear: we are leaking the `run-command` routine. However, we don't have any context, +so it is hard to understand why and when the routine is leaked. \ No newline at end of file diff --git a/examples/routine_leak/example2/step4/app_leaking_routine.go b/examples/routine_leak/example2/step4/app_leaking_routine.go new file mode 100644 index 0000000..bcc9c5b --- /dev/null +++ b/examples/routine_leak/example2/step4/app_leaking_routine.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "math/rand" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + + async. + NewAsyncRoutine("main-job", context.Background(), doJob). + Run() + + // wait for enter to be pressed + _, _ = fmt.Scanln() +} + +func doJob() { + for { + foo() + time.Sleep(50 * time.Millisecond) + } +} + +func foo() { + bar() +} + +func bar() { + async. + NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). + Run() +} + +func parentGoroutine() { + async. + NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). + Run() + time.Sleep(500 * time.Millisecond) +} + +func doInterestingStuff(value int) { + if value%4 == 0 { + select {} + } +} diff --git a/examples/routine_leak/example2/step4/leaking_routine_observer.go b/examples/routine_leak/example2/step4/leaking_routine_observer.go new file mode 100644 index 0000000..674f456 --- /dev/null +++ b/examples/routine_leak/example2/step4/leaking_routine_observer.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { + fmt.Printf("Routine count for %s: %d\n", name, count) +} diff --git a/examples/routine_leak/example2/step5/README.md b/examples/routine_leak/example2/step5/README.md new file mode 100644 index 0000000..7af4643 --- /dev/null +++ b/examples/routine_leak/example2/step5/README.md @@ -0,0 +1,27 @@ +# STEP 5 - Adding some context + +The `AsyncRoutineManager` gives the ability to add arbitrary data to each routine so that each execution can be tied to a set of data. +We will leverage this ability to restrict even more our view to the leaked routine: we will timebox +the routine to 1 second of execution and we add the received value as routine data. + +```go + async. + NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(data) }). + WithData("data", fmt.Sprintf("%d", data)). + Timebox(1 * time.Second). + Run() +``` + +Now the output will be similar to: +``` +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.218663 +0000 UTC data: map[data:52]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:59.038886 +0000 UTC data: map[data:56]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.218663 +0000 UTC data: map[data:52]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.987592 +0000 UTC data: map[data:72]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.834007 +0000 UTC data: map[data:16]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.885224 +0000 UTC data: map[data:84]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.680921 +0000 UTC data: map[data:44]] +leaked routine: [name: run-command started-at: 2025-05-07 15:35:59.038886 +0000 UTC data: map[data:56]] +``` +Analyzing the data, we observe that the routine leaks occur only when the data is even. +Upon closer inspection, it becomes evident that the leaks happen specifically when the data is divisible by 4. \ No newline at end of file diff --git a/examples/routine_leak/example2/step5/app_leaking_routine.go b/examples/routine_leak/example2/step5/app_leaking_routine.go new file mode 100644 index 0000000..d1c0833 --- /dev/null +++ b/examples/routine_leak/example2/step5/app_leaking_routine.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + "fmt" + "github.com/openshift-online/async-routine" + "math/rand" + "time" +) + +func main() { + async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() + async.Manager().AddObserver(&leakingRoutineObserver{}) + + async. + NewAsyncRoutine("main-job", context.Background(), doJob). + Run() + + // wait for enter to be pressed + _, _ = fmt.Scanln() +} + +func doJob() { + for { + foo() + time.Sleep(50 * time.Millisecond) + } +} + +func foo() { + bar() +} + +func bar() { + async. + NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). + Run() +} + +func parentGoroutine() { + data := rand.Intn(100) + async. + NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(data) }). + WithData("data", fmt.Sprintf("%d", data)). + Timebox(1 * time.Second). + Run() + time.Sleep(500 * time.Millisecond) +} + +func doInterestingStuff(value int) { + if value%4 == 0 { + select {} + } +} diff --git a/examples/routine_leak/example2/step5/leaking_routine_observer.go b/examples/routine_leak/example2/step5/leaking_routine_observer.go new file mode 100644 index 0000000..0bd201b --- /dev/null +++ b/examples/routine_leak/example2/step5/leaking_routine_observer.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "github.com/openshift-online/async-routine" +) + +var _ async.RoutinesObserver = (*leakingRoutineObserver)(nil) + +type leakingRoutineObserver struct{} + +func (l leakingRoutineObserver) RoutineStarted(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineFinished(routine async.AsyncRoutine) {} + +func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutine) { + fmt.Printf("leaked routine: [name: %s started-at: %v data: %s]\n", routine.Name(), routine.StartedAt(), routine.GetData()) +} + +func (l leakingRoutineObserver) RunningRoutineCount(count int) { +} + +func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) { +} From 8ffa3c26c4584126d97cf0b83c3109ec19c15852 Mon Sep 17 00:00:00 2001 From: Massimiliano Ziccardi Date: Tue, 13 May 2025 10:44:58 +0200 Subject: [PATCH 2/2] chore: updated examples --- examples/routine_leak/example2/README.md | 10 +- examples/routine_leak/example2/data/data.go | 69 +++++++++++++ .../routine_leak/example2/step1/README.md | 24 ++--- .../example2/step1/app_leaking_routine.go | 71 +++++++++----- .../example2/step2/app_leaking_routine.go | 88 +++++++++++------ .../routine_leak/example2/step3/README.md | 32 ++++-- .../example2/step3/app_leaking_routine.go | 88 +++++++++++------ .../routine_leak/example2/step4/README.md | 41 +++++--- .../example2/step4/app_leaking_routine.go | 88 +++++++++++------ .../routine_leak/example2/step5/README.md | 98 ++++++++++++++++--- .../example2/step5/app_leaking_routine.go | 91 +++++++++++------ .../step5/leaking_routine_observer.go | 1 + 12 files changed, 509 insertions(+), 192 deletions(-) create mode 100644 examples/routine_leak/example2/data/data.go diff --git a/examples/routine_leak/example2/README.md b/examples/routine_leak/example2/README.md index a598044..fd5880a 100644 --- a/examples/routine_leak/example2/README.md +++ b/examples/routine_leak/example2/README.md @@ -6,8 +6,8 @@ naked code of `step1` to the full integration in the last step. The application we are going to use for this demonstration is very simple: -1. The `main` function starts the `doJob` go routine and then, every 4 seconds, prints the `pproof` goroutine dump -2. `doJob` runs indefinitely and every 50 milliseconds invokes the `foo` function -3. `foo` just invokes `bar` wich in turn starts the `parentGoRoutine` routine -4. `parentGoRoutine` starts the `doInterestingStuff` routine passing a random number between 0 and 99 -5. `doInterestingStuff` hangs indefinitely if the received value is a multiple of 4, otherwise exits \ No newline at end of file +1. The `main` function repeatedly starts cycles where it processes multiple websites. + For each cycle, randomly selects 10 website URLs from a predefined list. + For each selected URL, asynchronously invokes the `getWebsiteResponseSize` +2. The `getWebsiteResponseSize` asynchronously calls the `getResponseSize` and do some fun stuff with the result +3. The `getResponseSize` contacts the site and returns the response size diff --git a/examples/routine_leak/example2/data/data.go b/examples/routine_leak/example2/data/data.go new file mode 100644 index 0000000..00b6135 --- /dev/null +++ b/examples/routine_leak/example2/data/data.go @@ -0,0 +1,69 @@ +package data + +var Websites = []string{ + "https://google.com", + "https://facebook.com", + "https://youtube.com", + "https://amazon.com", + "https://wikipedia.com", + "https://instagram.com", + "https://linkedin.com", + "https://reddit.com", + "https://ebay.com", + "https://microsoft.com", + "https://apple.com", + "https://walmart.com", + "https://espn-bad.com", + "https://bbc.com", + "https://cnn.com", + "https://foxnews.com", + "https://nytimes.com", + "https://forbes.com", + "https://cnbc.com", + "https://theguardian.com", + "https://nbcnews.com", + "https://abc.com", + "https://time.com", + "https://nationalgeographic.com", + "https://wired-bad.com", + "https://techcrunch.com", + "https://engadget.com", + "https://gizmodo.com", + "https://mashable.com", + "https://stackoverflow.com", + "https://github.com", + "https://medium-bad.com", + "https://dropbox.com", + "https://box.com", + "https://slack.com", + "https://zoom.com", + "https://stripe.com", + "https://airbnb.com", + "https://booking.com", + "https://hotels-bad.com", + "https://kayak.com", + "https://monster.com", + "https://edx.com", + "https://duolingo-bad.com", + "https://britannica-bad.com", + "https://dictionary.com", + "https://merriam-webster.com", + "https://thesaurus.com", + "https://weather-bad.com", + "https://trulia-bad.com", + "https://epicurious.com", + "https://tasteofhome.com", + "https://healthline.com", + "https://webmd.com", + "https://mayoclinic-bad.com", + "https://everydayhealth.com", + "https://livestrong.com", + "https://fool-bad.com", + "https://seekingalpha.com", + "https://nerdwallet.com", + "https://bankrate.com", + "https://creditkarma.com", + "https://mint.com", + "https://fidelity.com", + "https://vanguard.com", +} diff --git a/examples/routine_leak/example2/step1/README.md b/examples/routine_leak/example2/step1/README.md index e7a428f..1063728 100644 --- a/examples/routine_leak/example2/step1/README.md +++ b/examples/routine_leak/example2/step1/README.md @@ -3,22 +3,22 @@ In this step, we will use the native `go` keyword to start go routines. The code is very simple: -1. The `main` function starts the `doJob` go routine and then, every 4 seconds, prints the `pproof` goroutine dump -2. `doJob` runs indefinitely and every 50 milliseconds invokes the `foo` function -3. `foo` just invokes `bar` wich in turn starts the `parentGoRoutine` routine -4. `parentGoRoutine` starts the `doInterestingStuff` routine passing a random number between 0 and 99 -5. `doInterestingStuff` hangs indefinitely if the received value is a multiple of 4, otherwise exits +1. The `main` function repeatedly starts cycles where it processes multiple websites. + For each cycle, randomly selects 10 website URLs from a predefined list. + For each selected URL, asynchronously invokes the `getWebsiteResponseSize` +2. The `getWebsiteResponseSize` asynchronously calls the `getResponseSize` and do some fun stuff with the result +3. The `getResponseSize` contacts the site and returns the response size Running the application we can see how the total number of goroutine keeps increasing: ``` -goroutine profile: total 32 -20 @ 0x100e9caa8 0x100e7d520 0x100ef588c 0x100ea42b4 -# 0x100ef588b main.doInterestingStuff+0x3b +goroutine profile: total 18 +14 @ 0x102d71768 0x102d361a8 0x102d70a10 0x102db7a38 0x102db830c 0x102db82fd 0x102e451d8 0x102e4d974 0x102e87430 0x102dcd820 0x102e87610 0x102e84dac 0x102e8ad44 0x102e8ad45 0x102eb83b4 0x102d9d940 0x102ee5b18 0x102ee5aed 0x102ee6148 0x102ef6b50 0x102ef60c8 0x102d79aa4 +# 0x102d70a0f internal/poll.runtime_pollWait+0x9f ... ... -goroutine profile: total 50 -38 @ 0x100e9caa8 0x100e7d520 0x100ef588c 0x100ea42b4 -# 0x100ef588b main.doInterestingStuff+0x3b +goroutine profile: total 33 +26 @ 0x102d71768 0x102d361a8 0x102d70a10 0x102db7a38 0x102db830c 0x102db82fd 0x102e451d8 0x102e4d974 0x102e87430 0x102dcd820 0x102e87610 0x102e84dac 0x102e8ad44 0x102e8ad45 0x102eb83b4 0x102d9d940 0x102ee5b18 0x102ee5aed 0x102ee6148 0x102ef6b50 0x102ef60c8 0x102d79aa4 +# 0x102d70a0f internal/poll.runtime_pollWait+0x9f ``` -We can see that the leaked routine os the `doInterestingStuff` but we have no idea of the context of execution. \ No newline at end of file +Understanding which routine we are leaking is not immediate. diff --git a/examples/routine_leak/example2/step1/app_leaking_routine.go b/examples/routine_leak/example2/step1/app_leaking_routine.go index 83b182a..54adaca 100644 --- a/examples/routine_leak/example2/step1/app_leaking_routine.go +++ b/examples/routine_leak/example2/step1/app_leaking_routine.go @@ -1,44 +1,71 @@ package main import ( + "fmt" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" "math/rand" + "net/http" "os" "runtime/pprof" + "strconv" "time" ) func main() { - go doJob() - for { - time.Sleep(4 * time.Second) + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + go doJob(url) + time.Sleep(500 * time.Millisecond) + } _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } - } -func doJob() { - for { - foo() - time.Sleep(50 * time.Millisecond) - } +func doJob(url string) { + resultChan := make(chan int64) + go getResponseSize(url, resultChan) + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size } -func foo() { - bar() -} +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() -func bar() { - go parentGoroutine() -} + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } -func parentGoroutine() { - go doInterestingStuff(rand.Intn(100)) - time.Sleep(500 * time.Millisecond) -} + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } -func doInterestingStuff(value int) { - if value%4 == 0 { - select {} + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) } + + ch <- int64(len(body)) + return nil } diff --git a/examples/routine_leak/example2/step2/app_leaking_routine.go b/examples/routine_leak/example2/step2/app_leaking_routine.go index 200d261..95f7a6a 100644 --- a/examples/routine_leak/example2/step2/app_leaking_routine.go +++ b/examples/routine_leak/example2/step2/app_leaking_routine.go @@ -2,51 +2,83 @@ package main import ( "context" + "fmt" "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" "math/rand" + "net/http" "os" "runtime/pprof" + "strconv" "time" ) func main() { - async. - NewAsyncRoutine("main-job", context.Background(), doJob). - Run() - for { - time.Sleep(4 * time.Second) + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } - } -func doJob() { - for { - foo() - time.Sleep(50 * time.Millisecond) - } +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size } -func foo() { - bar() -} +// GetResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() -func bar() { - async. - NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). - Run() -} + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } -func parentGoroutine() { - async. - NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). - Run() - time.Sleep(500 * time.Millisecond) -} + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } + } -func doInterestingStuff(value int) { - if value%4 == 0 { - select {} + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) } + + ch <- int64(len(body)) + return nil } diff --git a/examples/routine_leak/example2/step3/README.md b/examples/routine_leak/example2/step3/README.md index dd9c598..1af3858 100644 --- a/examples/routine_leak/example2/step3/README.md +++ b/examples/routine_leak/example2/step3/README.md @@ -26,15 +26,29 @@ func main() { The output will be something like this: ``` -running routine count: 14 -running routine count: 17 -running routine count: 19 -running routine count: 21 -running routine count: 23 -running routine count: 28 -running routine count: 30 -running routine count: 31 -... +running routine count: 2 +running routine count: 3 +running routine count: 3 +running routine count: 2 +running routine count: 4 +running routine count: 4 +running routine count: 2 +running routine count: 4 +running routine count: 4 +running routine count: 2 +running routine count: 3 +running routine count: 5 +running routine count: 3 +running routine count: 3 +running routine count: 5 +running routine count: 5 +running routine count: 3 +running routine count: 4 +running routine count: 4 +running routine count: 4 +running routine count: 5 +running routine count: 5 +running routine count: 7 ``` We can see we have a routine leak, but we still have no idea of what is the routine leak. \ No newline at end of file diff --git a/examples/routine_leak/example2/step3/app_leaking_routine.go b/examples/routine_leak/example2/step3/app_leaking_routine.go index bcc9c5b..4c7863e 100644 --- a/examples/routine_leak/example2/step3/app_leaking_routine.go +++ b/examples/routine_leak/example2/step3/app_leaking_routine.go @@ -4,48 +4,80 @@ import ( "context" "fmt" "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" "math/rand" + "net/http" + "strconv" "time" ) func main() { async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() async.Manager().AddObserver(&leakingRoutineObserver{}) - - async. - NewAsyncRoutine("main-job", context.Background(), doJob). - Run() - - // wait for enter to be pressed - _, _ = fmt.Scanln() -} - -func doJob() { for { - foo() - time.Sleep(50 * time.Millisecond) + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } } } -func foo() { - bar() +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size } -func bar() { - async. - NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). - Run() -} +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() -func parentGoroutine() { - async. - NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). - Run() - time.Sleep(500 * time.Millisecond) -} + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } -func doInterestingStuff(value int) { - if value%4 == 0 { - select {} + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil } diff --git a/examples/routine_leak/example2/step4/README.md b/examples/routine_leak/example2/step4/README.md index b479e66..4526942 100644 --- a/examples/routine_leak/example2/step4/README.md +++ b/examples/routine_leak/example2/step4/README.md @@ -16,22 +16,35 @@ func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int The output will be similar to this: ``` -running routine count: 30 -Routine count for run-command: 18 +... +running routine count: 10 +Routine count for do-job: 9 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 Routine count for async-routine-monitor: 1 -Routine count for parent-go-routine: 10 -Routine count for main-job: 1 -running routine count: 31 -Routine count for run-command: 20 -Routine count for parent-go-routine: 9 +running routine count: 11 +Routine count for do-job: 10 Routine count for async-routine-monitor: 1 -Routine count for main-job: 1 -running routine count: 37 -Routine count for run-command: 25 -Routine count for parent-go-routine: 10 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 +Routine count for async-routine-monitor: 1 +running routine count: 11 +Routine count for do-job: 10 Routine count for async-routine-monitor: 1 -Routine count for main-job: 1 ``` - -Now it is clear: we are leaking the `run-command` routine. However, we don't have any context, +The `async-routine-montor` is the routine the manager started when we started the monitor. +Now it is clear: we are leaking the `do-job` routine. However, we don't have any context, so it is hard to understand why and when the routine is leaked. \ No newline at end of file diff --git a/examples/routine_leak/example2/step4/app_leaking_routine.go b/examples/routine_leak/example2/step4/app_leaking_routine.go index bcc9c5b..4c7863e 100644 --- a/examples/routine_leak/example2/step4/app_leaking_routine.go +++ b/examples/routine_leak/example2/step4/app_leaking_routine.go @@ -4,48 +4,80 @@ import ( "context" "fmt" "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" "math/rand" + "net/http" + "strconv" "time" ) func main() { async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() async.Manager().AddObserver(&leakingRoutineObserver{}) - - async. - NewAsyncRoutine("main-job", context.Background(), doJob). - Run() - - // wait for enter to be pressed - _, _ = fmt.Scanln() -} - -func doJob() { for { - foo() - time.Sleep(50 * time.Millisecond) + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine( + "do-job", + context.Background(), + func() { + doJob(url) + }). + Run() + time.Sleep(500 * time.Millisecond) + } } } -func foo() { - bar() +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size } -func bar() { - async. - NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). - Run() -} +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() -func parentGoroutine() { - async. - NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(rand.Intn(100)) }). - Run() - time.Sleep(500 * time.Millisecond) -} + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } -func doInterestingStuff(value int) { - if value%4 == 0 { - select {} + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil } diff --git a/examples/routine_leak/example2/step5/README.md b/examples/routine_leak/example2/step5/README.md index 7af4643..e73923c 100644 --- a/examples/routine_leak/example2/step5/README.md +++ b/examples/routine_leak/example2/step5/README.md @@ -5,23 +5,91 @@ We will leverage this ability to restrict even more our view to the leaked routi the routine to 1 second of execution and we add the received value as routine data. ```go - async. - NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(data) }). - WithData("data", fmt.Sprintf("%d", data)). - Timebox(1 * time.Second). - Run() + async.NewAsyncRoutine("do-job", context.Background(), + func() { + getWebsiteResponseSize(url) + }). + Timebox(5*time.Second). + WithData("url", url). + Run() + time.Sleep(500 * time.Millisecond) ``` Now the output will be similar to: ``` -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.218663 +0000 UTC data: map[data:52]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:59.038886 +0000 UTC data: map[data:56]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.218663 +0000 UTC data: map[data:52]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.987592 +0000 UTC data: map[data:72]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.834007 +0000 UTC data: map[data:16]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.885224 +0000 UTC data: map[data:84]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:58.680921 +0000 UTC data: map[data:44]] -leaked routine: [name: run-command started-at: 2025-05-07 15:35:59.038886 +0000 UTC data: map[data:56]] +... +routine: [count: 6] +routine: [count: 6] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.415189 +0000 UTC data: map[url:https://fool-bad.com]] +routine: [count: 4] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.415189 +0000 UTC data: map[url:https://fool-bad.com]] +leaked routine: [name: do-job started-at: 2025-05-14 09:54:54.915739 +0000 UTC data: map[url:https://weather-bad.com]] +routine: [count: 4] +... ``` -Analyzing the data, we observe that the routine leaks occur only when the data is even. -Upon closer inspection, it becomes evident that the leaks happen specifically when the data is divisible by 4. \ No newline at end of file +Analyzing the data, we observe that the routine leaks occur when there is an error with the website url. +Inspecting the code we see that `getWebsiteResponseSize` calls `getResponseSize` but ignores any error: +```go + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + + ... + ... + func getResponseSize(url string, ch chan<- int64) error { +``` + +Let's fix the code: +```go + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + err := getResponseSize(url, resultChan) + if err != nil { + resultChan <- -1 + fmt.Println("error fetching website size:", err) + } + }).Run() + + ... + ... + func getResponseSize(url string, ch chan<- int64) error { +``` + +Now the output is: +```go +routine: [count: 1] +Error getting response size: site unreachable: Get "https://duolingo-bad.com": dial tcp: lookup duolingo-bad.com: no such host +routine: [count: 1] +routine: [count: 1] +Error getting response size: site unreachable: Get "https://trulia-bad.com": dial tcp: lookup trulia-bad.com: no such host +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 3] +routine: [count: 3] +routine: [count: 5] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 3] +routine: [count: 3] +routine: [count: 1] +routine: [count: 1] +routine: [count: 1] +routine: [count: 1] +``` + +and we don't have leaked routines anymore: we can say that because we don't have any routine exceeding the timebox +anymore and the routine count always drops to 1 (the only running routine: the `async-routine-monitor` one) \ No newline at end of file diff --git a/examples/routine_leak/example2/step5/app_leaking_routine.go b/examples/routine_leak/example2/step5/app_leaking_routine.go index d1c0833..dc2b898 100644 --- a/examples/routine_leak/example2/step5/app_leaking_routine.go +++ b/examples/routine_leak/example2/step5/app_leaking_routine.go @@ -4,51 +4,80 @@ import ( "context" "fmt" "github.com/openshift-online/async-routine" + "github.com/openshift-online/async-routine/examples/routine_leak/example2/data" + "io" "math/rand" + "net/http" + "strconv" "time" ) func main() { async.Manager(async.WithSnapshottingInterval(500 * time.Millisecond)).Monitor().Start() async.Manager().AddObserver(&leakingRoutineObserver{}) - - async. - NewAsyncRoutine("main-job", context.Background(), doJob). - Run() - - // wait for enter to be pressed - _, _ = fmt.Scanln() -} - -func doJob() { for { - foo() - time.Sleep(50 * time.Millisecond) + for i := 0; i < 10; i++ { + url := data.Websites[rand.Intn(len(data.Websites))] + async.NewAsyncRoutine("do-job", context.Background(), + func() { + doJob(url) + }). + Timebox(5*time.Second). + WithData("url", url). + Run() + time.Sleep(500 * time.Millisecond) + } } } -func foo() { - bar() +func doJob(url string) { + resultChan := make(chan int64) + async.NewAsyncRoutine( + "get-website-size", + context.Background(), + func() { + getResponseSize(url, resultChan) + }).Run() + size := <-resultChan + // do something fun with the size - here we just avoid the compilation error + size = size } -func bar() { - async. - NewAsyncRoutine("parent-go-routine", context.Background(), parentGoroutine). - Run() -} +// getResponseSize fetches the given URL and sends the response size (in bytes) to the provided channel. +// Returns an error if the site does not exist or the request fails. +func getResponseSize(url string, ch chan<- int64) error { + // Perform the HTTP request + res, err := http.Get(url) + if err != nil { + return fmt.Errorf("site unreachable: %w", err) + } + defer res.Body.Close() -func parentGoroutine() { - data := rand.Intn(100) - async. - NewAsyncRoutine("run-command", context.Background(), func() { doInterestingStuff(data) }). - WithData("data", fmt.Sprintf("%d", data)). - Timebox(1 * time.Second). - Run() - time.Sleep(500 * time.Millisecond) -} + // Handle HTTP errors + if res.StatusCode != http.StatusOK { + switch res.StatusCode { + case http.StatusNotFound: + return fmt.Errorf("site does not exist (404 Not Found)") + default: + return fmt.Errorf("invalid HTTP response: %d %s", res.StatusCode, http.StatusText(res.StatusCode)) + } + } -func doInterestingStuff(value int) { - if value%4 == 0 { - select {} + // Try to use Content-Length header if available + if contentLength := res.Header.Get("Content-Length"); contentLength != "" { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err == nil && size > 0 { + ch <- size + return nil + } } + + // Calculate the size by reading the response body + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + ch <- int64(len(body)) + return nil } diff --git a/examples/routine_leak/example2/step5/leaking_routine_observer.go b/examples/routine_leak/example2/step5/leaking_routine_observer.go index 0bd201b..a3e237f 100644 --- a/examples/routine_leak/example2/step5/leaking_routine_observer.go +++ b/examples/routine_leak/example2/step5/leaking_routine_observer.go @@ -18,6 +18,7 @@ func (l leakingRoutineObserver) RoutineExceededTimebox(routine async.AsyncRoutin } func (l leakingRoutineObserver) RunningRoutineCount(count int) { + fmt.Println("running routine count:", count) } func (l leakingRoutineObserver) RunningRoutineByNameCount(name string, count int) {