Skip to content

feat(core): add support for filtering measurements in sample store #1029

Description

@AlessandroPomponio

Plan: Add Arbitrary Filter Support to SQLSampleStore Methods

Issue: #1029 Depends on: Issue for refactoring
simulate_json_contains_on_sqlite() (to be created)

Overview

Add database-level filtering capabilities to SQLSampleStore methods that deal
with measurement requests and results. This enhancement will enable efficient,
database-side filtering of measurement data, improving performance and enabling
more sophisticated query capabilities for CLI commands and API consumers.

Current State Analysis

Existing Infrastructure

The codebase already has filtering infrastructure used by the metastore:

Database Schema

From _create_source_table():

measurement_requests table:

  • uid (CHAR(36), primary key)
  • experiment_reference (Text)
  • operation_id (String(256))
  • request_index (Integer)
  • request_id (String(256))
  • type (String(256))
  • status (String(256))
  • metadata (JSON)
  • timestamp (DateTime with timezone)

measurement_results table:

  • uid (CHAR(36), primary key)
  • entity_id (Text)
  • data (JSON - contains full MeasurementResult)

measurement_requests_results junction table:

  • request_uid (CHAR(36), foreign key)
  • result_uid (CHAR(36), foreign key)
  • entity_index (Integer)

Target Methods for Enhancement

The following methods in
orchestrator/core/samplestore/sql.py
will be enhanced:

  1. measurement_requests_for_operation() -
    Fetch requests for an operation
  2. measurement_results_for_operation() -
    Fetch results for an operation
  3. complete_measurement_request_with_results_timeseries() -
    Fetch requests with results

Note: measurement_request_by_id()
does not need filtering as it fetches by specific ID.

Prerequisites

Before implementing this issue, the following refactoring must be completed in a
separate issue:

Refactor simulate_json_contains_on_sqlite() to be Generic

Current function
(orchestrator/metastore/sql/statements.py:13):

def simulate_json_contains_on_sqlite(path: str, candidate: str) -> str:

Required refactored signature:

def simulate_json_contains_on_sqlite(
    path: str,
    candidate: str,
    table_name: str = "resources",
    json_column: str = "data",
    id_column: str = "identifier",
) -> str:

Changes needed:

  1. Add optional parameters with defaults for backward compatibility
  2. Replace hardcoded resources r with {table_name} t
  3. Replace hardcoded r.data with t.{json_column}
  4. Replace hardcoded r.identifier with t.{id_column}
  5. Update
    check_field_in_sqlite_json_document()
    to accept id_column parameter
  6. Add tests for new functionality and backward compatibility

Why separate issue: This is a foundational refactoring that affects the
metastore and should be tested independently before being used for samplestore
filtering.


Implementation Plan

Phase 1: Core Filter Infrastructure

1.1 Extend Existing SQL Statement Utilities

File:
orchestrator/metastore/sql/statements.py

Add new functions following the existing pattern in this file:

def measurement_request_filter_by_arbitrary_selection(
    path: str,
    candidate: str,
    table_name: str,
    needs_where: bool = False,
    dialect: Literal["mysql", "sqlite"] = "mysql",
) -> str:
    """
    Build WHERE clause fragment for filtering measurement requests.

    Similar to resource_filter_by_arbitrary_selection but adapted for
    measurement_requests table structure with direct columns and JSON fields.

    Args:
        path: JSON path (e.g., "$.status", "$.metadata.key")
        candidate: JSON-encoded value to match
        table_name: Name of the measurement_requests table
        needs_where: If True, prefix with WHERE; otherwise prefix with AND
        dialect: Database dialect (mysql or sqlite)

    Returns:
        SQL WHERE clause fragment
    """

Implementation Strategy:

  • Check if path maps to a direct column (status, request_id, etc.)
  • If yes, generate simple column comparison: table.column = value
  • If no, determine which JSON column to query (metadata, experiment_reference)
  • Use JSON_CONTAINS (MySQL) or call simulate_json_contains_on_sqlite with
    appropriate table/column parameters (SQLite)
def measurement_result_filter_by_arbitrary_selection(
    path: str,
    candidate: str,
    table_name: str,
    needs_where: bool = False,
    dialect: Literal["mysql", "sqlite"] = "mysql",
) -> str:
    """
    Build WHERE clause fragment for filtering measurement results.

    Similar to resource_filter_by_arbitrary_selection but adapted for
    measurement_results table structure (uid, entity_id, data JSON column).

    Args:
        path: JSON path (e.g., "$.uid", "$.entityIdentifier", "$.metadata.key")
        candidate: JSON-encoded value to match
        table_name: Name of the measurement_results table
        needs_where: If True, prefix with WHERE; otherwise prefix with AND
        dialect: Database dialect (mysql or sqlite)

    Returns:
        SQL WHERE clause fragment
    """

Implementation Strategy:

  • Check if path maps to direct column (uid, entity_id)
  • If yes, generate simple column comparison
  • If no, query the data JSON column using appropriate JSON path
  • Call simulate_json_contains_on_sqlite with table_name, json_column="data",
    id_column="uid"

1.2 Column Mapping Constants

Add to statements.py:

# Direct column mappings for measurement requests
MEASUREMENT_REQUEST_COLUMN_MAPPINGS = {
    "$.requestid": "request_id",
    "$.requestIndex": "request_index",
    "$.status": "status",
    "$.operation_id": "operation_id",
    "$.timestamp": "timestamp",
    "$.type": "type",
}

# Direct column mappings for measurement results
MEASUREMENT_RESULT_COLUMN_MAPPINGS = {
    "$.uid": "uid",
    "$.entityIdentifier": "entity_id",
}

# JSON column mappings for measurement requests
MEASUREMENT_REQUEST_JSON_COLUMNS = {
    "$.metadata": "metadata",
    "$.experimentReference": "experiment_reference",
}

Phase 2: Update SQLSampleStore Methods

2.1 Update measurement_requests_for_operation()

Current signature (line 1460):

def measurement_requests_for_operation(
    self, operation_id: str
) -> list[MeasurementRequest]:

New signature:

def measurement_requests_for_operation(
    self,
    operation_id: str,
    filters: list[dict[str, str]] | None = None,
) -> list[MeasurementRequest]:
    """
    Fetch measurement requests for an operation, optionally filtered.

    Args:
        operation_id: The operation identifier
        filters: Optional DB-level filters from prepare_query_filters_for_db()
                 Format: [{"$.path": "value"}, ...]
                 Supports filtering on request fields (status, requestIndex, etc.)
                 and nested JSON fields (metadata.*, experimentReference.*)

    Returns:
        List of MeasurementRequest objects matching the filters
    """

Implementation approach:

# Build base query
query = f"""
    SELECT uid, experiment_reference, operation_id, request_index,
           request_id, type, status, metadata, timestamp
    FROM {self._tablename}_measurement_requests
    WHERE operation_id = :operation_id
"""

# Add filters if provided
if filters:
    from orchestrator.metastore.sql.statements import (
        measurement_request_filter_by_arbitrary_selection
    )

    for filter_dict in filters:
        for path, candidate in filter_dict.items():
            filter_clause = measurement_request_filter_by_arbitrary_selection(
                path=path,
                candidate=candidate,
                table_name=f"{self._tablename}_measurement_requests",
                needs_where=False,
                dialect=self.engine.dialect.name
            )
            query += filter_clause

query += " ORDER BY request_index"

# Execute query and convert to MeasurementRequest objects
# (existing cursor-to-pydantic conversion logic)

2.2 Update measurement_results_for_operation()

Current signature (line 1519):

def measurement_results_for_operation(
    self, operation_id: str
) -> list[MeasurementResult]:

New signature:

def measurement_results_for_operation(
    self,
    operation_id: str,
    filters: list[dict[str, str]] | None = None,
) -> list[MeasurementResult]:
    """
    Fetch measurement results for an operation, optionally filtered.

    Args:
        operation_id: The operation identifier
        filters: Optional DB-level filters from prepare_query_filters_for_db()
                 Format: [{"$.path": "value"}, ...]
                 Supports filtering on result fields (uid, entityIdentifier, etc.)
                 and nested JSON fields in the data column

    Returns:
        List of MeasurementResult objects matching the filters
    """

Implementation approach:

query = f"""
    SELECT DISTINCT mr.uid, mr.entity_id, mr.data
    FROM {self._tablename}_measurement_results mr
    JOIN {self._tablename}_measurement_requests_results mrr ON mr.uid = mrr.result_uid
    JOIN {self._tablename}_measurement_requests req ON mrr.request_uid = req.uid
    WHERE req.operation_id = :operation_id
"""

if filters:
    from orchestrator.metastore.sql.statements import (
        measurement_result_filter_by_arbitrary_selection
    )

    for filter_dict in filters:
        for path, candidate in filter_dict.items():
            filter_clause = measurement_result_filter_by_arbitrary_selection(
                path=path,
                candidate=candidate,
                table_name=f"{self._tablename}_measurement_results",
                needs_where=False,
                dialect=self.engine.dialect.name
            )
            query += filter_clause

# Execute query and convert to MeasurementResult objects
# (existing conversion logic)

2.3 Update complete_measurement_request_with_results_timeseries()

Current signature (line 1702):

def complete_measurement_request_with_results_timeseries(
    self,
    operation_id: str,
    entity_identifiers: list[str] | None = None,
    experiment_identifiers: list[str] | None = None,
    property_identifiers: list[str] | None = None,
) -> pd.DataFrame:

New signature:

def complete_measurement_request_with_results_timeseries(
    self,
    operation_id: str,
    entity_identifiers: list[str] | None = None,
    experiment_identifiers: list[str] | None = None,
    property_identifiers: list[str] | None = None,
    filters: list[dict[str, str]] | None = None,
) -> pd.DataFrame:
    """
    ... existing docstring ...

    Args:
        ... existing args ...
        filters: Optional DB-level filters from prepare_query_filters_for_db()
                 Can filter on both request and result fields
    """

Implementation: Apply filters in the complex JOIN query that combines
requests and results.

Phase 3: Filter Path Mapping Strategy

3.1 Direct Column Mappings

For performance, map common JSON paths directly to table columns:

  • Avoids JSON parsing overhead
  • Uses indexed columns where available
  • Falls back to JSON queries for unmapped paths

3.2 JSON Field Handling

For paths not in direct mappings:

  • Use existing resource_filter_by_arbitrary_selection() from metastore
  • Handles both MySQL JSON_CONTAINS and SQLite simulation
  • Supports nested path queries

3.3 Special Cases

Nested result filters (e.g., $.measurements[0].entityIdentifier):

  • When filtering requests by result properties, caller must add JOIN to results
    table
  • Extract the nested path and apply to results table
  • Return distinct requests to avoid duplicates

Timestamp comparisons:

  • Support ISO 8601 string comparisons
  • Consider adding operators (>, <, >=, <=) in future enhancement

Phase 4: Testing Strategy

4.1 Unit Tests

File: tests/metastore/sql/test_measurement_filters.py (new)

Test the filter building functions in isolation:

def test_measurement_request_filter_direct_columns():
    """Test direct column mapping for common fields."""

def test_measurement_request_filter_json_fields():
    """Test JSON field queries for metadata and experimentReference."""

def test_measurement_request_filter_multiple():
    """Test combining multiple filters with AND logic."""

def test_measurement_request_filter_mysql_vs_sqlite():
    """Test dialect-specific SQL generation."""

def test_measurement_result_filter_direct_columns():
    """Test direct column mapping for result fields."""

def test_measurement_result_filter_json_data():
    """Test JSON queries on the data column."""

def test_measurement_filter_empty_or_none():
    """Test handling of empty or None filter lists."""

def test_measurement_filter_invalid_paths():
    """Test handling of invalid JSON paths."""

def test_simulate_json_contains_generic():
    """Test refactored simulate_json_contains_on_sqlite with custom table/columns."""

def test_simulate_json_contains_backward_compatibility():
    """Test that existing resource queries still work with default parameters."""

4.2 Integration Tests

File: tests/core/samplestore/test_sql_samplestore_filtering.py (new)

Test the full filtering workflow with actual database:

def test_filter_requests_by_status(sql_samplestore_with_data):
    """Filter requests by status field."""

def test_filter_requests_by_request_index(sql_samplestore_with_data):
    """Filter requests by requestIndex."""

def test_filter_requests_by_experiment(sql_samplestore_with_data):
    """Filter requests by experimentReference fields."""

def test_filter_requests_by_metadata(sql_samplestore_with_data):
    """Filter requests by metadata fields."""

def test_filter_results_by_entity(sql_samplestore_with_data):
    """Filter results by entityIdentifier."""

def test_filter_results_by_validity(sql_samplestore_with_data):
    """Filter results by valid/invalid state."""

def test_filter_multiple_conditions(sql_samplestore_with_data):
    """Test multiple filters combined with AND logic."""

def test_filter_performance_large_dataset(sql_samplestore_with_large_data):
    """Verify filtering performance on large datasets."""

def test_filter_sqlite_and_mysql_consistency(sql_samplestore_both_dialects):
    """Ensure filters produce same results on both dialects."""

4.3 Test Fixtures

Create fixtures in tests/fixtures/samplestore/:

@pytest.fixture
def sql_samplestore_with_filtered_data():
    """
    Create a samplestore with diverse data for filter testing:
    - Multiple operations
    - Various request statuses
    - Different experiment references
    - Rich metadata fields
    - Valid and invalid results
    """

Phase 5: Update Base Class

5.1 Update ActiveSampleStore Abstract Methods

File:
orchestrator/core/samplestore/base.py

Add filters parameter to abstract method signatures:

@abc.abstractmethod
def measurement_requests_for_operation(
    self,
    operation_id: str,
    filters: list[dict[str, str]] | None = None,
) -> list[MeasurementRequest]:
    """
    Fetch measurement requests for an operation, optionally filtered.

    Args:
        operation_id: The operation identifier
        filters: Optional filters in prepare_query_filters_for_db() format

    Returns:
        List of MeasurementRequest objects
    """

@abc.abstractmethod
def measurement_results_for_operation(
    self,
    operation_id: str,
    filters: list[dict[str, str]] | None = None,
) -> list[MeasurementResult]:
    """
    Fetch measurement results for an operation, optionally filtered.

    Args:
        operation_id: The operation identifier
        filters: Optional filters in prepare_query_filters_for_db() format

    Returns:
        List of MeasurementResult objects
    """

5.2 Update Other Implementations

Ensure other samplestore implementations handle the new parameter:

CSV SampleStore: Can ignore filters (return all data) or implement in-memory
filtering Mock SampleStore: Can ignore filters or implement simple filtering
for tests

Implementation Order

Step 1: Add Measurement Filter Functions

  • Add functions to
    orchestrator/metastore/sql/statements.py
  • Implement measurement_request_filter_by_arbitrary_selection()
  • Implement measurement_result_filter_by_arbitrary_selection()
  • Define column mapping constants
  • Write unit tests in tests/metastore/sql/test_measurement_filters.py

Step 2: Update measurement_requests_for_operation()

  • Add filters parameter to method signature
  • Integrate filter building into SQL query
  • Update existing tests to verify backward compatibility
  • Add new tests for filtering behavior

Step 3: Update measurement_results_for_operation()

  • Add filters parameter to method signature
  • Handle JOIN for request filters if needed
  • Update existing tests
  • Add new tests for result filtering

Step 4: Update Base Class

  • Update abstract method signatures in ActiveSampleStore
  • Update CSV and other implementations to accept (and optionally ignore) filters
  • Verify all implementations compile and tests pass

Step 5: Integration Testing

  • Create comprehensive integration test suite
  • Test both MySQL and SQLite dialects
  • Verify performance with realistic datasets
  • Test edge cases and error handling

Step 6: Update complete_measurement_request_with_results_timeseries()

  • Add filter support to complex timeseries query
  • Test with various filter combinations
  • Verify performance impact

Key Design Decisions

1. Filter Format

Use existing prepare_query_filters_for_db() format: [{"$.path": "value"}]

  • Rationale: Consistent with metastore filtering, already handles JSON
    encoding, CLI integration ready
  • Trade-off: Requires understanding of JSON path syntax, but this is already
    documented

2. Parameter Naming

Use filters instead of field_selectors:

  • Rationale: More generic and intuitive, aligns with CLI --filter flag
  • Future: Can standardize across codebase (metastore, etc.) in separate
    refactoring

3. Direct Column vs JSON Queries

  • Strategy: Map common paths to direct column comparisons (faster), fall
    back to JSON queries for nested/metadata fields
  • Rationale: Performance optimization for common cases while maintaining
    flexibility
  • Documentation: Document performance implications in method docstrings

4. Result-Level Filters on Requests

When filtering requests by result properties:

  • Strategy: Automatically add JOIN to results table, apply filter on joined
    results, return distinct requests
  • Rationale: Enables powerful cross-table filtering without exposing JOIN
    complexity to caller
  • Trade-off: May have performance impact on large datasets, but provides
    necessary functionality

5. Backward Compatibility

  • Strategy: Make filters optional (default None), existing code
    continues to work unchanged
  • Rationale: No breaking changes to API, gradual adoption possible
  • Migration: Existing callers can add filtering incrementally

6. Error Handling

  • Strategy: Invalid filter paths are logged and ignored (don't fail the
    query)
  • Rationale: Graceful degradation, allows partial filtering even if some
    paths are invalid
  • Alternative: Could raise exceptions for invalid paths, but this is more
    brittle

Success Criteria

  • ✅ All target methods accept filters parameter
  • ✅ Filters work correctly on both MySQL and SQLite
  • ✅ Direct column filters use efficient SQL comparisons
  • ✅ JSON field filters use appropriate JSON functions
  • ✅ Result-level filters properly JOIN tables when needed
  • ✅ Multiple filters combine with AND logic
  • ✅ Unit test coverage >90% for filter utilities
  • ✅ Integration tests pass on both dialects
  • ✅ Performance acceptable for typical datasets (no significant regression)
  • ✅ Backward compatibility maintained (existing code works unchanged)
  • ✅ Documentation updated with filter examples

Future Enhancements

Operator Support

Currently filters only support equality. Future enhancements could add:

  • Comparison operators: >, <, >=, <=, !=
  • Pattern matching: LIKE, REGEX
  • Range queries: BETWEEN
  • Set membership: IN

OR Logic

Currently filters are combined with AND. Could add support for OR logic:

  • Syntax: [{"$.status": ["Success", "Failed"]}] for OR within a field
  • Complex expressions: Nested AND/OR combinations

Performance Optimization

  • Add indexes on commonly filtered columns
  • Query plan analysis and optimization
  • Caching for repeated filter patterns

Filter Validation

  • Validate filter paths against schema
  • Provide helpful error messages for invalid paths
  • Auto-complete suggestions for valid paths

Documentation Updates

After implementation, update:

  • Method docstrings with filter examples
  • User guide with filtering patterns
  • API reference with filter format specification
  • Performance considerations for different filter types

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions