diff --git a/.jules/sentinel.md b/.jules/sentinel.md new file mode 100644 index 00000000..fb3feafb --- /dev/null +++ b/.jules/sentinel.md @@ -0,0 +1,4 @@ +## 2024-05-18 - [CRITICAL] SQL Injection via `**kwargs` unpacked dictionary keys in dynamic queries +**Vulnerability:** The database wrapper in `streamrip/db.py` used unsafe unpacking of arbitrary user-supplied dictionaries `**items` to create dynamic SQL commands. This occurred in functions like `remove` without prior column validation, or with `assert` validation that would be optimized out in production. +**Learning:** `assert` should never be used as a security mechanism for input validation, because running python with the `-O` flag entirely strips all assertions from bytecode, leaving functions completely vulnerable. Dictionary unpacking for building dynamic queries can bypass standard identifier checks, which enables SQL injection inside parameterized keys (`f"{key}=?"`). +**Prevention:** Always explicitly check untrusted structure keys against allowed/expected schemas, and explicitly raise runtime exceptions (`raise ValueError()`) for validation instead of using Python's `assert` keyword. diff --git a/streamrip/db.py b/streamrip/db.py index a3558559..cc0beacd 100644 --- a/streamrip/db.py +++ b/streamrip/db.py @@ -113,9 +113,8 @@ def contains(self, **items) -> bool: :rtype: bool """ allowed_keys = set(self.structure.keys()) - assert all( - key in allowed_keys for key in items.keys() - ), f"Invalid key. Valid keys: {allowed_keys}" + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") items = {k: str(v) for k, v in items.items()} @@ -132,7 +131,8 @@ def add(self, items: tuple[str]): :param items: Column-name + value. Values must be provided for all cols. :type items: Tuple[str] """ - assert len(items) == len(self.structure) + if len(items) != len(self.structure): + raise ValueError(f"Expected {len(self.structure)} items, got {len(items)}") params = ", ".join(self.structure.keys()) question_marks = ", ".join("?" for _ in items) @@ -155,6 +155,10 @@ def remove(self, **items): :param items: """ + allowed_keys = set(self.structure.keys()) + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") + conditions = " AND ".join(f"{key}=?" for key in items.keys()) command = f"DELETE FROM {self.name} WHERE {conditions}"