@@ -2,20 +2,20 @@ package tunnel
2
2
3
3
import (
4
4
"context"
5
- "errors"
5
+ goerrors "errors"
6
6
"fmt"
7
7
"slices"
8
8
"sync"
9
9
10
10
"github.com/pion/ice/v4"
11
+ "k8s.io/apimachinery/pkg/api/errors"
11
12
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12
13
"k8s.io/client-go/util/retry"
13
14
"k8s.io/utils/ptr"
14
15
ctrl "sigs.k8s.io/controller-runtime"
15
16
"sigs.k8s.io/controller-runtime/pkg/builder"
16
17
"sigs.k8s.io/controller-runtime/pkg/client"
17
18
"sigs.k8s.io/controller-runtime/pkg/controller"
18
- "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
19
19
clog "sigs.k8s.io/controller-runtime/pkg/log"
20
20
"sigs.k8s.io/controller-runtime/pkg/predicate"
21
21
@@ -24,10 +24,6 @@ import (
24
24
corev1alpha "github.com/apoxy-dev/apoxy-cli/api/core/v1alpha"
25
25
)
26
26
27
- const (
28
- tunnelPeerOfferFinalizer = "tunnelpeeroffer.apoxy.dev/finalizer"
29
- )
30
-
31
27
type tunnelPeerOfferReconciler struct {
32
28
client.Client
33
29
@@ -53,73 +49,81 @@ func (r *tunnelPeerOfferReconciler) Reconcile(ctx context.Context, req ctrl.Requ
53
49
log .Info ("Reconciling TunnelPeerOffer" )
54
50
55
51
tunnelPeerOffer := & corev1alpha.TunnelPeerOffer {}
56
- if err := r .Get (ctx , req .NamespacedName , tunnelPeerOffer ); err != nil {
57
- return ctrl.Result {}, client .IgnoreNotFound (err )
58
- }
59
-
60
- if tunnelPeerOffer .Spec .RemoteTunnelNodeName == r .localTunnelNodeName { // Remote offer.
61
- remoteName , err := getOfferOwner (ctx , r .Client , tunnelPeerOffer )
62
- if err != nil {
63
- return ctrl.Result {}, err
64
- }
65
- log .Info ("Offer controlled by remote node, starting ICE negotiation" , "RemotePeer" , remoteName )
66
-
52
+ if err := r .Get (ctx , req .NamespacedName , tunnelPeerOffer ); client .IgnoreNotFound (err ) != nil {
53
+ return ctrl.Result {}, err
54
+ } else if errors .IsNotFound (err ) {
55
+ remoteName := req .Name
67
56
r .mu .Lock ()
68
- defer r .mu .Unlock ()
69
- peer , ok := r .peers [remoteName ]
70
- if ! ok { // Haven't started our end of the ICE negotiation yet.
71
- return ctrl.Result {Requeue : true }, nil
72
- }
73
-
74
- remoteOffer := tunnelPeerOffer .Spec .Offer
75
- if remoteOffer == nil {
76
- log .Info ("ICE offer not yet created" )
77
- return ctrl.Result {}, nil // Will re-trigger when offer is created.
57
+ if peer , ok := r .peers [remoteName ]; ok {
58
+ peer .Close ()
59
+ delete (r .peers , remoteName )
60
+ log .Info ("Deleted TunnelPeerOffer" )
78
61
}
62
+ r .mu .Unlock ()
79
63
80
- log .Info ("Connecting to remote peer" , "RemotePeer" , remoteName )
81
-
82
- return r .connect (ctx , remoteName , req , peer , remoteOffer )
64
+ return ctrl.Result {}, nil
83
65
}
84
66
85
- remoteName := tunnelPeerOffer .Spec .RemoteTunnelNodeName
86
-
87
- if tunnelPeerOffer .ObjectMeta .DeletionTimestamp .IsZero () {
88
- if ! controllerutil .ContainsFinalizer (tunnelPeerOffer , tunnelPeerOfferFinalizer ) {
89
- controllerutil .AddFinalizer (tunnelPeerOffer , tunnelPeerOfferFinalizer )
90
- if err := r .Update (ctx , tunnelPeerOffer ); err != nil {
67
+ if tunnelPeerOffer .Spec .RemoteTunnelNodeName == r .localTunnelNodeName { // Remote offer.
68
+ switch tunnelPeerOffer .Status .Phase {
69
+ case corev1alpha .TunnelPeerOfferPhaseConnected :
70
+ log .Info ("Already connected, ignoring" )
71
+ return ctrl.Result {}, nil
72
+ case corev1alpha .TunnelPeerOfferPhaseFailed :
73
+ log .Info ("Remote peer offer is in failed, ignoring" )
74
+ return ctrl.Result {}, nil
75
+ case corev1alpha .TunnelPeerOfferPhaseConnecting :
76
+ remoteName , err := getOfferOwner (ctx , r .Client , tunnelPeerOffer )
77
+ if err != nil {
91
78
return ctrl.Result {}, err
92
79
}
93
- }
94
- } else {
95
- if controllerutil . ContainsFinalizer ( tunnelPeerOffer , tunnelPeerOfferFinalizer ) {
80
+
81
+ log . Info ( "Offer controlled by remote node, starting ICE negotiation" , "RemotePeer" , remoteName )
82
+
96
83
r .mu .Lock ()
97
- if peer , ok := r .peers [remoteName ]; ok {
98
- peer .Close ()
99
- delete (r .peers , remoteName )
84
+ defer r .mu .Unlock ()
85
+ peer , ok := r .peers [remoteName ]
86
+ if ! ok { // Haven't started our end of the ICE negotiation yet.
87
+ return ctrl.Result {Requeue : true }, nil
100
88
}
101
- r .mu .Unlock ()
102
- controllerutil .RemoveFinalizer (tunnelPeerOffer , tunnelPeerOfferFinalizer )
103
- if err := r .Update (ctx , tunnelPeerOffer ); err != nil {
104
- return ctrl.Result {}, err
89
+
90
+ remoteOffer := tunnelPeerOffer .Spec .Offer
91
+ if remoteOffer == nil {
92
+ log .Info ("ICE offer not yet created" )
93
+ return ctrl.Result {}, nil // Will re-trigger when offer is created.
105
94
}
106
- log .Info ("Deleted TunnelPeerOffer" )
107
- }
108
95
109
- log .V ( 1 ). Info ("TunnelPeerOffer is being deleted" )
96
+ log .Info ("Connecting to remote peer" , "RemotePeer" , remoteName )
110
97
111
- return ctrl.Result {}, nil // Already deleted, nothing to do.
98
+ return r .connect (ctx , remoteName , req , peer , remoteOffer )
99
+ default :
100
+ log .Info ("Remote offer is in unknown state, ignoring" )
101
+ return ctrl.Result {}, nil
102
+ }
112
103
}
113
104
114
- log .Info ("Offer controlled by local node, starting ICE negotiation" , "RemotePeer" , remoteName )
105
+ remoteName := tunnelPeerOffer .Spec .RemoteTunnelNodeName
106
+ log .Info ("Offer controlled by local node" , "RemotePeer" , remoteName )
115
107
116
108
r .mu .Lock ()
117
109
defer r .mu .Unlock ()
118
110
peer , ok := r .peers [remoteName ]
119
- if ok { // Already connected, just return.
120
- return ctrl.Result {}, nil
111
+ if ok {
112
+ switch tunnelPeerOffer .Status .Phase {
113
+ case corev1alpha .TunnelPeerOfferPhaseConnected , corev1alpha .TunnelPeerOfferPhaseConnecting :
114
+ return ctrl.Result {}, nil // Already connected, or connecting, just return.
115
+ case corev1alpha .TunnelPeerOfferPhaseFailed :
116
+ log .Info ("Failed state, cleaning up peer" )
117
+ peer .Close ()
118
+ delete (r .peers , remoteName )
119
+ return ctrl.Result {}, nil
120
+ default :
121
+ return ctrl.Result {}, nil
122
+ }
121
123
}
122
124
125
+ log .Info ("Starting ICE negotiation" , "RemotePeer" , remoteName )
126
+
123
127
var err error
124
128
isControlling := r .localTunnelNodeName > remoteName
125
129
peer , err = r .bind .NewPeer (ctx , isControlling )
@@ -149,6 +153,45 @@ func (r *tunnelPeerOfferReconciler) Reconcile(ctx context.Context, req ctrl.Requ
149
153
log .Error (err , "Failed to update tunnel peer offer status" )
150
154
}
151
155
}
156
+ peer .OnConnected = func () {
157
+ log .Info ("ICE connection established" )
158
+ if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
159
+ var tn corev1alpha.TunnelPeerOffer
160
+ if err := r .Get (ctx , req .NamespacedName , & tn ); err != nil {
161
+ return err
162
+ }
163
+ tn .Status .Conditions = append (tn .Status .Conditions , metav1.Condition {
164
+ Type : "Connected" ,
165
+ Status : metav1 .ConditionTrue ,
166
+ Reason : "IceConnected" ,
167
+ LastTransitionTime : metav1 .Now (),
168
+ })
169
+ tn .Status .Phase = corev1alpha .TunnelPeerOfferPhaseConnected
170
+ return r .Status ().Update (ctx , & tn )
171
+ }); err != nil {
172
+ log .Error (err , "Failed to update tunnel peer offer status" )
173
+ }
174
+ }
175
+ peer .OnDisconnected = func (msg string ) {
176
+ log .Info ("ICE connection disconnected" , "Reason" , msg )
177
+ if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
178
+ var tn corev1alpha.TunnelPeerOffer
179
+ if err := r .Get (ctx , req .NamespacedName , & tn ); err != nil {
180
+ return err
181
+ }
182
+ tn .Status .Phase = corev1alpha .TunnelPeerOfferPhaseFailed
183
+ tn .Status .Conditions = append (tn .Status .Conditions , metav1.Condition {
184
+ Type : "ICEConnected" ,
185
+ Status : metav1 .ConditionFalse ,
186
+ Reason : "Failed" ,
187
+ Message : fmt .Sprintf ("Peer %s failed to connect: %v" , remoteName , msg ),
188
+ LastTransitionTime : metav1 .Now (),
189
+ })
190
+ return r .Status ().Update (ctx , & tn )
191
+ }); err != nil {
192
+ log .Error (err , "Failed to update tunnel peer offer status" )
193
+ }
194
+ }
152
195
if err := peer .Init (ctx ); err != nil {
153
196
log .Error (err , "Failed to initialize ICE peer" )
154
197
peer .Close ()
@@ -157,6 +200,17 @@ func (r *tunnelPeerOfferReconciler) Reconcile(ctx context.Context, req ctrl.Requ
157
200
158
201
r .peers [remoteName ] = peer
159
202
203
+ if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
204
+ var tn corev1alpha.TunnelPeerOffer
205
+ if err := r .Get (ctx , req .NamespacedName , & tn ); err != nil {
206
+ return err
207
+ }
208
+ tn .Status .Phase = corev1alpha .TunnelPeerOfferPhaseConnecting
209
+ return r .Status ().Update (ctx , & tn )
210
+ }); err != nil {
211
+ log .Error (err , "Failed to update tunnel peer offer status" )
212
+ }
213
+
160
214
return ctrl.Result {}, nil // Wait until the remote offer is created.
161
215
}
162
216
@@ -180,19 +234,21 @@ func (r *tunnelPeerOfferReconciler) connect(
180
234
}
181
235
182
236
go func () {
183
- if err := peer .Connect (ctx , remoteName ); err != nil && ! errors .Is (err , ice .ErrMultipleStart ) {
237
+ if err := peer .Connect (ctx , remoteName ); err != nil && ! goerrors .Is (err , ice .ErrMultipleStart ) {
184
238
log .Error (err , "Failed to connect to ICE peer" )
185
239
186
240
if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
187
241
var tunnelPeerOffer corev1alpha.TunnelPeerOffer
188
242
if err := r .Get (ctx , req .NamespacedName , & tunnelPeerOffer ); err != nil {
189
243
return err
190
244
}
245
+ tunnelPeerOffer .Status .Phase = corev1alpha .TunnelPeerOfferPhaseFailed
191
246
tunnelPeerOffer .Status .Conditions = append (tunnelPeerOffer .Status .Conditions , metav1.Condition {
192
- Type : "Connected" ,
193
- Status : metav1 .ConditionFalse ,
194
- Reason : "Failed" ,
195
- Message : fmt .Sprintf ("Peer %s failed to connect: %v" , r .localTunnelNodeName , err ),
247
+ Type : "Connected" ,
248
+ Status : metav1 .ConditionFalse ,
249
+ Reason : "Failed" ,
250
+ Message : fmt .Sprintf ("Peer %s failed to connect: %v" , r .localTunnelNodeName , err ),
251
+ LastTransitionTime : metav1 .Now (),
196
252
})
197
253
return r .Status ().Update (ctx , & tunnelPeerOffer )
198
254
}); err != nil {
@@ -208,26 +264,10 @@ func (r *tunnelPeerOfferReconciler) connect(
208
264
peer .Close ()
209
265
210
266
return
211
- } else if errors .Is (err , ice .ErrMultipleStart ) {
267
+ } else if goerrors .Is (err , ice .ErrMultipleStart ) {
212
268
log .Info ("ICE connection already established, ignoring" )
213
269
return
214
270
}
215
-
216
- if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
217
- var tunnelPeerOffer corev1alpha.TunnelPeerOffer
218
- if err := r .Get (ctx , req .NamespacedName , & tunnelPeerOffer ); err != nil {
219
- return err
220
- }
221
- tunnelPeerOffer .Status .Conditions = append (tunnelPeerOffer .Status .Conditions , metav1.Condition {
222
- Type : "Connected" ,
223
- Status : metav1 .ConditionTrue ,
224
- Reason : "Success" ,
225
- Message : fmt .Sprintf ("Peer %s successfully connected" , r .localTunnelNodeName ),
226
- })
227
- return r .Status ().Update (ctx , & tunnelPeerOffer )
228
- }); err != nil {
229
- log .Error (err , "Failed to update tunnel peer offer status" )
230
- }
231
271
}()
232
272
233
273
return ctrl.Result {}, nil
0 commit comments