diff --git a/src/base/task_desc.proto b/src/base/task_desc.proto index 2e828348c..f90ffa2e0 100644 --- a/src/base/task_desc.proto +++ b/src/base/task_desc.proto @@ -89,4 +89,6 @@ message TaskDescriptor { repeated LabelSelector label_selectors = 33; /* Affinity */ Affinity affinity = 34; + // Namespace + string task_namespace = 35; } diff --git a/src/engine/coordinator.cc b/src/engine/coordinator.cc index c8627b69b..dd941bc4d 100644 --- a/src/engine/coordinator.cc +++ b/src/engine/coordinator.cc @@ -122,7 +122,7 @@ Coordinator::Coordinator() job_table_, associated_resources_, local_resource_topology_, object_store_, task_table_, knowledge_base, topology_manager_, m_adapter_, NULL, uuid_, FLAGS_listen_uri, time_manager_, - trace_generator_); + trace_generator_, NULL, NULL); } else { // Unknown scheduler specified, error. LOG(FATAL) << "Unknown or unrecognized scheduler '" << FLAGS_scheduler diff --git a/src/scheduling/event_driven_scheduler.cc b/src/scheduling/event_driven_scheduler.cc index be9e9f985..2e6f451ec 100644 --- a/src/scheduling/event_driven_scheduler.cc +++ b/src/scheduling/event_driven_scheduler.cc @@ -66,9 +66,12 @@ EventDrivenScheduler::EventDrivenScheduler( ResourceID_t coordinator_res_id, const string& coordinator_uri, TimeInterface* time_manager, - TraceGenerator* trace_generator) - : SchedulerInterface(job_map, knowledge_base, resource_map, resource_topology, - object_store, task_map), + TraceGenerator* trace_generator, + unordered_map>>* labels_map, + vector *affinity_antiaffinity_tasks) + : SchedulerInterface(job_map, knowledge_base, resource_map, + resource_topology, object_store, task_map, labels_map, + affinity_antiaffinity_tasks), coordinator_uri_(coordinator_uri), coordinator_res_id_(coordinator_res_id), event_notifier_(event_notifier), @@ -646,9 +649,44 @@ void EventDrivenScheduler::LazyGraphReduction( current_task->state() == TaskDescriptor::BLOCKING) { if (!will_block || (current_task->dependencies_size() == 0 && current_task->outputs_size() == 0)) { - current_task->set_state(TaskDescriptor::RUNNABLE); - InsertTaskIntoRunnables(JobIDFromString(current_task->job_id()), - current_task->uid()); + // Pod affinity/anti-affinity + if (current_task->has_affinity() && + (current_task->affinity().has_pod_affinity() || + current_task->affinity().has_pod_anti_affinity())) { + if (queue_based_schedule == false || one_task_runnable == true) + continue; + for (auto itr = affinity_antiaffinity_tasks_->begin(); + itr != affinity_antiaffinity_tasks_->end(); itr++) { + } + for (auto task_itr = affinity_antiaffinity_tasks_->begin(); + task_itr != affinity_antiaffinity_tasks_->end(); task_itr++) { + TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr); + if (tdp) { + if ((tdp->state() == TaskDescriptor::RUNNABLE) && + (one_task_runnable == false)) { + TaskID_t task_id = *task_itr; + tdp->set_state(TaskDescriptor::CREATED); + affinity_antiaffinity_tasks_->erase(task_itr); + affinity_antiaffinity_tasks_->push_back(task_id); + JobID_t tdp_job_id = JobIDFromString(tdp->job_id()); + runnable_tasks_[tdp_job_id].erase(task_id); + task_itr = affinity_antiaffinity_tasks_->begin(); + continue; + } + } + if (tdp->state() == TaskDescriptor::CREATED) { + tdp->set_state(TaskDescriptor::RUNNABLE); + InsertTaskIntoRunnables(JobIDFromString(tdp->job_id()), + tdp->uid()); + one_task_runnable = true; + break; + } + } + } else { + current_task->set_state(TaskDescriptor::RUNNABLE); + InsertTaskIntoRunnables(JobIDFromString(current_task->job_id()), + current_task->uid()); + } } } } diff --git a/src/scheduling/event_driven_scheduler.h b/src/scheduling/event_driven_scheduler.h index a966f8b55..477531404 100644 --- a/src/scheduling/event_driven_scheduler.h +++ b/src/scheduling/event_driven_scheduler.h @@ -62,7 +62,10 @@ class EventDrivenScheduler : public SchedulerInterface { ResourceID_t coordinator_res_id, const string& coordinator_uri, TimeInterface* time_manager, - TraceGenerator* trace_generator); + TraceGenerator* trace_generator, + unordered_map>>* + labels_map, + vector *affinity_antiaffinity_tasks); ~EventDrivenScheduler(); virtual void AddJob(JobDescriptor* jd_ptr); ResourceID_t* BoundResourceForTask(TaskID_t task_id); @@ -183,6 +186,9 @@ class EventDrivenScheduler : public SchedulerInterface { shared_ptr topology_manager_; TimeInterface* time_manager_; TraceGenerator* trace_generator_; + //Pod affinity/anti-affinity + bool one_task_runnable; + bool queue_based_schedule; }; } // namespace scheduler diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index 9aa2db970..a924ff0c7 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -19,6 +19,7 @@ */ #include +#include #include "base/resource_status.h" #include "base/resource_topology_node_desc.pb.h" @@ -57,6 +58,7 @@ DEFINE_string(firmament_scheduler_service_address, "127.0.0.1", DEFINE_string(firmament_scheduler_service_port, "9090", "The port of the scheduler service"); DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple"); +DEFINE_uint64(queue_based_scheduling_time, 1, "Queue Based Schedule run time"); namespace firmament { @@ -78,7 +80,8 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { job_map_, resource_map_, top_level_res_status->mutable_topology_node(), obj_store_, task_map_, knowledge_base_, topology_manager_, sim_messaging_adapter_, NULL, - top_level_res_id_, "", &wall_time_, trace_generator_); + top_level_res_id_, "", &wall_time_, trace_generator_, &labels_map_, + &affinity_antiaffinity_tasks_); } else if (FLAGS_service_scheduler == "simple") { scheduler_ = new SimpleScheduler( job_map_, resource_map_, @@ -142,6 +145,24 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { if (deltas.size()) { LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; } + + // Schedule tasks having pod affinity/anti-affinity + chrono::high_resolution_clock::time_point start = + chrono::high_resolution_clock::now(); + chrono::duration time_spent( + chrono::duration_values::zero()); + while (affinity_antiaffinity_tasks_.size() && + (time_spent.count() < FLAGS_queue_based_scheduling_time)) { + scheduler_->ScheduleAllQueueJobs(&sstat, &deltas); + chrono::high_resolution_clock::time_point end = + chrono::high_resolution_clock::now(); + time_spent = chrono::duration_cast(end - start); + } + if(deltas.size()) { + LOG(INFO) << "QueueBasedSchedule: Got " << deltas.size() + << " scheduling deltas"; + } + for (auto& d : deltas) { // LOG(INFO) << "Delta: " << d.DebugString(); SchedulingDelta* ret_delta = reply->add_deltas(); @@ -162,6 +183,39 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { return Status::OK; } + // Pod affinity/anti-affinity + void RemoveTaskFromLabelsMap(const TaskDescriptor td) { + for (const auto& label : td.labels()) { + unordered_map>* label_values = + FindOrNull(labels_map_, label.key()); + if (!label_values) { + vector* labels_map_tasks = + FindOrNull(*label_values, label.value()); + if (labels_map_tasks) { + vector::iterator it_pos = find( + labels_map_tasks->begin(), labels_map_tasks->end(), td.uid()); + if (it_pos != labels_map_tasks->end()) { + labels_map_tasks->erase(it_pos); + if (!labels_map_tasks->size()) { + label_values->erase(label.value()); + if (label_values->empty()) labels_map_.erase(label.key()); + } + } + } + } + } + if (td.has_affinity() && (td.affinity().has_pod_affinity() || + td.affinity().has_pod_anti_affinity())) { + vector::iterator it = + find(affinity_antiaffinity_tasks_.begin(), + affinity_antiaffinity_tasks_.end(), td.uid()); + if (it != affinity_antiaffinity_tasks_.end()) { + affinity_antiaffinity_tasks_.erase(it); + } + } + } + + Status TaskCompleted(ServerContext* context, const TaskUID* tid_ptr, TaskCompletedResponse* reply) override { TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid()); @@ -182,6 +236,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { return Status::OK; } td_ptr->set_finish_time(wall_time_.GetCurrentTimestamp()); + RemoveTaskFromLabelsMap(*td_ptr); TaskFinalReport report; scheduler_->HandleTaskCompletion(td_ptr, &report); kb_populator_->PopulateTaskFinalReport(*td_ptr, &report); @@ -212,6 +267,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { if (!td_ptr->scheduled_to_resource().empty()) { UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); } + RemoveTaskFromLabelsMap(*td_ptr); scheduler_->HandleTaskFailure(td_ptr); reply->set_type(TaskReplyType::TASK_FAILED_OK); return Status::OK; @@ -226,6 +282,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { reply->set_type(TaskReplyType::TASK_NOT_FOUND); return Status::OK; } + RemoveTaskFromLabelsMap(*td_ptr); // TODO(jagadish): We need to remove below code once we start // getting machine resource stats samples from poseidon i.e., heapster. // Currently updating machine samples statically based on state of pod. @@ -261,6 +318,38 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { return Status::OK; } + // Pod affinity/anti-affinity + // Adding labels of task to the labels_map_ + void AddTaskToLabelsMap(const TaskDescriptor& td) { + TaskID_t task_id = td.uid(); + for (const auto& label : td.labels()) { + unordered_map>* label_values = + FindOrNull(labels_map_, label.key()); + if (!label_values) { + vector tasks; + tasks.push_back(task_id); + unordered_map> values; + CHECK(InsertIfNotPresent(&values, label.value(), tasks)); + CHECK(InsertIfNotPresent(&labels_map_, label.key(), values)); + } else { + vector* labels_map_tasks = + FindOrNull(*label_values, label.value()); + if (!labels_map_tasks) { + vector value_tasks; + value_tasks.push_back(task_id); + CHECK( + InsertIfNotPresent(&(*label_values), label.value(), value_tasks)); + } else { + labels_map_tasks->push_back(task_id); + } + } + } + if (td.has_affinity() && (td.affinity().has_pod_affinity() || + td.affinity().has_pod_anti_affinity())) { + affinity_antiaffinity_tasks_.push_back(task_id); + } + } + Status TaskSubmitted(ServerContext* context, const TaskDescription* task_desc_ptr, TaskSubmittedResponse* reply) override { @@ -275,6 +364,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { reply->set_type(TaskReplyType::TASK_STATE_NOT_CREATED); return Status::OK; } + AddTaskToLabelsMap(task_desc_ptr->task_descriptor()); JobID_t job_id = JobIDFromString(task_desc_ptr->task_descriptor().job_id()); JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id); // LOG(INFO) << "Job id is " << job_id ; @@ -534,6 +624,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { unordered_map> job_num_tasks_to_remove_; KnowledgeBasePopulator* kb_populator_; + //Pod affinity/anti-affinity + unordered_map>> labels_map_; + vector affinity_antiaffinity_tasks_; ResourceStatus* CreateTopLevelResource() { ResourceID_t res_id = GenerateResourceID(); diff --git a/src/scheduling/flow/cpu_cost_model.cc b/src/scheduling/flow/cpu_cost_model.cc index 06ef9b60e..2cd1e0682 100644 --- a/src/scheduling/flow/cpu_cost_model.cc +++ b/src/scheduling/flow/cpu_cost_model.cc @@ -38,10 +38,16 @@ namespace firmament { CpuCostModel::CpuCostModel(shared_ptr resource_map, shared_ptr task_map, - shared_ptr knowledge_base) + shared_ptr knowledge_base, + unordered_map>>* labels_map) : resource_map_(resource_map), task_map_(task_map), - knowledge_base_(knowledge_base) {} + knowledge_base_(knowledge_base), + labels_map_(labels_map) { + // Set an initial value for infinity -- this overshoots a bit; would be nice + // to have a tighter bound based on actual costs observed + infinity_ = omega_ * CpuMemCostVector_t::dimensions_; +} ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) { return ArcDescriptor(2560000, 1ULL, 0ULL); @@ -88,7 +94,9 @@ ArcDescriptor CpuCostModel::EquivClassToResourceNode(EquivClass_t ec, ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, EquivClass_t ec2) { - CpuMemCostVector_t* resource_request = FindOrNull(ec_resource_requirement_, ec1); + CpuMemCostVector_t cost_vector; + CpuMemResVector_t* resource_request = + FindOrNull(ec_resource_requirement_, ec1); CHECK_NOTNULL(resource_request); ResourceID_t* machine_res_id = FindOrNull(ec_to_machine_, ec2); CHECK_NOTNULL(machine_res_id); @@ -96,7 +104,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, CHECK_NOTNULL(rs); const ResourceDescriptor& rd = rs->topology_node().resource_desc(); CHECK_EQ(rd.type(), ResourceDescriptor::RESOURCE_MACHINE); - CpuMemCostVector_t available_resources; + CpuMemResVector_t available_resources; available_resources.cpu_cores_ = static_cast(rd.available_resources().cpu_cores()); available_resources.ram_cap_ = @@ -113,49 +121,126 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, ec_index * resource_request->cpu_cores_; available_resources.ram_cap_ = rd.available_resources().ram_cap() - ec_index * resource_request->ram_cap_; - int64_t cpu_cost = + // Expressing Least Requested Priority. + float cpu_fraction = ((rd.resource_capacity().cpu_cores() - available_resources.cpu_cores_) / - (float)rd.resource_capacity().cpu_cores()) * - omega_; - int64_t ram_cost = + (float)rd.resource_capacity().cpu_cores()); + float ram_fraction = ((rd.resource_capacity().ram_cap() - available_resources.ram_cap_) / - (float)rd.resource_capacity().ram_cap()) * - omega_; - int64_t cost = cpu_cost + ram_cost; + (float)rd.resource_capacity().ram_cap()); + int64_t cpu_cost = cpu_fraction * omega_; + int64_t ram_cost = ram_fraction * omega_; + int64_t cpu_ram_cost = (cpu_cost + ram_cost) / 2; + cost_vector.cpu_mem_cost_ = cpu_ram_cost; + + // Expressing Balanced Resource Allocation Priority. + // Compute variance for two fractions, cpu and ram. + float mean = (cpu_fraction + ram_fraction) / float(2); + float variance = (((cpu_fraction - mean) * (cpu_fraction - mean)) + + ((ram_fraction - mean) * (ram_fraction - mean))) / + float(2); + // Since the variance is between positive fractions, it will be positive + // fraction. Variance lets the cost to be higher for node which has high + // variance and multiplying it with omega_(1000) provides the scaling + // factor needed. + int64_t balanced_resource_cost = variance * omega_; + cost_vector.balanced_res_cost_ = balanced_resource_cost; + + // Expressing Node Affinity priority. const TaskDescriptor* td_ptr = FindOrNull(ec_to_td_requirements, ec1); CHECK_NOTNULL(td_ptr); - int64_t sum_of_weights = 0; + int64_t node_affinity_normalized_score = 0; + int64_t pod_affinity_normalized_score = 0; if (td_ptr->has_affinity()) { const Affinity& affinity = td_ptr->affinity(); if (affinity.has_node_affinity()) { if (affinity.node_affinity() .preferredduringschedulingignoredduringexecution_size()) { - // Match PreferredDuringSchedulingIgnoredDuringExecution term by term - for (auto& preferredSchedulingTerm : - affinity.node_affinity() - .preferredduringschedulingignoredduringexecution()) { - // If weight is zero then skip preferredSchedulingTerm. - if (!preferredSchedulingTerm.weight()) { - continue; - } - // A null or empty node selector term matches no objects. - if (!preferredSchedulingTerm.has_preference()) { - continue; - } - if (scheduler::NodeMatchesNodeSelectorTerm( - rd, preferredSchedulingTerm.preference())) { - sum_of_weights += preferredSchedulingTerm.weight(); + unordered_map>* + nodes_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec1); + CHECK_NOTNULL(nodes_priority_scores_ptr); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, *machine_res_id); + CHECK_NOTNULL(priority_scores_struct_ptr); + PriorityScore_t& node_affinity_score = + priority_scores_struct_ptr->node_affinity_priority; + if (node_affinity_score.satisfy) { + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec1); + CHECK_NOTNULL(max_min_priority_scores); + if (node_affinity_score.final_score == -1) { + // Normalised node affinity score is not calculated for this + // machine, so calculate and store it once. + int64_t max_score = + max_min_priority_scores->node_affinity_priority.max_score; + if (max_score) { + node_affinity_normalized_score = + (node_affinity_score.score / (float)(max_score)) * omega_; + node_affinity_score.final_score = node_affinity_normalized_score; + } + } else { + // Normalized node affinity score is already calculated, so use it. + node_affinity_normalized_score = node_affinity_score.final_score; } } } } + + // Expressing pod affinity/anti-affinity priority scores. + if ((affinity.has_pod_affinity() && + affinity.pod_affinity() + .preferredduringschedulingignoredduringexecution_size()) || + (affinity.has_pod_anti_affinity() && + affinity.pod_anti_affinity() + .preferredduringschedulingignoredduringexecution_size())) { + unordered_map>* + nodes_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec1); + CHECK_NOTNULL(nodes_priority_scores_ptr); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, *machine_res_id); + CHECK_NOTNULL(priority_scores_struct_ptr); + PriorityScore_t& pod_affinity_score = + priority_scores_struct_ptr->pod_affinity_priority; + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec1); + CHECK_NOTNULL(max_min_priority_scores); + if (pod_affinity_score.final_score == -1) { + int64_t max_score = + max_min_priority_scores->pod_affinity_priority.max_score; + int64_t min_score = + max_min_priority_scores->pod_affinity_priority.min_score; + if ((max_score - min_score) > 0) { + pod_affinity_normalized_score = + ((pod_affinity_score.score - min_score) / + (max_score - min_score)) * + omega_; + } + pod_affinity_score.final_score = pod_affinity_normalized_score; + } else { + pod_affinity_normalized_score = pod_affinity_score.final_score; + } + } } - // TODO(jagadish): We need to tune max_sum_of_weights to max possible value - // with the help of real time node affinty soft constraints requirements. - if (sum_of_weights > max_sum_of_weights) { - sum_of_weights = max_sum_of_weights; - } - return ArcDescriptor(cost + max_sum_of_weights - sum_of_weights, 1ULL, 0ULL); + + cost_vector.node_affinity_soft_cost_ = + omega_ - node_affinity_normalized_score; + cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score; + Cost_t final_cost = FlattenCostVector(cost_vector); + return ArcDescriptor(final_cost, 1ULL, 0ULL); +} + +Cost_t CpuCostModel::FlattenCostVector(CpuMemCostVector_t cv) { + int64_t accumulator = 0; + accumulator += cv.cpu_mem_cost_; + accumulator += cv.balanced_res_cost_; + accumulator += cv.node_affinity_soft_cost_; + accumulator += cv.pod_affinity_soft_cost_; + if (accumulator > infinity_) infinity_ = accumulator + 1; + return accumulator; } vector* CpuCostModel::GetTaskEquivClasses(TaskID_t task_id) { @@ -163,7 +248,7 @@ vector* CpuCostModel::GetTaskEquivClasses(TaskID_t task_id) { vector* ecs = new vector(); TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, task_id); CHECK_NOTNULL(td_ptr); - CpuMemCostVector_t* task_resource_request = + CpuMemResVector_t* task_resource_request = FindOrNull(task_resource_requirement_, task_id); CHECK_NOTNULL(task_resource_request); size_t task_agg = 0; @@ -173,23 +258,22 @@ vector* CpuCostModel::GetTaskEquivClasses(TaskID_t task_id) { // future. task_agg = HashJobID(*td_ptr); } else if (td_ptr->label_selectors_size()) { - task_agg = scheduler::HashSelectors(td_ptr->label_selectors()); - // And also hash the cpu and mem requests. - boost::hash_combine( + task_agg = scheduler::HashSelectors(td_ptr->label_selectors()); + // And also hash the cpu and mem requests. + boost::hash_combine( task_agg, to_string(task_resource_request->cpu_cores_) + "cpumem" + to_string(task_resource_request->ram_cap_)); } else { - // For other tasks, only hash the cpu and mem requests. - boost::hash_combine( - task_agg, to_string(task_resource_request->cpu_cores_) + "cpumem" + - to_string(task_resource_request->ram_cap_)); + // For other tasks, only hash the cpu and mem requests. + boost::hash_combine( + task_agg, to_string(task_resource_request->cpu_cores_) + "cpumem" + + to_string(task_resource_request->ram_cap_)); } EquivClass_t resource_request_ec = static_cast(task_agg); ecs->push_back(resource_request_ec); InsertIfNotPresent(&ec_resource_requirement_, resource_request_ec, *task_resource_request); - InsertIfNotPresent(&ec_to_td_requirements, resource_request_ec, - *td_ptr); + InsertIfNotPresent(&ec_to_td_requirements, resource_request_ec, *td_ptr); return ecs; } @@ -208,22 +292,491 @@ vector* CpuCostModel::GetTaskPreferenceArcs(TaskID_t task_id) { return pref_res; } +void CpuCostModel::CalculatePrioritiesCost(const EquivClass_t ec, + const ResourceDescriptor& rd) { + // Calculate priorities cost for node affinity. + const TaskDescriptor* td = FindOrNull(ec_to_td_requirements, ec); + CHECK_NOTNULL(td); + int64_t sum_of_weights = 0; + if (td->has_affinity()) { + const Affinity& affinity = td->affinity(); + if (affinity.has_node_affinity()) { + if (affinity.node_affinity() + .preferredduringschedulingignoredduringexecution_size()) { + // Match PreferredDuringSchedulingIgnoredDuringExecution term by term + for (auto& preferredSchedulingTerm : + affinity.node_affinity() + .preferredduringschedulingignoredduringexecution()) { + // If weight is zero then skip preferredSchedulingTerm. + if (!preferredSchedulingTerm.weight()) { + continue; + } + // A null or empty node selector term matches no objects. + if (!preferredSchedulingTerm.has_preference()) { + continue; + } + if (scheduler::NodeMatchesNodeSelectorTerm( + rd, preferredSchedulingTerm.preference())) { + sum_of_weights += preferredSchedulingTerm.weight(); + } + } + // Fill the node priority min, max and actual scores which will + // be used in cost calculation. + unordered_map>* + nodes_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec); + if (!nodes_priority_scores_ptr) { + // For this EC, no node to priority scores map exists, so initialize + // it. + unordered_map> + node_to_priority_scores_map; + InsertIfNotPresent(&ec_to_node_priority_scores, ec, + node_to_priority_scores_map); + nodes_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec); + } + CHECK_NOTNULL(nodes_priority_scores_ptr); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, res_id); + if (!priority_scores_struct_ptr) { + // Priority scores is empty for this node, so initialize it zero. + PriorityScoresList_t priority_scores_list; + InsertIfNotPresent(nodes_priority_scores_ptr, res_id, + priority_scores_list); + priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, res_id); + } + CHECK_NOTNULL(priority_scores_struct_ptr); + // Store the node affinity min, max and actual priority scores that will + // be utilized in calculating normalized cost. + PriorityScore_t& node_affinity_score = + priority_scores_struct_ptr->node_affinity_priority; + if (!sum_of_weights) { + // If machine does not satisfies soft constraint then we flag machine + // such that cost of omega_ is used in cost calculation. + node_affinity_score.satisfy = false; + } + if (node_affinity_score.satisfy) { + // Machine satisfies soft constraints. + // Store the node affinity min, max and actual priority scores. + node_affinity_score.score = sum_of_weights; + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec); + if (!max_min_priority_scores) { + MinMaxScores_t priority_scores_list; + InsertIfNotPresent(&ec_to_max_min_priority_scores, ec, + priority_scores_list); + max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec); + } + MinMaxScore_t& min_max_node_affinity_score = + max_min_priority_scores->node_affinity_priority; + if (min_max_node_affinity_score.max_score < sum_of_weights || + min_max_node_affinity_score.max_score == -1) { + min_max_node_affinity_score.max_score = sum_of_weights; + } + } + } + } + } +} + +// Pod affinity/anti-affinity +bool CpuCostModel::MatchExpressionWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression) { + unordered_map>* label_values = + FindOrNull(*labels_map_, expression.key()); + if (label_values) { + for (auto& value : expression.values()) { + vector* labels_map_tasks = FindOrNull(*label_values, value); + if (labels_map_tasks) { + for (auto task_id : *labels_map_tasks) { + TaskDescriptor* tdp = FindPtrOrNull(*task_map_, task_id); + if (tdp) { + if (!HasNamespace(tdp->task_namespace())) continue; + if (tdp->state() == TaskDescriptor::RUNNING) { + ResourceID_t pu_res_id = + ResourceIDFromString(tdp->scheduled_to_resource()); + ResourceID_t machine_res_id = MachineResIDForResource(pu_res_id); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + if (machine_res_id == res_id) { + return true; + } + } + } + } + } + } + } + return false; +} + +bool CpuCostModel::NotMatchExpressionWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression) { + bool namespace_match = false; + unordered_map>* label_values = + FindOrNull(*labels_map_, expression.key()); + if (label_values) { + for (auto& value : expression.values()) { + vector* labels_map_tasks = FindOrNull(*label_values, value); + if (labels_map_tasks) { + for (auto task_id : *labels_map_tasks) { + TaskDescriptor* tdp = FindPtrOrNull(*task_map_, task_id); + if (tdp) { + if (tdp->state() == TaskDescriptor::RUNNING) { + ResourceID_t pu_res_id = + ResourceIDFromString(tdp->scheduled_to_resource()); + ResourceID_t machine_res_id = MachineResIDForResource(pu_res_id); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + if (machine_res_id == res_id) { + return false; + } + } + } + if (HasNamespace(tdp->task_namespace())) namespace_match = true; + } + } + } + } + if (namespace_match == false) return false; + return true; +} + +bool CpuCostModel::MatchExpressionKeyWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression) { + unordered_map>* label_values = + FindOrNull(*labels_map_, expression.key()); + if (label_values) { + for (auto it = label_values->begin(); it != label_values->end(); it++) { + for (auto task_id : it->second) { + TaskDescriptor* tdp = FindPtrOrNull(*task_map_, task_id); + if (tdp) { + if (!HasNamespace(tdp->task_namespace())) continue; + if (tdp->state() == TaskDescriptor::RUNNING) { + ResourceID_t pu_res_id = + ResourceIDFromString(tdp->scheduled_to_resource()); + ResourceID_t machine_res_id = MachineResIDForResource(pu_res_id); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + if (machine_res_id == res_id) { + return true; + } + } + } + } + } + } + return false; +} + +bool CpuCostModel::NotMatchExpressionKeyWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression) { + bool namespace_match = false; + unordered_map>* label_values = + FindOrNull(*labels_map_, expression.key()); + if (label_values) { + for (auto it = label_values->begin(); it != label_values->end(); it++) { + for (auto task_id : it->second) { + TaskDescriptor* tdp = FindPtrOrNull(*task_map_, task_id); + if (tdp) { + if (tdp->state() == TaskDescriptor::RUNNING) { + ResourceID_t pu_res_id = + ResourceIDFromString(tdp->scheduled_to_resource()); + ResourceID_t machine_res_id = MachineResIDForResource(pu_res_id); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + if (machine_res_id == res_id) { + return false; + } + } + } + if (HasNamespace(tdp->task_namespace())) namespace_match = true; + } + } + } + if (namespace_match == false) return false; + return true; +} + +bool CpuCostModel::SatisfiesPodAntiAffinityMatchExpression( + const ResourceDescriptor& rd, + const LabelSelectorRequirementAntiAff& expression) { + LabelSelectorRequirement expression_selector; + expression_selector.set_key(expression.key()); + expression_selector.set_operator_(expression.operator_()); + for (auto& value : expression.values()) { + expression_selector.add_values(value); + } + if (expression.operator_() == std::string("In")) { + if (!MatchExpressionWithPodLabels(rd, expression_selector)) return true; + } else if (expression.operator_() == std::string("NotIn")) { + if (!NotMatchExpressionWithPodLabels(rd, expression_selector)) return true; + } else if (expression.operator_() == std::string("Exists")) { + if (!MatchExpressionKeyWithPodLabels(rd, expression_selector)) return true; + } else if (expression.operator_() == std::string("DoesNotExist")) { + if (!NotMatchExpressionKeyWithPodLabels(rd, expression_selector)) + return true; + } else { + LOG(FATAL) << "Unsupported selector type: " << expression.operator_(); + return false; + } + return false; +} + +bool CpuCostModel::SatisfiesPodAffinityMatchExpression( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression) { + if (expression.operator_() == std::string("In")) { + if (MatchExpressionWithPodLabels(rd, expression)) return true; + } else if (expression.operator_() == std::string("NotIn")) { + if (NotMatchExpressionWithPodLabels(rd, expression)) return true; + } else if (expression.operator_() == std::string("Exists")) { + if (MatchExpressionKeyWithPodLabels(rd, expression)) return true; + } else if (expression.operator_() == std::string("DoesNotExist")) { + if (NotMatchExpressionKeyWithPodLabels(rd, expression)) return true; + } else { + LOG(FATAL) << "Unsupported selector type: " << expression.operator_(); + return false; + } + return false; +} + +bool CpuCostModel::SatisfiesPodAntiAffinityMatchExpressions( + const ResourceDescriptor& rd, + const RepeatedPtrField& matchexpressions) { + for (auto& expression : matchexpressions) { + if (SatisfiesPodAntiAffinityMatchExpression(rd, expression)) { + continue; + } else { + return false; + } + } + return true; +} + +bool CpuCostModel::SatisfiesPodAffinityMatchExpressions( + const ResourceDescriptor& rd, + const RepeatedPtrField& matchexpressions) { + for (auto& expression : matchexpressions) { + if (SatisfiesPodAffinityMatchExpression(rd, expression)) { + continue; + } else { + return false; + } + } + return true; +} + +bool CpuCostModel::SatisfiesPodAntiAffinityTerm( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const PodAffinityTermAntiAff& term) { + if (!term.namespaces_size()) { + namespaces.insert(td.task_namespace()); + } else { + for (auto name : term.namespaces()) { + namespaces.insert(name); + } + } + if (term.has_labelselector()) { + if (term.labelselector().matchexpressions_size()) { + if (!SatisfiesPodAntiAffinityMatchExpressions( + rd, term.labelselector().matchexpressions())) + return false; + } + } + return true; +} + +bool CpuCostModel::SatisfiesPodAffinityTerm(const ResourceDescriptor& rd, + const TaskDescriptor& td, + const PodAffinityTerm& term) { + if (!term.namespaces_size()) { + namespaces.insert(td.task_namespace()); + } else { + for (auto name : term.namespaces()) { + namespaces.insert(name); + } + } + if (term.has_labelselector()) { + if (term.labelselector().matchexpressions_size()) { + if (!SatisfiesPodAffinityMatchExpressions( + rd, term.labelselector().matchexpressions())) + return false; + } + } + return true; +} + +bool CpuCostModel::SatisfiesPodAntiAffinityTerms( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const RepeatedPtrField& podantiaffinityterms) { + for (auto& term : podantiaffinityterms) { + if (!SatisfiesPodAntiAffinityTerm(rd, td, term)) return false; + } + return true; +} + +bool CpuCostModel::SatisfiesPodAffinityTerms( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const RepeatedPtrField& podaffinityterms) { + for (auto& term : podaffinityterms) { + if (!SatisfiesPodAffinityTerm(rd, td, term)) return false; + } + return true; +} + +// Hard constraint check for pod affinity/anti-affinity. +bool CpuCostModel::SatisfiesPodAffinityAntiAffinityRequired( + const ResourceDescriptor& rd, const TaskDescriptor& td) { + if (td.has_affinity()) { + Affinity affinity = td.affinity(); + if (affinity.has_pod_anti_affinity()) { + if (affinity.pod_anti_affinity() + .requiredduringschedulingignoredduringexecution_size()) { + if (!SatisfiesPodAntiAffinityTerms( + rd, td, + affinity.pod_anti_affinity() + .requiredduringschedulingignoredduringexecution())) { + return false; + } + } + } + if (affinity.has_pod_affinity()) { + if (affinity.pod_affinity() + .requiredduringschedulingignoredduringexecution_size()) { + if (!SatisfiesPodAffinityTerms( + rd, td, + affinity.pod_affinity() + .requiredduringschedulingignoredduringexecution())) { + return false; + } + } + } + } + return true; +} + +// Soft constraint check for pod affinity/anti-affinity. +void CpuCostModel::CalculatePodAffinityAntiAffinityPreference( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const EquivClass_t ec) { + if (td.has_affinity()) { + Affinity affinity = td.affinity(); + int32_t sum_of_weights = 0; + if (affinity.has_pod_anti_affinity()) { + if (affinity.pod_anti_affinity() + .preferredduringschedulingignoredduringexecution_size()) { + for (auto& weightedpodantiaffinityterm : + affinity.pod_anti_affinity() + .preferredduringschedulingignoredduringexecution()) { + if (!weightedpodantiaffinityterm.weight()) continue; + if (weightedpodantiaffinityterm.has_podaffinityterm()) { + if (SatisfiesPodAntiAffinityTerm( + rd, td, weightedpodantiaffinityterm.podaffinityterm())) { + sum_of_weights += weightedpodantiaffinityterm.weight(); + } + } + } + } + } + if (affinity.has_pod_affinity()) { + if (affinity.pod_affinity() + .preferredduringschedulingignoredduringexecution_size()) { + for (auto& weightedpodaffinityterm : + affinity.pod_affinity() + .preferredduringschedulingignoredduringexecution()) { + if (!weightedpodaffinityterm.weight()) continue; + if (weightedpodaffinityterm.has_podaffinityterm()) { + if (SatisfiesPodAffinityTerm( + rd, td, weightedpodaffinityterm.podaffinityterm())) { + sum_of_weights += weightedpodaffinityterm.weight(); + } + } + } + } + } + unordered_map>* nodes_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec); + if (!nodes_priority_scores_ptr) { + unordered_map> + node_to_priority_scores_map; + InsertIfNotPresent(&ec_to_node_priority_scores, ec, + node_to_priority_scores_map); + nodes_priority_scores_ptr = FindOrNull(ec_to_node_priority_scores, ec); + } + CHECK_NOTNULL(nodes_priority_scores_ptr); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, res_id); + if (!priority_scores_struct_ptr) { + PriorityScoresList_t priority_scores_list; + InsertIfNotPresent(nodes_priority_scores_ptr, res_id, + priority_scores_list); + priority_scores_struct_ptr = + FindOrNull(*nodes_priority_scores_ptr, res_id); + } + CHECK_NOTNULL(priority_scores_struct_ptr); + PriorityScore_t& pod_affinity_score = + priority_scores_struct_ptr->pod_affinity_priority; + if (!sum_of_weights) { + pod_affinity_score.satisfy = false; + } + pod_affinity_score.score = sum_of_weights; + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec); + if (!max_min_priority_scores) { + MinMaxScores_t priority_scores_list; + InsertIfNotPresent(&ec_to_max_min_priority_scores, ec, + priority_scores_list); + max_min_priority_scores = FindOrNull(ec_to_max_min_priority_scores, ec); + } + CHECK_NOTNULL(max_min_priority_scores); + MinMaxScore_t& min_max_pod_affinity_score = + max_min_priority_scores->pod_affinity_priority; + if (min_max_pod_affinity_score.max_score < sum_of_weights || + min_max_pod_affinity_score.max_score == -1) { + min_max_pod_affinity_score.max_score = sum_of_weights; + } + if (min_max_pod_affinity_score.min_score > sum_of_weights || + min_max_pod_affinity_score.min_score == -1) { + min_max_pod_affinity_score.min_score = sum_of_weights; + } + } +} + vector* CpuCostModel::GetEquivClassToEquivClassesArcs( EquivClass_t ec) { vector* pref_ecs = new vector(); - CpuMemCostVector_t* task_resource_request = + CpuMemResVector_t* task_resource_request = FindOrNull(ec_resource_requirement_, ec); if (task_resource_request) { + // Clear priority scores for node affinity and pod affinity. + // TODO(jagadish): Currently we clear old affinity scores, and restore new + // scores. But we are not clearing it just after scheduling round completed, + // we are clearing in the subsequent scheduling round, need to improve this. + ec_to_node_priority_scores.clear(); for (auto& ec_machines : ecs_for_machines_) { ResourceStatus* rs = FindPtrOrNull(*resource_map_, ec_machines.first); CHECK_NOTNULL(rs); const ResourceDescriptor& rd = rs->topology_node().resource_desc(); const TaskDescriptor* td_ptr = FindOrNull(ec_to_td_requirements, ec); if (td_ptr) { - // Checking whether machine satisfies node selectot and node affinity. - if (!scheduler::SatisfiesNodeSelectorAndNodeAffinity(rd, *td_ptr)) continue; + // Checking whether machine satisfies node selector and node affinity. + if (scheduler::SatisfiesNodeSelectorAndNodeAffinity(rd, *td_ptr)) { + // Calculate costs for all priorities. + CalculatePrioritiesCost(ec, rd); + } else + continue; + // Checking pod affinity/anti-affinity + if (SatisfiesPodAffinityAntiAffinityRequired(rd, *td_ptr)) { + CalculatePodAffinityAntiAffinityPreference(rd, *td_ptr, ec); + } else { + continue; + } } - CpuMemCostVector_t available_resources; + CpuMemResVector_t available_resources; available_resources.cpu_cores_ = static_cast(rd.available_resources().cpu_cores()); available_resources.ram_cap_ = @@ -233,7 +786,7 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( FindOrNull(ecs_for_machines_, res_id); CHECK_NOTNULL(ecs_for_machine); uint64_t index = 0; - CpuMemCostVector_t cur_resource; + CpuMemResVector_t cur_resource; for (cur_resource = *task_resource_request; cur_resource.cpu_cores_ <= available_resources.cpu_cores_ && cur_resource.ram_cap_ <= available_resources.ram_cap_ && @@ -265,10 +818,10 @@ void CpuCostModel::AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr) { void CpuCostModel::AddTask(TaskID_t task_id) { const TaskDescriptor& td = GetTask(task_id); - CpuMemCostVector_t resource_request; + CpuMemResVector_t resource_request; resource_request.cpu_cores_ = static_cast(td.resource_request().cpu_cores()); - resource_request.ram_cap_ = + resource_request.ram_cap_ = static_cast(td.resource_request().ram_cap()); CHECK(InsertIfNotPresent(&task_resource_requirement_, task_id, resource_request)); @@ -349,6 +902,8 @@ void CpuCostModel::PrepareStats(FlowGraphNode* accumulator) { CHECK_NOTNULL(accumulator->rd_ptr_); accumulator->rd_ptr_->clear_num_running_tasks_below(); accumulator->rd_ptr_->clear_num_slots_below(); + // Clear maps related to priority scores. + ec_to_node_priority_scores.clear(); } FlowGraphNode* CpuCostModel::UpdateStats(FlowGraphNode* accumulator, diff --git a/src/scheduling/flow/cpu_cost_model.h b/src/scheduling/flow/cpu_cost_model.h index ca72ca373..ecc5ce1aa 100644 --- a/src/scheduling/flow/cpu_cost_model.h +++ b/src/scheduling/flow/cpu_cost_model.h @@ -35,16 +35,53 @@ namespace firmament { -typedef struct CpuMemCostVector { +struct CpuMemCostVector_t { + // record number of dimensions here + static const int16_t dimensions_ = 3; + uint64_t cpu_mem_cost_; + uint64_t balanced_res_cost_; + uint64_t node_affinity_soft_cost_; + uint64_t pod_affinity_soft_cost_; + CpuMemCostVector_t() + : cpu_mem_cost_(0), balanced_res_cost_(0), node_affinity_soft_cost_(0), pod_affinity_soft_cost_(0) {} +}; + +struct CpuMemResVector_t { uint64_t cpu_cores_; uint64_t ram_cap_; -} CpuMemCostVector_t; +}; + +struct MinMaxScore_t { + int64_t min_score; + int64_t max_score; + MinMaxScore_t() : min_score(-1), max_score(-1) {} +}; + +struct MinMaxScores_t { + MinMaxScore_t node_affinity_priority; + MinMaxScore_t pod_affinity_priority; +}; + +struct PriorityScore_t { + // Flag that indicates whether soft constraints are satisfied or not. + bool satisfy; + int64_t score; + int64_t final_score; + PriorityScore_t() : satisfy(true), score(0), final_score(-1) {} +}; + +struct PriorityScoresList_t { + PriorityScore_t node_affinity_priority; + PriorityScore_t pod_affinity_priority; +}; class CpuCostModel : public CostModelInterface { public: CpuCostModel(shared_ptr resource_map, shared_ptr task_map, - shared_ptr knowledge_base); + shared_ptr knowledge_base, + unordered_map>>* + labels_map); // Costs pertaining to leaving tasks unscheduled ArcDescriptor TaskToUnscheduledAgg(TaskID_t task_id); ArcDescriptor UnscheduledAggToSink(JobID_t job_id); @@ -61,10 +98,52 @@ class CpuCostModel : public CostModelInterface { ArcDescriptor TaskToEquivClassAggregator(TaskID_t task_id, EquivClass_t tec); ArcDescriptor EquivClassToResourceNode(EquivClass_t tec, ResourceID_t res_id); ArcDescriptor EquivClassToEquivClass(EquivClass_t tec1, EquivClass_t tec2); + // Calculate costs pertaining to pod priorities such node affinity, pod + // affinity etc. + void CalculatePrioritiesCost(const EquivClass_t ec, + const ResourceDescriptor& rd); // Get the type of equiv class. vector* GetTaskEquivClasses(TaskID_t task_id); vector* GetOutgoingEquivClassPrefArcs(EquivClass_t tec); vector* GetTaskPreferenceArcs(TaskID_t task_id); + // Pod anti-affinity + bool MatchExpressionWithPodLabels(const ResourceDescriptor& rd, + const LabelSelectorRequirement& expression); + bool NotMatchExpressionWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression); + bool MatchExpressionKeyWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression); + bool NotMatchExpressionKeyWithPodLabels( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression); + bool SatisfiesPodAntiAffinityMatchExpression( + const ResourceDescriptor& rd, + const LabelSelectorRequirementAntiAff& expression); + bool SatisfiesPodAffinityMatchExpression( + const ResourceDescriptor& rd, const LabelSelectorRequirement& expression); + bool SatisfiesPodAntiAffinityMatchExpressions( + const ResourceDescriptor& rd, + const RepeatedPtrField& + matchexpressions); + bool SatisfiesPodAffinityMatchExpressions( + const ResourceDescriptor& rd, + const RepeatedPtrField& matchexpressions); + bool SatisfiesPodAntiAffinityTerm(const ResourceDescriptor& rd, + const TaskDescriptor& td, + const PodAffinityTermAntiAff& term); + bool SatisfiesPodAffinityTerm(const ResourceDescriptor& rd, + const TaskDescriptor& td, + const PodAffinityTerm& term); + bool SatisfiesPodAntiAffinityTerms( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const RepeatedPtrField& podantiaffinityterms); + bool SatisfiesPodAffinityTerms( + const ResourceDescriptor& rd, const TaskDescriptor& td, + const RepeatedPtrField& podaffinityterms); + bool SatisfiesPodAffinityAntiAffinityRequired(const ResourceDescriptor& rd, + const TaskDescriptor& td); + void CalculatePodAffinityAntiAffinityPreference(const ResourceDescriptor& rd, + const TaskDescriptor& td, + const EquivClass_t ec); vector* GetEquivClassToEquivClassesArcs(EquivClass_t tec); void AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr); void AddTask(TaskID_t task_id); @@ -78,6 +157,8 @@ class CpuCostModel : public CostModelInterface { // Fixed value for OMEGA, the normalization ceiling for each dimension's cost // value const Cost_t omega_ = 1000; + // Largest cost seen so far, plus one + Cost_t infinity_; FRIEND_TEST(CpuCostModelTest, AddMachine); FRIEND_TEST(CpuCostModelTest, AddTask); FRIEND_TEST(CpuCostModelTest, EquivClassToEquivClass); @@ -86,9 +167,7 @@ class CpuCostModel : public CostModelInterface { FRIEND_TEST(CpuCostModelTest, GetOutgoingEquivClassPrefArcs); FRIEND_TEST(CpuCostModelTest, GetTaskEquivClasses); FRIEND_TEST(CpuCostModelTest, MachineResIDForResource); - // TODO(jagadish): Fixed value of some big positive number which needs to added to cost to keep cost - // positive. We need to come up with correct formula in future. - const Cost_t max_sum_of_weights = 1000; + Cost_t FlattenCostVector(CpuMemCostVector_t cv); EquivClass_t GetMachineEC(const string& machine_name, uint64_t ec_index); ResourceID_t MachineResIDForResource(ResourceID_t res_id); inline const TaskDescriptor& GetTask(TaskID_t task_id) { @@ -96,6 +175,13 @@ class CpuCostModel : public CostModelInterface { CHECK_NOTNULL(td); return *td; } + inline bool HasNamespace(const string name) { + if (namespaces.find(name) == namespaces.end()) { + return false; + } else { + return true; + } + } shared_ptr resource_map_; // The task map used in the rest of the system @@ -104,10 +190,10 @@ class CpuCostModel : public CostModelInterface { shared_ptr knowledge_base_; unordered_map task_cpu_cores_requirement_; unordered_map task_rx_bw_requirement_; - unordered_map task_resource_requirement_; + unordered_map task_resource_requirement_; unordered_map ec_cpu_cores_requirement_; unordered_map ec_rx_bw_requirement_; - unordered_map ec_resource_requirement_; + unordered_map ec_resource_requirement_; unordered_map, boost::hash> ecs_for_machines_; unordered_map ec_to_machine_; @@ -115,6 +201,13 @@ class CpuCostModel : public CostModelInterface { unordered_map> ec_to_label_selectors; unordered_map ec_to_td_requirements; + unordered_map>> + ec_to_node_priority_scores; + unordered_map ec_to_max_min_priority_scores; + // Pod affinity/anti-affinity + unordered_map>>* labels_map_; + unordered_set namespaces; }; } // namespace firmament diff --git a/src/scheduling/flow/cpu_cost_model_test.cc b/src/scheduling/flow/cpu_cost_model_test.cc index 872d6433e..93314dabc 100644 --- a/src/scheduling/flow/cpu_cost_model_test.cc +++ b/src/scheduling/flow/cpu_cost_model_test.cc @@ -49,7 +49,8 @@ class CpuCostModelTest : public ::testing::Test { resource_map_.reset(new ResourceMap_t); task_map_.reset(new TaskMap_t); knowledge_base_.reset(new KnowledgeBase); - cost_model = new CpuCostModel(resource_map_, task_map_, knowledge_base_); + cost_model = + new CpuCostModel(resource_map_, task_map_, knowledge_base_, NULL); } virtual ~CpuCostModelTest() { @@ -112,7 +113,7 @@ TEST_F(CpuCostModelTest, AddTask) { TaskID_t task_id = td_ptr->uid(); td_ptr->mutable_resource_request()->set_cpu_cores(10.0); cost_model->AddTask(task_id); - unordered_map::iterator it = + unordered_map::iterator it = cost_model->task_resource_requirement_.find(task_id); EXPECT_NE(cost_model->task_resource_requirement_.end(), it); EXPECT_FLOAT_EQ(10.0, it->second.cpu_cores_); @@ -162,8 +163,8 @@ TEST_F(CpuCostModelTest, EquivClassToEquivClass) { // Calculate cost of arc between main EC and second machine EC. ArcDescriptor arc_cost2 = cost_model->EquivClassToEquivClass( (*equiv_classes)[0], machine_equiv_classes[1]); - EXPECT_EQ(2000, arc_cost1.cost_); - EXPECT_EQ(2051, arc_cost2.cost_); + EXPECT_EQ(1500, arc_cost1.cost_); + EXPECT_EQ(1525, arc_cost2.cost_); // Cost of arc between main EC and first machine EC should be less than // cost of arc between main EC and second machine EC. EXPECT_LT(arc_cost1.cost_, arc_cost2.cost_); @@ -292,7 +293,7 @@ TEST_F(CpuCostModelTest, GatherStats) { sink_node->type_ = FlowNodeType::SINK; // Test GatherStats from sink to PU. cost_model->GatherStats(pu_node, sink_node); - // Verifying number of slots, runnning tasks on PU. + // Verifying number of slots, running tasks on PU. EXPECT_EQ(2U, rd_ptr2->num_running_tasks_below()); EXPECT_EQ(FLAGS_max_tasks_per_pu, rd_ptr2->num_slots_below()); // Test GatherStats from PU to Machine. diff --git a/src/scheduling/flow/flow_scheduler.cc b/src/scheduling/flow/flow_scheduler.cc index fcd2e2196..42f505cec 100644 --- a/src/scheduling/flow/flow_scheduler.cc +++ b/src/scheduling/flow/flow_scheduler.cc @@ -90,11 +90,14 @@ FlowScheduler::FlowScheduler( ResourceID_t coordinator_res_id, const string& coordinator_uri, TimeInterface* time_manager, - TraceGenerator* trace_generator) + TraceGenerator* trace_generator, + unordered_map>>* labels_map, + vector *affinity_antiaffinity_tasks) : EventDrivenScheduler(job_map, resource_map, resource_topology, object_store, task_map, knowledge_base, topo_mgr, m_adapter, event_notifier, coordinator_res_id, - coordinator_uri, time_manager, trace_generator), + coordinator_uri, time_manager, trace_generator, + labels_map, affinity_antiaffinity_tasks), topology_manager_(topo_mgr), last_updated_time_dependent_costs_(0ULL), leaf_res_ids_(new unordered_setset_scheduled_to_resource(rd_ptr->uuid()); flow_graph_manager_->TaskScheduled(td_ptr->uid(), ResourceIDFromString(rd_ptr->uuid())); + // Pod affinity/anti-affinity + if (td_ptr->has_affinity() && (td_ptr->affinity().has_pod_affinity() || + td_ptr->affinity().has_pod_anti_affinity())) { + vector::iterator it = + find(affinity_antiaffinity_tasks_->begin(), + affinity_antiaffinity_tasks_->end(), td_ptr->uid()); + if (it != affinity_antiaffinity_tasks_->end()) { + affinity_antiaffinity_tasks_->erase(it); + } + } EventDrivenScheduler::HandleTaskPlacement(td_ptr, rd_ptr); } @@ -417,16 +431,29 @@ uint64_t FlowScheduler::ScheduleAllJobs(SchedulerStats* scheduler_stats) { return ScheduleAllJobs(scheduler_stats, NULL); } +uint64_t FlowScheduler::ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, + vector* deltas) { + boost::lock_guard lock(scheduling_lock_); + queue_based_schedule = true; + uint64_t num_scheduled_tasks = ScheduleAllJobs(scheduler_stats, deltas); + queue_based_schedule = false; + return num_scheduled_tasks; +} + uint64_t FlowScheduler::ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas) { boost::lock_guard lock(scheduling_lock_); vector jobs; + //Pod affinity/anti-affinity + one_task_runnable = false; for (auto& job_id_jd : jobs_to_schedule_) { if (ComputeRunnableTasksForJob(job_id_jd.second).size() > 0) { jobs.push_back(job_id_jd.second); } } uint64_t num_scheduled_tasks = ScheduleJobs(jobs, scheduler_stats, deltas); + //Pod affinity/anti-affinity + one_task_runnable = false; return num_scheduled_tasks; } diff --git a/src/scheduling/flow/flow_scheduler.h b/src/scheduling/flow/flow_scheduler.h index e593c4df4..75e3a23f8 100644 --- a/src/scheduling/flow/flow_scheduler.h +++ b/src/scheduling/flow/flow_scheduler.h @@ -65,7 +65,10 @@ class FlowScheduler : public EventDrivenScheduler { ResourceID_t coordinator_res_id, const string& coordinator_uri, TimeInterface* time_manager, - TraceGenerator* trace_generator); + TraceGenerator* trace_generator, + unordered_map>>* + labels_map, + vector *affinity_antiaffinity_tasks); ~FlowScheduler(); virtual void DeregisterResource(ResourceTopologyNodeDescriptor* rtnd_ptr); virtual void HandleJobCompletion(JobID_t job_id); @@ -88,6 +91,8 @@ class FlowScheduler : public EventDrivenScheduler { bool local, bool simulated); virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats); + virtual uint64_t ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, + vector* deltas); virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas); virtual uint64_t ScheduleJob(JobDescriptor* jd_ptr, diff --git a/src/scheduling/scheduler_interface.h b/src/scheduling/scheduler_interface.h index 5caf75271..81610eef6 100644 --- a/src/scheduling/scheduler_interface.h +++ b/src/scheduling/scheduler_interface.h @@ -72,10 +72,15 @@ class SchedulerInterface : public PrintableInterface { shared_ptr resource_map, ResourceTopologyNodeDescriptor* resource_topology, shared_ptr object_store, - shared_ptr task_map) + shared_ptr task_map, + unordered_map>>* + labels_map, + vector *affinity_antiaffinity_tasks) : job_map_(job_map), knowledge_base_(knowledge_base), resource_map_(resource_map), task_map_(task_map), - object_store_(object_store), resource_topology_(resource_topology) {} + object_store_(object_store), resource_topology_(resource_topology), + labels_map_(labels_map), + affinity_antiaffinity_tasks_(affinity_antiaffinity_tasks) {} /** * Adds a new job. The job will be scheduled on the next run of the scheduler @@ -242,6 +247,14 @@ class SchedulerInterface : public PrintableInterface { virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats) = 0; virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas) = 0; + /** + * Runs a scheduling iteration for all active queue based jobs. + * @return the number of tasks scheduled + */ + virtual uint64_t ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, + vector* deltas) { + return 0; +} /** * Schedules all runnable tasks in a job. @@ -298,6 +311,9 @@ class SchedulerInterface : public PrintableInterface { shared_ptr object_store_; // Resource topology (including any registered remote resources) ResourceTopologyNodeDescriptor* resource_topology_; + //Pod affinity/anti-affinity + unordered_map>>* labels_map_; + vector *affinity_antiaffinity_tasks_; }; } // namespace scheduler diff --git a/src/scheduling/simple/simple_scheduler.cc b/src/scheduling/simple/simple_scheduler.cc index 2803255a0..874f6ce0f 100644 --- a/src/scheduling/simple/simple_scheduler.cc +++ b/src/scheduling/simple/simple_scheduler.cc @@ -59,7 +59,8 @@ SimpleScheduler::SimpleScheduler( : EventDrivenScheduler(job_map, resource_map, resource_topology, object_store, task_map, knowledge_base, topo_mgr, m_adapter, event_notifier, coordinator_res_id, - coordinator_uri, time_manager, trace_generator) { + coordinator_uri, time_manager, trace_generator, NULL, + NULL) { VLOG(1) << "SimpleScheduler initiated."; } diff --git a/src/sim/simulator_bridge.cc b/src/sim/simulator_bridge.cc index 47bdf3f2f..6befa5a5f 100644 --- a/src/sim/simulator_bridge.cc +++ b/src/sim/simulator_bridge.cc @@ -90,7 +90,7 @@ SimulatorBridge::SimulatorBridge(EventManager* event_manager, shared_ptr( new machine::topology::TopologyManager), messaging_adapter_, this, root_uuid, "http://localhost", - simulated_time_, trace_generator_); + simulated_time_, trace_generator_, NULL, NULL); } else { scheduler_ = new scheduler::SimpleScheduler( job_map_, resource_map_, &rtn_root_,