Skip to content

Conversation

MichaelASuarez
Copy link

Modified the Yellowbrick vector store to include IVF Indexing and allow for metadata filtering via JSON when doing similarity searches. All changes are in Yellowbrick.py and test_yellowbrick.py (for modified test suite).

Comment on lines 1742 to 1945
def jsonFilter2sqlWhere(
filter_dict: Dict[str, Any], metadata_column: str = "metadata"
) -> str:
"""
Convert Pinecone filter syntax to Yellowbrick SQL WHERE clause using
JSON path syntax.
Args:
filter_dict: Pinecone-style filter dictionary
metadata_column: Name of the JSONB column containing metadata
(default: "metadata")
Returns:
SQL WHERE clause string using Yellowbrick JSON path syntax
Example:
filter = {"genre": {"$eq": "documentary"}, "year": {"$gte": 2020}}
result = jsonFilter2sqlWhere(filter)
# Returns: "(metadata:$.genre::TEXT = 'documentary' AND
# metadata:$.year::INTEGER >= 2020)"
"""
if not filter_dict:
return "1=1" # No filter condition

return _process_filter_dict(filter_dict, metadata_column)


def _process_filter_dict(filter_dict: Dict[str, Any], metadata_column: str) -> str:
"""Process a filter dictionary and return SQL WHERE clause."""
conditions = []

for key, value in filter_dict.items():
if key == "$and":
and_conditions = []
for condition in value:
and_conditions.append(_process_filter_dict(condition, metadata_column))
conditions.append(f"({' AND '.join(and_conditions)})")

elif key == "$or":
or_conditions = []
for condition in value:
or_conditions.append(_process_filter_dict(condition, metadata_column))
conditions.append(f"({' OR '.join(or_conditions)})")

else:
# Regular field condition
field_condition = _process_field_condition(key, value, metadata_column)
conditions.append(field_condition)

if len(conditions) == 1:
return conditions[0]
else:
# Multiple conditions at same level are implicitly AND
return f"({' AND '.join(conditions)})"


def _process_field_condition(
field_name: str, condition: Any, metadata_column: str
) -> str:
"""Process a single field condition."""

# Handle simple equality (shorthand syntax)
if not isinstance(condition, dict):
return _create_json_condition(field_name, "$eq", condition, metadata_column)

# Handle operator-based conditions
conditions = []
for operator, value in condition.items():
sql_condition = _create_json_condition(
field_name, operator, value, metadata_column
)
conditions.append(sql_condition)

if len(conditions) == 1:
return conditions[0]
else:
# Multiple operators on same field are implicitly AND
return f"({' AND '.join(conditions)})"


def _create_json_condition(
field_name: str, operator: str, value: Any, metadata_column: str
) -> str:
"""Create a single JSON condition using Yellowbrick JSON path syntax."""

# Escape field name for JSON path if it contains special characters
escaped_field = _escape_json_field_name(field_name)
json_path = f"{metadata_column}:$.{escaped_field}"

# Determine the cast type for the field
cast_type = (
f"::{_get_cast_type(value)}" if not isinstance(value, bool) else "::BOOLEAN"
)

if operator == "$eq":
return f"{json_path}{cast_type} = {_format_sql_value(value)}"

elif operator == "$ne":
return f"{json_path}{cast_type} != {_format_sql_value(value)}"

elif operator == "$gt":
return f"{json_path}{cast_type} > {_format_sql_value(value)}"

elif operator == "$gte":
return f"{json_path}{cast_type} >= {_format_sql_value(value)}"

elif operator == "$lt":
return f"{json_path}{cast_type} < {_format_sql_value(value)}"

elif operator == "$lte":
return f"{json_path}{cast_type} <= {_format_sql_value(value)}"

elif operator == "$in":
if not isinstance(value, list):
raise ValueError(f"$in operator requires a list, got {type(value)}")

# Use Yellowbrick's supported IN syntax
formatted_values = ", ".join(_format_sql_value(v) for v in value)
return f"{json_path}{cast_type} IN ({formatted_values})"

elif operator == "$nin":
if not isinstance(value, list):
raise ValueError(f"$nin operator requires a list, got {type(value)}")

# For NOT IN operations, convert to AND of != conditions
nin_conditions = [
f"{json_path}{cast_type} != {_format_sql_value(v)}" for v in value
]
return f"({' AND '.join(nin_conditions)})"

elif operator == "$exists":
base_json_path = f"{metadata_column}:$.{escaped_field}"
if value:
return f"{base_json_path} NULL ON ERROR IS NOT NULL"
else:
return f"{base_json_path} NULL ON ERROR IS NULL"

else:
raise ValueError(f"Unsupported operator: {operator}")


def _escape_json_field_name(field_name: str) -> str:
"""
Escape field names for JSON path expressions in Yellowbrick.
Uses bracket notation for fields with special characters.
"""
# Check if field name contains special characters that need bracket notation
special_chars = [
".",
" ",
"'",
'"',
"[",
"]",
":",
"-",
"+",
"*",
"/",
"\\",
"(",
")",
"{",
"}",
]

if any(char in field_name for char in special_chars):
# Use bracket notation and escape quotes
escaped = field_name.replace("'", "''")
return f"['{escaped}']"
else:
# Use dot notation for simple field names
return field_name


def _get_cast_type(value: Any) -> str:
"""Determine the appropriate SQL cast type based on Python value type."""
if isinstance(value, int):
return "INTEGER"
elif isinstance(value, float):
return "DOUBLE PRECISION"
elif isinstance(value, bool):
return "BOOLEAN"
elif isinstance(value, str):
return "TEXT"
else:
return "TEXT" # Default to TEXT for other types


def _format_sql_value(value: Any) -> str:
"""Format a Python value for SQL."""
if value is None:
return "NULL"
elif isinstance(value, bool):
return "true" if value else "false"
elif isinstance(value, (int, float)):
return str(value)
elif isinstance(value, str):
# Escape single quotes by doubling them
escaped = value.replace("'", "''")
return f"'{escaped}'"
else:
# For other types, convert to JSON string
return f"'{json.dumps(value)}'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper functions _process_filter_dict, _process_field_condition, _create_json_condition, _escape_json_field_name, _get_cast_type, and _format_sql_value lack Google-style docstrings with Args sections. While jsonFilter2sqlWhere has proper documentation, all functions should include comprehensive docstrings with Args sections to maintain consistent code documentation standards.

Spotted by Diamond (based on custom rule: Code quality)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 553 to 558
whereClause = "1=1"
if kwargs.get("filter") is not None:
filter_value = kwargs.get("filter")
if filter_value is not None:
jsFilter = json.loads(filter_value)
whereClause = jsonFilter2sqlWhere(jsFilter, "v3.metadata")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable naming violates Python conventions and logic contains redundant checks. The variables whereClause, filter_value, and jsFilter should use snake_case naming (where_clause, filter_dict). The nested null checks for filter_value are redundant since the outer condition already verifies kwargs.get("filter") is not None. Simplify the logic flow and adopt consistent naming conventions to improve code readability and maintainability.

Suggested change
whereClause = "1=1"
if kwargs.get("filter") is not None:
filter_value = kwargs.get("filter")
if filter_value is not None:
jsFilter = json.loads(filter_value)
whereClause = jsonFilter2sqlWhere(jsFilter, "v3.metadata")
whereClause = "1=1"
if kwargs.get("filter") is not None:
filter_value = kwargs.get("filter")
jsFilter = json.loads(filter_value)
whereClause = jsonFilter2sqlWhere(jsFilter, "v3.metadata")

Spotted by Diamond (based on custom rule: Code quality)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 685 to 725
'''
centroid_id = sql.Literal(
self._find_centroid(cursor, "tmp_" + self._table)
)
sql_query = sql.SQL(
"""
SELECT
text,
metadata,
score
FROM
(SELECT
v5.doc_id doc_id,
SUM(v1.embedding * v5.embedding) /
(SQRT(SUM(v1.embedding * v1.embedding)) *
SQRT(SUM(v5.embedding * v5.embedding))) AS score
FROM
{tmp_embedding} v1
INNER JOIN
{ivf_index_table} v5
ON v1.embedding_id = v5.embedding_id
WHERE v5.id = {centroid_id}
GROUP BY v5.doc_id
ORDER BY score DESC LIMIT %s
) v4
INNER JOIN
{content_table} v3
ON v4.doc_id = v3.doc_id
where {whereClause}
ORDER BY score DESC
"""
).format(
content_table=content_table,
ivf_index_table=ivf_index_table,
centroid_id=centroid_id,
tmp_embedding=tmp_embedding,
whereClause=sql.SQL(whereClause)
)
cursor.execute(sql_query, (k,))
results = cursor.fetchall()
'''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the commented-out code block (lines 685-725) containing an alternative IVF search implementation. Commented-out code creates maintenance overhead and reduces code clarity. If this implementation is needed for future reference, preserve it in version control history or project documentation instead of the production codebase.

Spotted by Diamond (based on custom rule: Code quality)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@MichaelASuarez
Copy link
Author

Hi! I've pushed the requested changes. The workflow is awaiting approval to run the CI checks. Ready for review when you have time. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant