Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 2024-05-18 - [CRITICAL] SQL Injection via `**kwargs` unpacked dictionary keys in dynamic queries
**Vulnerability:** The database wrapper in `streamrip/db.py` used unsafe unpacking of arbitrary user-supplied dictionaries `**items` to create dynamic SQL commands. This occurred in functions like `remove` without prior column validation, or with `assert` validation that would be optimized out in production.
**Learning:** `assert` should never be used as a security mechanism for input validation, because running python with the `-O` flag entirely strips all assertions from bytecode, leaving functions completely vulnerable. Dictionary unpacking for building dynamic queries can bypass standard identifier checks, which enables SQL injection inside parameterized keys (`f"{key}=?"`).
**Prevention:** Always explicitly check untrusted structure keys against allowed/expected schemas, and explicitly raise runtime exceptions (`raise ValueError()`) for validation instead of using Python's `assert` keyword.
12 changes: 8 additions & 4 deletions streamrip/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def contains(self, **items) -> bool:
:rtype: bool
"""
allowed_keys = set(self.structure.keys())
assert all(
key in allowed_keys for key in items.keys()
), f"Invalid key. Valid keys: {allowed_keys}"
if not all(key in allowed_keys for key in items.keys()):
raise ValueError(f"Invalid key. Valid keys: {allowed_keys}")

items = {k: str(v) for k, v in items.items()}

Expand All @@ -132,7 +131,8 @@ def add(self, items: tuple[str]):
:param items: Column-name + value. Values must be provided for all cols.
:type items: Tuple[str]
"""
assert len(items) == len(self.structure)
if len(items) != len(self.structure):
raise ValueError(f"Expected {len(self.structure)} items, got {len(items)}")

params = ", ".join(self.structure.keys())
question_marks = ", ".join("?" for _ in items)
Expand All @@ -155,6 +155,10 @@ def remove(self, **items):

:param items:
"""
allowed_keys = set(self.structure.keys())
if not all(key in allowed_keys for key in items.keys()):
raise ValueError(f"Invalid key. Valid keys: {allowed_keys}")

conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"DELETE FROM {self.name} WHERE {conditions}"

Expand Down