Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion shotgun_api3/shotgun.py
Original file line number Diff line number Diff line change
Expand Up @@ -4837,9 +4837,11 @@ def _translate_filters_dict(sg_filter):


def _translate_filters_list(filters):
# Deduplicate filters to avoid redundant conditions
deduplicated_filters = remove_duplicate_filters(filters)
conditions = []

for sg_filter in filters:
for sg_filter in deduplicated_filters:
if isinstance(sg_filter, (list, tuple)):
conditions.append(_translate_filters_simple(sg_filter))
elif isinstance(sg_filter, dict):
Expand All @@ -4852,6 +4854,95 @@ def _translate_filters_list(filters):
return conditions


# =============================================================================
# FILTER DEDUPLICATION UTILITIES
# =============================================================================

def normalize_filter(filter_obj):
"""
Creates a canonical, comparable representation of a filter.
The key step is sorting dictionaries to ensure that different key orders
are treated as identical for comparison purposes.
"""
if isinstance(filter_obj, dict):
return tuple(sorted((k, normalize_filter(v)) for k, v in filter_obj.items()))
elif isinstance(filter_obj, (list, tuple)):
return tuple(normalize_filter(item) for item in filter_obj)
else:
return filter_obj


def complex_filter(filter_obj):
"""
Checks if a filter is a complex group by looking for a 'filters' key.
Note: In Python API, complex filters use 'filters' key (input format),
not 'conditions' key (server format).

Examples:
complex_filter({"filter_operator": "and", "filters": []}) -> True
complex_filter({"path": "id", "values": [1]}) -> False
"""
return isinstance(filter_obj, dict) and 'filters' in filter_obj


def deduplicate_nested_conditions(sg_filter, unique_normalized_filters):
"""
For a complex filter, returns a new version with its nested conditions deduplicated.
"""
if not complex_filter(sg_filter):
return sg_filter

nested_conditions = deduplicate(sg_filter["filters"], unique_normalized_filters)
new_filter = sg_filter.copy()
new_filter["filters"] = nested_conditions
return new_filter


def deduplicate(filters, unique_normalized_filters):
"""
The recursive worker that processes a list of filters. It relies on the
shared unique_normalized_filters set to track and remove duplicates
across all levels of nesting.
"""
deduplicated_filters = []

for sg_filter in filters:
# First, deduplicate any nested conditions within the current filter
prepared_filter = deduplicate_nested_conditions(sg_filter, unique_normalized_filters)

# Discard complex filters that become empty after their nested conditions are deduplicated
if complex_filter(prepared_filter) and not prepared_filter.get('filters'):
# But keep invalid filters for proper error handling
if prepared_filter.get('filter_operator') in ['all', 'and', 'any', 'or']:
continue

normalized_filter = normalize_filter(prepared_filter)

# The add() method returns None, but we check membership first
if normalized_filter not in unique_normalized_filters:
unique_normalized_filters.add(normalized_filter)
deduplicated_filters.append(prepared_filter)

return deduplicated_filters


def remove_duplicate_filters(filters):
"""
Remove duplicate filters from a list of filters while preserving order.

:param filters: List of filter objects to deduplicate
:returns: List with duplicates removed, preserving original order
"""
try:
if not isinstance(filters, (list, tuple)):
return filters

return deduplicate(filters, set())
except Exception:
# Fail-safe: return original filters if deduplication fails to prevent a crash
return filters


def _translate_filters_simple(sg_filter):
condition = {"path": sg_filter[0], "relation": sg_filter[1]}

Expand Down
35 changes: 35 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,27 @@ def test_simple_summary(self):
assert result["groups"][0]["summaries"]
assert result["summaries"]

def test_summarize_with_duplicate_filters(self):
"""Test that summarize() with duplicate filters returns same results as without duplicates"""
summaries = [{"field": "id", "type": "count"}]
grouping = [{"direction": "asc", "field": "id", "type": "exact"}]

# Baseline: test without duplicates
filters_clean = [["project", "is", self.project]]
result_clean = self.sg.summarize(
"Shot", filters=filters_clean, summary_fields=summaries, grouping=grouping
)

# Test with duplicates - should return deduplicated results
project_filter = ["project", "is", self.project]
filters_duplicated = [project_filter, project_filter]
result_deduplicated = self.sg.summarize(
"Shot", filters=filters_duplicated, summary_fields=summaries, grouping=grouping
)

# Results should be identical
self.assertEqual(result_clean, result_deduplicated)

def test_summary_include_archived_projects(self):
"""Test summarize with archived project"""
if self.sg.server_caps.version > (5, 3, 13):
Expand Down Expand Up @@ -1342,6 +1363,20 @@ def test_find(self):
self.assertEqual("Version", version["type"])
self.assertEqual(self.version["id"], version["id"])

def test_find_with_duplicate_filters(self):
"""Test that find() with duplicate filters returns same results as without duplicates"""
# Baseline: test without duplicates
filters_clean = [["project", "is", self.project]]
result_clean = self.sg.find("Shot", filters_clean, ["id", "code"])

# Test with duplicates - should return deduplicated results
project_filter = ["project", "is", self.project]
filters_duplicated = [project_filter, project_filter, project_filter]
result_deduplicated = self.sg.find("Shot", filters_duplicated, ["id", "code"])

# Results should be identical
self.assertEqual(result_clean, result_deduplicated)

def _id_in_result(self, entity_type, filters, expected_id):
"""
Checks that a given id matches that of entities returned
Expand Down
198 changes: 198 additions & 0 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,204 @@ def test_urlib(self):
assert response is not None


class TestFilterDeduplication(unittest.TestCase):
"""Test filter deduplication utility functions"""

def setUp(self):
"""Set up test fixtures"""
from shotgun_api3.shotgun import (
normalize_filter,
complex_filter,
remove_duplicate_filters
)
self.normalize_filter = normalize_filter
self.complex_filter = complex_filter
self.remove_duplicate_filters = remove_duplicate_filters

def test_normalize_filter_simple_list(self):
"""Test normalizing simple list-based filters"""
filter_obj = ["project", "is", {"type": "Project", "id": 123}]
result = self.normalize_filter(filter_obj)
expected = ("project", "is", (("id", 123), ("type", "Project")))
self.assertEqual(result, expected)

def test_normalize_filter_simple_dict(self):
"""Test normalizing simple dict-based filters"""
filter_obj = {"path": "project", "relation": "is", "values": [{"type": "Project", "id": 123}]}
result = self.normalize_filter(filter_obj)

# Should be a tuple with sorted dict keys
result_str = str(result)
self.assertIsInstance(result, tuple)
self.assertIn("path", result_str)
self.assertIn("relation", result_str)
self.assertIn("values", result_str)

def test_normalize_filter_complex_nested(self):
"""Test normalizing complex nested filters"""
complex_filter_obj = {
"filter_operator": "and",
"filters": [
["project", "is", {"type": "Project", "id": 123}],
["sg_status_list", "is", "rev"]
]
}
result = self.normalize_filter(complex_filter_obj)

# Should contain sorted keys and nested normalized filters
result_str = str(result)
self.assertIsInstance(result, tuple)
self.assertIn("filter_operator", result_str)
self.assertIn("filters", result_str)

def test_complex_filter_detection_true(self):
"""Test complex_filter() returns True for complex filters"""
complex_filters = [
{"filter_operator": "and", "filters": []},
{"filters": [["project", "is", {"type": "Project", "id": 123}]]},
{"filter_operator": "or", "filters": [["id", "is", 1]]}
]
for f in complex_filters:
with self.subTest(filter=f):
self.assertTrue(self.complex_filter(f))

def test_complex_filter_detection_false(self):
"""Test complex_filter() returns False for simple filters"""
simple_filters = [
["project", "is", {"type": "Project", "id": 123}],
{"path": "id", "values": [1]},
{"path": "sg_status_list", "relation": "is", "values": ["rev"]},
"simple_string",
123,
None
]
for f in simple_filters:
with self.subTest(filter=f):
self.assertFalse(self.complex_filter(f))

def test_remove_duplicate_filters_simple(self):
"""Test removing duplicates from simple filter list"""
project_filter = ["project", "is", {"type": "Project", "id": 123}]
status_filter = ["sg_status_list", "is", "rev"]
filters = [
project_filter,
status_filter,
project_filter, # duplicate
["entity", "type_is", "Shot"],
project_filter # duplicate
]
result = self.remove_duplicate_filters(filters)

# Should have 3 unique sorted filters
self.assertEqual(len(result), 3)
self.assertEqual(result[0], project_filter)
self.assertEqual(result[1], status_filter)
self.assertEqual(result[2], ["entity", "type_is", "Shot"])

def test_remove_duplicate_filters_complex(self):
"""Test removing duplicates with complex nested filters"""
simple_filter = ["project", "is", {"type": "Project", "id": 123}]
complex_filter_1 = {
"filter_operator": "and",
"filters": [
["sg_status_list", "is", "rev"],
["entity", "type_is", "Shot"]
]
}
complex_filter_2 = {
"filter_operator": "or",
"filters": [["id", "is", 1]]
}
filters = [
simple_filter,
complex_filter_1,
simple_filter, # duplicate
complex_filter_2,
complex_filter_1 # duplicate
]
result = self.remove_duplicate_filters(filters)

# Should have 3 unique sorted filters
self.assertEqual(len(result), 3)
self.assertEqual(result[0], simple_filter)
self.assertEqual(result[1], complex_filter_1)
self.assertEqual(result[2], complex_filter_2)

def test_remove_duplicate_filters_preserves_order(self):
"""Test that order is preserved for remaining filters"""
filter_a = ["field_a", "is", "value_a"]
filter_b = ["field_b", "is", "value_b"]
filter_c = ["field_c", "is", "value_c"]
filters = [filter_a, filter_b, filter_a, filter_c, filter_b]
result = self.remove_duplicate_filters(filters)
expected = [filter_a, filter_b, filter_c] # Should preserve order

self.assertEqual(result, expected)

def test_remove_duplicate_filters_edge_cases(self):
"""Test edge cases: empty lists, single item, no duplicates"""
# Empty list
self.assertEqual(self.remove_duplicate_filters([]), [])

# Single item
single_filter = [["project", "is", {"type": "Project", "id": 123}]]
self.assertEqual(self.remove_duplicate_filters(single_filter), single_filter)

# No duplicates
unique_filters = [
["project", "is", {"type": "Project", "id": 123}],
["sg_status_list", "is", "rev"],
["entity", "type_is", "Shot"]
]
self.assertEqual(self.remove_duplicate_filters(unique_filters), unique_filters)

def test_deduplicate_nested_conditions(self):
"""Test deduplicating nested conditions in complex filters - testing via remove_duplicate_filters"""
nested_filter = {
"filter_operator": "and",
"filters": [
["project", "is", {"type": "Project", "id": 123}],
["sg_status_list", "is", "rev"],
["project", "is", {"type": "Project", "id": 123}] # duplicate
]
}
result = self.remove_duplicate_filters([nested_filter])

# Should have one complex filter with deduplicated nested conditions
result_filter = result[0]
self.assertEqual(len(result), 1)
self.assertEqual(result_filter["filter_operator"], "and")
self.assertEqual(len(result_filter["filters"]), 2)
self.assertEqual(result_filter["filters"][0], ["project", "is", {"type": "Project", "id": 123}])
self.assertEqual(result_filter["filters"][1], ["sg_status_list", "is", "rev"])

def test_deduplicate_function_mixed_filters(self):
"""Test main function with mixed simple and complex filters"""
project_filter = ["project", "is", {"type": "Project", "id": 123}]
complex_filter = {
"filter_operator": "and",
"filters": [
["sg_status_list", "is", "rev"],
project_filter, # Nested duplication
["entity", "type_is", "Shot"]
]
}
filters = [
project_filter,
complex_filter,
project_filter, # Top-level duplication
]
result = self.remove_duplicate_filters(filters)

# Should have 2 items (project_filter + deduplicated complex_filter)
self.assertEqual(len(result), 2)
self.assertEqual(result[0], project_filter)

# Complex filter should be processed and nested duplicates removed
self.assertIsInstance(result[1], dict)
self.assertEqual(result[1]["filter_operator"], "and")


class TestMimetypesFix(unittest.TestCase):
"""
Makes sure that the mimetypes fix will be imported.
Expand Down
Loading