@@ -108,11 +108,13 @@ MpiHost::Result MpiHost::terminate()
108
108
for (Size i = 1 ; i < m_nodes.count (); i++)
109
109
{
110
110
static u8 packet[MpiProxy::MaximumPacketSize];
111
- Size packetSize = sizeof (packet );
111
+ Size packetSize = sizeof (MpiProxy::Header );
112
112
113
113
// Send terminate request to the remote node
114
114
MpiProxy::Header *hdr = (MpiProxy::Header *) packet;
115
115
hdr->operation = MpiProxy::MpiOpTerminate;
116
+ hdr->coreId = m_nodes[i]->coreId ;
117
+ hdr->rankId = i;
116
118
117
119
// Send the packet
118
120
const Result sendResult = sendPacket (i, packet, sizeof (MpiProxy::Header));
@@ -130,7 +132,7 @@ MpiHost::Result MpiHost::terminate()
130
132
return recvResult;
131
133
}
132
134
133
- // The packet must be a data response
135
+ // The packet must be a terminate response
134
136
const MpiProxy::Header *header = (const MpiProxy::Header *) packet;
135
137
if (header->operation != MpiProxy::MpiOpTerminate)
136
138
{
@@ -212,6 +214,7 @@ MpiHost::Result MpiHost::send(const void *buf,
212
214
MpiProxy::Header *hdr = (MpiProxy::Header *) packet;
213
215
hdr->operation = MpiProxy::MpiOpSend;
214
216
hdr->result = 0 ;
217
+ hdr->coreId = node->coreId ;
215
218
hdr->rankId = dest;
216
219
hdr->datatype = datatype;
217
220
hdr->datacount = count;
@@ -251,6 +254,7 @@ MpiHost::Result MpiHost::receive(void *buf,
251
254
// Send receive data request to the remote node
252
255
MpiProxy::Header *hdr = (MpiProxy::Header *) packet;
253
256
hdr->operation = MpiProxy::MpiOpRecv;
257
+ hdr->coreId = node->coreId ;
254
258
hdr->rankId = source;
255
259
hdr->datatype = datatype;
256
260
hdr->datacount = count;
@@ -414,7 +418,11 @@ MpiHost::Result MpiHost::parseHostsFile(const char *hostsfile)
414
418
MpiHost::Result MpiHost::startProcesses (int argc,
415
419
char **argv)
416
420
{
421
+ const Size NumOfParallelStart = 32 ;
422
+ static u8 packet[MpiProxy::MaximumPacketSize];
423
+ MpiProxy::Header *hdr = (MpiProxy::Header *) packet;
417
424
String cmdline;
425
+ Size startIndex = 1 , startCount = 0 ;
418
426
419
427
DEBUG (" argc = " << argc);
420
428
@@ -439,45 +447,57 @@ MpiHost::Result MpiHost::startProcesses(int argc,
439
447
// Start remote processes with the constructed command line
440
448
NOTICE (" cmdline = " << *cmdline);
441
449
442
- for (Size i = 1 ; i < m_nodes.count (); i++)
450
+ // Send out packets to all the hosts
451
+ while (startIndex < m_nodes.count ())
443
452
{
444
- in_addr nodeAddr;
445
- nodeAddr.s_addr = m_nodes[i]->ipAddress ;
446
-
447
- NOTICE (" nodes[" << i << " ] = " << inet_ntoa (nodeAddr) <<
448
- " :" << m_nodes[i]->udpPort << " :" << m_nodes[i]->coreId );
453
+ const Size receiveIndex = startIndex;
449
454
450
- // Construct packet to send
451
- u8 packet[MpiProxy::MaximumPacketSize];
452
- MpiProxy::Header *hdr = (MpiProxy::Header *) packet;
453
- hdr->operation = MpiProxy::MpiOpExec;
454
- hdr->result = 0 ;
455
- hdr->rankId = i;
456
- hdr->coreId = m_nodes[i]->coreId ;
457
-
458
- hdr->coreCount = m_nodes.count ();
459
-
460
- // Append command-line after the header
461
- MemoryBlock::copy (packet + sizeof (MpiProxy::Header), *cmdline,
462
- sizeof (packet) - sizeof (MpiProxy::Header));
463
-
464
- // Send the packet
465
- const Result sendResult = sendPacket (i, packet, sizeof (MpiProxy::Header) + cmdline.length ());
466
- if (sendResult != MPI_SUCCESS)
455
+ // Limit the number of parallel requests
456
+ while (startIndex < m_nodes.count () && startCount < NumOfParallelStart)
467
457
{
468
- ERROR (" failed to send packet to nodeId " << i << " : result = " << (int ) sendResult);
469
- return sendResult;
458
+ in_addr nodeAddr;
459
+ nodeAddr.s_addr = m_nodes[startIndex]->ipAddress ;
460
+
461
+ NOTICE (" nodes[" << startIndex << " ] = " << inet_ntoa (nodeAddr) <<
462
+ " :" << m_nodes[startIndex]->udpPort << " :" << m_nodes[startIndex]->coreId );
463
+
464
+ // Construct packet to send
465
+ hdr->operation = MpiProxy::MpiOpExec;
466
+ hdr->result = 0 ;
467
+ hdr->rankId = startIndex;
468
+ hdr->coreId = m_nodes[startIndex]->coreId ;
469
+ hdr->coreCount = m_nodes.count ();
470
+
471
+ // Append command-line after the header
472
+ MemoryBlock::copy ((char *)packet + sizeof (MpiProxy::Header), *cmdline,
473
+ sizeof (packet) - sizeof (MpiProxy::Header));
474
+
475
+ // Send the packet
476
+ const Result sendResult = sendPacket (startIndex, packet, sizeof (MpiProxy::Header) + cmdline.length ());
477
+ if (sendResult != MPI_SUCCESS)
478
+ {
479
+ ERROR (" failed to send packet to nodeId " << startIndex << " : result = " << (int ) sendResult);
480
+ return sendResult;
481
+ }
482
+ startIndex++;
483
+ startCount++;
470
484
}
471
485
472
- // Wait for acknowledge
473
- Size sz;
474
- const Result recvResult = receivePacket (i, MpiProxy::MpiOpExec, packet, sz);
475
- if (recvResult != MPI_SUCCESS)
486
+ // Wait for acknowledge of each node
487
+ for (Size i = receiveIndex; i < startIndex; i++)
476
488
{
477
- ERROR (" failed to receive acknowledge for MpiOpExec from nodeId " <<
478
- i << " : result = " << (int ) recvResult);
479
- return recvResult;
489
+ Size sz;
490
+ sz = sizeof (MpiProxy::Header);
491
+
492
+ const Result recvResult = receivePacket (i, MpiProxy::MpiOpExec, &packet, sz);
493
+ if (recvResult != MPI_SUCCESS)
494
+ {
495
+ ERROR (" failed to receive acknowledge for MpiOpExec from nodeId " <<
496
+ i << " : result = " << (int ) recvResult);
497
+ return recvResult;
498
+ }
480
499
}
500
+ startCount = 0 ;
481
501
}
482
502
483
503
return MPI_SUCCESS;
@@ -529,7 +549,8 @@ MpiHost::Result MpiHost::receivePacket(const Size nodeId,
529
549
530
550
in_addr nodeAddr;
531
551
nodeAddr.s_addr = node->ipAddress ;
532
- DEBUG (" node = " << inet_ntoa (nodeAddr) << " operation = " << (int ) operation);
552
+ DEBUG (" nodeId = " << nodeId << " addr = " << inet_ntoa (nodeAddr) <<
553
+ " operation = " << (int ) operation << " size = " << size);
533
554
534
555
// Process buffered packets first
535
556
for (ListIterator<Packet *> i (m_packetBuffers[nodeId]); i.hasCurrent (); i++)
@@ -539,7 +560,6 @@ MpiHost::Result MpiHost::receivePacket(const Size nodeId,
539
560
540
561
if (hdr->operation == operation)
541
562
{
542
- DEBUG (" buffered packet: " << pkt->size << " bytes" );
543
563
MemoryBlock::copy (packet, pkt->data , pkt->size );
544
564
size = pkt->size ;
545
565
delete[] pkt->data ;
@@ -555,25 +575,28 @@ MpiHost::Result MpiHost::receivePacket(const Size nodeId,
555
575
{
556
576
struct sockaddr_in addr;
557
577
socklen_t len = sizeof (addr);
578
+ const Size recvSize = size;
558
579
559
580
// Receive UDP datagram
560
- int r = recvfrom (m_sock, packet, size , 0 ,
581
+ int r = recvfrom (m_sock, packet, recvSize , 0 ,
561
582
(struct sockaddr *) &addr, &len);
562
583
if (r < 0 )
563
584
{
564
- ERROR (" failed to receive UDP datagram: " << strerror (errno));
585
+ ERROR (" failed to receive UDP datagram on socket " << m_sock << " : " << strerror (errno));
565
586
return MPI_ERR_IO;
566
587
}
567
588
568
- size = r;
589
+ const MpiProxy::Header *hdr = (const MpiProxy::Header *) packet;
590
+
569
591
DEBUG (" received " << r << " bytes from " << inet_ntoa (addr.sin_addr ) <<
570
- " at port " << htons (addr.sin_port ));
592
+ " :" << htons (addr.sin_port ) << " with coreId = " << hdr->coreId <<
593
+ " rankId = " << hdr->rankId );
571
594
572
595
// Is this packet targeted for the given node?
573
- if (addr.sin_addr .s_addr == node->ipAddress && htons (addr.sin_port ) == node->udpPort )
596
+ if (addr.sin_addr .s_addr == node->ipAddress &&
597
+ htons (addr.sin_port ) == node->udpPort &&
598
+ hdr->coreId == node->coreId )
574
599
{
575
- const MpiProxy::Header *hdr = (const MpiProxy::Header *) packet;
576
-
577
600
// Verify the MPI operation
578
601
if (hdr->operation != operation)
579
602
{
@@ -583,27 +606,52 @@ MpiHost::Result MpiHost::receivePacket(const Size nodeId,
583
606
return MPI_ERR_IO;
584
607
}
585
608
609
+ DEBUG (" done" );
610
+ size = r;
586
611
return MPI_SUCCESS;
587
612
}
588
613
// Add the packet to internal buffers for later retrieval
589
614
else
590
615
{
591
- Packet *pkt = new Packet;
592
- if (!pkt)
616
+ Size otherNodeId = 0 ;
617
+
618
+ // Find the corresponding node
619
+ for (Size i = 0 ; i < m_nodes.count (); i++)
593
620
{
594
- ERROR (" failed to allocate Packet struct for buffering: " << strerror (errno));
595
- return MPI_ERR_NO_MEM;
621
+ if (addr.sin_addr .s_addr == m_nodes[i]->ipAddress &&
622
+ htons (addr.sin_port ) == m_nodes[i]->udpPort &&
623
+ hdr->coreId == m_nodes[i]->coreId )
624
+ {
625
+ otherNodeId = i;
626
+ break ;
627
+ }
596
628
}
597
629
598
- pkt->data = new u8[size];
599
- if (!pkt->data )
630
+ if (otherNodeId == 0 )
600
631
{
601
- ERROR (" failed to allocate memory for buffered packet: " << strerror (errno));
602
- return MPI_ERR_NO_MEM;
632
+ ERROR (" nodeId not found for packet from " << inet_ntoa (addr.sin_addr ) <<
633
+ " at port " << htons (addr.sin_port ));
634
+ }
635
+ else
636
+ {
637
+ Packet *pkt = new Packet;
638
+ if (!pkt)
639
+ {
640
+ ERROR (" failed to allocate Packet struct for buffering: " << strerror (errno));
641
+ return MPI_ERR_NO_MEM;
642
+ }
643
+
644
+ pkt->data = new u8[r];
645
+ if (!pkt->data )
646
+ {
647
+ ERROR (" failed to allocate memory for buffered packet: " << strerror (errno));
648
+ return MPI_ERR_NO_MEM;
649
+ }
650
+
651
+ MemoryBlock::copy (pkt->data , hdr, r);
652
+ pkt->size = r;
653
+ m_packetBuffers[otherNodeId]->append (pkt);
603
654
}
604
-
605
- pkt->size = size;
606
- m_packetBuffers[nodeId]->append (pkt);
607
655
}
608
656
}
609
657
0 commit comments