Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f4ce3f1
chore: migrated to multi-cluster provider
vertex451 Aug 29, 2025
4999ab2
pulled main
vertex451 Sep 5, 2025
82ce421
addressed comments
vertex451 Sep 5, 2025
70e5a05
fixed kcp gateway connection
vertex451 Sep 19, 2025
c04cab2
removed http2 workaround
vertex451 Sep 19, 2025
b5a328d
linter
vertex451 Sep 19, 2025
903e39a
cleanup
vertex451 Sep 19, 2025
e84c1f7
removed tls
vertex451 Sep 19, 2025
9919cf0
iterate
vertex451 Sep 19, 2025
3f5976f
fix discovery check
vertex451 Sep 22, 2025
fea6b34
fixed virtual workspace
vertex451 Sep 22, 2025
b8f3ade
dynamic virtual workspace resolution
vertex451 Sep 23, 2025
1be4f83
made app exit on virtual-ws failure
vertex451 Sep 23, 2025
0c9ac2d
incapsulated clusterNameCtxKey
vertex451 Sep 23, 2025
1aa7ca0
added dedicated helper for ws path
vertex451 Sep 23, 2025
a5efb28
pulled main
vertex451 Sep 24, 2025
36eff27
centralized context keys
vertex451 Sep 24, 2025
907f251
removed redundant wrappers
vertex451 Sep 24, 2025
c7193e2
improved logging in ClusterPathResolver
vertex451 Sep 24, 2025
b366f2b
made clusterPathResolver as a KCPManager field
vertex451 Sep 24, 2025
98ab379
removed hardcoded fallback in clusterpath
vertex451 Sep 24, 2025
70b65c2
iterate
vertex451 Sep 24, 2025
ad6be7a
iterate
vertex451 Sep 24, 2025
e9b4c33
cleanup
vertex451 Sep 25, 2025
5c4741d
consistent error handling
vertex451 Sep 25, 2025
e3b5705
add validation of virtual ws fields
vertex451 Sep 25, 2025
7d6d82c
differentiate between shutdown reasons
vertex451 Sep 25, 2025
330ffe4
corrected virtual ws patter check
vertex451 Sep 25, 2025
282ef8b
exponential backoff
vertex451 Sep 25, 2025
5b62aa1
removed deprecated alias
vertex451 Sep 25, 2025
3ab0b6e
iterate
vertex451 Sep 25, 2025
3225d75
pulled main
vertex451 Sep 26, 2025
d01584a
improve reconciler coversage
vertex451 Sep 26, 2025
bcc07f3
coverage
vertex451 Sep 26, 2025
988e7b7
scalars test
vertex451 Sep 26, 2025
b808fd2
more tests
vertex451 Sep 26, 2025
1869a61
addressed scalar bug
vertex451 Sep 29, 2025
e8beffd
Merge branch 'main' of github.com:platform-mesh/kubernetes-graphql-ga…
vertex451 Sep 29, 2025
779f42c
iterate
vertex451 Sep 29, 2025
f6bab1c
replaced log.Error with log.Fatal
vertex451 Oct 2, 2025
97cc1c0
replaced context.Background with t.Context() in tests
vertex451 Oct 2, 2025
4494be2
decomposed isDiscoveryRequest logic
vertex451 Oct 2, 2025
409185a
removed redundant url parsing
vertex451 Oct 2, 2025
662bcef
normal shutdown in case of context cancelation
vertex451 Oct 2, 2025
e695399
eliminated false positive in worksapce detection
vertex451 Oct 2, 2025
73cc293
added type asser
vertex451 Oct 2, 2025
00c6e87
fix bug in ws
vertex451 Oct 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"os"
"time"

openmfpcontext "github.com/platform-mesh/golang-commons/context"
Expand All @@ -29,19 +30,22 @@ var gatewayCmd = &cobra.Command{
defer shutdown()

if err := initializeSentry(ctx, log); err != nil {
log.Fatal().Err(err).Msg("Failed to initialize Sentry")
log.Error().Err(err).Msg("Failed to initialize Sentry")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity: What is the reason to switch to log.Error() as log.Fatal() also das the os.Exit(1)- is it the log level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After your comment I double checked the behavior and log.Fatal does basically the same as log.Error + os.Exit, but it additionally it flushes any buffered messages.
Replaced log.Error + os.Exit with log.Fatal

os.Exit(1)
}

ctrl.SetLogger(log.Logr())

gatewayInstance, err := manager.NewGateway(ctx, log, appCfg)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create gateway")
log.Error().Err(err).Msg("Failed to create gateway")
os.Exit(1)
}

tracingShutdown, err := initializeTracing(ctx, log)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize tracing")
log.Error().Err(err).Msg("Failed to initialize tracing")
os.Exit(1)
}
defer func() {
if err := tracingShutdown(ctx); err != nil {
Expand All @@ -50,7 +54,8 @@ var gatewayCmd = &cobra.Command{
}()

if err := runServers(ctx, log, gatewayInstance); err != nil {
log.Fatal().Err(err).Msg("Failed to run servers")
log.Error().Err(err).Msg("Failed to run servers")
os.Exit(1)
}
},
}
Expand All @@ -65,7 +70,7 @@ func initializeSentry(ctx context.Context, log *logger.Logger) error {
defaultCfg.Image.Name, defaultCfg.Image.Tag,
)
if err != nil {
log.Fatal().Err(err).Msg("Sentry init failed")
return fmt.Errorf("sentry init failed: %w", err)
}

defer openmfpcontext.Recover(log)
Expand All @@ -76,14 +81,14 @@ func initializeTracing(ctx context.Context, log *logger.Logger) (func(ctx contex
if defaultCfg.Tracing.Enabled {
shutdown, err := traces.InitProvider(ctx, defaultCfg.Tracing.Collector)
if err != nil {
log.Fatal().Err(err).Msg("unable to start gRPC-Sidecar TracerProvider")
return nil, fmt.Errorf("unable to start gRPC-Sidecar TracerProvider: %w", err)
}
return shutdown, nil
}

shutdown, err := traces.InitLocalProvider(ctx, defaultCfg.Tracing.Collector, false)
if err != nil {
log.Fatal().Err(err).Msg("unable to start local TracerProvider")
return nil, fmt.Errorf("unable to start local TracerProvider: %w", err)
}
return shutdown, nil
}
Expand Down
62 changes: 49 additions & 13 deletions cmd/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"
"crypto/tls"
"os"

kcpapis "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
kcpcore "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
Expand Down Expand Up @@ -77,7 +78,20 @@ var listenCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
log.Info().Str("LogLevel", log.GetLevel().String()).Msg("Starting the Listener...")

ctx := ctrl.SetupSignalHandler()
// Set up signal handler and create a cancellable context for coordinated shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Error channel to distinguish error-triggered shutdown from normal signals
errCh := make(chan error, 1)

// Set up signal handling
signalCtx := ctrl.SetupSignalHandler()
go func() {
<-signalCtx.Done()
log.Info().Msg("received shutdown signal, initiating graceful shutdown")
cancel()
}()

restCfg := ctrl.GetConfigOrDie()

mgrOpts := ctrl.Options{
Expand All @@ -93,7 +107,8 @@ var listenCmd = &cobra.Command{
Scheme: scheme,
})
if err != nil {
log.Fatal().Err(err).Msg("failed to create client from config")
log.Error().Err(err).Msg("failed to create client from config")
os.Exit(1)
}

reconcilerOpts := reconciler.ReconcilerOpts{
Expand All @@ -105,44 +120,65 @@ var listenCmd = &cobra.Command{
}

// Create the appropriate reconciler based on configuration
var reconcilerInstance reconciler.CustomReconciler
var reconcilerInstance reconciler.ControllerProvider
if appCfg.EnableKcp {
kcpReconciler, err := kcp.NewKCPReconciler(appCfg, reconcilerOpts, log)
kcpManager, err := kcp.NewKCPManager(appCfg, reconcilerOpts, log)
if err != nil {
log.Fatal().Err(err).Msg("unable to create KCP reconciler")
log.Error().Err(err).Msg("unable to create KCP manager")
os.Exit(1)
}
reconcilerInstance = kcpManager

// Start virtual workspace watching if path is configured
if appCfg.Listener.VirtualWorkspacesConfigPath != "" {
go func() {
if err := kcpReconciler.StartVirtualWorkspaceWatching(ctx, appCfg.Listener.VirtualWorkspacesConfigPath); err != nil {
log.Fatal().Err(err).Msg("failed to start virtual workspace watching")
if err := kcpManager.StartVirtualWorkspaceWatching(ctx, appCfg.Listener.VirtualWorkspacesConfigPath); err != nil {
log.Error().Err(err).Msg("virtual workspace watching failed, initiating graceful shutdown")
select {
case errCh <- err:
default:
}
cancel() // Trigger coordinated shutdown
}
}()
}

reconcilerInstance = kcpReconciler
} else {
ioHandler, err := workspacefile.NewIOHandler(appCfg.OpenApiDefinitionsPath)
if err != nil {
log.Fatal().Err(err).Msg("unable to create IO handler")
log.Error().Err(err).Msg("unable to create IO handler")
os.Exit(1)
}

reconcilerInstance, err = clusteraccess.NewClusterAccessReconciler(ctx, appCfg, reconcilerOpts, ioHandler, apischema.NewResolver(log), log)
if err != nil {
log.Fatal().Err(err).Msg("unable to create cluster access reconciler")
log.Error().Err(err).Msg("unable to create cluster access reconciler")
os.Exit(1)
}
}

// Setup reconciler with its own manager and start everything
// Use the original context for the manager - it will be cancelled if watcher fails
if err := startManagerWithReconciler(ctx, reconcilerInstance); err != nil {
log.Fatal().Err(err).Msg("failed to start manager with reconciler")
log.Error().Err(err).Msg("failed to start manager with reconciler")
os.Exit(1)
}

// Determine exit reason: error-triggered vs. normal signal
select {
case err := <-errCh:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh i am not fully happy with the context and error handling, maybe we find a better solution in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can fix context propagation, but I am not sure what would you improve here in this select statement.
Please, give details if you have any, and I will create an issue.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attention, untested: This could be an idea using errgroup

	Run: func(cmd *cobra.Command, args []string) {
		log.Info().Str("LogLevel", log.GetLevel().String()).Msg("Starting the Listener...")

		// Set up signal handling
		signalCtx := ctrl.SetupSignalHandler()
		ctx, cancel := context.WithCancel(signalCtx)
		defer cancel()

		restCfg := ctrl.GetConfigOrDie()

		mgrOpts := ctrl.Options{
			Scheme:                 scheme,
			Metrics:                metricsServerOptions,
			WebhookServer:          webhookServer,
			HealthProbeBindAddress: defaultCfg.HealthProbeBindAddress,
			LeaderElection:         defaultCfg.LeaderElection.Enabled,
			LeaderElectionID:       "72231e1f.platform-mesh.io",
		}

		clt, err := client.New(restCfg, client.Options{
			Scheme: scheme,
		})
		if err != nil {
			log.Fatal().Err(err).Msg("failed to create client from config")
		}

		reconcilerOpts := reconciler.ReconcilerOpts{
			Scheme:                 scheme,
			Client:                 clt,
			Config:                 restCfg,
			ManagerOpts:            mgrOpts,
			OpenAPIDefinitionsPath: appCfg.OpenApiDefinitionsPath,
		}

		eg, eCtx := errgroup.WithContext(ctx)

		// Create the appropriate reconciler based on configuration
		var reconcilerInstance reconciler.ControllerProvider
		if appCfg.EnableKcp {
			kcpManager, err := kcp.NewKCPManager(appCfg, reconcilerOpts, log)
			if err != nil {
				log.Fatal().Err(err).Msg("unable to create KCP manager")
			}
			reconcilerInstance = kcpManager

			// Start virtual workspace watching if path is configured
			if appCfg.Listener.VirtualWorkspacesConfigPath != "" {
				eg.Go(func() error {
					if err := kcpManager.StartVirtualWorkspaceWatching(eCtx, appCfg.Listener.VirtualWorkspacesConfigPath); err != nil {
						return err
					}

					return nil
				})
			}
		} else {
			ioHandler, err := workspacefile.NewIOHandler(appCfg.OpenApiDefinitionsPath)
			if err != nil {
				log.Fatal().Err(err).Msg("unable to create IO handler")
			}

			reconcilerInstance, err = clusteraccess.NewClusterAccessReconciler(ctx, appCfg, reconcilerOpts, ioHandler, apischema.NewResolver(log), log)
			if err != nil {
				log.Fatal().Err(err).Msg("unable to create cluster access reconciler")
			}
		}

		// Setup reconciler with its own manager and start everything
		// Use the original context for the manager - it will be cancelled if watcher fails
		eg.Go(func() error {
			if err := startManagerWithReconciler(eCtx, reconcilerInstance); err != nil {
				log.Error().Err(err).Msg("failed to start manager with reconciler")
				return err
			}

			return nil
		})

		err = eg.Wait()
		if err != nil {
			log.Fatal().Err(err).Msg("exiting due to critical component failure")
		}

		log.Info().Msg("graceful shutdown complete")
	},

Maybe we don't even need our own context and can only use the signal context. The errgroup returns it's own context that is cancelled if one function returns with an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also do it in this issue:
#62

if err != nil {
log.Error().Err(err).Msg("exiting due to critical component failure")
os.Exit(1)
}
default:
// Normal, graceful shutdown via signal
log.Info().Msg("graceful shutdown complete")
}
},
}

// startManagerWithReconciler handles the common manager setup and start operations
func startManagerWithReconciler(ctx context.Context, reconciler reconciler.CustomReconciler) error {
func startManagerWithReconciler(ctx context.Context, reconciler reconciler.ControllerProvider) error {
mgr := reconciler.GetManager()

if err := reconciler.SetupWithManager(mgr); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func initConfig() {
// Gateway URL
v.SetDefault("gateway-url-virtual-workspace-prefix", "virtual-workspace")
v.SetDefault("gateway-url-default-kcp-workspace", "root")
v.SetDefault("gateway-url-kcp-workspace-pattern", "root:orgs:{org}")
v.SetDefault("gateway-url-graphql-suffix", "graphql")
}

Expand Down
1 change: 1 addition & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Config struct {
Url struct {
VirtualWorkspacePrefix string `mapstructure:"gateway-url-virtual-workspace-prefix"`
DefaultKcpWorkspace string `mapstructure:"gateway-url-default-kcp-workspace"`
KcpWorkspacePattern string `mapstructure:"gateway-url-kcp-workspace-pattern"`
GraphqlSuffix string `mapstructure:"gateway-url-graphql-suffix"`
} `mapstructure:",squash"`

Expand Down
62 changes: 62 additions & 0 deletions common/mocks/mock_Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions common/mocks/mock_WithWatch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions common/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ func TestWatchSingleFile_InvalidDirectory(t *testing.T) {
}

func TestWatchSingleFile_RealFile(t *testing.T) {
t.Parallel() // Run in parallel to avoid resource conflicts

log := testlogger.New().HideLogOutput().Logger
handler := &MockFileEventHandler{}

Expand All @@ -388,15 +390,15 @@ func TestWatchSingleFile_RealFile(t *testing.T) {
watchDone <- watcher.WatchSingleFile(ctx, tempFile, 50) // 50ms debounce
}()

// Give the watcher time to start
time.Sleep(30 * time.Millisecond)
// Give the watcher more time to start (increased for parallel execution)
time.Sleep(100 * time.Millisecond)

// Modify the file to trigger an event
err = os.WriteFile(tempFile, []byte("modified"), 0644)
require.NoError(t, err)

// Give time for file change to be detected and debounced
time.Sleep(150 * time.Millisecond) // 50ms debounce + extra buffer
// Give more time for file change to be detected and debounced (increased for parallel execution)
time.Sleep(300 * time.Millisecond) // 50ms debounce + extra buffer for parallel tests

// Wait for watch to finish (should timeout after remaining time)
err = <-watchDone
Expand Down Expand Up @@ -626,6 +628,8 @@ func TestHandleEvent_StatError(t *testing.T) {
}

func TestWatchSingleFile_WithDebounceTimer(t *testing.T) {
t.Parallel() // Run in parallel to avoid resource conflicts

log := testlogger.New().HideLogOutput().Logger
handler := &MockFileEventHandler{}

Expand Down Expand Up @@ -976,6 +980,8 @@ func TestWatchDirectory_ErrorInLoop(t *testing.T) {

// TestWatchSingleFile_TimerStop tests the timer stop path in watchWithDebounce
func TestWatchSingleFile_TimerStop(t *testing.T) {
t.Parallel() // Run in parallel to avoid resource conflicts

log := testlogger.New().HideLogOutput().Logger
handler := &MockFileEventHandler{}

Expand Down
Loading
Loading