diff --git a/.jules/sentinel.md b/.jules/sentinel.md new file mode 100644 index 00000000..264e14c1 --- /dev/null +++ b/.jules/sentinel.md @@ -0,0 +1,4 @@ +## 2024-03-28 - [CRITICAL] Fix SQL injection risk in dynamic database queries +**Vulnerability:** SQL injection via bypass of Python's `assert` in `**kwargs` unpacking for dynamic query construction. +**Learning:** `streamrip/db.py` used `assert` to validate dictionary keys (`**kwargs` passed to `contains`) against allowed schema columns to build an SQL query string safely. However, `assert` statements can be optimized away entirely when Python is run with the `-O` flag. Since `**kwargs` allows any identifier, a malicious caller (or corrupted input) could bypass validation and inject arbitrary SQL column names (which are concatenated directly into the query string) if `-O` is used. Furthermore, `remove` did not validate `**kwargs` keys at all, presenting a similar SQL injection vulnerability. +**Prevention:** Explicitly validate keys against the allowed schema columns using `raise ValueError` instead of `assert` to ensure validation always runs regardless of execution flags. Ensure all dynamic query construction pathways (e.g., `remove`, `contains`) sanitize/validate their input keys. \ No newline at end of file diff --git a/streamrip/db.py b/streamrip/db.py index a3558559..392abf24 100644 --- a/streamrip/db.py +++ b/streamrip/db.py @@ -113,9 +113,9 @@ def contains(self, **items) -> bool: :rtype: bool """ allowed_keys = set(self.structure.keys()) - assert all( - key in allowed_keys for key in items.keys() - ), f"Invalid key. Valid keys: {allowed_keys}" + # 🛡️ Sentinel: Use ValueError instead of assert to prevent bypass via python -O + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") items = {k: str(v) for k, v in items.items()} @@ -155,6 +155,11 @@ def remove(self, **items): :param items: """ + allowed_keys = set(self.structure.keys()) + # 🛡️ Sentinel: Validate keys to prevent SQL injection via **kwargs + if not all(key in allowed_keys for key in items.keys()): + raise ValueError(f"Invalid key. Valid keys: {allowed_keys}") + conditions = " AND ".join(f"{key}=?" for key in items.keys()) command = f"DELETE FROM {self.name} WHERE {conditions}"