4
4
package utils
5
5
6
6
import (
7
+ "context"
7
8
"errors"
8
9
"fmt"
9
10
"os"
@@ -19,7 +20,6 @@ import (
19
20
)
20
21
21
22
var unmount = syscall .Unmount
22
- var command = exec .Command
23
23
24
24
var ErrTimeoutWaitProcess = errors .New ("timeout waiting for process to end" )
25
25
@@ -33,37 +33,59 @@ type MounterOptsUtils struct {
33
33
34
34
func (su * MounterOptsUtils ) FuseMount (path string , comm string , args []string ) error {
35
35
klog .Info ("-FuseMount-" )
36
- klog .Infof ("FuseMount params:\n \t path: <%s>\n \t command: <%s>\n \t args: <%v>" , path , comm , args )
37
- cmd := command (comm , args ... )
36
+ klog .Infof ("FuseMount: params:\n \t path: <%s>\n \t command: <%s>\n \t args: <%v>" , path , comm , args )
37
+
38
+ ctx , cancel := context .WithCancel (context .Background ())
39
+ defer cancel ()
40
+
41
+ cmd := exec .CommandContext (ctx , comm , args ... )
38
42
err := cmd .Start ()
39
43
if err != nil {
40
44
klog .Errorf ("FuseMount: command start failed: mounter=%s, args=%v, error=%v" , comm , args , err )
41
45
return fmt .Errorf ("FuseMount: '%s' command start failed: %v" , comm , err )
42
46
}
43
47
klog .Infof ("FuseMount: command 'start' succeeded for '%s' mounter" , comm )
44
48
45
- err = cmd .Wait ()
46
- if err != nil {
47
- klog .Warningf ("FuseMount: command wait failed: mounter=%s, args=%v, error=%v" , comm , args , err )
48
- klog .Infof ("FuseMount: checking if path already exists and is a mountpoint. path=%s" , path )
49
- if mounted , err1 := isMountpoint (path ); err1 == nil && mounted { // check if bucket already got mounted
50
- klog .Infof ("bucket is already mounted using '%s' mounter" , comm )
51
- return nil
49
+ waitCh := make (chan error , 1 )
50
+ mountCh := make (chan error , 1 )
51
+
52
+ go func () {
53
+ waitCh <- cmd .Wait ()
54
+ }()
55
+
56
+ go func () {
57
+ mountCh <- waitForMount (ctx , path , 2 * time .Second , 90 * time .Second ) // kubelet retries NodePublishVolume after 120 seconds
58
+ }()
59
+
60
+ select {
61
+ case err := <- waitCh :
62
+ if err != nil {
63
+ klog .Warningf ("FuseMount: command 'wait' failed: mounter=%s, args=%v, error=%v" , comm , args , err )
64
+ klog .Infof ("FuseMount: checking if path already exists and is a mountpoint: path=%s" , path )
65
+ if mounted , err1 := isMountpoint (path ); err1 == nil && mounted { // check if bucket already got mounted
66
+ klog .Infof ("bucket is already mounted using '%s' mounter" , comm )
67
+ return nil
68
+ }
69
+ return fmt .Errorf ("'%s' mount failed: %v" , comm , err )
70
+ }
71
+ klog .Infof ("FuseMount: command 'wait' succeeded for '%s' mounter" , comm )
72
+ if err := waitForMount (ctx , path , 0 , 30 * time .Second ); err != nil {
73
+ return err
52
74
}
53
- klog .Errorf ("FuseMount: path is not mountpoint. Mount failed: mounter=%s, path=%s" , comm , path )
54
- return fmt .Errorf ("'%s' mount failed: %v" , comm , err )
55
- }
56
75
57
- klog .Infof ("FuseMount: command 'wait' succeeded for '%s' mounter" , comm )
58
- if err := waitForMount (path , 10 * time .Second ); err != nil {
59
- return err
76
+ case err := <- mountCh :
77
+ if err != nil {
78
+ klog .Errorf ("FuseMount: path is not mountpoint. Mount failed: mounter=%s, path=%s" , comm , path )
79
+ return fmt .Errorf ("'%s' mount failed: %v" , comm , err )
80
+ }
60
81
}
82
+
61
83
klog .Infof ("bucket mounted successfully using '%s' mounter" , comm )
62
84
return nil
63
85
}
64
86
65
87
func (su * MounterOptsUtils ) FuseUnmount (path string ) error {
66
- klog .Info ("-fuseUnmount -" )
88
+ klog .Info ("-FuseUnmount -" )
67
89
// check if mountpoint exists
68
90
isMount , checkMountErr := isMountpoint (path )
69
91
if isMount || checkMountErr != nil {
@@ -136,23 +158,31 @@ func isMountpoint(pathname string) (bool, error) {
136
158
return false , nil
137
159
}
138
160
139
- func waitForMount (path string , timeout time.Duration ) error {
161
+ func waitForMount (ctx context.Context , path string , initialDelaySeconds , timeout time.Duration ) error {
162
+ if initialDelaySeconds > 0 {
163
+ time .Sleep (initialDelaySeconds )
164
+ }
140
165
var elapsed time.Duration
141
166
attempt := 1
142
167
for {
143
- isMount , err := k8sMountUtils .New ("" ).IsMountPoint (path )
144
- if err == nil && isMount {
145
- klog .Infof ("Path is a mountpoint: pathname: %s" , path )
146
- return nil
147
- }
168
+ select {
169
+ case <- ctx .Done ():
170
+ return ctx .Err ()
171
+ default :
172
+ isMount , err := k8sMountUtils .New ("" ).IsMountPoint (path )
173
+ if err == nil && isMount {
174
+ klog .Infof ("Path is a mountpoint: pathname: %s" , path )
175
+ return nil
176
+ }
148
177
149
- klog .Infof ("Mountpoint check in progress: attempt=%d, path=%s, isMount=%v, err=%v" , attempt , path , isMount , err )
150
- time .Sleep (constants .Interval )
151
- elapsed += constants .Interval
152
- if elapsed >= timeout {
153
- return fmt .Errorf ("timeout waiting for mount. Last check response: isMount=%v, err=%v" , isMount , err )
178
+ klog .Infof ("Mountpoint check in progress: attempt=%d, path=%s, isMount=%v, err=%v, timeout=%v" , attempt , path , isMount , err , timeout )
179
+ time .Sleep (constants .Interval )
180
+ elapsed += constants .Interval
181
+ if elapsed >= timeout {
182
+ return fmt .Errorf ("timeout waiting for mount. Last check response: isMount=%v, err=%v, timeout=%v" , isMount , err , constants .Timeout )
183
+ }
184
+ attempt ++
154
185
}
155
- attempt ++
156
186
}
157
187
}
158
188
0 commit comments