@@ -44,6 +44,8 @@ class LeaderHeartbeat {
44
44
// Careful, there is a move assignment happening!
45
45
}
46
46
47
+ std::atomic<int > start_id;
48
+
47
49
void startPoller () {
48
50
read_seq = 0 ;
49
51
@@ -202,7 +204,7 @@ class LeaderHeartbeat {
202
204
// std::cout << std::endl;
203
205
// }
204
206
205
- if (leader_pid () == ctx->cc .my_id ) {
207
+ if (leader_pid (start_id. load () ) == ctx->cc .my_id ) {
206
208
want_leader.store (true );
207
209
} else {
208
210
std::this_thread::sleep_for (std::chrono::milliseconds (50 ));
@@ -242,13 +244,15 @@ class LeaderHeartbeat {
242
244
};
243
245
244
246
private:
245
- int leader_pid () {
247
+ int leader_pid (int start_index ) {
246
248
int leader_id = -1 ;
247
249
248
- for (auto &pid : ids) {
250
+ for (int i = 0 ; i < static_cast <int >(ids.size ()); i++) {
251
+ int pid = ids[i];
249
252
// std::cout << pid << " " << status[pid].consecutive_updates <<
250
253
// std::endl;
251
254
if (status[pid].consecutive_updates > 2 ) {
255
+ // leader_id = pid;
252
256
leader_id = pid;
253
257
break ;
254
258
}
@@ -533,7 +537,10 @@ class LeaderSwitcher {
533
537
want_leader{&heartbeat->wantLeaderSignal ()},
534
538
read_slots{ctx->scratchpad .writeLeaderChangeSlots ()},
535
539
sz{read_slots.size ()},
540
+ // leader_start_index{(heartbeat->start_id == 0 ? 0 : heartbeat->start_id + 1)},
536
541
permission_asker{ctx} {
542
+ leader_start_index.store ((heartbeat->start_id .load () == 0 ? 0 : heartbeat->start_id .load () + 1 ));
543
+ std::cout << " inside leaderswitcher constructor: " << leader_start_index << std::endl;
537
544
prepareScanner ();
538
545
}
539
546
@@ -545,7 +552,7 @@ class LeaderSwitcher {
545
552
int force_reset = 0 ;
546
553
auto constexpr shift = 8 * sizeof (uintptr_t ) - 1 ;
547
554
548
- for (int i = 0 ; i < static_cast <int >(sz); i++) {
555
+ for (int i = leader_start_index. load () ; i < static_cast <int >(sz); i++) {
549
556
reading[i] = *reinterpret_cast <uint64_t *>(read_slots[i]);
550
557
force_reset = static_cast <int >(reading[i] >> shift);
551
558
reading[i] &= (1UL << shift) - 1 ;
@@ -556,12 +563,12 @@ class LeaderSwitcher {
556
563
break ;
557
564
}
558
565
}
559
-
560
566
// If you discovered a new request for a leader, notify the main event loop
561
567
// to give permissions to him and switch to follower.
562
568
if (requester > 0 ) {
563
- // std::cout << "Process with pid " << requester
564
- // << " asked for permissions" << std::endl;
569
+ std::cout << " index: " << leader_start_index.load () << std::endl;
570
+ std::cout << " Process with pid " << requester
571
+ << " asked for permissions" << std::endl;
565
572
leader.store (dory::Leader (requester, reading[requester], force_reset));
566
573
want_leader->store (false );
567
574
} else {
@@ -594,10 +601,10 @@ class LeaderSwitcher {
594
601
Leader current_leader = leader.load ();
595
602
if (current_leader != prev_leader || force_permission_request) {
596
603
// std::cout << "Adjusting connections to leader ("
597
- // << int(current_leader.requester) << " "
598
- // << current_leader.requester_value << ") " << (current_leader
599
- // != prev_leader) << " " << force_permission_request <<
600
- // std::endl;
604
+ // << int(current_leader.requester) << " "
605
+ // << current_leader.requester_value << ") " << (current_leader
606
+ // != prev_leader) << " " << force_permission_request <<
607
+ // std::endl;
601
608
602
609
auto orig_leader = prev_leader;
603
610
prev_leader = current_leader;
@@ -611,13 +618,13 @@ class LeaderSwitcher {
611
618
612
619
// GET_TIMESTAMP(ts_start);
613
620
614
- // std::cout << "Asking for permissions: " << hard_reset << std::endl;
621
+ std::cout << " Asking for permissions: " << hard_reset << std::endl;
615
622
// Ask for permission. Wait for everybody to reply
616
623
permission_asker.askForPermissions (hard_reset);
617
624
618
625
// GET_TIMESTAMP(ts_mid);
619
626
620
- // std::cout << "Waiting for approval" << std::endl;
627
+ std::cout << " Waiting for approval" << std::endl;
621
628
// In order to avoid a distributed deadlock (when two processes try
622
629
// to become leaders at the same time), we bail whe the leader
623
630
// changes.
@@ -643,7 +650,7 @@ class LeaderSwitcher {
643
650
// std::cout << "Asking for permissions: " << hard_reset << std::endl;
644
651
645
652
// std::cout << "I (process " << c_ctx->my_id << ") got leader "
646
- // << "approval" << std::endl;
653
+ // << "approval" << std::endl;
647
654
648
655
// GET_TIMESTAMP(ts_start);
649
656
if (hard_reset) {
@@ -828,6 +835,7 @@ class LeaderSwitcher {
828
835
std::vector<uint8_t *> dummy;
829
836
std::vector<uint8_t *> &read_slots;
830
837
size_t sz;
838
+ std::atomic<int > leader_start_index;
831
839
832
840
LeaderPermissionAsker permission_asker;
833
841
@@ -873,6 +881,15 @@ class LeaderElection {
873
881
return leader_switcher.leaderSignal ();
874
882
}
875
883
884
+ // made public
885
+ void stopHeartbreat () {
886
+ if (hb_started) {
887
+ hb_exit_signal.set_value ();
888
+ heartbeat_thd.join ();
889
+ hb_started = false ;
890
+ }
891
+ }
892
+
876
893
private:
877
894
void startHeartbeat () {
878
895
if (hb_started) {
@@ -881,7 +898,15 @@ class LeaderElection {
881
898
hb_started = true ;
882
899
883
900
leader_heartbeat = LeaderHeartbeat (&ctx);
901
+ if (threadConfig.prefix == " Secondary-" )
902
+ leader_heartbeat.start_id .store (1 );
903
+ else
904
+ leader_heartbeat.start_id .store (0 );
884
905
std::future<void > ftr = hb_exit_signal.get_future ();
906
+ uint64_t start_hb = std::chrono::duration_cast<std::chrono::microseconds>(
907
+ std::chrono::high_resolution_clock::now ().time_since_epoch ())
908
+ .count ();
909
+ std::cout << " started heartbeat at " << start_hb << std::endl;
885
910
heartbeat_thd = std::thread ([this , ftr = std::move (ftr)]() {
886
911
leader_heartbeat.startPoller ();
887
912
@@ -947,14 +972,22 @@ class LeaderElection {
947
972
response_blocked.store (false );
948
973
leader_heartbeat.scanHeartbeats ();
949
974
} else if (prev_command == ' c' ) {
950
- response_blocked.store (true );
975
+ response_blocked.store (true );
951
976
leader_heartbeat.retract ();
952
977
}
953
978
954
979
prev_command = current_command;
955
980
956
981
std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
957
982
983
+ // ADDED FARZIN
984
+ // std::cout << "checking exit" << std::endl;
985
+ if (ftr.wait_for (std::chrono::seconds (0 )) ==
986
+ std::future_status::ready) {
987
+ std::cout << " exiting heartbeat..." << std::endl;
988
+ break ;
989
+ }
990
+
958
991
// if (i == 0) {
959
992
// if (ftr.wait_for(std::chrono::seconds(0)) !=
960
993
// std::future_status::timeout) {
@@ -965,7 +998,8 @@ class LeaderElection {
965
998
// std::this_thread::sleep_for(std::chrono::seconds(10));
966
999
}
967
1000
968
- file_watcher_thd.join ();
1001
+ // file_watcher_thd.join();
1002
+ file_watcher_thd.detach ();
969
1003
});
970
1004
971
1005
if (threadConfig.pinThreads ) {
@@ -977,32 +1011,25 @@ class LeaderElection {
977
1011
}
978
1012
}
979
1013
980
- void stopHeartbreat () {
981
- if (hb_started) {
982
- hb_exit_signal.set_value ();
983
- heartbeat_thd.join ();
984
- hb_started = false ;
985
- }
986
- }
987
-
988
1014
void startLeaderSwitcher () {
989
1015
if (switcher_started) {
990
1016
throw std::runtime_error (" Already started" );
991
1017
}
992
1018
switcher_started = true ;
993
-
994
1019
leader_switcher = LeaderSwitcher (&ctx, &leader_heartbeat);
995
1020
std::future<void > ftr = switcher_exit_signal.get_future ();
996
1021
switcher_thd = std::thread ([this , ftr = std::move (ftr)]() {
997
1022
leader_switcher.startPoller ();
998
1023
for (unsigned long long i = 0 ;; i = (i + 1 ) & iterations_ftr_check) {
999
1024
leader_switcher.scanPermissions ();
1000
- if (i == 0 ) {
1001
- if (ftr.wait_for (std::chrono::seconds (0 )) !=
1002
- std::future_status::timeout) {
1003
- break ;
1004
- }
1005
- }
1025
+ // if (i == 0) {
1026
+ // std::cout << "inside exit check" << std::endl;
1027
+ // if (ftr.wait_for(std::chrono::seconds(0)) !=
1028
+ // std::future_status::timeout) {
1029
+ // std::cout << "exiting heartbeat..." << std::endl;
1030
+ // break;
1031
+ // }
1032
+ // }
1006
1033
}
1007
1034
});
1008
1035
0 commit comments