@@ -1260,7 +1260,7 @@ impl Build {
1260
1260
1261
1261
#[ cfg( feature = "parallel" ) ]
1262
1262
fn compile_objects ( & self , objs : & [ Object ] , print : & PrintThread ) -> Result < ( ) , Error > {
1263
- use std:: sync:: { mpsc:: channel , Once } ;
1263
+ use std:: sync:: { mpsc, Once } ;
1264
1264
1265
1265
if objs. len ( ) <= 1 {
1266
1266
for obj in objs {
@@ -1299,16 +1299,88 @@ impl Build {
1299
1299
// acquire the appropriate tokens, Once all objects have been compiled
1300
1300
// we wait on all the processes and propagate the results of compilation.
1301
1301
1302
- let ( tx, rx) = channel :: < ( _ , String , KillOnDrop , _ ) > ( ) ;
1302
+ let ( tx, rx) = mpsc :: channel :: < ( _ , String , KillOnDrop , _ ) > ( ) ;
1303
1303
1304
1304
// Since jobserver::Client::acquire can block, waiting
1305
1305
// must be done in parallel so that acquire won't block forever.
1306
1306
let wait_thread = thread:: Builder :: new ( ) . spawn ( move || {
1307
- for ( cmd, program, mut child, _token) in rx {
1308
- wait_on_child ( & cmd, & program, & mut child. 0 ) ?;
1309
- }
1307
+ let mut error = None ;
1308
+ let mut pendings = Vec :: new ( ) ;
1309
+ // Buffer the stdout
1310
+ let mut stdout = io:: BufWriter :: with_capacity ( 128 , io:: stdout ( ) ) ;
1311
+ let mut backoff_cnt = 0 ;
1312
+
1313
+ loop {
1314
+ let mut has_made_progress = false ;
1315
+
1316
+ // Reading new pending tasks
1317
+ loop {
1318
+ match rx. try_recv ( ) {
1319
+ Ok ( pending) => {
1320
+ has_made_progress = true ;
1321
+ pendings. push ( pending)
1322
+ }
1323
+ Err ( mpsc:: TryRecvError :: Disconnected ) if pendings. is_empty ( ) => {
1324
+ let _ = stdout. flush ( ) ;
1325
+ return if let Some ( err) = error {
1326
+ Err ( err)
1327
+ } else {
1328
+ Ok ( ( ) )
1329
+ } ;
1330
+ }
1331
+ _ => break ,
1332
+ }
1333
+ }
1310
1334
1311
- Ok ( ( ) )
1335
+ // Try waiting on them.
1336
+ pendings. retain_mut ( |( cmd, program, child, _) | {
1337
+ match try_wait_on_child ( cmd, program, & mut child. 0 , & mut stdout) {
1338
+ Ok ( Some ( ( ) ) ) => {
1339
+ // Task done, remove the entry
1340
+ has_made_progress = true ;
1341
+ false
1342
+ }
1343
+ Ok ( None ) => true , // Task still not finished, keep the entry
1344
+ Err ( err) => {
1345
+ // Task fail, remove the entry.
1346
+ has_made_progress = true ;
1347
+
1348
+ // Since we can only return one error, log the error to make
1349
+ // sure users always see all the compilation failures.
1350
+ let _ = writeln ! ( stdout, "cargo:warning={}" , err) ;
1351
+ error = Some ( err) ;
1352
+
1353
+ false
1354
+ }
1355
+ }
1356
+ } ) ;
1357
+
1358
+ if !has_made_progress {
1359
+ if backoff_cnt > 3 {
1360
+ // We have yielded at least three times without making'
1361
+ // any progress, so we will sleep for a while.
1362
+ let duration =
1363
+ std:: time:: Duration :: from_millis ( 100 * ( backoff_cnt - 3 ) . min ( 10 ) ) ;
1364
+ thread:: sleep ( duration) ;
1365
+ } else {
1366
+ // Given that we spawned a lot of compilation tasks, it is unlikely
1367
+ // that OS cannot find other ready task to execute.
1368
+ //
1369
+ // If all of them are done, then we will yield them and spawn more,
1370
+ // or simply returns.
1371
+ //
1372
+ // Thus this will not be turned into a busy-wait loop and it will not
1373
+ // waste CPU resource.
1374
+ thread:: yield_now ( ) ;
1375
+ }
1376
+ }
1377
+
1378
+ backoff_cnt = if has_made_progress {
1379
+ 0
1380
+ } else {
1381
+ backoff_cnt + 1
1382
+ } ;
1383
+ }
1312
1384
} ) ?;
1313
1385
1314
1386
for obj in objs {
@@ -1317,10 +1389,10 @@ impl Build {
1317
1389
1318
1390
let child = spawn ( & mut cmd, & program, print. pipe_writer_cloned ( ) ?. unwrap ( ) ) ?;
1319
1391
1320
- if tx. send ( ( cmd, program, KillOnDrop ( child) , token) ) . is_err ( ) {
1321
- break ;
1322
- }
1392
+ tx. send ( ( cmd, program, KillOnDrop ( child) , token) )
1393
+ . expect ( "Wait thread must be alive until all compilation jobs are done, otherwise we risk deadlock" ) ;
1323
1394
}
1395
+ // Drop tx so that the wait_thread could return
1324
1396
drop ( tx) ;
1325
1397
1326
1398
return wait_thread. join ( ) . expect ( "wait_thread panics" ) ;
@@ -3545,6 +3617,40 @@ fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(),
3545
3617
}
3546
3618
}
3547
3619
3620
+ #[ cfg( feature = "parallel" ) ]
3621
+ fn try_wait_on_child (
3622
+ cmd : & Command ,
3623
+ program : & str ,
3624
+ child : & mut Child ,
3625
+ stdout : & mut dyn io:: Write ,
3626
+ ) -> Result < Option < ( ) > , Error > {
3627
+ match child. try_wait ( ) {
3628
+ Ok ( Some ( status) ) => {
3629
+ let _ = writeln ! ( stdout, "{}" , status) ;
3630
+
3631
+ if status. success ( ) {
3632
+ Ok ( Some ( ( ) ) )
3633
+ } else {
3634
+ Err ( Error :: new (
3635
+ ErrorKind :: ToolExecError ,
3636
+ format ! (
3637
+ "Command {:?} with args {:?} did not execute successfully (status code {})." ,
3638
+ cmd, program, status
3639
+ ) ,
3640
+ ) )
3641
+ }
3642
+ }
3643
+ Ok ( None ) => Ok ( None ) ,
3644
+ Err ( e) => Err ( Error :: new (
3645
+ ErrorKind :: ToolExecError ,
3646
+ format ! (
3647
+ "Failed to wait on spawned child process, command {:?} with args {:?}: {}." ,
3648
+ cmd, program, e
3649
+ ) ,
3650
+ ) ) ,
3651
+ }
3652
+ }
3653
+
3548
3654
fn run_inner ( cmd : & mut Command , program : & str , pipe_writer : File ) -> Result < ( ) , Error > {
3549
3655
let mut child = spawn ( cmd, program, pipe_writer) ?;
3550
3656
wait_on_child ( cmd, program, & mut child)
0 commit comments