@@ -11,22 +11,38 @@ import (
11
11
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12
12
"k8s.io/apimachinery/pkg/runtime/schema"
13
13
"k8s.io/apimachinery/pkg/types"
14
+ "sigs.k8s.io/controller-runtime/pkg/client"
14
15
"sigs.k8s.io/controller-runtime/pkg/log"
15
16
16
17
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
17
18
)
18
19
19
- // kagentToolServerGVK defines the GroupVersionKind for kagent ToolServer
20
+ // kagentToolServerGVK defines the GroupVersionKind for kagent v1alpha1 ToolServer
20
21
var kagentToolServerGVK = schema.GroupVersionKind {
21
22
Group : "kagent.dev" ,
22
23
Version : "v1alpha1" ,
23
24
Kind : "ToolServer" ,
24
25
}
25
26
27
+ // kagentRemoteMCPServerGVK defines the GroupVersionKind for kagent v1alpha2 RemoteMCPServer
28
+ var kagentRemoteMCPServerGVK = schema.GroupVersionKind {
29
+ Group : "kagent.dev" ,
30
+ Version : "v1alpha2" ,
31
+ Kind : "RemoteMCPServer" ,
32
+ }
33
+
26
34
// Constants for kagent config types
27
35
const (
36
+ // v1alpha1 config types
28
37
kagentConfigTypeSSE = "sse"
29
38
kagentConfigTypeStreamableHTTP = "streamableHttp"
39
+
40
+ // v1alpha2 protocol types
41
+ kagentProtocolSSE = "SSE"
42
+ kagentProtocolStreamableHTTP = "STREAMABLE_HTTP"
43
+
44
+ // Environment variable for kagent API version preference
45
+ kagentAPIVersionEnv = "KAGENT_API_VERSION"
30
46
)
31
47
32
48
// isKagentIntegrationEnabled checks if kagent integration is enabled via environment variable
@@ -42,89 +58,178 @@ func isKagentIntegrationEnabled() bool {
42
58
return result
43
59
}
44
60
45
- // ensureKagentToolServer ensures a kagent ToolServer resource exists for the ToolHive MCPServer
61
+ // getPreferredKagentAPIVersion returns the preferred kagent API version
62
+ // Defaults to v1alpha1 for backward compatibility, but can be overridden
63
+ // via KAGENT_API_VERSION environment variable
64
+ func getPreferredKagentAPIVersion () string {
65
+ version := os .Getenv (kagentAPIVersionEnv )
66
+ if version == "v1alpha2" {
67
+ return "v1alpha2"
68
+ }
69
+ // Default to v1alpha1 for backward compatibility
70
+ return "v1alpha1"
71
+ }
72
+
73
+ // detectKagentAPIVersion detects which kagent API version is available in the cluster
74
+ func (r * MCPServerReconciler ) detectKagentAPIVersion (ctx context.Context ) (string , error ) {
75
+ // First check if user has a preference
76
+ preferred := getPreferredKagentAPIVersion ()
77
+
78
+ // Try to list resources of the preferred version to see if it's available
79
+ if preferred == "v1alpha2" {
80
+ // Try v1alpha2 RemoteMCPServer
81
+ list := & unstructured.UnstructuredList {}
82
+ list .SetGroupVersionKind (schema.GroupVersionKind {
83
+ Group : "kagent.dev" ,
84
+ Version : "v1alpha2" ,
85
+ Kind : "RemoteMCPServerList" ,
86
+ })
87
+
88
+ // We just want to check if the API exists, limit to 1 item
89
+ if err := r .List (ctx , list , & client.ListOptions {Limit : 1 }); err == nil {
90
+ return "v1alpha2" , nil
91
+ }
92
+ }
93
+
94
+ // Try v1alpha1 ToolServer
95
+ list := & unstructured.UnstructuredList {}
96
+ list .SetGroupVersionKind (schema.GroupVersionKind {
97
+ Group : "kagent.dev" ,
98
+ Version : "v1alpha1" ,
99
+ Kind : "ToolServerList" ,
100
+ })
101
+
102
+ if err := r .List (ctx , list , & client.ListOptions {Limit : 1 }); err == nil {
103
+ return "v1alpha1" , nil
104
+ }
105
+
106
+ // If neither works, return the preferred version anyway
107
+ // The actual resource creation will fail with a clear error
108
+ return preferred , nil
109
+ }
110
+
111
+ // ensureKagentToolServer ensures a kagent resource exists for the ToolHive MCPServer
112
+ // It automatically detects and uses the appropriate kagent API version
46
113
func (r * MCPServerReconciler ) ensureKagentToolServer (ctx context.Context , mcpServer * mcpv1alpha1.MCPServer ) error {
47
114
logger := log .FromContext (ctx )
48
115
49
116
// Check if kagent integration is enabled
50
117
if ! isKagentIntegrationEnabled () {
51
- // If not enabled, ensure any existing kagent ToolServer is deleted
118
+ // If not enabled, ensure any existing kagent resources are deleted
52
119
return r .deleteKagentToolServer (ctx , mcpServer )
53
120
}
54
121
55
- // Create the kagent ToolServer object
56
- kagentToolServer := r .createKagentToolServerObject (mcpServer )
122
+ // Detect which kagent API version to use
123
+ apiVersion , err := r .detectKagentAPIVersion (ctx )
124
+ if err != nil {
125
+ logger .Error (err , "Failed to detect kagent API version, using default" , "default" , apiVersion )
126
+ }
127
+
128
+ logger .V (1 ).Info ("Using kagent API version" , "version" , apiVersion )
129
+
130
+ // Create the appropriate kagent resource based on API version
131
+ var kagentResource * unstructured.Unstructured
132
+ var gvk schema.GroupVersionKind
133
+
134
+ if apiVersion == "v1alpha2" {
135
+ kagentResource = r .createKagentRemoteMCPServerObject (mcpServer )
136
+ gvk = kagentRemoteMCPServerGVK
137
+ } else {
138
+ kagentResource = r .createKagentToolServerObject (mcpServer )
139
+ gvk = kagentToolServerGVK
140
+ }
57
141
58
- // Check if the kagent ToolServer already exists
142
+ // Check if the kagent resource already exists
59
143
existing := & unstructured.Unstructured {}
60
- existing .SetGroupVersionKind (kagentToolServerGVK )
61
- err : = r .Get (ctx , types.NamespacedName {
62
- Name : kagentToolServer .GetName (),
63
- Namespace : kagentToolServer .GetNamespace (),
144
+ existing .SetGroupVersionKind (gvk )
145
+ err = r .Get (ctx , types.NamespacedName {
146
+ Name : kagentResource .GetName (),
147
+ Namespace : kagentResource .GetNamespace (),
64
148
}, existing )
65
149
66
150
if errors .IsNotFound (err ) {
67
- // Create the kagent ToolServer
68
- logger .Info ("Creating kagent ToolServer" ,
69
- "name" , kagentToolServer .GetName (),
70
- "namespace" , kagentToolServer .GetNamespace ())
71
- if err := r .Create (ctx , kagentToolServer ); err != nil {
72
- return fmt .Errorf ("failed to create kagent ToolServer: %w" , err )
151
+ // Create the kagent resource
152
+ logger .Info ("Creating kagent resource" ,
153
+ "kind" , gvk .Kind ,
154
+ "version" , gvk .Version ,
155
+ "name" , kagentResource .GetName (),
156
+ "namespace" , kagentResource .GetNamespace ())
157
+ if err := r .Create (ctx , kagentResource ); err != nil {
158
+ return fmt .Errorf ("failed to create kagent %s: %w" , gvk .Kind , err )
73
159
}
74
160
return nil
75
161
} else if err != nil {
76
- return fmt .Errorf ("failed to get kagent ToolServer : %w" , err )
162
+ return fmt .Errorf ("failed to get kagent %s : %w" , gvk . Kind , err )
77
163
}
78
164
79
- // Update the kagent ToolServer if needed
165
+ // Update the kagent resource if needed
80
166
existingSpec , _ , _ := unstructured .NestedMap (existing .Object , "spec" )
81
- desiredSpec , _ , _ := unstructured .NestedMap (kagentToolServer .Object , "spec" )
167
+ desiredSpec , _ , _ := unstructured .NestedMap (kagentResource .Object , "spec" )
82
168
83
169
if ! equality .Semantic .DeepEqual (existingSpec , desiredSpec ) {
84
- logger .Info ("Updating kagent ToolServer" ,
85
- "name" , kagentToolServer .GetName (),
86
- "namespace" , kagentToolServer .GetNamespace ())
87
- existing .Object ["spec" ] = kagentToolServer .Object ["spec" ]
170
+ logger .Info ("Updating kagent resource" ,
171
+ "kind" , gvk .Kind ,
172
+ "version" , gvk .Version ,
173
+ "name" , kagentResource .GetName (),
174
+ "namespace" , kagentResource .GetNamespace ())
175
+ existing .Object ["spec" ] = kagentResource .Object ["spec" ]
88
176
if err := r .Update (ctx , existing ); err != nil {
89
- return fmt .Errorf ("failed to update kagent ToolServer : %w" , err )
177
+ return fmt .Errorf ("failed to update kagent %s : %w" , gvk . Kind , err )
90
178
}
91
179
}
92
180
93
181
return nil
94
182
}
95
183
96
- // deleteKagentToolServer deletes the kagent ToolServer if it exists
184
+ // deleteKagentToolServer deletes any kagent resources (v1alpha1 or v1alpha2) if they exist
97
185
func (r * MCPServerReconciler ) deleteKagentToolServer (ctx context.Context , mcpServer * mcpv1alpha1.MCPServer ) error {
98
186
logger := log .FromContext (ctx )
187
+ resourceName := fmt .Sprintf ("toolhive-%s" , mcpServer .Name )
99
188
100
- kagentToolServer := & unstructured.Unstructured {}
101
- kagentToolServer .SetGroupVersionKind (kagentToolServerGVK )
102
- kagentToolServer .SetName (fmt .Sprintf ("toolhive-%s" , mcpServer .Name ))
103
- kagentToolServer .SetNamespace (mcpServer .Namespace )
189
+ // Try to delete v1alpha1 ToolServer
190
+ toolServer := & unstructured.Unstructured {}
191
+ toolServer .SetGroupVersionKind (kagentToolServerGVK )
192
+ toolServer .SetName (resourceName )
193
+ toolServer .SetNamespace (mcpServer .Namespace )
104
194
105
195
err := r .Get (ctx , types.NamespacedName {
106
- Name : kagentToolServer .GetName (),
107
- Namespace : kagentToolServer .GetNamespace (),
108
- }, kagentToolServer )
196
+ Name : toolServer .GetName (),
197
+ Namespace : toolServer .GetNamespace (),
198
+ }, toolServer )
109
199
110
- if errors .IsNotFound (err ) {
111
- // Already deleted
112
- return nil
113
- } else if err != nil {
114
- return fmt .Errorf ("failed to get kagent ToolServer for deletion: %w" , err )
200
+ if err == nil {
201
+ logger .Info ("Deleting kagent ToolServer" ,
202
+ "name" , toolServer .GetName (),
203
+ "namespace" , toolServer .GetNamespace ())
204
+ if err := r .Delete (ctx , toolServer ); err != nil && ! errors .IsNotFound (err ) {
205
+ return fmt .Errorf ("failed to delete kagent ToolServer: %w" , err )
206
+ }
115
207
}
116
208
117
- logger .Info ("Deleting kagent ToolServer" ,
118
- "name" , kagentToolServer .GetName (),
119
- "namespace" , kagentToolServer .GetNamespace ())
120
- if err := r .Delete (ctx , kagentToolServer ); err != nil && ! errors .IsNotFound (err ) {
121
- return fmt .Errorf ("failed to delete kagent ToolServer: %w" , err )
209
+ // Try to delete v1alpha2 RemoteMCPServer
210
+ remoteMCPServer := & unstructured.Unstructured {}
211
+ remoteMCPServer .SetGroupVersionKind (kagentRemoteMCPServerGVK )
212
+ remoteMCPServer .SetName (resourceName )
213
+ remoteMCPServer .SetNamespace (mcpServer .Namespace )
214
+
215
+ err = r .Get (ctx , types.NamespacedName {
216
+ Name : remoteMCPServer .GetName (),
217
+ Namespace : remoteMCPServer .GetNamespace (),
218
+ }, remoteMCPServer )
219
+
220
+ if err == nil {
221
+ logger .Info ("Deleting kagent RemoteMCPServer" ,
222
+ "name" , remoteMCPServer .GetName (),
223
+ "namespace" , remoteMCPServer .GetNamespace ())
224
+ if err := r .Delete (ctx , remoteMCPServer ); err != nil && ! errors .IsNotFound (err ) {
225
+ return fmt .Errorf ("failed to delete kagent RemoteMCPServer: %w" , err )
226
+ }
122
227
}
123
228
124
229
return nil
125
230
}
126
231
127
- // createKagentToolServerObject creates an unstructured kagent ToolServer object
232
+ // createKagentToolServerObject creates an unstructured kagent v1alpha1 ToolServer object
128
233
func (* MCPServerReconciler ) createKagentToolServerObject (mcpServer * mcpv1alpha1.MCPServer ) * unstructured.Unstructured {
129
234
kagentToolServer := & unstructured.Unstructured {}
130
235
kagentToolServer .SetGroupVersionKind (kagentToolServerGVK )
@@ -200,3 +305,66 @@ func (*MCPServerReconciler) createKagentToolServerObject(mcpServer *mcpv1alpha1.
200
305
201
306
return kagentToolServer
202
307
}
308
+
309
+ // createKagentRemoteMCPServerObject creates an unstructured kagent v1alpha2 RemoteMCPServer object
310
+ func (* MCPServerReconciler ) createKagentRemoteMCPServerObject (mcpServer * mcpv1alpha1.MCPServer ) * unstructured.Unstructured {
311
+ remoteMCPServer := & unstructured.Unstructured {}
312
+ remoteMCPServer .SetGroupVersionKind (kagentRemoteMCPServerGVK )
313
+ remoteMCPServer .SetName (fmt .Sprintf ("toolhive-%s" , mcpServer .Name ))
314
+ remoteMCPServer .SetNamespace (mcpServer .Namespace )
315
+
316
+ // Build the service URL for the ToolHive MCP server
317
+ serviceName := createServiceName (mcpServer .Name )
318
+ serviceURL := fmt .Sprintf ("http://%s.%s.svc.cluster.local:%d" ,
319
+ serviceName , mcpServer .Namespace , mcpServer .Spec .Port )
320
+
321
+ // Determine the protocol based on ToolHive transport
322
+ var protocol string
323
+ switch mcpServer .Spec .Transport {
324
+ case "sse" :
325
+ protocol = kagentProtocolSSE
326
+ case "streamable-http" :
327
+ protocol = kagentProtocolStreamableHTTP
328
+ default :
329
+ // For stdio or any other transport, default to SSE
330
+ // since ToolHive exposes everything via HTTP
331
+ protocol = kagentProtocolSSE
332
+ }
333
+
334
+ // Build the spec for v1alpha2 RemoteMCPServer
335
+ spec := map [string ]interface {}{
336
+ "description" : fmt .Sprintf ("ToolHive MCP Server: %s" , mcpServer .Name ),
337
+ "url" : serviceURL ,
338
+ "protocol" : protocol ,
339
+ // terminateOnClose defaults to true which is what we want
340
+ }
341
+
342
+ // Add timeout if needed (optional, using default for now)
343
+ // spec["timeout"] = "30s"
344
+
345
+ remoteMCPServer .Object = map [string ]interface {}{
346
+ "apiVersion" : "kagent.dev/v1alpha2" ,
347
+ "kind" : "RemoteMCPServer" ,
348
+ "metadata" : map [string ]interface {}{
349
+ "name" : remoteMCPServer .GetName (),
350
+ "namespace" : remoteMCPServer .GetNamespace (),
351
+ "labels" : map [string ]interface {}{
352
+ "toolhive.stacklok.dev/managed-by" : "toolhive-operator" ,
353
+ "toolhive.stacklok.dev/mcpserver" : mcpServer .Name ,
354
+ },
355
+ "ownerReferences" : []interface {}{
356
+ map [string ]interface {}{
357
+ "apiVersion" : "toolhive.stacklok.dev/v1alpha1" ,
358
+ "kind" : "MCPServer" ,
359
+ "name" : mcpServer .Name ,
360
+ "uid" : string (mcpServer .UID ),
361
+ "controller" : true ,
362
+ "blockOwnerDeletion" : true ,
363
+ },
364
+ },
365
+ },
366
+ "spec" : spec ,
367
+ }
368
+
369
+ return remoteMCPServer
370
+ }
0 commit comments