diff --git a/shotgun_api3/shotgun.py b/shotgun_api3/shotgun.py index b682e87e..93d85d5e 100644 --- a/shotgun_api3/shotgun.py +++ b/shotgun_api3/shotgun.py @@ -4837,9 +4837,11 @@ def _translate_filters_dict(sg_filter): def _translate_filters_list(filters): + # Deduplicate filters to avoid redundant conditions + deduplicated_filters = remove_duplicate_filters(filters) conditions = [] - for sg_filter in filters: + for sg_filter in deduplicated_filters: if isinstance(sg_filter, (list, tuple)): conditions.append(_translate_filters_simple(sg_filter)) elif isinstance(sg_filter, dict): @@ -4852,6 +4854,95 @@ def _translate_filters_list(filters): return conditions +# ============================================================================= +# FILTER DEDUPLICATION UTILITIES +# ============================================================================= + +def normalize_filter(filter_obj): + """ + Creates a canonical, comparable representation of a filter. + The key step is sorting dictionaries to ensure that different key orders + are treated as identical for comparison purposes. + """ + if isinstance(filter_obj, dict): + return tuple(sorted((k, normalize_filter(v)) for k, v in filter_obj.items())) + elif isinstance(filter_obj, (list, tuple)): + return tuple(normalize_filter(item) for item in filter_obj) + else: + return filter_obj + + +def complex_filter(filter_obj): + """ + Checks if a filter is a complex group by looking for a 'filters' key. + Note: In Python API, complex filters use 'filters' key (input format), + not 'conditions' key (server format). + + Examples: + complex_filter({"filter_operator": "and", "filters": []}) -> True + complex_filter({"path": "id", "values": [1]}) -> False + """ + return isinstance(filter_obj, dict) and 'filters' in filter_obj + + +def deduplicate_nested_conditions(sg_filter, unique_normalized_filters): + """ + For a complex filter, returns a new version with its nested conditions deduplicated. + """ + if not complex_filter(sg_filter): + return sg_filter + + nested_conditions = deduplicate(sg_filter["filters"], unique_normalized_filters) + new_filter = sg_filter.copy() + new_filter["filters"] = nested_conditions + return new_filter + + +def deduplicate(filters, unique_normalized_filters): + """ + The recursive worker that processes a list of filters. It relies on the + shared unique_normalized_filters set to track and remove duplicates + across all levels of nesting. + """ + deduplicated_filters = [] + + for sg_filter in filters: + # First, deduplicate any nested conditions within the current filter + prepared_filter = deduplicate_nested_conditions(sg_filter, unique_normalized_filters) + + # Discard complex filters that become empty after their nested conditions are deduplicated + if complex_filter(prepared_filter) and not prepared_filter.get('filters'): + # But keep invalid filters for proper error handling + if prepared_filter.get('filter_operator') in ['all', 'and', 'any', 'or']: + continue + + normalized_filter = normalize_filter(prepared_filter) + + # The add() method returns None, but we check membership first + if normalized_filter not in unique_normalized_filters: + unique_normalized_filters.add(normalized_filter) + deduplicated_filters.append(prepared_filter) + + return deduplicated_filters + + +def remove_duplicate_filters(filters): + """ + Remove duplicate filters from a list of filters while preserving order. + + :param filters: List of filter objects to deduplicate + :returns: List with duplicates removed, preserving original order + """ + try: + if not isinstance(filters, (list, tuple)): + return filters + + return deduplicate(filters, set()) + except Exception: + # Fail-safe: return original filters if deduplication fails to prevent a crash + return filters + + def _translate_filters_simple(sg_filter): condition = {"path": sg_filter[0], "relation": sg_filter[1]} diff --git a/tests/test_api.py b/tests/test_api.py index 788c9751..bf37fadc 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -767,6 +767,27 @@ def test_simple_summary(self): assert result["groups"][0]["summaries"] assert result["summaries"] + def test_summarize_with_duplicate_filters(self): + """Test that summarize() with duplicate filters returns same results as without duplicates""" + summaries = [{"field": "id", "type": "count"}] + grouping = [{"direction": "asc", "field": "id", "type": "exact"}] + + # Baseline: test without duplicates + filters_clean = [["project", "is", self.project]] + result_clean = self.sg.summarize( + "Shot", filters=filters_clean, summary_fields=summaries, grouping=grouping + ) + + # Test with duplicates - should return deduplicated results + project_filter = ["project", "is", self.project] + filters_duplicated = [project_filter, project_filter] + result_deduplicated = self.sg.summarize( + "Shot", filters=filters_duplicated, summary_fields=summaries, grouping=grouping + ) + + # Results should be identical + self.assertEqual(result_clean, result_deduplicated) + def test_summary_include_archived_projects(self): """Test summarize with archived project""" if self.sg.server_caps.version > (5, 3, 13): @@ -1342,6 +1363,20 @@ def test_find(self): self.assertEqual("Version", version["type"]) self.assertEqual(self.version["id"], version["id"]) + def test_find_with_duplicate_filters(self): + """Test that find() with duplicate filters returns same results as without duplicates""" + # Baseline: test without duplicates + filters_clean = [["project", "is", self.project]] + result_clean = self.sg.find("Shot", filters_clean, ["id", "code"]) + + # Test with duplicates - should return deduplicated results + project_filter = ["project", "is", self.project] + filters_duplicated = [project_filter, project_filter, project_filter] + result_deduplicated = self.sg.find("Shot", filters_duplicated, ["id", "code"]) + + # Results should be identical + self.assertEqual(result_clean, result_deduplicated) + def _id_in_result(self, entity_type, filters, expected_id): """ Checks that a given id matches that of entities returned diff --git a/tests/test_unit.py b/tests/test_unit.py index de996c55..e6472035 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -798,6 +798,204 @@ def test_urlib(self): assert response is not None +class TestFilterDeduplication(unittest.TestCase): + """Test filter deduplication utility functions""" + + def setUp(self): + """Set up test fixtures""" + from shotgun_api3.shotgun import ( + normalize_filter, + complex_filter, + remove_duplicate_filters + ) + self.normalize_filter = normalize_filter + self.complex_filter = complex_filter + self.remove_duplicate_filters = remove_duplicate_filters + + def test_normalize_filter_simple_list(self): + """Test normalizing simple list-based filters""" + filter_obj = ["project", "is", {"type": "Project", "id": 123}] + result = self.normalize_filter(filter_obj) + expected = ("project", "is", (("id", 123), ("type", "Project"))) + self.assertEqual(result, expected) + + def test_normalize_filter_simple_dict(self): + """Test normalizing simple dict-based filters""" + filter_obj = {"path": "project", "relation": "is", "values": [{"type": "Project", "id": 123}]} + result = self.normalize_filter(filter_obj) + + # Should be a tuple with sorted dict keys + result_str = str(result) + self.assertIsInstance(result, tuple) + self.assertIn("path", result_str) + self.assertIn("relation", result_str) + self.assertIn("values", result_str) + + def test_normalize_filter_complex_nested(self): + """Test normalizing complex nested filters""" + complex_filter_obj = { + "filter_operator": "and", + "filters": [ + ["project", "is", {"type": "Project", "id": 123}], + ["sg_status_list", "is", "rev"] + ] + } + result = self.normalize_filter(complex_filter_obj) + + # Should contain sorted keys and nested normalized filters + result_str = str(result) + self.assertIsInstance(result, tuple) + self.assertIn("filter_operator", result_str) + self.assertIn("filters", result_str) + + def test_complex_filter_detection_true(self): + """Test complex_filter() returns True for complex filters""" + complex_filters = [ + {"filter_operator": "and", "filters": []}, + {"filters": [["project", "is", {"type": "Project", "id": 123}]]}, + {"filter_operator": "or", "filters": [["id", "is", 1]]} + ] + for f in complex_filters: + with self.subTest(filter=f): + self.assertTrue(self.complex_filter(f)) + + def test_complex_filter_detection_false(self): + """Test complex_filter() returns False for simple filters""" + simple_filters = [ + ["project", "is", {"type": "Project", "id": 123}], + {"path": "id", "values": [1]}, + {"path": "sg_status_list", "relation": "is", "values": ["rev"]}, + "simple_string", + 123, + None + ] + for f in simple_filters: + with self.subTest(filter=f): + self.assertFalse(self.complex_filter(f)) + + def test_remove_duplicate_filters_simple(self): + """Test removing duplicates from simple filter list""" + project_filter = ["project", "is", {"type": "Project", "id": 123}] + status_filter = ["sg_status_list", "is", "rev"] + filters = [ + project_filter, + status_filter, + project_filter, # duplicate + ["entity", "type_is", "Shot"], + project_filter # duplicate + ] + result = self.remove_duplicate_filters(filters) + + # Should have 3 unique sorted filters + self.assertEqual(len(result), 3) + self.assertEqual(result[0], project_filter) + self.assertEqual(result[1], status_filter) + self.assertEqual(result[2], ["entity", "type_is", "Shot"]) + + def test_remove_duplicate_filters_complex(self): + """Test removing duplicates with complex nested filters""" + simple_filter = ["project", "is", {"type": "Project", "id": 123}] + complex_filter_1 = { + "filter_operator": "and", + "filters": [ + ["sg_status_list", "is", "rev"], + ["entity", "type_is", "Shot"] + ] + } + complex_filter_2 = { + "filter_operator": "or", + "filters": [["id", "is", 1]] + } + filters = [ + simple_filter, + complex_filter_1, + simple_filter, # duplicate + complex_filter_2, + complex_filter_1 # duplicate + ] + result = self.remove_duplicate_filters(filters) + + # Should have 3 unique sorted filters + self.assertEqual(len(result), 3) + self.assertEqual(result[0], simple_filter) + self.assertEqual(result[1], complex_filter_1) + self.assertEqual(result[2], complex_filter_2) + + def test_remove_duplicate_filters_preserves_order(self): + """Test that order is preserved for remaining filters""" + filter_a = ["field_a", "is", "value_a"] + filter_b = ["field_b", "is", "value_b"] + filter_c = ["field_c", "is", "value_c"] + filters = [filter_a, filter_b, filter_a, filter_c, filter_b] + result = self.remove_duplicate_filters(filters) + expected = [filter_a, filter_b, filter_c] # Should preserve order + + self.assertEqual(result, expected) + + def test_remove_duplicate_filters_edge_cases(self): + """Test edge cases: empty lists, single item, no duplicates""" + # Empty list + self.assertEqual(self.remove_duplicate_filters([]), []) + + # Single item + single_filter = [["project", "is", {"type": "Project", "id": 123}]] + self.assertEqual(self.remove_duplicate_filters(single_filter), single_filter) + + # No duplicates + unique_filters = [ + ["project", "is", {"type": "Project", "id": 123}], + ["sg_status_list", "is", "rev"], + ["entity", "type_is", "Shot"] + ] + self.assertEqual(self.remove_duplicate_filters(unique_filters), unique_filters) + + def test_deduplicate_nested_conditions(self): + """Test deduplicating nested conditions in complex filters - testing via remove_duplicate_filters""" + nested_filter = { + "filter_operator": "and", + "filters": [ + ["project", "is", {"type": "Project", "id": 123}], + ["sg_status_list", "is", "rev"], + ["project", "is", {"type": "Project", "id": 123}] # duplicate + ] + } + result = self.remove_duplicate_filters([nested_filter]) + + # Should have one complex filter with deduplicated nested conditions + result_filter = result[0] + self.assertEqual(len(result), 1) + self.assertEqual(result_filter["filter_operator"], "and") + self.assertEqual(len(result_filter["filters"]), 2) + self.assertEqual(result_filter["filters"][0], ["project", "is", {"type": "Project", "id": 123}]) + self.assertEqual(result_filter["filters"][1], ["sg_status_list", "is", "rev"]) + + def test_deduplicate_function_mixed_filters(self): + """Test main function with mixed simple and complex filters""" + project_filter = ["project", "is", {"type": "Project", "id": 123}] + complex_filter = { + "filter_operator": "and", + "filters": [ + ["sg_status_list", "is", "rev"], + project_filter, # Nested duplication + ["entity", "type_is", "Shot"] + ] + } + filters = [ + project_filter, + complex_filter, + project_filter, # Top-level duplication + ] + result = self.remove_duplicate_filters(filters) + + # Should have 2 items (project_filter + deduplicated complex_filter) + self.assertEqual(len(result), 2) + self.assertEqual(result[0], project_filter) + + # Complex filter should be processed and nested duplicates removed + self.assertIsInstance(result[1], dict) + self.assertEqual(result[1]["filter_operator"], "and") + + class TestMimetypesFix(unittest.TestCase): """ Makes sure that the mimetypes fix will be imported.