Skip to content

Commit 5d23e51

Browse files
committed
fix: fix param in create service.
Signed-off-by: X1aoZEOuO <[email protected]>
1 parent aecfda5 commit 5d23e51

File tree

3 files changed

+28
-14
lines changed

3 files changed

+28
-14
lines changed

cmd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"k8s.io/apimachinery/pkg/runtime"
2828
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29+
"k8s.io/client-go/dynamic"
2930
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3031
ctrl "sigs.k8s.io/controller-runtime"
3132
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -180,7 +181,6 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}, enableServerle
180181
os.Exit(1)
181182
}
182183

183-
184184
if enableServerless {
185185
dynamicClient, err := dynamic.NewForConfig(mgr.GetConfig())
186186
if err != nil {

pkg/controller/inference/activator_controller.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"sigs.k8s.io/controller-runtime/pkg/predicate"
4343
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4444

45-
llmazcoreapi "github.com/inftyai/llmaz/api/core/v1alpha1"
4645
llmazcorev1alpha1 "github.com/inftyai/llmaz/api/core/v1alpha1"
4746
)
4847

@@ -93,6 +92,7 @@ func (r *ActivatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
9392
return ctrl.Result{}, err
9493
}
9594

95+
// nolint:staticcheck
9696
ep := &corev1.Endpoints{}
9797
if err := r.Get(ctx, req.NamespacedName, ep); err != nil {
9898
if errors.IsNotFound(err) {
@@ -113,8 +113,7 @@ func (r *ActivatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
113113
if len(ep.Subsets) == 0 {
114114
// If the endpoints are empty, inject the activator IP
115115
return ctrl.Result{}, r.injectEndpoint(ctx, ep, svc, ports)
116-
} else if ep.Subsets[0].Addresses != nil &&
117-
len(ep.Subsets[0].Addresses) > 0 &&
116+
} else if len(ep.Subsets[0].Addresses) > 0 &&
118117
ep.Subsets[0].Addresses[0].IP != r.ip {
119118
// If the endpoints are not empty and not the activator IP, forward the traffic
120119
return ctrl.Result{}, r.forwardEndpoint(ctx, ep, ports)
@@ -127,7 +126,7 @@ func (r *ActivatorReconciler) needInject(svc *corev1.Service) ([]corev1.ServiceP
127126
if svc == nil || svc.Annotations == nil {
128127
return nil, false
129128
}
130-
if _, ok := svc.Annotations[llmazcoreapi.ModelActivatorAnnoKey]; !ok {
129+
if _, ok := svc.Annotations[llmazcorev1alpha1.ModelActivatorAnnoKey]; !ok {
131130
return nil, false
132131
}
133132
if len(svc.Spec.Ports) == 0 || svc.Spec.Type != corev1.ServiceTypeClusterIP {
@@ -148,7 +147,7 @@ func (r *ActivatorReconciler) needInject(svc *corev1.Service) ([]corev1.ServiceP
148147
}
149148

150149
func (r *ActivatorReconciler) restoreSelectorIfNeeded(ctx context.Context, svc *corev1.Service) error {
151-
selectorStr := svc.Annotations[llmazcoreapi.CachedModelActivatorAnnoKey]
150+
selectorStr := svc.Annotations[llmazcorev1alpha1.CachedModelActivatorAnnoKey]
152151
if selectorStr == "" {
153152
return nil
154153
}
@@ -160,7 +159,7 @@ func (r *ActivatorReconciler) restoreSelectorIfNeeded(ctx context.Context, svc *
160159
}
161160

162161
updatedSvc := svc.DeepCopy()
163-
delete(updatedSvc.Annotations, llmazcoreapi.CachedModelActivatorAnnoKey)
162+
delete(updatedSvc.Annotations, llmazcorev1alpha1.CachedModelActivatorAnnoKey)
164163
updatedSvc.Spec.Selector = sel
165164

166165
if err := r.Update(ctx, updatedSvc); err != nil {
@@ -172,7 +171,9 @@ func (r *ActivatorReconciler) restoreSelectorIfNeeded(ctx context.Context, svc *
172171
return nil
173172
}
174173

174+
// nolint:staticcheck
175175
func (r *ActivatorReconciler) injectEndpoint(ctx context.Context, ep *corev1.Endpoints, svc *corev1.Service, ports []corev1.ServicePort) error {
176+
// nolint:staticcheck
176177
subsets := make([]corev1.EndpointSubset, 0, len(ports))
177178
for _, port := range ports {
178179
ds, err := r.portManager.AddTarget(ep.Name, ep.Namespace, int(port.Port))
@@ -185,6 +186,7 @@ func (r *ActivatorReconciler) injectEndpoint(ctx context.Context, ep *corev1.End
185186
"listenerPort", ds.Listener.Port(),
186187
)
187188

189+
// nolint:staticcheck
188190
subsets = append(subsets, corev1.EndpointSubset{
189191
Addresses: []corev1.EndpointAddress{{IP: r.ip}},
190192
Ports: []corev1.EndpointPort{{
@@ -207,7 +209,7 @@ func (r *ActivatorReconciler) injectEndpoint(ctx context.Context, ep *corev1.End
207209
if updatedSvc.Annotations == nil {
208210
updatedSvc.Annotations = make(map[string]string)
209211
}
210-
updatedSvc.Annotations[llmazcoreapi.CachedModelActivatorAnnoKey] = string(selectorBytes)
212+
updatedSvc.Annotations[llmazcorev1alpha1.CachedModelActivatorAnnoKey] = string(selectorBytes)
211213
updatedSvc.Spec.Selector = nil
212214
return r.Update(ctx, updatedSvc)
213215
}
@@ -226,7 +228,8 @@ func (r *ActivatorReconciler) handleServiceDeletion(namespace, name string) {
226228
}
227229
}
228230

229-
func (r *ActivatorReconciler) forwardEndpoint(ctx context.Context, ep *corev1.Endpoints, ports []corev1.ServicePort) error {
231+
// nolint:staticcheck
232+
func (r *ActivatorReconciler) forwardEndpoint(_ context.Context, ep *corev1.Endpoints, ports []corev1.ServicePort) error {
230233
for _, port := range ports {
231234
ds := r.portManager.RemoveTarget(ep.Name, ep.Namespace, int(port.Port))
232235
if ds == nil {
@@ -262,6 +265,7 @@ func (r *ActivatorReconciler) forwardEndpoint(ctx context.Context, ep *corev1.En
262265
return nil
263266
}
264267

268+
// nolint:staticcheck
265269
func (r *ActivatorReconciler) getEndpointAddress(ep *corev1.Endpoints, ports []corev1.ServicePort, target *Target) (string, error) {
266270
for _, port := range ports {
267271
if int(port.Port) != target.Port {
@@ -293,7 +297,7 @@ func (r *ActivatorReconciler) scaleUp(pi *PortInformation) {
293297
return
294298
}
295299

296-
name := svc.Annotations[llmazcoreapi.ModelActivatorAnnoKey]
300+
name := svc.Annotations[llmazcorev1alpha1.ModelActivatorAnnoKey]
297301
if name == "" {
298302
activatorControllerLog.Error(nil, "Scale annotation not found")
299303
return
@@ -356,7 +360,7 @@ func (r *ActivatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
356360
hasActivatorAnnotation := func(obj client.Object) bool {
357361
// Make sure the object has the activator annotation
358362
annotations := obj.GetAnnotations()
359-
_, ok := annotations[llmazcoreapi.ModelActivatorAnnoKey]
363+
_, ok := annotations[llmazcorev1alpha1.ModelActivatorAnnoKey]
360364
if ok {
361365
activatorControllerLog.V(4).Info("Object has activator annotation", "object", obj.GetName())
362366
}
@@ -378,6 +382,7 @@ func (r *ActivatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
378382
},
379383
})).
380384
Watches(
385+
// nolint:staticcheck
381386
&corev1.Endpoints{},
382387
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
383388
return []reconcile.Request{
@@ -403,8 +408,17 @@ func (r *ActivatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
403408
}
404409

405410
func tunnel(a, b net.Conn) {
406-
go io.Copy(a, b)
407-
go io.Copy(b, a)
411+
go func() {
412+
if _, err := io.Copy(a, b); err != nil {
413+
activatorControllerLog.Error(err, "Failed to copy")
414+
}
415+
}()
416+
417+
go func() {
418+
if _, err := io.Copy(b, a); err != nil {
419+
activatorControllerLog.Error(err, "Failed to copy")
420+
}
421+
}()
408422
}
409423

410424
type Listener interface {

pkg/controller/inference/service_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
131131
}
132132

133133
// Create a service for the leader pods of the lws for loadbalancing.
134-
if err := CreateServiceIfNotExists(ctx, r.Client, r.Scheme, service); err != nil {
134+
if err := CreateServiceIfNotExists(ctx, r.Client, r.Scheme, service, models); err != nil {
135135
return ctrl.Result{}, err
136136
}
137137

0 commit comments

Comments
 (0)