@@ -17,7 +17,6 @@ limitations under the License.
1717package plugin
1818
1919import (
20- "context"
2120 "fmt"
2221 "io"
2322 "os/exec"
@@ -143,15 +142,15 @@ func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) {
143142}
144143
145144func (p * Plugin ) run (rule cpmtypes.CustomRule ) (exitStatus cpmtypes.Status , output string ) {
146- var ctx context. Context
147- var cancel context. CancelFunc
145+ isTimeout := false
146+ isHung := false
148147
148+ var timeoutDuration time.Duration
149149 if rule .Timeout != nil && * rule .Timeout < * p .config .PluginGlobalConfig .Timeout {
150- ctx , cancel = context . WithTimeout ( context . Background (), * rule .Timeout )
150+ timeoutDuration = * rule .Timeout
151151 } else {
152- ctx , cancel = context . WithTimeout ( context . Background (), * p .config .PluginGlobalConfig .Timeout )
152+ timeoutDuration = * p .config .PluginGlobalConfig .Timeout
153153 }
154- defer cancel ()
155154
156155 cmd := util .Exec (rule .Path , rule .Args ... )
157156
@@ -170,37 +169,6 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
170169 return cpmtypes .Unknown , "Error in starting plugin. Please check the error log"
171170 }
172171
173- waitChan := make (chan struct {})
174- defer close (waitChan )
175-
176- var m sync.Mutex
177- timeout := false
178-
179- go func () {
180- select {
181- case <- ctx .Done ():
182- if ctx .Err () == context .Canceled {
183- return
184- }
185- klog .Errorf ("Error in running plugin timeout %q" , rule .Path )
186- if cmd .Process == nil || cmd .Process .Pid == 0 {
187- klog .Errorf ("Error in cmd.Process check %q" , rule .Path )
188- break
189- }
190-
191- m .Lock ()
192- timeout = true
193- m .Unlock ()
194-
195- err := util .Kill (cmd )
196- if err != nil {
197- klog .Errorf ("Error in kill process %d, %v" , cmd .Process .Pid , err )
198- }
199- case <- waitChan :
200- return
201- }
202- }()
203-
204172 var (
205173 wg sync.WaitGroup
206174 stdout []byte
@@ -220,14 +188,46 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
220188 }()
221189 // This will wait for the reads to complete. If the execution times out, the pipes
222190 // will be closed and the wait group unblocks.
223- wg .Wait ()
191+ // If the timeout is caused by the plugin process or sub-process hung due to GPU device errors or other reasons,
192+ // wg.Wait() will be blocked forever, so we need to add a timeout to the wait group.
193+ waitChan := make (chan struct {})
194+ go func () {
195+ wg .Wait ()
196+ close (waitChan )
197+ }()
198+ select {
199+ case <- waitChan :
200+ // The reads are done.
201+ break
202+ case <- time .After (timeoutDuration ):
203+ klog .Errorf ("Waiting for command output timed out when running plugin %q" , rule .Path )
204+ isTimeout = true
205+ err := util .Kill (cmd )
206+ if err != nil {
207+ klog .Errorf ("Error when killing process %d: %v" , cmd .Process .Pid , err )
208+ } else {
209+ klog .Infof ("Killed process %d successfully" , cmd .Process .Pid )
210+ }
224211
225- if stdoutErr != nil {
212+ // Check if the process is in D state. If it is, the process is hung and can not be killed.
213+ // It also means that the plugin can not report the correct status, instead reports Unknown status.
214+ // On a GPU machine, a plugin with Python script calling pynvml API may hang in D state due to some GPU device errors.
215+ if util .IsProcessInDState (cmd .Process .Pid ) {
216+ klog .Errorf ("Process %d is hung in D state" , cmd .Process .Pid )
217+ isHung = true
218+ }
219+ }
220+
221+ if isHung {
222+ return cpmtypes .Unknown , fmt .Sprintf ("Process is hung when running plugin %s" , rule .Path )
223+ }
224+
225+ if ! isTimeout && stdoutErr != nil {
226226 klog .Errorf ("Error reading stdout for plugin %q: error - %v" , rule .Path , err )
227227 return cpmtypes .Unknown , "Error reading stdout for plugin. Please check the error log"
228228 }
229229
230- if stderrErr != nil {
230+ if ! isTimeout && stderrErr != nil {
231231 klog .Errorf ("Error reading stderr for plugin %q: error - %v" , rule .Path , err )
232232 return cpmtypes .Unknown , "Error reading stderr for plugin. Please check the error log"
233233 }
@@ -239,16 +239,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
239239 }
240240 }
241241
242- // trim suffix useless bytes
243- output = string (stdout )
244- output = strings .TrimSpace (output )
245-
246- m .Lock ()
247- cmdKilled := timeout
248- m .Unlock ()
249-
250- if cmdKilled {
251- output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), output )
242+ stderrStr := ""
243+ if isTimeout {
244+ output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), "" )
245+ } else {
246+ // trim suffix useless bytes
247+ output = strings .TrimSpace (string (stdout ))
248+ stderrStr = strings .TrimSpace (string (stderr ))
252249 }
253250
254251 // cut at position max_output_length if stdout is longer than max_output_length bytes
@@ -259,13 +256,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
259256 exitCode := cmd .ProcessState .Sys ().(syscall.WaitStatus ).ExitStatus ()
260257 switch exitCode {
261258 case 0 :
262- logPluginStderr (rule , string ( stderr ) , 3 )
259+ logPluginStderr (rule , stderrStr , 3 )
263260 return cpmtypes .OK , output
264261 case 1 :
265- logPluginStderr (rule , string ( stderr ) , 0 )
262+ logPluginStderr (rule , stderrStr , 0 )
266263 return cpmtypes .NonOK , output
267264 default :
268- logPluginStderr (rule , string ( stderr ) , 0 )
265+ logPluginStderr (rule , stderrStr , 0 )
269266 return cpmtypes .Unknown , output
270267 }
271268}
0 commit comments