diff --git a/.vscode/settings.json b/.vscode/settings.json index 8f4c05cc..7a73a41b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,2 @@ { - "julia.environmentPath": "/Users/epacuit/Dropbox/code/voting-scripts/pref_voting" } \ No newline at end of file diff --git a/docs/source/generate_weighted_majority_graphs.md b/docs/source/generate_weighted_majority_graphs.md index 56527a50..af2b2151 100644 --- a/docs/source/generate_weighted_majority_graphs.md +++ b/docs/source/generate_weighted_majority_graphs.md @@ -1,4 +1,4 @@ -Generate Weighted Majority Graphs +Generate (Weighted) Majority Graphs ======================================= ## Generate Linearly Edge-Ordered Tournaments @@ -32,13 +32,27 @@ Generate Weighted Majority Graphs ## Enumerate Canonical Objects +```{eval-rst} +.. autofunction:: pref_voting.generate_weighted_majority_graphs.enumerate_tournaments + +``` + ```{eval-rst} .. autofunction:: pref_voting.generate_weighted_majority_graphs.enumerate_canonical_edge_ordered_tournaments ``` +```{eval-rst} +.. autofunction:: pref_voting.generate_weighted_majority_graphs.enumerate_canonical_weakly_edge_ordered_tournaments + +``` + ```{eval-rst} .. autofunction:: pref_voting.generate_weighted_majority_graphs.enumerate_uniquely_weighted_margin_graphs ``` +```{eval-rst} +.. autofunction:: pref_voting.generate_weighted_majority_graphs.enumerate_margin_graphs + +``` diff --git a/docs/source/iterative_methods.md b/docs/source/iterative_methods.md index 79a6f5ae..59736b95 100644 --- a/docs/source/iterative_methods.md +++ b/docs/source/iterative_methods.md @@ -84,6 +84,78 @@ To illustrate the difference with respect to the second question, consider Insta ``` +### Top-N Instant Runoff for Truncated Linear Orders + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.top_n_instant_runoff_for_truncated_linear_orders + +``` + +## Approval IRV + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.approval_irv + +``` + +### Approval IRV TB + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.approval_irv_tb + +``` + +### Approval IRV PUT + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.approval_irv_put + +``` + +### Approval IRV with Explanation + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.approval_irv_with_explanation + +``` + +## Split IRV + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.split_irv + +``` + +### Split IRV TB + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.split_irv_tb + +``` + +### Split IRV PUT + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.split_irv_put + +``` + +### Split IRV with Explanation + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.split_irv_with_explanation + +``` + ## Plurality With Runoff PUT ```{eval-rst} @@ -92,6 +164,14 @@ To illustrate the difference with respect to the second question, consider Insta ``` +### Plurality With Runoff PUT with Explanation + +```{eval-rst} + +.. autofunction:: pref_voting.iterative_methods.plurality_with_runoff_put_with_explanation + +``` + ## Benham ```{eval-rst} diff --git a/docs/source/variable_voter_axioms.md b/docs/source/variable_voter_axioms.md index 8bef128e..8dea67d6 100644 --- a/docs/source/variable_voter_axioms.md +++ b/docs/source/variable_voter_axioms.md @@ -99,6 +99,26 @@ Variable Voter Axioms ``` +### Single-Voter Resolvability with Truncation + +```{eval-rst} + +.. autofunction:: pref_voting.variable_voter_axioms.has_single_voter_resolvability_violation_with_truncation + +.. autofunction:: pref_voting.variable_voter_axioms.find_all_single_voter_resolvability_violations_with_truncation + +``` + +### Single-Voter Resolvability with Ties + +```{eval-rst} + +.. autofunction:: pref_voting.variable_voter_axioms.has_single_voter_resolvability_violation_with_ties + +.. autofunction:: pref_voting.variable_voter_axioms.find_all_single_voter_resolvability_violations_with_ties + +``` + ## Weak Single-Voter Resolvability ```{eval-rst} @@ -133,4 +153,4 @@ Variable Voter Axioms .. autofunction:: pref_voting.variable_voter_axioms.has_nonlinear_neutral_reversal_violation .. autofunction:: pref_voting.variable_voter_axioms.find_all_nonlinear_neutral_reversal_violations -``` \ No newline at end of file +``` diff --git a/pref_voting/.claude/settings.local.json b/pref_voting/.claude/settings.local.json new file mode 100644 index 00000000..85547bd9 --- /dev/null +++ b/pref_voting/.claude/settings.local.json @@ -0,0 +1,3 @@ +{ + "enableAllProjectMcpServers": false +} diff --git a/pref_voting/generate_weighted_majority_graphs.py b/pref_voting/generate_weighted_majority_graphs.py index 840e4c54..0a8d7575 100644 --- a/pref_voting/generate_weighted_majority_graphs.py +++ b/pref_voting/generate_weighted_majority_graphs.py @@ -1,5 +1,5 @@ ''' - File: generate_margin_graphs.py + File: generate_weighted_majority_graphs.py Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu) Date: July 14, 2022 Updated: December 19, 2022 @@ -8,7 +8,6 @@ ''' - import networkx as nx from itertools import combinations from pref_voting.helper import sublists, compositions, enumerate_compositions, convex_lexicographic_sublists @@ -221,7 +220,87 @@ def pair(p): return MarginGraph(candidates, w_edges) -## Generating Canonical MarginGraphs without Tied Margins +## Enumerating Canonical Majority Graphs + +def _canonical_tournament(num_verts, edges): + """ + Return (certificate, canonical_edges) for a tournament on 0,...,num_verts-1. + + The certificate is the lex-smallest upper-triangle bitstring over all + relabelings of the vertices. canonical_edges is the corresponding + canonically labeled edge set. + """ + edge_set = set(edges) + pairs = list(combinations(range(num_verts), 2)) + + best_bits = None + for perm in permutations(range(num_verts)): + bits = tuple( + 1 if (perm[i], perm[j]) in edge_set else 0 + for i, j in pairs + ) + if best_bits is None or bits < best_bits: + best_bits = bits + + canonical_edges = [ + (i, j) if bit else (j, i) + for bit, (i, j) in zip(best_bits, pairs) + ] + return best_bits, canonical_edges + + +def enumerate_tournaments(num_cands, candidates=None, cmap=None): + """ + Enumerate one MajorityGraph representative from each isomorphism class + of tournaments on num_cands candidates. + """ + if not isinstance(num_cands, int) or num_cands < 0: + raise ValueError("num_cands must be a nonnegative integer.") + + if candidates is None: + candidates = list(range(num_cands)) + else: + candidates = list(candidates) + if len(candidates) != num_cands: + raise ValueError("candidates must have length num_cands.") + + if num_cands <= 1: + return [MajorityGraph(candidates, [], cmap=cmap)] + + # One canonically labeled representative on vertices 0,...,k-1 + # for each isomorphism class at size k. + canonical_edge_sets = [[]] + + for new_v in range(1, num_cands): + seen = {} + + for old_edges in canonical_edge_sets: + # Bit i = 0 means i -> new_v, bit i = 1 means new_v -> i. + for mask in range(1 << new_v): + new_edges = list(old_edges) + + for i in range(new_v): + if (mask >> i) & 1: + new_edges.append((new_v, i)) + else: + new_edges.append((i, new_v)) + + cert, canon_edges = _canonical_tournament(new_v + 1, new_edges) + if cert not in seen: + seen[cert] = canon_edges + + canonical_edge_sets = [seen[cert] for cert in sorted(seen)] + + return [ + MajorityGraph( + candidates, + [(candidates[i], candidates[j]) for i, j in edges], + cmap=cmap, + ) + for edges in canonical_edge_sets + ] + +## Enumerating Canonical MarginGraphs without Tied Margins def _enumerate_ceots(num_cands, num_edges, partial_ceot, used_nodes, next_node): @@ -338,7 +417,7 @@ def enumerate_uniquely_weighted_margin_graphs(num_cands, weight_domain): [(e[0], e[1], weight_list[eidx]) for eidx, e in enumerate(reversed(ceot))]) -## Generating Canonical MarginGraphs with Tied Margins +## Enumerating Canonical MarginGraphs with Tied Margins def _enumerate_cweots_as_edgelist(num_cands, include_weak_tournaments=True): diff --git a/pref_voting/grade_profiles.py b/pref_voting/grade_profiles.py index 728d4c83..f2183bc1 100644 --- a/pref_voting/grade_profiles.py +++ b/pref_voting/grade_profiles.py @@ -78,7 +78,7 @@ def __init__( self.use_grade_order = grade_order is not None - self.compare_function = lambda v1, v2: (v1 > v2) - (v2 > v1) if grade_order is None else lambda v1, v2: (grade_order.index(v1) < grade_order.index(v2)) - (grade_order.index(v2) < grade_order.index(v1)) + self.compare_function = (lambda v1, v2: (v1 > v2) - (v2 > v1)) if grade_order is None else (lambda v1, v2: (grade_order.index(v1) < grade_order.index(v2)) - (grade_order.index(v2) < grade_order.index(v1))) self.gmap = gmap if gmap is not None else {g: str(g) for g in self.grades} """The candidate map is a dictionary associating an alternative with the name used when displaying a alternative.""" diff --git a/pref_voting/iterative_methods.py b/pref_voting/iterative_methods.py index 051389b8..6ddf5a95 100644 --- a/pref_voting/iterative_methods.py +++ b/pref_voting/iterative_methods.py @@ -2,7 +2,7 @@ File: iterative_methods.py Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu) Date: January 6, 2022 - Update: October 2, 2023 + Update: February 7, 2026 Implementations of iterative voting methods. ''' @@ -20,55 +20,234 @@ from pref_voting.profiles import Profile from pref_voting.profiles_with_ties import ProfileWithTies -def _instant_runoff_basic(profile,curr_cands = None): - "The basic implementation of instant runoff" +def _validate_tie_breaker(tie_breaker, candidates): + """Validate a tie-breaker and return a dict mapping candidates to positions.""" + if tie_breaker is None: + return None + tb_pos = {c: i for i, c in enumerate(tie_breaker)} + if len(tb_pos) != len(tie_breaker): + raise ValueError("tie_breaker contains duplicates.") + missing = [c for c in candidates if c not in tb_pos] + if missing: + raise ValueError(f"tie_breaker missing candidates: {sorted(missing)}") + return tb_pos + +def _instant_runoff_basic(profile, curr_cands=None, tie_breaker=None, score_method=None, exit_on_majority=True): + """The basic implementation of instant runoff. + + If tie_breaker is provided, eliminate one candidate at a time using the tie_breaker + to select among tied candidates. tie_breaker[0] has lowest priority (eliminated first). + If tie_breaker is None, eliminate all tied candidates simultaneously. + + Args: + profile (Profile or ProfileWithTies): The profile to use + curr_cands (List[int], optional): Candidates to consider + tie_breaker (List[int], optional): Tie-breaking order (tie_breaker[0] eliminated first) + score_method (str, optional): For ProfileWithTies only. "approval" or "split". + For Profile, this is ignored (always uses plurality). + exit_on_majority (bool): If True, stop as soon as a candidate has a strict majority + of first-place votes. If False, continue until one candidate + remains (or all are tied). Default is True. + + Returns: + A sorted list of winners + """ # need the total number of all candidates in a profile to check when all candidates have been removed num_cands = profile.num_cands candidates = profile.candidates if curr_cands is None else curr_cands - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) - - strict_maj_size = profile.strict_maj_size() - rs, rcounts = profile.rankings_counts # get all the ranking data + if len(candidates) == 0: + return [] + + tb_pos = _validate_tie_breaker(tie_breaker, candidates) - winners = [c for c in candidates - if _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] + # Dispatch based on profile type + if isinstance(profile, ProfileWithTies): + # Use tops_scores for ProfileWithTies + # Note: exit_on_majority is ignored for ProfileWithTies because approval/split + # scores can exceed the number of voters (a voter can approve multiple candidates) + sm = score_method if score_method is not None else "approval" + if sm not in ("approval", "split"): + raise ValueError("score_method must be 'approval' or 'split'") + + remaining_cands = set(candidates) + + while len(remaining_cands) > 1: + # Compute scores based on score_method (use sorted for determinism) + scores = profile.tops_scores(curr_cands=sorted(remaining_cands), score_type=sm) + + min_score = min(scores.values()) + lowest_cands = [c for c, s in scores.items() if _scores_equal(s, min_score)] + + # Handle the all-tied case explicitly first + if len(lowest_cands) == len(remaining_cands): + if tb_pos is None: + # No tie-breaker, return all as winners + return sorted(remaining_cands) + else: + # Use tie-breaker to eliminate one and continue + cand_to_remove = min(remaining_cands, key=lambda c: tb_pos[c]) + remaining_cands.remove(cand_to_remove) + continue + + # If tie_breaker is provided and there's a tie, eliminate one candidate + if tb_pos is not None and len(lowest_cands) > 1: + cand_to_remove = min(lowest_cands, key=lambda c: tb_pos[c]) + remaining_cands.remove(cand_to_remove) + else: + # Remove all candidates with lowest score + remaining_cands -= set(lowest_cands) + + return sorted(remaining_cands) + + elif isinstance(profile, Profile): + # Profile: use the original NumPy-optimized code with plurality scores + cands_to_ignore = np.empty(0, dtype=int) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands], dtype=int) - while len(winners) == 0: - plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates - if not isin(cands_to_ignore,c)} - min_plurality_score = min(plurality_scores.values()) - lowest_first_place_votes = np.array([c for c in plurality_scores.keys() - if plurality_scores[c] == min_plurality_score]) + strict_maj_size = profile.strict_maj_size() + + rs, rcounts = profile.rankings_counts # get all the ranking data - # remove cands with lowest plurality score - cands_to_ignore = np.concatenate((cands_to_ignore, lowest_first_place_votes), axis=None) - if len(cands_to_ignore) == num_cands: # removed all of the candidates - winners = sorted(lowest_first_place_votes) - else: + # Check for majority winner at the start if exit_on_majority is True + if exit_on_majority: winners = [c for c in candidates - if not isin(cands_to_ignore,c) and _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] - - return sorted(winners) + if _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] + else: + winners = [] + + while len(winners) == 0: + plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates + if not isin(cands_to_ignore,c)} + min_plurality_score = min(plurality_scores.values()) + lowest_first_place_votes = np.array([c for c in plurality_scores.keys() + if plurality_scores[c] == min_plurality_score], dtype=int) + + # If tie_breaker is provided, eliminate only the candidate with lowest TB priority + if tb_pos is not None and len(lowest_first_place_votes) > 1: + cand_to_remove = min(lowest_first_place_votes, key=lambda c: tb_pos[c]) + cands_to_ignore = np.concatenate((cands_to_ignore, [cand_to_remove]), axis=None) + else: + # remove all cands with lowest plurality score + cands_to_ignore = np.concatenate((cands_to_ignore, lowest_first_place_votes), axis=None) + + if len(cands_to_ignore) == num_cands: # removed all of the candidates + winners = sorted(lowest_first_place_votes) + else: + # Check for majority winner if exit_on_majority is True + if exit_on_majority: + winners = [c for c in candidates + if not isin(cands_to_ignore,c) and _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] + else: + # Check if only one candidate remains + remaining = [c for c in candidates if not isin(cands_to_ignore, c)] + if len(remaining) == 1: + winners = remaining + + return sorted(winners) + + else: + raise TypeError(f"Expected Profile or ProfileWithTies, got {type(profile)}") -def _instant_runoff_recursive(profile, curr_cands = None): +def _instant_runoff_recursive(profile, curr_cands=None, tie_breaker=None, _tb_pos=None): "A recursive implementation of instant runoff" candidates = curr_cands if curr_cands is not None else profile.candidates + if len(candidates) == 0: + return [] + if len(candidates) == 1: + return sorted(candidates) + + # Validate tie_breaker once on the first call + if tie_breaker is not None and _tb_pos is None: + _tb_pos = _validate_tie_breaker(tie_breaker, candidates) + cands_to_ignore = np.array([c for c in profile.candidates if c not in candidates]) rs, rcounts = profile.rankings_counts # get all the ranking data - plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates if not isin(cands_to_ignore,c)} + plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates if not isin(cands_to_ignore,c)} min_plurality_score = min(plurality_scores.values()) - lowest_first_place_votes = np.array([c for c in plurality_scores.keys() - if plurality_scores[c] == min_plurality_score]) + lowest_first_place_votes = [c for c in plurality_scores.keys() + if plurality_scores[c] == min_plurality_score] if len(lowest_first_place_votes) == len(candidates): - return sorted(lowest_first_place_votes) - + # If all remaining candidates are tied and there is no tie-breaker, + # they are all winners. Otherwise, eliminate one according to the + # tie-breaker and continue. + if _tb_pos is None: + return sorted(lowest_first_place_votes) + cand_to_remove = min(lowest_first_place_votes, key=lambda c: _tb_pos[c]) + return _instant_runoff_recursive( + profile, + [c for c in candidates if c != cand_to_remove], + tie_breaker=tie_breaker, + _tb_pos=_tb_pos + ) + + if _tb_pos is not None and len(lowest_first_place_votes) > 1: + cand_to_remove = min(lowest_first_place_votes, key=lambda c: _tb_pos[c]) + return _instant_runoff_recursive(profile, [c for c in candidates if c != cand_to_remove], tie_breaker=tie_breaker, _tb_pos=_tb_pos) else: - return _instant_runoff_recursive(profile, [c for c in candidates if c not in lowest_first_place_votes]) + return _instant_runoff_recursive(profile, [c for c in candidates if c not in lowest_first_place_votes], tie_breaker=tie_breaker, _tb_pos=_tb_pos) + + +# Constant for float comparison in Split-IRV +FLOAT_TOLERANCE = 1e-12 + +def _scores_equal(a, b, tol=FLOAT_TOLERANCE): + """Check if two scores are equal, handling floats (including numpy.float64) for Split-IRV.""" + # Convert to float to handle numpy.float64 and other numeric types + return abs(float(a) - float(b)) <= tol + + +def _instant_runoff_put_for_profile_with_ties(profile, curr_cands=None, score_method="approval"): + """ + Instant Runoff PUT for ProfileWithTies using approval or split scoring. + + Under PUT, when candidates are tied for lowest score, we branch on eliminating + each one and return the union of all possible winners. + + Note that the only base case is when one candidate remains. We do not + return all candidates when they are all tied; we still branch and recurse. + + Note: exit_on_majority is not supported for approval/split scoring because + approval scores can exceed the number of voters (a voter can approve multiple + candidates). + + Args: + profile (ProfileWithTies): A profile with possible ties in ballots + curr_cands (List[int], optional): Candidates to consider + score_method (str): "approval" or "split" + + Returns: + A sorted list of all possible winners + """ + if score_method not in ("approval", "split"): + raise ValueError("score_method must be 'approval' or 'split'") + + candidates = list(profile.candidates if curr_cands is None else curr_cands) + + if len(candidates) == 0: + return [] + + if len(candidates) == 1: + return candidates + + # Compute scores + scores = profile.tops_scores(curr_cands=candidates, score_type=score_method) + + # Find candidates with lowest score + min_score = min(scores.values()) + lowest_cands = [c for c, s in scores.items() if _scores_equal(s, min_score)] + + # Recursively explore all elimination paths + winners = set() + for cand_to_remove in lowest_cands: + new_cands = [c for c in candidates if c != cand_to_remove] + new_winners = _instant_runoff_put_for_profile_with_ties(profile, curr_cands=new_cands, score_method=score_method) + winners.update(new_winners) + + return sorted(winners) -def _instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, threshold = None, hide_warnings = True): +def _instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, threshold = None, hide_warnings = True): """ Instant Runoff for Truncated Linear Orders. Iteratively remove the candidates with the fewest number of first place votes, until there is a candidate with more than the threshold number of first-place votes. If a threshold is not set, then it is strictly more than half of the non-empty ballots. @@ -105,6 +284,11 @@ def _instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, thre assert all([not r.has_overvote() for r in profile.rankings]), "Instant Runoff is only defined when all the ballots are truncated linear orders." curr_cands = profile.candidates if curr_cands is None else curr_cands + if len(curr_cands) == 0: + return [] + + # Track whether threshold was explicitly provided + threshold_is_default = threshold is None # we need to remove empty rankings during the algorithm, so make a copy of the profile prof2 = copy.deepcopy(profile) @@ -114,23 +298,30 @@ def _instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, thre # remove the empty rankings _prof.remove_empty_rankings() - threshold = threshold if threshold is not None else _prof.strict_maj_size() + if len(_prof.candidates) == 0: + return [] - remaining_candidates = _prof.candidates - - pl_scores = _prof.plurality_scores() - max_pl_score = max(pl_scores.values()) + remaining_candidates = list(_prof.candidates) - while max_pl_score < threshold: - + while True: reduced_prof = _prof.remove_candidates([c for c in _prof.candidates if c not in remaining_candidates]) # after removing the candidates, there might be some empty ballots. reduced_prof.remove_empty_rankings() pl_scores = reduced_prof.plurality_scores() + if len(pl_scores) == 0: + return [] + + # Update threshold if default (based on remaining ballots) + if threshold_is_default: + threshold = reduced_prof.strict_maj_size() + + max_pl_score = max(pl_scores.values()) + if max_pl_score >= threshold: + break + min_pl_score = min(pl_scores.values()) - cands_to_remove = [c for c in pl_scores.keys() if pl_scores[c] == min_pl_score] if not hide_warnings and len(cands_to_remove) > 1: @@ -139,38 +330,35 @@ def _instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, thre if len(cands_to_remove) == len(reduced_prof.candidates): # all remaining candidates have the same plurality score. break - - # possibly update the threshold, so that it is a strict majority of the remaining ballots - threshold = threshold if threshold is not None else reduced_prof.strict_maj_size() - max_pl_score = max(pl_scores.values()) remaining_candidates = [c for c in remaining_candidates if c not in cands_to_remove] - + # final result reduced_prof = _prof.remove_candidates([c for c in _prof.candidates if c not in remaining_candidates]) - - # after removing the candidates, there might be some empty ballots. reduced_prof.remove_empty_rankings() - pl_scores = reduced_prof.plurality_scores() - + if len(pl_scores) == 0: + return [] max_pl_score = max(pl_scores.values()) - return sorted([c for c in pl_scores.keys() if pl_scores[c] == max_pl_score]) @vm(name = "Instant Runoff", - input_types=[ElectionTypes.PROFILE]) -def instant_runoff(profile, curr_cands = None, algorithm = "basic", **kwargs): + input_types=[ElectionTypes.PROFILE, ElectionTypes.PROFILE_WITH_TIES]) +def instant_runoff(profile, curr_cands = None, algorithm = "basic", tie_breaker=None, score_method=None, exit_on_majority=True): """ If there is a majority winner then that candidate is the winner. If there is no majority winner, then remove all candidates that are ranked first by the fewest number of voters. Continue removing candidates with the fewest number first-place votes until there is a candidate with a majority of first place votes. .. important:: - If there is more than one candidate with the fewest number of first-place votes, then *all* such candidates are removed from the profile. + If there is more than one candidate with the fewest number of first-place votes and ``tie_breaker`` is None, then *all* such candidates are removed from the profile. If ``tie_breaker`` is provided, only one candidate is removed at a time (the one with lowest priority in the tie_breaker). Args: - profile (Profile): An anonymous profile of linear orders on a set of candidates + profile (Profile or ProfileWithTies): An anonymous profile of linear orders or weak orders on a set of candidates curr_cands (List[int], optional): If set, then find the winners for the profile restricted to the candidates in ``curr_cands`` algorithm (str, optional): The algorithm to use. Options are "basic" and "recursive". The default is "basic". + tie_breaker (List[int], optional): If provided, use this linear order to break ties. tie_breaker[0] has lowest priority (eliminated first among tied). + score_method (str, optional): For ProfileWithTies only. "approval" (default) or "split". + exit_on_majority (bool): If True (default), stop as soon as a candidate has a strict majority + of first-place votes. If False, continue until one candidate remains. Returns: A sorted list of candidates @@ -197,17 +385,19 @@ def instant_runoff(profile, curr_cands = None, algorithm = "basic", **kwargs): hare.display(prof) """ - if isinstance(profile, Profile): + if isinstance(profile, Profile): if algorithm == "basic": - return _instant_runoff_basic(profile, curr_cands = curr_cands) - + return _instant_runoff_basic(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, exit_on_majority=exit_on_majority) elif algorithm == "recursive": - return _instant_runoff_recursive(profile, curr_cands = curr_cands) - + return _instant_runoff_recursive(profile, curr_cands=curr_cands, tie_breaker=tie_breaker) else: raise ValueError("Algorithm must be either 'basic' or 'recursive'.") - elif isinstance(profile, ProfileWithTies): - return _instant_runoff_for_truncated_linear_orders(profile, curr_cands = curr_cands, **kwargs) + elif isinstance(profile, ProfileWithTies): + # Use unified _instant_runoff_basic for ProfileWithTies + sm = score_method if score_method is not None else "approval" + return _instant_runoff_basic(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, score_method=sm, exit_on_majority=exit_on_majority) + else: + raise TypeError(f"Expected Profile or ProfileWithTies, got {type(profile)}") # Create some aliases for instant runoff instant_runoff.set_name("Hare") hare = copy.deepcopy(instant_runoff) @@ -256,14 +446,17 @@ def instant_runoff_ranking(profile, curr_cands = None): return ranking @vm(name = "Instant Runoff TB", - input_types=[ElectionTypes.PROFILE]) -def instant_runoff_tb(profile, curr_cands = None, tie_breaker = None): + input_types=[ElectionTypes.PROFILE, ElectionTypes.PROFILE_WITH_TIES]) +def instant_runoff_tb(profile, curr_cands = None, tie_breaker = None, score_method=None, exit_on_majority=True): """Instant Runoff (``instant_runoff``) with tie breaking: If there is more than one candidate with the fewest number of first-place votes, then remove the candidate with lowest in the tie_breaker ranking from the profile. Args: - profile (Profile): An anonymous profile of linear orders on a set of candidates + profile (Profile or ProfileWithTies): An anonymous profile of linear orders or weak orders on a set of candidates curr_cands (List[int], optional): If set, then find the winners for the profile restricted to the candidates in ``curr_cands`` tie_breaker (List[int]): A list of the candidates in the profile to be used as a tiebreaker. + score_method (str, optional): For ProfileWithTies only. "approval" (default) or "split". + exit_on_majority (bool): If True (default), stop as soon as a candidate has a strict majority + of first-place votes. If False, continue until one candidate remains. Returns: A sorted list of candidates @@ -286,53 +479,22 @@ def instant_runoff_tb(profile, curr_cands = None, tie_breaker = None): instant_runoff_tb.display(prof, tie_breaker=[1, 2, 0]) """ - # the tie_breaker is any linear order (i.e., list) of the candidates - tb = tie_breaker if tie_breaker is not None else list(range(profile.num_cands)) - - # need the total number of all candidates in a profile to check when all candidates have been removed - num_cands = profile.num_cands - - candidates = profile.candidates if curr_cands is None else curr_cands - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) - - strict_maj_size = profile.strict_maj_size() - - rs, rcounts = profile.rankings_counts # get all the ranking data - - winners = [c for c in candidates - if _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] - - while len(winners) == 0: - plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates if not isin(cands_to_ignore,c)} - min_plurality_score = min(plurality_scores.values()) - lowest_first_place_votes = np.array([c for c in plurality_scores.keys() - if plurality_scores[c] == min_plurality_score]) - - cand_to_remove = lowest_first_place_votes[0] - for c in lowest_first_place_votes[1:]: - if tb.index(c) < tb.index(cand_to_remove): - cand_to_remove = c - - # remove cands with lowest plurality winners - cands_to_ignore = np.concatenate((cands_to_ignore, cand_to_remove), axis=None) - if len(cands_to_ignore) == num_cands: #all the candidates where removed - winners = sorted(lowest_first_place_votes) - else: - winners = [c for c in candidates - if not isin(cands_to_ignore,c) and _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] - - return sorted(winners) + tb = tie_breaker if tie_breaker is not None else list(profile.candidates) + return instant_runoff(profile, curr_cands=curr_cands, tie_breaker=tb, score_method=score_method, exit_on_majority=exit_on_majority) @vm(name = "Instant Runoff PUT", - input_types=[ElectionTypes.PROFILE]) -def instant_runoff_put(profile, curr_cands = None): + input_types=[ElectionTypes.PROFILE, ElectionTypes.PROFILE_WITH_TIES]) +def instant_runoff_put(profile, curr_cands = None, score_method=None, exit_on_majority=True): """ Instant Runoff (:func:`instant_runoff`) with parallel universe tie-breaking (PUT), defined recursively: if there is a candidate with a strict majority of first-place votes, that candidate is the IRV-PUT winner; otherwise a candidate x is an IRV-PUT winner if there is some candidate y with a minimal number of first-place votes such that after removing y from the profile, x is an IRV-PUT winner. Args: - profile (Profile): An anonymous profile of linear orders on a set of candidates + profile (Profile or ProfileWithTies): An anonymous profile of linear orders or weak orders on a set of candidates curr_cands (List[int], optional): If set, then find the winners for the profile restricted to the candidates in ``curr_cands`` + score_method (str, optional): For ProfileWithTies only. "approval" (default) or "split". + exit_on_majority (bool): If True (default), stop as soon as a candidate has a strict majority + of first-place votes. If False, continue until one candidate remains. Returns: A sorted list of candidates @@ -369,36 +531,54 @@ def instant_runoff_put(profile, curr_cands = None): """ - - candidates = profile.candidates if curr_cands is None else curr_cands - - plurality_scores = profile.plurality_scores(candidates) - - strict_maj_size = profile.strict_maj_size() - majority_winner = [cand for cand, score in plurality_scores.items() if score >= strict_maj_size] + if isinstance(profile, Profile): + candidates = profile.candidates if curr_cands is None else curr_cands + + if len(candidates) == 0: + return [] + + if len(candidates) == 1: + return list(candidates) - if len(majority_winner) > 0: - return majority_winner - - original_num_cands = len(candidates) - - # immediately eliminate candidates with plurality score 0 - # this is safe, because every elimination order will eliminate all these candidates first (in some order) - candidates = [cand for cand in candidates if plurality_scores[cand] > 0] - if len(candidates) < original_num_cands: - # if we removed some candidates, we need to update the plurality scores plurality_scores = profile.plurality_scores(candidates) - # plurality losers - worst_score = min(plurality_scores.values()) - cands_to_remove = [cand for cand, value in plurality_scores.items() if value == worst_score] - - winners = [] - for cand_to_remove in cands_to_remove: - new_winners = instant_runoff_put(profile, curr_cands = [c for c in candidates if not c == cand_to_remove]) - winners = winners + new_winners + # Check for majority winner if exit_on_majority is True + if exit_on_majority: + strict_maj_size = profile.strict_maj_size() + majority_winner = [cand for cand, score in plurality_scores.items() if score >= strict_maj_size] + if len(majority_winner) > 0: + return majority_winner + + original_num_cands = len(candidates) + + # immediately eliminate candidates with plurality score 0 + # this is safe, because every elimination order will eliminate all these candidates first (in some order) + candidates = [cand for cand in candidates if plurality_scores[cand] > 0] + if len(candidates) == 0: + # All candidates had score 0, return all original candidates + return sorted(profile.candidates if curr_cands is None else curr_cands) + if len(candidates) < original_num_cands: + # if we removed some candidates, we need to update the plurality scores + plurality_scores = profile.plurality_scores(candidates) + + # plurality losers + worst_score = min(plurality_scores.values()) + cands_to_remove = [cand for cand, value in plurality_scores.items() if value == worst_score] + + winners = [] + for cand_to_remove in cands_to_remove: + new_winners = instant_runoff_put(profile, curr_cands = [c for c in candidates if not c == cand_to_remove], exit_on_majority=exit_on_majority) + winners = winners + new_winners + + return sorted(set(winners)) - return sorted(set(winners)) + elif isinstance(profile, ProfileWithTies): + sm = score_method if score_method is not None else "approval" + # Note: exit_on_majority is not passed to ProfileWithTies because approval/split + # scores can exceed the number of voters, making majority check meaningless + return _instant_runoff_put_for_profile_with_ties(profile, curr_cands=curr_cands, score_method=sm) + else: + raise TypeError(f"Expected Profile or ProfileWithTies, got {type(profile)}") # Create some aliases for instant runoff @@ -413,13 +593,17 @@ def instant_runoff_put(profile, curr_cands = None): instant_runoff_put.set_name("Instant Runoff PUT") -def instant_runoff_with_explanation(profile, curr_cands = None): +def instant_runoff_with_explanation(profile, curr_cands=None, tie_breaker=None, score_method=None, exit_on_majority=True): """ Instant Runoff with an explanation. In addition to the winner(s), return the order in which the candidates are eliminated as a list of lists. Args: profile (Profile): An anonymous profile of linear orders on a set of candidates curr_cands (List[int], optional): If set, then find the winners for the profile restricted to the candidates in ``curr_cands`` + tie_breaker (List[int], optional): If provided, use this linear order to break ties. tie_breaker[0] has lowest priority (eliminated first among tied). + score_method (str, optional): For ProfileWithTies only. "approval" (default) or "split". + exit_on_majority (bool): If True (default), stop as soon as a candidate has a strict majority + of first-place votes. If False, continue until one candidate remains. Returns: A sorted list of candidates @@ -456,39 +640,112 @@ def instant_runoff_with_explanation(profile, curr_cands = None): print(f"order of elimination: {exp}") """ - # need the total number of all candidates in a profile to check when all candidates have been removed - num_cands = profile.num_cands - - candidates = profile.candidates if curr_cands is None else curr_cands - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) - - strict_maj_size = profile.strict_maj_size() - - rs, rcounts = profile.rankings_counts # get all the ranking data - - - winners = [c for c in candidates - if _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] - elims_list = list() - - while len(winners) == 0: - plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates - if not isin(cands_to_ignore,c)} - min_plurality_score = min(plurality_scores.values()) - lowest_first_place_votes = np.array([c for c in plurality_scores.keys() - if plurality_scores[c] == min_plurality_score]) + if isinstance(profile, Profile): + # need the total number of all candidates in a profile to check when all candidates have been removed + num_cands = profile.num_cands + + candidates = profile.candidates if curr_cands is None else curr_cands + + if len(candidates) == 0: + return [], [] + + cands_to_ignore = np.empty(0, dtype=int) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands], dtype=int) - elims_list.append(list(lowest_first_place_votes)) + strict_maj_size = profile.strict_maj_size() + + rs, rcounts = profile.rankings_counts # get all the ranking data + + # Validate tie_breaker if provided + tb_pos = _validate_tie_breaker(tie_breaker, candidates) - # remove cands with lowest plurality winners - cands_to_ignore = np.concatenate((cands_to_ignore, lowest_first_place_votes), axis=None) - if len(cands_to_ignore) == num_cands: # removed all of the candidates - winners = sorted(lowest_first_place_votes) - else: + # Check for majority winner at start if exit_on_majority is True + if exit_on_majority: winners = [c for c in candidates - if not isin(cands_to_ignore,c) and _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] - - return sorted(winners), elims_list + if _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] + else: + winners = [] + elims_list = list() + + while len(winners) == 0: + plurality_scores = {c: _num_rank_first(rs, rcounts, cands_to_ignore, c) for c in candidates + if not isin(cands_to_ignore,c)} + min_plurality_score = min(plurality_scores.values()) + lowest_first_place_votes = [c for c in plurality_scores.keys() + if plurality_scores[c] == min_plurality_score] + + if tb_pos is not None and len(lowest_first_place_votes) > 1: + cand_to_remove = min(lowest_first_place_votes, key=lambda c: tb_pos[c]) + elims_list.append([cand_to_remove]) + cands_to_ignore = np.concatenate((cands_to_ignore, [cand_to_remove]), axis=None) + else: + elims_list.append(list(lowest_first_place_votes)) + # remove cands with lowest plurality winners + cands_to_ignore = np.concatenate((cands_to_ignore, lowest_first_place_votes), axis=None) + + if len(cands_to_ignore) == num_cands: # removed all of the candidates + winners = sorted(lowest_first_place_votes) + else: + remaining = [c for c in candidates if not isin(cands_to_ignore, c)] + if len(remaining) == 1: + # Only one candidate left + winners = remaining + elif exit_on_majority: + winners = [c for c in candidates + if not isin(cands_to_ignore,c) and _num_rank_first(rs, rcounts, cands_to_ignore, c) >= strict_maj_size] + else: + winners = [] + + return sorted(winners), elims_list + + elif isinstance(profile, ProfileWithTies): + # Note: exit_on_majority is ignored for ProfileWithTies because approval/split + # scores can exceed the number of voters (a voter can approve multiple candidates) + sm = score_method if score_method is not None else "approval" + if sm not in ("approval", "split"): + raise ValueError(f"score_method must be 'approval' or 'split', got '{sm}'") + + candidates = list(profile.candidates if curr_cands is None else curr_cands) + + if len(candidates) == 0: + return [], [] + + remaining_cands = set(candidates) + elims_list = list() + + # Validate tie_breaker if provided + tb_pos = _validate_tie_breaker(tie_breaker, candidates) + + while len(remaining_cands) > 1: + scores = profile.tops_scores(curr_cands=sorted(remaining_cands), score_type=sm) + + min_score = min(scores.values()) + lowest_cands = sorted([c for c in scores.keys() if _scores_equal(scores[c], min_score)]) + + if len(lowest_cands) == len(remaining_cands): + # All remaining candidates tied for lowest + if tb_pos is None: + # No tie-breaker, record the terminal tie round and return all as winners + elims_list.append(list(lowest_cands)) + break + else: + # Use tie-breaker to eliminate one + cand_to_remove = min(remaining_cands, key=lambda c: tb_pos[c]) + elims_list.append([cand_to_remove]) + remaining_cands.remove(cand_to_remove) + continue + + if tb_pos is not None and len(lowest_cands) > 1: + cand_to_remove = min(lowest_cands, key=lambda c: tb_pos[c]) + elims_list.append([cand_to_remove]) + remaining_cands.remove(cand_to_remove) + else: + elims_list.append(list(lowest_cands)) + remaining_cands -= set(lowest_cands) + + return sorted(remaining_cands), elims_list + + else: + raise TypeError(f"Expected Profile or ProfileWithTies, got {type(profile)}") @vm(name="Instant Runoff (Truncated Linear Orders)", input_types=[ElectionTypes.TRUNCATED_LINEAR_PROFILE]) @@ -525,62 +782,92 @@ def instant_runoff_for_truncated_linear_orders(profile, curr_cands = None, thres """ - - assert all([not r.has_overvote() for r in profile.rankings]), "Instant Runoff is only defined when all the ballots are truncated linear orders." - - curr_cands = profile.candidates if curr_cands is None else curr_cands + return _instant_runoff_for_truncated_linear_orders(profile, curr_cands=curr_cands, threshold=threshold, hide_warnings=hide_warnings) - # we need to remove empty rankings during the algorithm, so make a copy of the profile - prof2 = copy.deepcopy(profile) - - _prof = prof2.remove_candidates([c for c in profile.candidates if c not in curr_cands]) - # remove the empty rankings - _prof.remove_empty_rankings() +@vm(name="Approval IRV", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def approval_irv(profile, curr_cands=None, tie_breaker=None): + """ + Approval-based Instant Runoff Voting for ProfileWithTies. - threshold = threshold if threshold is not None else _prof.strict_maj_size() + Based on Delemazure & Peters (2024) "Approval-Based Instant-Runoff Voting" (https://arxiv.org/abs/2404.11407). + Each voter's ballot approves all candidates at their top rank among remaining candidates. + Candidates with the fewest approvals are eliminated. - remaining_candidates = _prof.candidates - - pl_scores = _prof.plurality_scores() - max_pl_score = max(pl_scores.values()) + Args: + profile (ProfileWithTies): A profile with possible ties in ballots + curr_cands (List[int], optional): Candidates to consider + tie_breaker (List[int], optional): If provided, eliminate one at a time using this order. + tie_breaker[0] has lowest priority (eliminated first). - while max_pl_score < threshold: + Returns: + A sorted list of winners + """ + return instant_runoff(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, score_method="approval") - reduced_prof = _prof.remove_candidates([c for c in _prof.candidates if c not in remaining_candidates]) - - # after removing the candidates, there might be some empty ballots. - reduced_prof.remove_empty_rankings() - pl_scores = reduced_prof.plurality_scores() - min_pl_score = min(pl_scores.values()) - - cands_to_remove = [c for c in pl_scores.keys() if pl_scores[c] == min_pl_score] +@vm(name="Approval IRV TB", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def approval_irv_tb(profile, curr_cands=None, tie_breaker=None): + """Approval IRV with tie-breaking. See :func:`approval_irv` for details.""" + tb = tie_breaker if tie_breaker is not None else list(profile.candidates) + return approval_irv(profile, curr_cands=curr_cands, tie_breaker=tb) - if not hide_warnings and len(cands_to_remove) > 1: - print(f"Warning: multiple candidates removed in a round: {', '.join(map(str,cands_to_remove))}") - - if len(cands_to_remove) == len(reduced_prof.candidates): - # all remaining candidates have the same plurality score. - break - - # possibly update the threshold, so that it is a strict majority of the remaining ballots - threshold = threshold if threshold is not None else reduced_prof.strict_maj_size() - max_pl_score = max(pl_scores.values()) - remaining_candidates = [c for c in remaining_candidates if c not in cands_to_remove] +@vm(name="Approval IRV PUT", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def approval_irv_put(profile, curr_cands=None): + """Approval IRV with parallel universe tie-breaking. See :func:`approval_irv` for details.""" + return instant_runoff_put(profile, curr_cands=curr_cands, score_method="approval") - reduced_prof = _prof.remove_candidates([c for c in _prof.candidates if c not in remaining_candidates]) +def approval_irv_with_explanation(profile, curr_cands=None, tie_breaker=None): + """Approval IRV with explanation. See :func:`instant_runoff_with_explanation` for details.""" + return instant_runoff_with_explanation(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, score_method="approval") - # after removing the candidates, there might be some empty ballots. - reduced_prof.remove_empty_rankings() - - pl_scores = reduced_prof.plurality_scores() + +@vm(name="Split IRV", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def split_irv(profile, curr_cands=None, tie_breaker=None): + """ + Split-based Instant Runoff Voting for ProfileWithTies. - max_pl_score = max(pl_scores.values()) + Based on Delemazure & Peters (2024) "Approval-Based Instant-Runoff Voting" (https://arxiv.org/abs/2404.11407). + Each voter's vote is split equally among all candidates at their top rank among remaining candidates. + Candidates with the lowest split score are eliminated. - return sorted([c for c in pl_scores.keys() if pl_scores[c] == max_pl_score]) + Args: + profile (ProfileWithTies): A profile with possible ties in ballots + curr_cands (List[int], optional): Candidates to consider + tie_breaker (List[int], optional): If provided, eliminate one at a time using this order. + tie_breaker[0] has lowest priority (eliminated first). + + Returns: + A sorted list of winners + """ + return instant_runoff(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, score_method="split") + + +@vm(name="Split IRV TB", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def split_irv_tb(profile, curr_cands=None, tie_breaker=None): + """Split IRV with tie-breaking. See :func:`split_irv` for details.""" + tb = tie_breaker if tie_breaker is not None else list(profile.candidates) + return split_irv(profile, curr_cands=curr_cands, tie_breaker=tb) + + +@vm(name="Split IRV PUT", + input_types=[ElectionTypes.PROFILE_WITH_TIES]) +def split_irv_put(profile, curr_cands=None): + """Split IRV with parallel universe tie-breaking. See :func:`split_irv` for details.""" + return instant_runoff_put(profile, curr_cands=curr_cands, score_method="split") + + +def split_irv_with_explanation(profile, curr_cands=None, tie_breaker=None): + """Split IRV with explanation. See :func:`instant_runoff_with_explanation` for details.""" + return instant_runoff_with_explanation(profile, curr_cands=curr_cands, tie_breaker=tie_breaker, score_method="split") + def top_n_instant_runoff_for_truncated_linear_orders( profile, @@ -1239,7 +1526,7 @@ def baldwin_tb(profile, curr_cands = None, tie_breaker=None): cands_to_ignore = np.concatenate((cands_to_ignore, np.array([cand_to_remove])), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have lowest Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have lowest Borda score winners = sorted(last_place_borda_scores) else: # remove the candidates with lowest Borda score num_cands = len(candidates) @@ -1373,7 +1660,8 @@ def baldwin_with_explanation(profile, curr_cands = None): rcounts = profile._rcounts # get all the ranking data rankings = profile._rankings if curr_cands is None else _find_updated_profile(profile._rankings, np.array([c for c in profile.candidates if c not in curr_cands]), all_num_cands) - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) + cands_to_ignore = np.empty(0) + num_cands = len(candidates) borda_scores = {c: _borda_score(rankings, rcounts, len(candidates), c) for c in candidates} @@ -1385,10 +1673,9 @@ def baldwin_with_explanation(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, np.array(last_place_borda_scores)), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have lowest Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have lowest Borda score winners = sorted(last_place_borda_scores) else: # remove the candidates with lowest Borda score - num_cands = len(candidates) updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) while len(winners) == 0: @@ -1401,7 +1688,7 @@ def baldwin_with_explanation(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, np.array(last_place_borda_scores)), axis=None) - if cands_to_ignore.shape[0] == all_num_cands: # removed all remaining candidates + if cands_to_ignore.shape[0] == num_cands: # removed all remaining candidates winners = sorted(last_place_borda_scores) elif num_cands - cands_to_ignore.shape[0] == 1: # only one candidate remains winners = sorted([c for c in candidates if c not in cands_to_ignore]) @@ -1448,6 +1735,7 @@ def strict_nanson(profile, curr_cands = None): rcounts = profile._rcounts # get all the ranking data rankings = profile._rankings if curr_cands is None else _find_updated_profile(profile._rankings, np.array([c for c in profile.candidates if c not in curr_cands]), all_num_cands) cands_to_ignore = np.empty(0) + num_cands = len(candidates) borda_scores = {c: _borda_score(rankings, rcounts, len(candidates), c) for c in candidates} @@ -1456,10 +1744,9 @@ def strict_nanson(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, below_borda_avg_candidates), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have same Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have same Borda score winners = sorted(candidates) else: - num_cands = len(candidates) updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) while len(winners) == 0: @@ -1473,7 +1760,7 @@ def strict_nanson(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, below_borda_avg_candidates), axis=None) - if (below_borda_avg_candidates.shape[0] == 0) or ((all_num_cands - cands_to_ignore.shape[0]) == 1): + if (below_borda_avg_candidates.shape[0] == 0) or ((num_cands - cands_to_ignore.shape[0]) == 1): winners = sorted([c for c in candidates if c not in cands_to_ignore]) else: updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) @@ -1511,6 +1798,7 @@ def strict_nanson_with_explanation(profile, curr_cands = None): rcounts = profile._rcounts # get all the ranking data rankings = profile._rankings if curr_cands is None else _find_updated_profile(profile._rankings, np.array([c for c in profile.candidates if c not in curr_cands]), all_num_cands) cands_to_ignore = np.empty(0) + num_cands = len(candidates) elim_list = list() borda_scores = {c: _borda_score(rankings, rcounts, len(candidates), c) for c in candidates} @@ -1520,13 +1808,12 @@ def strict_nanson_with_explanation(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, np.array(below_borda_avg_candidates)), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have same Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have same Borda score elim_list.append({"avg_borda_score": avg_borda_score, "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) winners = sorted(candidates) else: - num_cands = len(candidates) elim_list.append({"avg_borda_score": avg_borda_score, "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) @@ -1546,7 +1833,7 @@ def strict_nanson_with_explanation(profile, curr_cands = None): "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) - if (len(below_borda_avg_candidates) == 0) or ((all_num_cands - cands_to_ignore.shape[0]) == 1): + if (len(below_borda_avg_candidates) == 0) or ((num_cands - cands_to_ignore.shape[0]) == 1): winners = sorted([c for c in candidates if c not in cands_to_ignore]) else: updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) @@ -1593,7 +1880,8 @@ def weak_nanson(profile, curr_cands = None): rcounts = profile._rcounts # get all the ranking data rankings = profile._rankings if curr_cands is None else _find_updated_profile(profile._rankings, np.array([c for c in profile.candidates if c not in curr_cands]), all_num_cands) - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) + cands_to_ignore = np.empty(0) + num_cands = len(candidates) borda_scores = {c: _borda_score(rankings, rcounts, len(candidates), c) for c in candidates} @@ -1604,12 +1892,11 @@ def weak_nanson(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, below_borda_avg_candidates), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have same Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have same Borda score winners = sorted(candidates) - elif all_num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains + elif num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains winners = [c for c in candidates if not isin(cands_to_ignore, c)] else: - num_cands = len(candidates) updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) while len(winners) == 0: @@ -1625,9 +1912,9 @@ def weak_nanson(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, below_borda_avg_candidates), axis=None) - if cands_to_ignore.shape[0] == all_num_cands: # all remaining candidates have been removed + if cands_to_ignore.shape[0] == num_cands: # all remaining candidates have been removed winners = sorted(below_borda_avg_candidates) - elif all_num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains + elif num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains winners = [c for c in candidates if not isin(cands_to_ignore, c)] else: updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) @@ -1665,7 +1952,8 @@ def weak_nanson_with_explanation(profile, curr_cands = None): rcounts = profile._rcounts # get all the ranking data rankings = profile._rankings if curr_cands is None else _find_updated_profile(profile._rankings, np.array([c for c in profile.candidates if c not in curr_cands]), all_num_cands) - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) + cands_to_ignore = np.empty(0) + num_cands = len(candidates) elim_list = list() borda_scores = {c: _borda_score(rankings, rcounts, len(candidates), c) for c in candidates} @@ -1676,18 +1964,17 @@ def weak_nanson_with_explanation(profile, curr_cands = None): cands_to_ignore = np.concatenate((cands_to_ignore, np.array(below_borda_avg_candidates)), axis=None) winners = list() - if cands_to_ignore.shape[0] == all_num_cands: # all candidates have same Borda score + if cands_to_ignore.shape[0] == num_cands: # all candidates have same Borda score elim_list.append({"avg_borda_score": avg_borda_score, "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) winners = sorted(candidates) - elif all_num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains + elif num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains elim_list.append({"avg_borda_score": avg_borda_score, "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) winners = [c for c in candidates if not isin(cands_to_ignore, c)] else: - num_cands = len(candidates) elim_list.append({"avg_borda_score": avg_borda_score, "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) @@ -1709,9 +1996,9 @@ def weak_nanson_with_explanation(profile, curr_cands = None): "elim_cands": below_borda_avg_candidates, "borda_scores": borda_scores}) - if cands_to_ignore.shape[0] == all_num_cands: # all remaining candidates have been removed + if cands_to_ignore.shape[0] == num_cands: # all remaining candidates have been removed winners = sorted(below_borda_avg_candidates) - elif all_num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains + elif num_cands - cands_to_ignore.shape[0] == 1: # one candidate remains winners = [c for c in candidates if not isin(cands_to_ignore, c)] else: updated_rankings = _find_updated_profile(rankings, cands_to_ignore, num_cands) @@ -1911,7 +2198,7 @@ def benham_tb(profile, curr_cands = None, tie_breaker = None): num_cands = profile.num_cands candidates = profile.candidates if curr_cands is None else curr_cands - cands_to_ignore = np.empty(0) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands]) + cands_to_ignore = np.empty(0, dtype=int) if curr_cands is None else np.array([c for c in profile.candidates if c not in curr_cands], dtype=int) rs, rcounts = profile.rankings_counts # get all the ranking data @@ -1931,7 +2218,7 @@ def benham_tb(profile, curr_cands = None, tie_breaker = None): cand_to_remove = c # remove cands with lowest plurality winners - cands_to_ignore = np.concatenate((cands_to_ignore, cand_to_remove), axis=None) + cands_to_ignore = np.concatenate((cands_to_ignore, [cand_to_remove]), axis=None) if len(cands_to_ignore) == num_cands: #all the candidates where removed winners = sorted(lowest_first_place_votes) else: @@ -2230,7 +2517,7 @@ def plurality_veto(profile, curr_cands=None, voter_order=None): Returns: A sorted list of candidates - warning:: + .. warning:: If no voter order is specified, the method uses the default order of voter rankings in the profile. Note that anonymizing a profile changes the order of voter rankings. """ candidates = profile.candidates if curr_cands is None else curr_cands @@ -2241,11 +2528,21 @@ def plurality_veto(profile, curr_cands=None, voter_order=None): # If no voter order specified, use default order if voter_order is None: voter_order = list(range(profile.num_voters)) + else: + voter_order = list(voter_order) + if len(voter_order) != profile.num_voters or set(voter_order) != set(range(profile.num_voters)): + raise ValueError("voter_order must be a permutation of range(profile.num_voters).") # Track non-eliminated candidates and last remaining active_candidates = set(candidates) last_remaining = None # Track the last remaining candidate + # Eliminate candidates with zero initial plurality score + zero_initial = sorted([c for c in candidates if scores[c] == 0]) + for c in zero_initial: + active_candidates.remove(c) + last_remaining = c + # Process each voter for voter in voter_order: # Get remaining candidates with positive scores @@ -2288,7 +2585,7 @@ def plurality_veto_with_explanation(profile, curr_cands=None, voter_order=None): Returns: tuple: A tuple containing (winner list, explanation string) - warning:: + .. warning:: If no voter order is specified, the method uses the default order of voter rankings in the profile. Note that anonymizing a profile changes the order of voter rankings. """ curr_cands = profile.candidates if curr_cands is None else curr_cands @@ -2296,15 +2593,19 @@ def plurality_veto_with_explanation(profile, curr_cands=None, voter_order=None): if voter_order is None: voter_order = list(range(profile.num_voters)) + else: + voter_order = list(voter_order) + if len(voter_order) != profile.num_voters or set(voter_order) != set(range(profile.num_voters)): + raise ValueError("voter_order must be a permutation of range(profile.num_voters).") explanation = [ "Initial plurality scores: " + str(dict(scores)), ] # Note any candidates eliminated due to zero initial plurality scores - zero_initial = [c for c in curr_cands if scores[c] == 0] + zero_initial = sorted([c for c in curr_cands if scores[c] == 0]) if zero_initial: - explanation.append(f"Candidates eliminated due to zero initial plurality score: {sorted(zero_initial)}") + explanation.append(f"Candidates eliminated due to zero initial plurality score: {zero_initial}") explanation.append("") # Add blank line active_candidates = set(curr_cands) @@ -2321,18 +2622,18 @@ def plurality_veto_with_explanation(profile, curr_cands=None, voter_order=None): explanation.append("All remaining candidates have score 0") if last_remaining is not None: explanation.append(f"Winners are candidates [{last_remaining}] (highest remaining scores)") - return [last_remaining], "\\n".join(explanation) + return [last_remaining], "\n".join(explanation) else: winners = sorted(active_candidates) explanation.append(f"Winners are candidates {winners} (highest remaining scores)") - return winners, "\\n".join(explanation) + return winners, "\n".join(explanation) # If only one candidate remains with positive score, they are the winner if len(remaining) == 1: winners = sorted(remaining) explanation.append(f"Only one candidate remains with positive score") explanation.append(f"Winners: {winners} (highest remaining scores)") - return winners, "\\n".join(explanation) + return winners, "\n".join(explanation) ranking = profile.rankings[voter] # Filter ranking to show only active candidates @@ -2353,12 +2654,12 @@ def plurality_veto_with_explanation(profile, curr_cands=None, voter_order=None): if last_remaining is not None: explanation.append(f"Winners: [{last_remaining}] (highest remaining scores)") - return [last_remaining], "\\n".join(explanation) + return [last_remaining], "\n".join(explanation) else: max_score = max(scores.values()) winners = sorted([c for c in curr_cands if scores[c] == max_score]) explanation.append(f"Winners: {winners} (highest remaining scores)") - return winners, "\\n".join(explanation) + return winners, "\n".join(explanation) @vm(name="Consensus Builder", input_types=[ElectionTypes.PROFILE]) @@ -2422,4 +2723,4 @@ def consensus_builder(profile, curr_cands=None, consensus_building_ranking=None, weak_nanson_with_explanation, iterated_removal_cl_with_explanation, plurality_veto_with_explanation -] \ No newline at end of file +] diff --git a/pref_voting/pref_grade_profile.py b/pref_voting/pref_grade_profile.py new file mode 100644 index 00000000..b4f78311 --- /dev/null +++ b/pref_voting/pref_grade_profile.py @@ -0,0 +1,1370 @@ +''' + File: pref_grade_profile.py + Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu) + Date: March 28, 2026 + + A class that represents profiles in which each voter submits both a + (truncated) strict weak order and an assignment of grades. +''' + +import copy +import numpy as np +from math import ceil +from tabulate import tabulate +from pref_voting.rankings import Ranking +from pref_voting.mappings import Grade, _Mapping +from pref_voting.profiles import Profile +from pref_voting.profiles_with_ties import ProfileWithTies +from pref_voting.grade_profiles import GradeProfile +from pref_voting.scoring_methods import symmetric_borda_scores +from pref_voting.weighted_majority_graphs import ( + MajorityGraph, + MarginGraph, + SupportGraph, +) +import pandas as pd +import matplotlib.pyplot as plt + + +class PrefGradeProfile(object): + """An anonymous profile in which each voter submits both a (truncated) strict weak order + and an assignment of grades. + + :param rankings: List of rankings in the profile, where a ranking is either a :class:`Ranking` object or a dictionary. + :type rankings: list[dict[int or str: int]] or list[Ranking] + :param grade_maps: List of grades in the profile, where a grade is either a :class:`Grade` object or a dictionary. + :type grade_maps: list[dict[int or str: int or str]] or list[Grade] + :param grades: List of grades. + :type grades: list[int or str] + :param rcounts: List of the number of voters associated with each ranking/grade pair. Should be the same length as rankings and grade_maps. If not provided, it is assumed that 1 voter submitted each element. + :type rcounts: list[int], optional + :param candidates: List of candidates in the profile. If not provided, this is the union of candidates appearing in the rankings and grade maps. + :type candidates: list[int] or list[str], optional + :param cmap: Dictionary mapping candidates to candidate names (strings). If not provided, each candidate name is mapped to itself. + :type cmap: dict[int or str: str], optional + :param gmap: Dictionary mapping grades to grade names (strings). If not provided, each grade is mapped to itself. + :type gmap: dict[int or str: str], optional + :param grade_order: A list of the grades representing the order of the grades. It is assumed the grades are listed from largest to smallest. If not provided, the grades are assumed to be numbers and compared using the greater-than relation. + :type grade_order: list[int or str], optional + + :Example: + + The following code creates a profile in which 2 voters submit the ranking + 0 first, 1 second, 2 third along with grades {0: 5, 1: 3, 2: 1}; + and 3 voters submit the ranking with 1 and 2 tied for first and 0 second + along with grades {0: 2, 1: 4, 2: 4}: + + .. code-block:: python + + pgprof = PrefGradeProfile( + [{0: 1, 1: 2, 2: 3}, {1: 1, 2: 1, 0: 2}], + [{0: 5, 1: 3, 2: 1}, {0: 2, 1: 4, 2: 4}], + [1, 2, 3, 4, 5], + rcounts=[2, 3], + ) + + pgprof.display() + + """ + + def __init__( + self, + rankings, + grade_maps, + grades, + rcounts=None, + candidates=None, + cmap=None, + gmap=None, + grade_order=None, + ): + """Constructor method""" + + assert len(rankings) == len( + grade_maps + ), "The number of rankings must be the same as the number of grade maps" + + assert rcounts is None or len(rankings) == len( + rcounts + ), "The number of rankings must be the same as the number of rcounts" + + # Determine candidates from both rankings and grade_maps + get_cands_ranking = lambda r: list(r.keys()) if type(r) == dict else r.cands + get_cands_grade = lambda g: list(g.keys()) if type(g) == dict else g.graded_candidates + + if candidates is not None: + self.candidates = sorted(candidates) + else: + ranking_cands = set([c for r in rankings for c in get_cands_ranking(r)]) + grade_cands = set([c for g in grade_maps for c in get_cands_grade(g)]) + self.candidates = sorted(list(ranking_cands | grade_cands)) + """The candidates in the profile. """ + + self.num_cands = len(self.candidates) + """The number of candidates in the profile.""" + + self.cmap = cmap if cmap is not None else {c: str(c) for c in self.candidates} + """The candidate map is a dictionary associating a candidate with the name used when displaying a candidate.""" + + # --- Ranking data (from ProfileWithTies) --- + + self._rankings = [ + Ranking(r, cmap=self.cmap) + if type(r) == dict + else Ranking(r.rmap, cmap=self.cmap) + for r in rankings + ] + """The list of rankings in the profile (each ranking is a :class:`Ranking` object).""" + + self.ranks = list(range(1, self.num_cands + 1)) + """The ranks that are possible in the profile. """ + + self.cindices = list(range(self.num_cands)) + self._cand_to_cindex = {c: i for i, c in enumerate(self.candidates)} + self.cand_to_cindex = lambda c: self._cand_to_cindex[c] + self._cindex_to_cand = {i: c for i, c in enumerate(self.candidates)} + self.cindex_to_cand = lambda i: self._cindex_to_cand[i] + """Maps candidates to their index in the list of candidates and vice versa. """ + + # --- Grade data (from GradeProfile) --- + + self.grades = grades + """The grades in the profile. """ + + self.can_sum_grades = all([isinstance(g, (float, int)) for g in self.grades]) + + self.grade_order = grade_order if grade_order is not None else sorted(self.grades, reverse=True) + """The order of the grades. If None, then order from largest to smallest""" + + self.use_grade_order = grade_order is not None + + self.compare_function = ( + (lambda v1, v2: (v1 > v2) - (v2 > v1)) + if grade_order is None + else (lambda v1, v2: (grade_order.index(v1) < grade_order.index(v2)) - (grade_order.index(v2) < grade_order.index(v1))) + ) + + self.gmap = gmap if gmap is not None else {g: str(g) for g in self.grades} + """The grade map is a dictionary associating a grade with the name used when displaying a grade.""" + + self._grades = [ + Grade( + g_map, + self.grades, + candidates=self.candidates, + cmap=self.cmap, + gmap=self.gmap, + compare_function=self.compare_function, + ) + if type(g_map) == dict + else Grade( + g_map.as_dict(), + self.grades, + candidates=self.candidates, + cmap=self.cmap, + gmap=self.gmap, + compare_function=self.compare_function, + ) + for g_map in grade_maps + ] + """The list of grades in the profile (each grade is a :class:`Grade` object).""" + + # --- Shared data --- + + self.rcounts = [1] * len(rankings) if rcounts is None else list(rcounts) + + self.num_voters = np.sum(self.rcounts) + """The number of voters in the profile. """ + + self.using_extended_strict_preference = False + """A flag indicating whether the profile is using extended strict preferences when calculating supports, margins, etc.""" + + # memoize the supports (based on rankings) + self._supports = { + c1: { + c2: sum( + n + for r, n in zip(self._rankings, self.rcounts) + if r.strict_pref(c1, c2) + ) + for c2 in self.candidates + } + for c1 in self.candidates + } + + # ========================================================================= + # Ranking-related methods (from ProfileWithTies) + # ========================================================================= + + def use_extended_strict_preference(self): + """ + Redefine the supports so that *extended strict preferences* are used. Using extended strict preference may change the margins between candidates. + """ + + self.using_extended_strict_preference = True + self._supports = { + c1: { + c2: sum( + n + for r, n in zip(self._rankings, self.rcounts) + if r.extended_strict_pref(c1, c2) + ) + for c2 in self.candidates + } + for c1 in self.candidates + } + + def use_strict_preference(self): + """ + Redefine the supports so that strict preferences are used. Using strict preference may change the margins between candidates. + """ + + self.using_extended_strict_preference = False + self._supports = { + c1: { + c2: sum( + n + for r, n in zip(self._rankings, self.rcounts) + if r.strict_pref(c1, c2) + ) + for c2 in self.candidates + } + for c1 in self.candidates + } + + @property + def rankings(self): + """ + Return a list of all individual rankings in the profile. + """ + + return [ + r + for ridx, r in enumerate(self._rankings) + for _ in range(self.rcounts[ridx]) + ] + + @property + def rankings_as_indifference_list(self): + """ + Return a list of all individual rankings as indifference lists in the profile. + """ + + return [ + r.to_indiff_list() + for ridx, r in enumerate(self._rankings) + for _ in range(self.rcounts[ridx]) + ] + + @property + def ranking_types(self): + """ + Return a list of the types of rankings in the profile. + """ + + unique_rankings = [] + for r in self._rankings: + if r not in unique_rankings: + unique_rankings.append(r) + return unique_rankings + + @property + def rankings_counts(self): + """ + Returns the rankings and the counts of each ranking. + """ + + return self._rankings, self.rcounts + + @property + def rankings_as_dicts_counts(self): + """ + Returns the rankings represented as dictionaries and the counts of each ranking. + """ + + return [r.rmap for r in self._rankings], self.rcounts + + def support(self, c1, c2): + """ + Returns the support of candidate ``c1`` over candidate ``c2``, where the support is the number of voters that rank ``c1`` strictly above ``c2``. + """ + + return self._supports[c1][c2] + + def margin(self, c1, c2): + """ + Returns the margin of candidate ``c1`` over candidate ``c2``, where the margin is the number of voters that rank ``c1`` strictly above ``c2`` minus the number of voters that rank ``c2`` strictly above ``c1``. + """ + + return self._supports[c1][c2] - self._supports[c2][c1] + + @property + def margin_matrix(self): + """Returns the margin matrix of the profile, where the entry at row ``i`` and column ``j`` is the margin of candidate ``i`` over candidate ``j``.""" + + return np.array( + [ + [ + self.margin( + self.cindex_to_cand(c1_idx), self.cindex_to_cand(c2_idx) + ) + for c2_idx in self.cindices + ] + for c1_idx in self.cindices + ] + ) + + def is_tied(self, c1, c2): + """Returns True if ``c1`` and ``c2`` are tied (i.e., the margin of ``c1`` over ``c2`` is 0).""" + + return self.margin(c1, c2) == 0 + + def dominators(self, cand, curr_cands=None): + """ + Returns the list of candidates that are majority preferred to ``cand`` in the profile restricted to the candidates in ``curr_cands``. + """ + candidates = self.candidates if curr_cands is None else curr_cands + + return [c for c in candidates if self.majority_prefers(c, cand)] + + def dominates(self, cand, curr_cands=None): + """ + Returns the list of candidates that ``cand`` is majority preferred to in the majority graph restricted to ``curr_cands``. + """ + candidates = self.candidates if curr_cands is None else curr_cands + + return [c for c in candidates if self.majority_prefers(cand, c)] + + def ratio(self, c1, c2): + """ + Returns the ratio of the support of ``c1`` over ``c2`` to the support ``c2`` over ``c1``. + """ + + if self.support(c1, c2) > 0 and self.support(c2, c1) > 0: + return self.support(c1, c2) / self.support(c2, c1) + elif self.support(c1, c2) > 0 and self.support(c2, c1) == 0: + return float(self.num_voters + self.support(c1, c2)) + elif self.support(c1, c2) == 0 and self.support(c2, c1) > 0: + return 1 / (self.num_voters + self.support(c2, c1)) + elif self.support(c1, c2) == 0 and self.support(c2, c1) == 0: + return 1 + + def majority_prefers(self, c1, c2): + """Returns True if ``c1`` is majority preferred to ``c2``.""" + + return self.margin(c1, c2) > 0 + + def strength_matrix(self, curr_cands=None, strength_function=None): + """ + Return the strength matrix of the profile. The strength matrix is a matrix where the entry in row :math:`i` and column :math:`j` is the number of voters that rank the candidate with index :math:`i` over the candidate with index :math:`j`. If ``curr_cands`` is provided, then the strength matrix is restricted to the candidates in ``curr_cands``. If ``strength_function`` is provided, then the strength matrix is computed using the strength function. + """ + + if curr_cands is not None: + cindices = [cidx for cidx, _ in enumerate(curr_cands)] + cindex_to_cand = lambda cidx: curr_cands[cidx] + cand_to_cindex = lambda c: cindices[curr_cands.index(c)] + strength_function = ( + self.margin if strength_function is None else strength_function + ) + strength_matrix = np.array( + [ + [ + strength_function( + cindex_to_cand(a_idx), cindex_to_cand(b_idx) + ) + for b_idx in cindices + ] + for a_idx in cindices + ] + ) + else: + cindices = self.cindices + cindex_to_cand = self.cindex_to_cand + cand_to_cindex = self.cand_to_cindex + strength_matrix = ( + np.array(self.margin_matrix) + if strength_function is None + else np.array( + [ + [ + strength_function( + cindex_to_cand(a_idx), cindex_to_cand(b_idx) + ) + for b_idx in cindices + ] + for a_idx in cindices + ] + ) + ) + + return strength_matrix, cand_to_cindex + + def condorcet_winner(self, curr_cands=None): + """Returns the Condorcet winner in the profile restricted to ``curr_cands`` if one exists, otherwise return None. + + The **Condorcet winner** is the candidate that is majority preferred to every other candidate. + """ + curr_cands = curr_cands if curr_cands is not None else self.candidates + + cw = None + for c in curr_cands: + if all( + [self.majority_prefers(c, c1) for c1 in curr_cands if c1 != c] + ): + cw = c + break + return cw + + def condorcet_loser(self, curr_cands=None): + """Returns the Condorcet loser in the profile restricted to ``curr_cands`` if one exists, otherwise return None. + + A candidate :math:`c` is a **Condorcet loser** if every other candidate is majority preferred to :math:`c`. + """ + + curr_cands = curr_cands if curr_cands is not None else self.candidates + + cl = None + for c1 in curr_cands: + if all( + [self.majority_prefers(c2, c1) for c2 in curr_cands if c1 != c2] + ): + cl = c1 + break # if a Condorcet loser exists, then it is unique + return cl + + def weak_condorcet_winner(self, curr_cands=None): + """Returns a list of the weak Condorcet winners in the profile restricted to ``curr_cands`` (which may be empty). + + A candidate :math:`c` is a **weak Condorcet winner** if there is no other candidate that is majority preferred to :math:`c`. + + .. note:: While the Condorcet winner is unique if it exists, there may be multiple weak Condorcet winners. + """ + + curr_cands = curr_cands if curr_cands is not None else self.candidates + + weak_cw = list() + for c1 in curr_cands: + if not any( + [ + self.majority_prefers(c2, c1) + for c2 in curr_cands + if c1 != c2 + ] + ): + weak_cw.append(c1) + return sorted(weak_cw) if len(weak_cw) > 0 else None + + def copeland_scores(self, curr_cands=None, scores=(1, 0, -1)): + """The Copeland scores in the profile restricted to the candidates in ``curr_cands``. + + The **Copeland score** for candidate :math:`c` is calculated as follows: :math:`c` receives ``scores[0]`` points for every candidate that :math:`c` is majority preferred to, ``scores[1]`` points for every candidate that is tied with :math:`c`, and ``scores[2]`` points for every candidate that is majority preferred to :math:`c`. The default ``scores`` is ``(1, 0, -1)``. + + :param curr_cands: restrict attention to candidates in this list. Defaults to all candidates in the profile if not provided. + :type curr_cands: list[int], optional + :param scores: the scores used to calculate the Copeland score of a candidate :math:`c`: ``scores[0]`` is for the candidates that :math:`c` is majority preferred to; ``scores[1]`` is the number of candidates tied with :math:`c`; and ``scores[2]`` is the number of candidate majority preferred to :math:`c`. The default value is ``scores = (1, 0, -1)`` + :type scores: tuple[int], optional + :returns: a dictionary associating each candidate in ``curr_cands`` with its Copeland score. + + """ + + wscore, tscore, lscore = scores + candidates = self.candidates if curr_cands is None else curr_cands + c_scores = {c: 0.0 for c in candidates} + for c1 in candidates: + for c2 in candidates: + if self.majority_prefers(c1, c2): + c_scores[c1] += wscore + elif self.majority_prefers(c2, c1): + c_scores[c1] += lscore + elif c1 != c2: + c_scores[c1] += tscore + return c_scores + + def strict_maj_size(self): + """Returns the strict majority of the number of voters.""" + + return int( + self.num_voters / 2 + 1 + if self.num_voters % 2 == 0 + else int(ceil(float(self.num_voters) / 2)) + ) + + def plurality_scores(self, curr_cands=None): + """ + Return the Plurality Scores of the candidates, assuming that each voter ranks a single candidate in first place. + """ + + if curr_cands is None: + curr_cands = self.candidates + + if any(len(r.first(cs=curr_cands)) > 1 for r in self._rankings): + raise ValueError( + "Cannot find the plurality scores unless all voters rank a unique candidate in first place." + ) + + rankings, rcounts = self.rankings_counts + + plurality_scores = {cand: 0 for cand in curr_cands} + + for ranking, count in zip(rankings, rcounts): + first_place_candidates = ranking.first(cs=curr_cands) + if len(first_place_candidates) == 1: + cand = first_place_candidates[0] + plurality_scores[cand] += count + + return plurality_scores + + def plurality_scores_ignoring_overvotes(self, curr_cands=None): + """ + Return the Plurality scores ignoring empty rankings and overvotes. + """ + + curr_cands = curr_cands if curr_cands is not None else self.candidates + + rankings, rcounts = self.rankings_counts + + return { + cand: sum( + [ + c + for r, c in zip(rankings, rcounts) + if len(r.cands) > 0 and [cand] == r.first(cs=curr_cands) + ] + ) + for cand in curr_cands + } + + def borda_scores(self, curr_cands=None, borda_score_fnc=symmetric_borda_scores): + + curr_cands = self.candidates if curr_cands is None else curr_cands + restricted_prof = self.to_ranking_profile().remove_candidates( + [c for c in self.candidates if c not in curr_cands] + ) + return borda_score_fnc(restricted_prof) + + def tops_scores(self, curr_cands=None, score_type="approval"): + """ + Return the tops scores of the candidates. + """ + + if curr_cands is None: + curr_cands = self.candidates + + rankings, rcounts = self.rankings_counts + + if score_type not in {"approval", "split"}: + raise ValueError( + "Invalid score_type specified. Use 'approval' or 'split'." + ) + + tops_scores = {cand: 0 for cand in curr_cands} + + if score_type == "approval": + for ranking, count in zip(rankings, rcounts): + for cand in curr_cands: + if cand in ranking.first(cs=curr_cands): + tops_scores[cand] += count + + elif score_type == "split": + for ranking, count in zip(rankings, rcounts): + for cand in curr_cands: + if cand in ranking.first(cs=curr_cands): + tops_scores[cand] += ( + count * 1 / len(ranking.first(cs=curr_cands)) + ) + + return tops_scores + + def remove_empty_rankings(self): + """ + Remove the empty rankings from the profile. + """ + new_rankings = list() + new_grades = list() + new_rcounts = list() + + for r, g, c in zip(self._rankings, self._grades, self.rcounts): + + if len(r.cands) != 0: + new_rankings.append(r) + new_grades.append(g) + new_rcounts.append(c) + + self._rankings = new_rankings + self._grades = new_grades + self.rcounts = new_rcounts + + # update the number of voters + self.num_voters = np.sum(self.rcounts) + + if self.using_extended_strict_preference: + self.use_extended_strict_preference() + else: + self.use_strict_preference() + + @property + def is_truncated_linear(self): + """ + Return True if the profile only contains (truncated) linear orders. + """ + return all( + [ + r.is_truncated_linear(len(self.candidates)) + or r.is_linear(len(self.candidates)) + for r in self._rankings + ] + ) + + def num_bullet_votes(self): + """ + Return the number of bullet votes in the profile. + """ + + return sum( + [c for r, c in zip(*self.rankings_counts) if r.is_bullet_vote()] + ) + + def num_empty_rankings(self): + """ + Return the number of empty rankings in the profile. + """ + + return sum( + [c for r, c in zip(*self.rankings_counts) if r.is_empty()] + ) + + def num_linear_orders(self): + """ + Return the number of linear orders in the profile. + """ + + return sum( + [ + c + for r, c in zip(*self.rankings_counts) + if r.is_linear(len(self.candidates)) + ] + ) + + def num_truncated_linear_orders(self): + """ + Return the number of truncated linear orders in the profile. + """ + + return sum( + [ + c + for r, c in zip(*self.rankings_counts) + if r.is_truncated_linear(len(self.candidates)) + ] + ) + + def num_rankings_with_ties(self): + """ + Return the number of rankings with ties in the profile. + """ + + return sum( + [c for r, c in zip(*self.rankings_counts) if r.has_tie()] + ) + + def num_ranked_all_candidates(self): + """ + Return the number of rankings that rank all candidates in the profile. + """ + + return sum( + [ + c + for r, c in zip(*self.rankings_counts) + if all([r.is_ranked(cand) for cand in self.candidates]) + ] + ) + + def num_ranking_each_candidate(self): + """Return a dictionary mapping each candidate to the number of voters that rank the candidate.""" + + return { + cand: sum( + [ + c + for r, c in zip(*self.rankings_counts) + if r.is_ranked(cand) + ] + ) + for cand in self.candidates + } + + def margin_graph(self): + """Returns the margin graph of the profile. See :class:`.MarginGraph`.""" + + return MarginGraph.from_profile(self) + + def support_graph(self): + """Returns the support graph of the profile. See :class:`.SupportGraph`.""" + + return SupportGraph.from_profile(self) + + def majority_graph(self): + """Returns the majority graph of the profile. See :class:`.MajorityGraph`.""" + + return MajorityGraph.from_profile(self) + + def cycles(self): + """Return a list of the cycles in the profile.""" + + return self.margin_graph().cycles() + + def is_uniquely_weighted(self): + """Returns True if the profile is uniquely weighted. + + A profile is **uniquely weighted** when there are no 0 margins and all the margins between any two candidates are unique. + """ + + return MarginGraph.from_profile(self).is_uniquely_weighted() + + def remove_candidates(self, cands_to_ignore): + """Remove all candidates from ``cands_to_ignore`` from the profile. + + :param cands_to_ignore: list of candidates to remove from the profile + :type cands_to_ignore: list[int] + :returns: a profile with candidates from ``cands_to_ignore`` removed. + """ + + updated_rankings = [ + {c: r for c, r in rank.rmap.items() if c not in cands_to_ignore} + for rank in self._rankings + ] + updated_grade_maps = [ + {c: g.val(c) for c in g.graded_candidates if c not in cands_to_ignore} + for g in self._grades + ] + new_candidates = [ + c for c in self.candidates if c not in cands_to_ignore + ] + + restricted_prof = PrefGradeProfile( + updated_rankings, + updated_grade_maps, + self.grades, + rcounts=self.rcounts, + candidates=new_candidates, + cmap=self.cmap, + gmap=self.gmap, + grade_order=self.grade_order if self.use_grade_order else None, + ) + + if self.using_extended_strict_preference: + restricted_prof.use_extended_strict_preference() + + return restricted_prof + + # ========================================================================= + # Grade-related methods (from GradeProfile) + # ========================================================================= + + @property + def grades_counts(self): + """Returns the grades and the counts of each grade.""" + + return self._grades, self.rcounts + + @property + def grade_functions(self): + """Return all of the grade functions in the profile.""" + + gs = list() + for g, c in zip(self._grades, self.rcounts): + gs += [g] * c + return gs + + def has_grade(self, c): + """Return True if ``c`` is assigned a grade by at least one voter.""" + + return any([g.has_grade(c) for g in self._grades]) + + def grade_margin(self, c1, c2, use_extended=False): + """ + Return the grade-based margin of ``c1`` over ``c2``. + """ + if use_extended: + return np.sum( + [ + num + for g, num in zip(*self.grades_counts) + if g.extended_strict_pref(c1, c2) + ] + ) - np.sum( + [ + num + for g, num in zip(*self.grades_counts) + if g.extended_strict_pref(c2, c1) + ] + ) + else: + return np.sum( + [ + num + for g, num in zip(*self.grades_counts) + if g.strict_pref(c1, c2) + ] + ) - np.sum( + [ + num + for g, num in zip(*self.grades_counts) + if g.strict_pref(c2, c1) + ] + ) + + def proportion(self, cand, grade): + """ + Return the proportion of voters that assign ``cand`` the grade ``grade``. + + Note that ``grade`` could be None, in which case the proportion of voters that do not assign ``cand`` a grade is returned. + """ + return ( + np.sum( + [ + num + for g, num in zip(*self.grades_counts) + if g(cand) == grade + ] + ) + / self.num_voters + ) + + def sum(self, c): + """Return the sum of the grades of ``c``. If ``c`` is not assigned a grade by any voter, return None.""" + + assert self.can_sum_grades, "The grades in the profile cannot be summed." + + return ( + np.sum( + [ + g(c) * num + for g, num in zip(*self.grades_counts) + if g.has_grade(c) + ] + ) + if self.has_grade(c) + else None + ) + + def avg(self, c): + """Return the average of the grades of ``c``. If ``c`` is not assigned a grade by any voter, return None.""" + + assert self.can_sum_grades, "The grades in the profile cannot be summed." + + return ( + np.mean([g(c) for g in self.grade_functions if g.has_grade(c)]) + if self.has_grade(c) + else None + ) + + def max(self, c): + """Return the maximum of the grade of ``c``. If ``c`` is not assigned a grade by any voter, return None.""" + + grades_for_c = ( + [-1 * self.grade_order.index(g(c)) for g in self._grades if g.has_grade(c)] + if self.use_grade_order + else [g(c) for g in self._grades if g.has_grade(c)] + ) + + return ( + ( + self.grade_order[-1 * max(grades_for_c)] + if self.use_grade_order + else max(grades_for_c) + ) + if self.has_grade(c) + else None + ) + + def min(self, c): + """Return the minimum of the grades of ``c``. If ``c`` is not assigned a grade by any voter, return None.""" + + grades_for_c = ( + [-1 * self.grade_order.index(g(c)) for g in self._grades if g.has_grade(c)] + if self.use_grade_order + else [g(c) for g in self._grades if g.has_grade(c)] + ) + + return ( + ( + self.grade_order[-1 * min(grades_for_c)] + if self.use_grade_order + else min(grades_for_c) + ) + if self.has_grade(c) + else None + ) + + def median(self, c, use_lower=True, use_average=False): + """Return the median of the grades of ``c``. If ``c`` is not assigned a grade by any voter, return None.""" + + grades_for_c = ( + [ + -1 * self.grade_order.index(g(c)) + for g in self.grade_functions + if g.has_grade(c) + ] + if self.use_grade_order + else [g(c) for g in self.grade_functions if g.has_grade(c)] + ) + + sorted_grades_for_c = sorted(grades_for_c) + num_grades = len(sorted_grades_for_c) + median_idx = num_grades // 2 + if num_grades % 2 == 0: + median_grades = sorted_grades_for_c[median_idx - 1 : median_idx + 1] + else: + median_grades = [sorted_grades_for_c[median_idx]] + + if use_lower: + return ( + ( + self.grade_order[-1 * median_grades[0]] + if self.use_grade_order + else median_grades[0] + ) + if self.has_grade(c) + else None + ) + elif use_average: + return ( + ( + np.average( + [self.grade_order[-1 * m] for m in median_grades] + ) + if self.use_grade_order + else np.average(median_grades) + ) + if self.has_grade(c) + else None + ) + else: + return ( + ( + [self.grade_order[-1 * m] for m in median_grades] + if self.use_grade_order + else median_grades + ) + if self.has_grade(c) + else None + ) + + def sum_grade_function(self): + """Return the sum grade function of the profile.""" + + assert self.can_sum_grades, "The grades in the profile cannot be summed." + + return _Mapping( + {c: self.sum(c) for c in self.candidates if self.has_grade(c)}, + domain=self.candidates, + item_map=self.cmap, + compare_function=self.compare_function, + ) + + def avg_grade_function(self): + """Return the average grade function of the profile.""" + + assert self.can_sum_grades, "The grades in the profile cannot be summed." + + return _Mapping( + {c: self.avg(c) for c in self.candidates if self.has_grade(c)}, + domain=self.candidates, + item_map=self.cmap, + compare_function=self.compare_function, + ) + + def proportion_with_grade(self, cand, grade): + """ + Return the proportion of voters that assign a ``grade`` to ``cand``. + """ + + assert ( + cand in self.candidates + ), f"{cand} is not a candidate in the profile." + assert grade in self.grades, f"{grade} is not a grade in the profile." + + num_with_higher_grade = 0 + for g, num in zip(*self.grades_counts): + if self.compare_function(g(cand), grade) == 0: + num_with_higher_grade += num + return num_with_higher_grade / self.num_voters + + def proportion_with_higher_grade(self, cand, grade): + """ + Return the proportion of voters that assign a strictly higher grade to ``cand`` than ``grade``. + """ + + assert ( + cand in self.candidates + ), f"{cand} is not a candidate in the profile." + assert grade in self.grades, f"{grade} is not a grade in the profile." + + num_with_higher_grade = 0 + for g, num in zip(*self.grades_counts): + if self.compare_function(g(cand), grade) == 1: + num_with_higher_grade += num + return num_with_higher_grade / self.num_voters + + def proportion_with_lower_grade(self, cand, grade): + """ + Return the proportion of voters that assign a strictly lower grade to ``cand`` than ``grade``. + """ + + assert ( + cand in self.candidates + ), f"{cand} is not a candidate in the profile." + assert grade in self.grades, f"{grade} is not a grade in the profile." + + num_with_lower_grade = 0 + for g, num in zip(*self.grades_counts): + if self.compare_function(g(cand), grade) == -1: + num_with_lower_grade += num + return num_with_lower_grade / self.num_voters + + def approval_scores(self): + """ + Return a dictionary representing the approval scores of the candidates in the profile. + """ + + assert ( + self.can_sum_grades + ), "The grades in the profile cannot be summed." + assert sorted(self.grades) == [ + 0, + 1, + ], "The grades in the profile must be 0 and 1." + + return {c: self.sum(c) for c in self.candidates} + + # ========================================================================= + # Conversion methods + # ========================================================================= + + def to_ranking_profile(self): + """Return a :class:`ProfileWithTies` corresponding to the ranking data in this profile.""" + + return ProfileWithTies( + self._rankings, + rcounts=self.rcounts, + candidates=self.candidates, + cmap=self.cmap, + ) + + def to_grade_profile(self): + """Return a :class:`GradeProfile` corresponding to the grade data in this profile.""" + + return GradeProfile( + [g.as_dict() for g in self._grades], + self.grades, + gcounts=self.rcounts, + candidates=self.candidates, + cmap=self.cmap, + gmap=self.gmap, + grade_order=self.grade_order if self.use_grade_order else None, + ) + + # ========================================================================= + # Display and report methods + # ========================================================================= + + def report(self): + """ + Display a report of the types of rankings in the profile. + """ + num_ties = 0 + num_empty_rankings = 0 + num_with_skipped_ranks = 0 + num_trucated_linear_orders = 0 + num_linear_orders = 0 + + rankings, rcounts = self.rankings_counts + + for r, c in zip(rankings, rcounts): + + if r.has_tie(): + num_ties += c + if r.is_empty(): + num_empty_rankings += c + elif r.is_linear(len(self.candidates)): + num_linear_orders += c + elif r.is_truncated_linear(len(self.candidates)): + num_trucated_linear_orders += c + + if r.has_skipped_rank(): + num_with_skipped_ranks += c + print( + f"""There are {len(self.candidates)} candidates and {str(sum(rcounts))} {'ranking: ' if sum(rcounts) == 1 else 'rankings: '} + The number of empty rankings: {num_empty_rankings} + The number of rankings with ties: {num_ties} + The number of linear orders: {num_linear_orders} + The number of truncated linear orders: {num_trucated_linear_orders} + +The number of rankings with skipped ranks: {num_with_skipped_ranks} + + """ + ) + + def display_rankings(self): + """ + Display a list of the rankings in the profile. + """ + rankings, rcounts = self.rankings_counts + + rs = dict() + for r, c in zip(rankings, rcounts): + if str(r) in rs.keys(): + rs[str(r)] += c + else: + rs[str(r)] = c + + for r, c in rs.items(): + print(f"{r}: {c}") + + def display( + self, + cmap=None, + style="pretty", + curr_cands=None, + show_grades=True, + show_totals=False, + ): + """Display the profile as an ascii table (using tabulate). + + The rankings are displayed as in :class:`ProfileWithTies`. If ``show_grades`` + is True, the grade assignment for each voter type is also displayed below + the ranking table. + + :param cmap: the candidate map (overrides the cmap associated with this profile) + :type cmap: dict[int,str], optional + :param style: the table style for tabulate (default ``"pretty"``) + :type style: str, optional + :param curr_cands: list of candidates to display + :type curr_cands: list[int], optional + :param show_grades: whether to also display the grade assignments (default True) + :type show_grades: bool, optional + :param show_totals: whether to display grade totals (sum, median) when showing grades (default False) + :type show_totals: bool, optional + :rtype: None + """ + + _rankings = copy.deepcopy(self._rankings) + _rankings = [r.normalize_ranks() or r for r in _rankings] + curr_cands = curr_cands if curr_cands is not None else self.candidates + cmap = cmap if cmap is not None else self.cmap + + # Display rankings table + existing_ranks = ( + list( + range( + min(min(r.ranks) for r in _rankings), + max(max(r.ranks) for r in _rankings) + 1, + ) + ) + if len(_rankings) > 0 + else [] + ) + print("Rankings:") + print( + tabulate( + [ + [ + " ".join( + [ + str(cmap[c]) + for c in r.cands_at_rank(rank) + if c in curr_cands + ] + ) + for r in _rankings + ] + for rank in existing_ranks + ], + self.rcounts, + tablefmt=style, + ) + ) + + # Display grades table + if show_grades: + print("\nGrades:") + if show_totals: + sum_grade_fnc = self.sum_grade_function() + headers = [""] + self.rcounts + ["Sum", "Median"] + tbl = [ + [cmap[c]] + + [ + self.gmap[g(c)] if g.has_grade(c) else "" + for g in self._grades + ] + + [sum_grade_fnc(c), self.median(c)] + for c in curr_cands + ] + else: + headers = [""] + self.rcounts + tbl = [ + [cmap[c]] + + [ + self.gmap[g(c)] if g.has_grade(c) else "" + for g in self._grades + ] + for c in curr_cands + ] + print(tabulate(tbl, headers=headers)) + + def display_margin_graph(self, cmap=None, curr_cands=None): + """ + Display the margin graph of the profile (restricted to ``curr_cands``) using the ``cmap``. See :class:`.MarginGraph`. + """ + + cmap = cmap if cmap is not None else self.cmap + MarginGraph.from_profile(self, cmap=cmap).display( + curr_cands=curr_cands + ) + + def display_support_graph(self, cmap=None, curr_cands=None): + """ + Display the support graph of the profile (restricted to ``curr_cands``) using the ``cmap``. See :class:`.SupportGraph`. + """ + + cmap = cmap if cmap is not None else self.cmap + SupportGraph.from_profile(self, cmap=cmap).display( + curr_cands=curr_cands + ) + + def visualize_grades(self): + """Visualize the grade assignments as a stacked bar plot.""" + data_for_df = {"Candidate": [], "Grade": [], "Proportion": []} + + for c in self.candidates: + for g in [None] + self.grades: + data_for_df["Candidate"].append(self.cmap[c]) + data_for_df["Grade"].append( + self.gmap[g] if g is not None else "None" + ) + data_for_df["Proportion"].append(self.proportion(c, g)) + df = pd.DataFrame(data_for_df) + + df_pivot = df.pivot( + index="Candidate", columns="Grade", values="Proportion" + ) + + ax = df_pivot.plot(kind="barh", stacked=True, figsize=(10, 6), rot=0) + ax.set_ylabel("Candidate") + ax.set_xlabel("Proportion") + for spine in ["top", "right"]: + ax.spines[spine].set_visible(False) + + ax.legend( + loc="upper center", + bbox_to_anchor=(0.5, 1.1), + ncol=len(self.grades) + 1, + title="Grades", + ) + + plt.show() + + # ========================================================================= + # Description / code-generation methods + # ========================================================================= + + def description(self): + """ + Return the Python code needed to create the profile. + """ + return ( + f"PrefGradeProfile(" + f"{[r.rmap for r in self._rankings]}, " + f"{[g.as_dict() for g in self._grades]}, " + f"{self.grades}, " + f"rcounts={[int(c) for c in self.rcounts]}, " + f"cmap={self.cmap})" + ) + + # ========================================================================= + # Anonymize + # ========================================================================= + + def anonymize(self): + """ + Return a profile which is the anonymized version of this profile. + """ + + anon_rankings = list() + anon_grades = list() + rcounts = list() + for r, g in zip(self.rankings, [gf for gf in self.grade_functions]): + found_it = False + for _ridx, (_r, _g) in enumerate( + zip(anon_rankings, anon_grades) + ): + if r == _r and g.as_dict() == _g.as_dict(): + rcounts[_ridx] += 1 + found_it = True + break + if not found_it: + anon_rankings.append(r) + anon_grades.append(g) + rcounts.append(1) + + prof = PrefGradeProfile( + anon_rankings, + [g.as_dict() for g in anon_grades], + self.grades, + rcounts=rcounts, + cmap=self.cmap, + gmap=self.gmap, + grade_order=self.grade_order if self.use_grade_order else None, + ) + + if self.using_extended_strict_preference: + prof.use_extended_strict_preference() + + return prof + + # ========================================================================= + # Dunder methods + # ========================================================================= + + def __eq__(self, other_prof): + """ + Returns true if two profiles are equal. Two profiles are equal if they have the same rankings and grade assignments. Note that we ignore the cmaps. + """ + + rankings = self.rankings + grades = self.grade_functions + other_rankings = other_prof.rankings[:] + other_grades = other_prof.grade_functions[:] + for r1, g1 in zip(rankings, grades): + for i, (r2, g2) in enumerate( + zip(other_rankings, other_grades) + ): + if r1 == r2 and g1.as_dict() == g2.as_dict(): + del other_rankings[i] + del other_grades[i] + break + else: + return False + + return not other_rankings + + def __add__(self, other_prof): + """ + Returns the sum of two profiles. The sum of two profiles is the profile that contains all the rankings and grade assignments from the first in addition to all the rankings and grade assignments from the second profile. + + Note: the cmaps of the profiles are ignored. + """ + + return PrefGradeProfile( + self._rankings + other_prof._rankings, + [g.as_dict() for g in self._grades] + + [g.as_dict() for g in other_prof._grades], + self.grades, + rcounts=self.rcounts + other_prof.rcounts, + candidates=sorted( + list(set(self.candidates + other_prof.candidates)) + ), + ) diff --git a/pref_voting/variable_voter_axioms.py b/pref_voting/variable_voter_axioms.py index 12124b0d..0cb5d9e1 100644 --- a/pref_voting/variable_voter_axioms.py +++ b/pref_voting/variable_voter_axioms.py @@ -1,7 +1,9 @@ """ File: variable_voter_axioms.py Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu) + Date: March 16, 2024 + Updated: February 4, 2026 Variable voter axioms """ @@ -13,6 +15,7 @@ from pref_voting.helper import weak_orders from pref_voting.rankings import Ranking from pref_voting.generate_profiles import strict_weak_orders +from functools import partial def divide_electorate(prof): """Given a Profile or ProfileWithTies object, yield all possible ways to divide the electorate into two nonempty electorates.""" @@ -3347,40 +3350,78 @@ def find_all_participation_violations(prof, vm, verbose = False, violation_type find_all_violations = find_all_participation_violations, ) -def has_single_voter_resolvability_violation(prof, vm, verbose=False): +def has_single_voter_resolvability_violation(prof, vm, verbose=False, allow_truncation=False, allow_ties=False): """ Single-Voter Resolvability requires that for any profile with multiple winners, each of the tied winners can be made the unique winner by adding a ballot (cf. Weak Single-Voter Resolvability, which only requires that at least one of the tied winners can be made the unique winner by adding a ballot). - If prof is a Profile, returns True if there are multiple vm winners in prof and for one such winner A, there is no linear ballot that can be added to prof to make A the unique winner. + If prof is a Profile and allow_ties is False and allow_truncation is False, returns True if there are multiple vm winners in prof and for one such winner A, there is no linear ballot that can be added to prof to make A the unique winner. + + If prof is a Profile and allow_ties is True, returns True if there are multiple vm winners in prof and for one such winner A, there is no Ranking (allowing ties) that can be added to prof to make A the unique winner. Note: allow_ties takes precedence over allow_truncation. - If prof is a ProfileWithTies, returns True if there are multiple vm winners in prof and for one such winner A, there is no Ranking (allowing ties) that can be added to prof to make A the unique winner. + If prof is a Profile and allow_truncation is True (and allow_ties is False), returns True if there are multiple vm winners in prof and for one such winner A, there is no truncated linear ballot that can be added to prof to make A the unique winner. + + If prof is a ProfileWithTies, returns True if there are multiple vm winners in prof and for one such winner A, there is no Ranking (allowing ties) that can be added to prof to make A the unique winner. Args: prof: a Profile or ProfileWithTies object. vm (VotingMethod): A voting method to test. verbose (bool, default=False): If a violation is found, display the violation. + allow_truncation (bool, default=False): If True and prof is a Profile, allow truncated linear ballots. + allow_ties (bool, default=False): If True and prof is a Profile, allow Rankings with ties. Takes precedence over allow_truncation. Returns: Result of the test (bool): Returns True if there is a violation and False otherwise. """ - winners = vm(prof) - if isinstance(prof,ProfileWithTies): prof.use_extended_strict_preference() + winners = vm(prof) + if len(winners) > 1: for winner in winners: found_voter_to_add = False if isinstance(prof,Profile): - for r in permutations(prof.candidates): - new_prof = Profile(prof.rankings + [r]) - if vm(new_prof) == [winner]: - found_voter_to_add = True - break - + if allow_ties: + # Iterate over all weak orders (rankings possibly with ties) + for _r in weak_orders(prof.candidates): + r = Ranking(_r) + new_prof = ProfileWithTies( + [Ranking({c: rank + 1 for rank, c in enumerate(old_r)}) for old_r in prof.rankings] + [r], + candidates=prof.candidates + ) + new_prof.use_extended_strict_preference() + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + elif allow_truncation: + # Iterate over all truncated linear orders + for subset_size in range(1, len(prof.candidates) + 1): + if found_voter_to_add: + break + for subset in combinations(prof.candidates, subset_size): + if found_voter_to_add: + break + for r in permutations(subset): + rmap = {c: rank + 1 for rank, c in enumerate(r)} + ranking = Ranking(rmap) + new_prof = ProfileWithTies( + [Ranking({c: rank + 1 for rank, c in enumerate(old_r)}) for old_r in prof.rankings] + [ranking], + candidates=prof.candidates + ) + new_prof.use_extended_strict_preference() + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + else: + for r in permutations(prof.candidates): + new_prof = Profile(prof.rankings + [r]) + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + if isinstance(prof,ProfileWithTies): for _r in weak_orders(prof.candidates): r = Ranking(_r) @@ -3389,13 +3430,18 @@ def has_single_voter_resolvability_violation(prof, vm, verbose=False): if vm(new_prof) == [winner]: found_voter_to_add = True break - + if not found_voter_to_add: if verbose: prof = prof.anonymize() if isinstance(prof,Profile): - print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a linear ballot.") + if allow_ties: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a Ranking (possibly with ties).") + elif allow_truncation: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a (possibly truncated) linear ballot.") + else: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a linear ballot.") if isinstance(prof,ProfileWithTies): print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a Ranking.") print("") @@ -3408,14 +3454,18 @@ def has_single_voter_resolvability_violation(prof, vm, verbose=False): print("") return True - + return False - + return False -def find_all_single_voter_resolvability_violations(prof, vm, verbose=False): +def find_all_single_voter_resolvability_violations(prof, vm, verbose=False, allow_truncation=False, allow_ties=False): """ - If prof is a Profile, returns a list of candidates who win in prof but who cannot be made the unique winner by adding a linear ballot. + If prof is a Profile and allow_ties is False and allow_truncation is False, returns a list of candidates who win in prof but who cannot be made the unique winner by adding a linear ballot. + + If prof is a Profile and allow_ties is True, returns a list of candidates who win in prof but who cannot be made the unique winner by adding a Ranking (allowing ties). Note: allow_ties takes precedence over allow_truncation. + + If prof is a Profile and allow_truncation is True (and allow_ties is False), returns a list of candidates who win in prof but who cannot be made the unique winner by adding a truncated linear ballot. If prof is a ProfileWithTies, returns a list of candidates who win in prof but who cannot be made the unique winner by adding a Ranking (allowing ties). @@ -3423,16 +3473,18 @@ def find_all_single_voter_resolvability_violations(prof, vm, verbose=False): prof: a Profile or ProfileWithTies object. vm (VotingMethod): A voting method to test. verbose (bool, default=False): If a violation is found, display the violation. + allow_truncation (bool, default=False): If True and prof is a Profile, allow truncated linear ballots. + allow_ties (bool, default=False): If True and prof is a Profile, allow Rankings with ties. Takes precedence over allow_truncation. Returns: A List of candidates who win in the given profile but who cannot be made the unique winner by adding a ballot. """ - winners = vm(prof) - if isinstance(prof,ProfileWithTies): prof.use_extended_strict_preference() + winners = vm(prof) + violations = list() if len(winners) > 1: @@ -3441,12 +3493,44 @@ def find_all_single_voter_resolvability_violations(prof, vm, verbose=False): found_voter_to_add = False if isinstance(prof,Profile): - for r in permutations(prof.candidates): - new_prof = Profile(prof.rankings + [r]) - if vm(new_prof) == [winner]: - found_voter_to_add = True - break - + if allow_ties: + # Iterate over all weak orders (rankings possibly with ties) + for _r in weak_orders(prof.candidates): + r = Ranking(_r) + new_prof = ProfileWithTies( + [Ranking({c: rank + 1 for rank, c in enumerate(old_r)}) for old_r in prof.rankings] + [r], + candidates=prof.candidates + ) + new_prof.use_extended_strict_preference() + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + elif allow_truncation: + # Iterate over all truncated linear orders + for subset_size in range(1, len(prof.candidates) + 1): + if found_voter_to_add: + break + for subset in combinations(prof.candidates, subset_size): + if found_voter_to_add: + break + for r in permutations(subset): + rmap = {c: rank + 1 for rank, c in enumerate(r)} + ranking = Ranking(rmap) + new_prof = ProfileWithTies( + [Ranking({c: rank + 1 for rank, c in enumerate(old_r)}) for old_r in prof.rankings] + [ranking], + candidates=prof.candidates + ) + new_prof.use_extended_strict_preference() + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + else: + for r in permutations(prof.candidates): + new_prof = Profile(prof.rankings + [r]) + if vm(new_prof) == [winner]: + found_voter_to_add = True + break + if isinstance(prof,ProfileWithTies): for _r in weak_orders(prof.candidates): r = Ranking(_r) @@ -3455,13 +3539,18 @@ def find_all_single_voter_resolvability_violations(prof, vm, verbose=False): if vm(new_prof) == [winner]: found_voter_to_add = True break - + if not found_voter_to_add: if verbose: prof = prof.anonymize() if isinstance(prof,Profile): - print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a linear ballot.") + if allow_ties: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a Ranking (possibly with ties).") + elif allow_truncation: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a (possibly truncated) linear ballot.") + else: + print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a linear ballot.") if isinstance(prof,ProfileWithTies): print(f"Violation of Single-Voter Resolvability for {vm.name}: cannot make {winner} the unique winner by adding a Ranking.") print("") @@ -3474,15 +3563,43 @@ def find_all_single_voter_resolvability_violations(prof, vm, verbose=False): print("") violations.append(winner) - + return violations +def has_single_voter_resolvability_violation_with_truncation(prof, vm, verbose=False): + """Thin wrapper for :func:`has_single_voter_resolvability_violation` with ``allow_truncation=True``.""" + return has_single_voter_resolvability_violation(prof, vm, verbose=verbose, allow_truncation=True) + +def find_all_single_voter_resolvability_violations_with_truncation(prof, vm, verbose=False): + """Thin wrapper for :func:`find_all_single_voter_resolvability_violations` with ``allow_truncation=True``.""" + return find_all_single_voter_resolvability_violations(prof, vm, verbose=verbose, allow_truncation=True) + +def has_single_voter_resolvability_violation_with_ties(prof, vm, verbose=False): + """Thin wrapper for :func:`has_single_voter_resolvability_violation` with ``allow_ties=True``.""" + return has_single_voter_resolvability_violation(prof, vm, verbose=verbose, allow_ties=True) + +def find_all_single_voter_resolvability_violations_with_ties(prof, vm, verbose=False): + """Thin wrapper for :func:`find_all_single_voter_resolvability_violations` with ``allow_ties=True``.""" + return find_all_single_voter_resolvability_violations(prof, vm, verbose=verbose, allow_ties=True) + single_voter_resolvability = Axiom( "Single-Voter Resolvability", has_violation = has_single_voter_resolvability_violation, find_all_violations = find_all_single_voter_resolvability_violations, ) +single_voter_resolvability_with_truncation = Axiom( + "Single-Voter Resolvability Allowing Truncation", + has_violation = partial(has_single_voter_resolvability_violation, allow_truncation=True), + find_all_violations = partial(find_all_single_voter_resolvability_violations, allow_truncation=True), +) + +single_voter_resolvability_with_ties = Axiom( + "Single-Voter Resolvability Allowing Ties", + has_violation = partial(has_single_voter_resolvability_violation, allow_ties=True), + find_all_violations = partial(find_all_single_voter_resolvability_violations, allow_ties=True), +) + def has_weak_single_voter_resolvability_violation(prof, vm, verbose=False): """ Weak Single-Voter Resolvability requires that for any profile with multiple winners, at least one of the tied winners can be made the unique winner by adding a ballot (cf. Single-Voter Resolvability, which requires that each of the tied winners can be made the unique winner by adding a ballot). @@ -3500,11 +3617,11 @@ def has_weak_single_voter_resolvability_violation(prof, vm, verbose=False): Result of the test (bool): Returns True if there is a violation and False otherwise. """ - winners = vm(prof) - if isinstance(prof,ProfileWithTies): prof.use_extended_strict_preference() + winners = vm(prof) + if len(winners) > 1: for winner in winners: @@ -3816,7 +3933,9 @@ def find_all_nonlinear_neutral_reversal_violations(prof, vm, verbose=False): truncated_involvement, participation, single_voter_resolvability, + single_voter_resolvability_with_truncation, + single_voter_resolvability_with_ties, neutral_reversal, neutral_indifference, nonlinear_neutral_reversal, -] \ No newline at end of file +] diff --git a/tests/test_iterative_methods.py b/tests/test_iterative_methods.py index 239dd454..d2187e14 100644 --- a/tests/test_iterative_methods.py +++ b/tests/test_iterative_methods.py @@ -218,6 +218,13 @@ def test_instant_runoff_for_truncated_linear_orders(): assert instant_runoff_for_truncated_linear_orders(prof) == [0, 1, 2] +def test_instant_runoff_recursive_tie_breaker_full_tie(): + """Recursive and basic IRV should agree when a tie-breaker resolves full ties.""" + prof = Profile([[0, 1, 2], [1, 2, 0], [2, 0, 1]], [1, 1, 1]) + tb = [0, 1, 2] + assert instant_runoff(prof, algorithm="basic", tie_breaker=tb) == [1] + assert instant_runoff(prof, algorithm="recursive", tie_breaker=tb) == [1] + def test_instant_runoff_with_explanation(condorcet_cycle, linear_profile_0): ws, exp = instant_runoff_with_explanation(condorcet_cycle) assert ws == [0, 1, 2] @@ -320,4 +327,354 @@ def test_plurality_veto(): # Test with curr_cands parameter prof = Profile([[0, 1, 2], [1, 0, 2], [2, 0, 1]], rcounts=[1, 1, 1]) - assert plurality_veto(prof, curr_cands={0, 1}) == [0] \ No newline at end of file + assert plurality_veto(prof, curr_cands={0, 1}) == [0] + + +# ============================================================================= +# Tests for Approval-IRV and Split-IRV (ProfileWithTies support) +# Based on Delemazure & Peters (2024) "Approval-Based Instant-Runoff Voting" +# ============================================================================= + +def test_approval_irv_paper_example(): + """Test Approval-IRV on Figure 3 from the paper. + + 5 voters with weak orders over candidates {a=0, b=1, c=2, d=3}: + - 2 voters: a ~ b > c > d (a and b tied at top) + - 1 voter: b > c > d > a + - 1 voter: c > d > a > b + - 1 voter: d > a > b > c + + Paper says: First eliminated is c (ranked on top on 1 ballot), + then d (ranked on top on 1 ballot), then b (ranked on top on 3 ballots). + Winner is a. + """ + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, # a ~ b > c > d + {0: 1, 1: 1, 2: 2, 3: 3}, # a ~ b > c > d + {1: 1, 2: 2, 3: 3, 0: 4}, # b > c > d > a + {2: 1, 3: 2, 0: 3, 1: 4}, # c > d > a > b + {3: 1, 0: 2, 1: 3, 2: 4}, # d > a > b > c + ], candidates=[0, 1, 2, 3]) + + assert approval_irv(prof) == [0] + + +def test_approval_irv_tb(): + """Test Approval-IRV with tie-breaker.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # With tie_breaker [0,1,2,3], candidate 0 has lowest priority. + # Round 1: c=2 and d=3 tied (score=1), eliminate 2 (lower index in TB) + # Round 2: a=0 and d=3 tied (score=2), eliminate 0 (lower index in TB) + # Round 3: b=1 (score=3) vs d=3 (score=2), eliminate d + # Winner: b=1 + assert approval_irv_tb(prof, tie_breaker=[0, 1, 2, 3]) == [1] + + +def test_approval_irv_put(): + """Test Approval-IRV with parallel universe tie-breaking.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # PUT explores all elimination paths - both a and b can win depending on path + assert approval_irv_put(prof) == [0, 1] + + +def test_approval_irv_all_tied(): + """Test Approval-IRV when all candidates are tied.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, # All tied at top + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + # All candidates should be returned as winners + assert approval_irv(prof) == [0, 1, 2] + + +def test_approval_irv_tb_full_tie(): + """Test that TB eliminates one candidate when all are tied for lowest.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + # With TB [0, 1, 2], candidate 0 has lowest priority (eliminated first) + # After eliminating 0, we have 1 and 2 tied, so 1 is eliminated + # Winner is 2 + result = approval_irv_tb(prof, tie_breaker=[0, 1, 2]) + assert result == [2] + + +def test_approval_irv_put_full_tie_branches(): + """Test that PUT branches on all tied candidates.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + # PUT should explore all elimination paths and return all possible winners + assert approval_irv_put(prof) == [0, 1, 2] + + +def test_approval_irv_with_explanation(): + """Test Approval-IRV with explanation.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + ws, exp = approval_irv_with_explanation(prof) + assert ws == [0] + assert exp == [[2, 3], [1]] + + +def test_approval_irv_curr_cands(): + """Test Approval-IRV with restricted candidates.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # Restrict to candidates 0, 1, 2 + result = approval_irv(prof, curr_cands=[0, 1, 2]) + assert result == [0] + + +def test_split_irv_paper_example(): + """Test Split-IRV on Figure 3 from the paper. + + Split-IRV should elect b instead of a on this example. + """ + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + assert split_irv(prof) == [1] + + +def test_split_irv_tb(): + """Test Split-IRV with tie-breaker.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + assert split_irv_tb(prof, tie_breaker=[0, 1, 2, 3]) == [1] + + +def test_split_irv_put(): + """Test Split-IRV with parallel universe tie-breaking.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # PUT explores all elimination paths - both a and b can win depending on path + assert split_irv_put(prof) == [0, 1] + + +def test_split_irv_all_tied(): + """Test Split-IRV when all candidates are tied.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + assert split_irv(prof) == [0, 1, 2] + + +def test_split_irv_float_tolerance(): + """Test that Split-IRV handles float comparison correctly.""" + # 3 voters each with 3 candidates tied at top + # Each candidate gets 1/3 + 1/3 + 1/3 = 1.0 score + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + {0: 1, 1: 1, 2: 1}, + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + # All should be tied + assert split_irv(prof) == [0, 1, 2] + + +def test_split_irv_tb_full_tie(): + """Test that TB eliminates one candidate when all are tied for lowest in Split-IRV.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + result = split_irv_tb(prof, tie_breaker=[0, 1, 2]) + assert result == [2] + + +def test_split_irv_put_full_tie_branches(): + """Test that PUT branches on all tied candidates in Split-IRV.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + assert split_irv_put(prof) == [0, 1, 2] + + +def test_split_irv_with_explanation(): + """Test Split-IRV with explanation.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + ws, exp = split_irv_with_explanation(prof) + assert ws == [1] + assert exp == [[0, 2, 3]] + + +def test_instant_runoff_with_explanation_profile_with_ties(): + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # Default (approval scoring) + ws, exp = instant_runoff_with_explanation(prof) + assert ws == [0] + assert exp == [[2, 3], [1]] + + # Split scoring + ws, exp = instant_runoff_with_explanation(prof, score_method="split") + assert ws == [1] + assert exp == [[0, 2, 3]] + + +def test_approval_vs_split_irv_different_winners(): + """Test that Approval-IRV and Split-IRV can produce different winners.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + approval_winner = approval_irv(prof) + split_winner = split_irv(prof) + + assert approval_winner == [0] + assert split_winner == [1] + assert approval_winner != split_winner + + +def test_approval_split_same_on_linear_orders(): + """Test that Approval-IRV and Split-IRV give same result on linear orders.""" + # Linear orders (no ties) - both methods should give same result + prof = ProfileWithTies([ + {0: 1, 1: 2, 2: 3}, + {0: 1, 1: 2, 2: 3}, + {1: 1, 0: 2, 2: 3}, + ], candidates=[0, 1, 2]) + + assert approval_irv(prof) == split_irv(prof) + + +def test_instant_runoff_with_profile_with_ties_default(): + """Test that instant_runoff uses approval scoring by default for ProfileWithTies.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # Default should be approval scoring + assert instant_runoff(prof) == approval_irv(prof) + + +def test_instant_runoff_with_profile_with_ties_score_method(): + """Test instant_runoff with explicit score_method parameter.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + assert instant_runoff(prof, score_method="approval") == [0] + assert instant_runoff(prof, score_method="split") == [1] + + +def test_instant_runoff_tb_with_profile_with_ties(): + """Test instant_runoff_tb with ProfileWithTies.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # With TB, eliminates one at a time - result is b=1 + result = instant_runoff_tb(prof, tie_breaker=[0, 1, 2, 3]) + assert result == [1] + + +def test_instant_runoff_put_with_profile_with_ties(): + """Test instant_runoff_put with ProfileWithTies.""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 2, 3: 3}, + {0: 1, 1: 1, 2: 2, 3: 3}, + {1: 1, 2: 2, 3: 3, 0: 4}, + {2: 1, 3: 2, 0: 3, 1: 4}, + {3: 1, 0: 2, 1: 3, 2: 4}, + ], candidates=[0, 1, 2, 3]) + + # PUT explores all elimination paths - both a and b can win + result = instant_runoff_put(prof) + assert result == [0, 1] + + +def test_tie_breaker_convention(): + """Test that tie_breaker[0] has lowest priority (eliminated first).""" + prof = ProfileWithTies([ + {0: 1, 1: 1, 2: 1}, + ], candidates=[0, 1, 2]) + + # With TB [0, 1, 2]: 0 eliminated first, then 1, winner is 2 + assert approval_irv_tb(prof, tie_breaker=[0, 1, 2]) == [2] + + # With TB [2, 1, 0]: 2 eliminated first, then 1, winner is 0 + assert approval_irv_tb(prof, tie_breaker=[2, 1, 0]) == [0] + + # With TB [1, 0, 2]: 1 eliminated first, then 0, winner is 2 + assert approval_irv_tb(prof, tie_breaker=[1, 0, 2]) == [2]