diff --git a/.jules/sentinel.md b/.jules/sentinel.md new file mode 100644 index 00000000..af9b7691 --- /dev/null +++ b/.jules/sentinel.md @@ -0,0 +1,5 @@ + +## 2024-05-18 - [CRITICAL] SQL Injection via Database kwargs Unpacking +**Vulnerability:** The `streamrip/db.py` `DatabaseBase.contains` and `remove` methods accepted `**items` (kwargs) and directly used their keys in dynamic SQL condition string formatting (`" AND ".join(f"{key}=?" for key in items.keys())`). While `contains` attempted validation, it used `assert all(...)`, which can be completely bypassed if Python is run with `-O` (optimization). The `remove` method had no validation at all. This allowed an attacker to pass arbitrary keys (e.g., `contains(**{"1=1; DROP TABLE users;--": "val"})`) resulting in SQL injection. +**Learning:** Never rely on `assert` for security validation, as assertions are removed in optimized Python execution modes, leaving the code vulnerable. Furthermore, unpacking `**kwargs` allows arbitrary parameter names that bypass standard Python identifier rules, so dictionary keys must be strictly validated against an allowlist before being used in SQL string formatting or other sensitive contexts. +**Prevention:** Always use explicit `if not ...: raise ValueError(...)` or similar control flow mechanisms for security and data integrity validation. Strictly validate all keys from `**kwargs` against an allowed schema (e.g., allowed database columns) before dynamically constructing SQL queries, even when using parameterized values, as the keys themselves form the SQL structure. diff --git a/streamrip/db.py b/streamrip/db.py index a3558559..6cc9039c 100644 --- a/streamrip/db.py +++ b/streamrip/db.py @@ -113,9 +113,8 @@ def contains(self, **items) -> bool: :rtype: bool """ allowed_keys = set(self.structure.keys()) - assert all( - key in allowed_keys for key in items.keys() - ), f"Invalid key. Valid keys: {allowed_keys}" + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") items = {k: str(v) for k, v in items.items()} @@ -155,6 +154,10 @@ def remove(self, **items): :param items: """ + allowed_keys = set(self.structure.keys()) + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") + conditions = " AND ".join(f"{key}=?" for key in items.keys()) command = f"DELETE FROM {self.name} WHERE {conditions}"