Source code for neosqlite.collection.query_helper

from typing import Any

# Import sqlite3 for type hints and potential direct usage
from ..._sqlite import sqlite3 as sqlite3  # noqa: F401
from ..jsonb_support import (
    _get_json_each_function,
    supports_jsonb,
    supports_jsonb_each,
)

# Import the centralized ID normalization function
from ..type_correction import normalize_id_query_for_db
from .aggregation import AggregationMixin

# Import mixin modules
from .crud_operations import CRUDOperationsMixin

# Import helper functions
from .helpers import (
    _get_integer_id_for_oid,
    _get_json_error_position,
    _validate_json_document,
)
from .query_builder import QueryBuilderMixin
from .query_optimizer import QueryOptimizerMixin
from .translation_cache import TranslationCache  # noqa: F401
from .update_operations import UpdateOperationsMixin

# Import utility functions
from .utils import (
    _convert_bytes_to_binary as _convert_bytes_to_binary,
)
from .utils import (
    _get_json_function as _get_json_function,
)
from .utils import (
    _get_json_function_prefix,
)
from .utils import (
    _is_numeric_value as _is_numeric_value,
)
from .utils import (
    _supports_returning_clause as _supports_returning_clause,
)
from .utils import (
    _validate_inc_mul_field_value as _validate_inc_mul_field_value,
)
from .utils import (
    get_force_fallback as get_force_fallback,
)
from .utils import (
    set_force_fallback as set_force_fallback,
)


[docs] class QueryHelper( CRUDOperationsMixin, UpdateOperationsMixin, QueryBuilderMixin, AggregationMixin, QueryOptimizerMixin, ): """ A helper class for the QueryEngine that provides methods for building queries, performing updates, and processing aggregation pipelines. This class contains the core logic for translating MongoDB-like queries and operations into SQL statements that can be executed against the SQLite database. It handles both simple operations that can be done directly with SQL JSON functions and complex operations that require Python-based processing. The class is composed of several mixins: - CRUDOperationsMixin: Insert, replace, delete operations - UpdateOperationsMixin: Update operations (SQL and Python-based) - QueryBuilderMixin: WHERE clause building and query application - AggregationMixin: Aggregation pipeline processing - QueryOptimizerMixin: Query optimization and cost estimation """
[docs] def __init__(self, collection): """ Initialize the QueryHelper with a collection. Args: collection: The collection instance this QueryHelper will operate on. """ self.collection = collection # Access debug flag from the database connection if available self.debug = ( getattr(collection.database, "debug", False) if hasattr(collection, "database") else False ) # Check if JSONB is supported for this connection self._jsonb_supported = supports_jsonb(collection.db) # Check if jsonb_each is supported (requires SQLite 3.51.0+) self._jsonb_each_supported = supports_jsonb_each(collection.db) # Cache the function prefix for performance self._json_function_prefix = _get_json_function_prefix( self._jsonb_supported ) # Cache the correct json_each function name self._json_each_function = _get_json_each_function( self._jsonb_supported, self._jsonb_each_supported ) # Initialize Tier-2 evaluator for complex $expr queries # Import here to avoid circular imports from ..expr_temp_table import TempTableExprEvaluator # Get cache size from database connection (defaults to 100) cache_size = ( getattr(collection.database, "_translation_cache_size", 100) if hasattr(collection, "database") else 100 ) self.tier2_evaluator = TempTableExprEvaluator( collection.db, data_column=( collection.query_engine._data_column if hasattr(collection, "query_engine") and hasattr(collection.query_engine, "_data_column") else "data" ), translation_cache_size=cache_size, )
[docs] def cleanup(self) -> None: """Clean up resources used by the QueryHelper.""" if hasattr(self, "tier2_evaluator"): self.tier2_evaluator.cleanup_temp_tables()
[docs] def _normalize_id_query(self, query: dict[str, Any]) -> dict[str, Any]: """ Normalize ID types in a query dictionary to correct common mismatches. This method delegates to the centralized normalize_id_query_for_db function to ensure consistent ID handling across all NeoSQLite components. Args: query: The query dictionary to process Returns: A new query dictionary with corrected ID types """ return normalize_id_query_for_db(query)
[docs] def _get_integer_id_for_oid(self, oid: Any) -> int: """ Get the integer ID for a given ObjectId or other ID type. Args: oid: The ID value (can be ObjectId, int, str, etc.) Returns: int: The integer ID from the database """ return _get_integer_id_for_oid(self.collection, oid)
[docs] def _validate_json_document(self, json_str: str) -> bool: """ Validate JSON document using SQLite's json_valid function. Args: json_str: The JSON string to validate Returns: bool: True if valid, False otherwise """ return _validate_json_document(self.collection.db, json_str)
[docs] def _get_json_error_position(self, json_str: str) -> int: """ Get position of JSON error using json_error_position(). Args: json_str: The JSON string to check Returns: int: Position of error, or -1 if valid/not supported """ return _get_json_error_position(self.collection.db, json_str)
[docs] def _build_expr_where_clause( self, query: dict[str, Any] ) -> tuple[str, list[Any], list[str]] | None: """ Build a SQL WHERE clause for $expr queries using the 3-tier approach. Also handles other query fields combined with $expr. Tier Selection Logic: - Tier 1 (Simple): Direct SQL WHERE with json_extract/jsonb_extract - Tier 2 (Complex): Temporary tables with pre-computed field extractions - Tier 3 (Fallback): Python evaluation for unsupported operations Args: query: Query dictionary containing $expr and potentially other fields Returns: Tuple of (SQL WHERE clause, parameters, tables) or None for Python fallback """ if "$expr" not in query: return None expr = query["$expr"] if not isinstance(expr, dict): return None # Import here to avoid circular imports from ..expr_evaluator import ExprEvaluator # Create evaluator instance for Tier 1 tier1_evaluator = ExprEvaluator( ( self.collection.query_engine._data_column if hasattr(self.collection.query_engine, "_data_column") else "data" ), self.collection.db, ) # Determine complexity tier based on expression analysis tier = self._analyze_expr_complexity(expr) # Check for force fallback (kill switch) force_python = get_force_fallback() # Tier selection with kill switch awareness if force_python or tier >= 3: # Tier 3: Python fallback (kill switch or too complex) return None elif tier == 2: # Tier 2: Try temporary tables approach # Get the main query from Tier 2 evaluator tier2_result = self.tier2_evaluator.evaluate( expr, self.collection.name, None, # Filter expr not used yet ) if tier2_result[0] is not None: # Success - return the full query with cleanup tables return tier2_result # If Tier 2 fails, fall back to Tier 1 sql_expr, params = tier1_evaluator.evaluate( expr, tier=1, force_python=False ) if sql_expr is None: return None # Build WHERE clause with other fields return self._combine_expr_with_other_fields( sql_expr, params, query, expr ) else: # Tier 1: Direct SQL evaluation sql_expr, params = tier1_evaluator.evaluate( expr, tier=1, force_python=False ) if sql_expr is None: # Python fallback - return None to force Python filtering return None # Build WHERE clause with other fields return self._combine_expr_with_other_fields( sql_expr, params, query, expr )
[docs] def _build_other_fields_clause( self, query: dict[str, Any], expr: dict[str, Any] ) -> tuple[str, list[Any]] | None: """Helper to build WHERE clause for non-$expr fields.""" where_parts = [] all_params = [] for field, value in query.items(): if field == "$expr": continue if field in ("$and", "$or", "$nor", "$not"): return None field_result = self._build_field_clause(field, value) if field_result is None: return None field_clause, field_params = field_result where_parts.append(field_clause) all_params.extend(field_params) if not where_parts: return "", [] return " AND ".join(where_parts), all_params
[docs] def _analyze_expr_complexity(self, expr: dict[str, Any]) -> int: """ Analyze expression complexity to determine appropriate tier. Complexity scoring: - Base expression: 1 point - Each nested operator: +1 point - Arithmetic operators: +1 point each - Conditional operators: +2 points each - Array operators: +2 points each - Type conversion: +1 point each Tier thresholds: - 1-2: Tier 1 (simple SQL WHERE) - 3-8: Tier 2 (temporary tables) - 9+: Tier 3 (Python fallback) Args: expr: The $expr expression Returns: int: Complexity score """ if not isinstance(expr, dict) or len(expr) != 1: return 0 score = 1 # Base score for operator, operands in expr.items(): # Recurse into operands for all operators if isinstance(operands, list): for op in operands: if isinstance(op, dict): score += self._analyze_expr_complexity(op) elif isinstance(operands, dict): score += self._analyze_expr_complexity(operands) # Add operator-specific complexity match operator: case "$add" | "$subtract" | "$multiply" | "$divide" | "$mod": score += 1 case "$cond" | "$switch": score += 2 case "$size" | "$in" | "$arrayElemAt" | "$first" | "$last": score += 2 case "$filter" | "$map" | "$reduce": # Array transformation operators are complex score += 3 case "$concat" | "$toLower" | "$toUpper" | "$substr": score += 1 case "$abs" | "$ceil" | "$floor" | "$round" | "$trunc": score += 1 case "$dateAdd" | "$dateSubtract" | "$dateDiff": score += 1 case "$regexFind" | "$regexFindAll": # Regex operations require Python evaluation score += 2 case "$cmp": score += 1 case "$ifNull" | "$type" | "$toString" | "$toInt": score += 1 # Comparison and logical operators don't add extra complexity # (their complexity comes from their operands which are already counted) return score
[docs] def _combine_expr_with_other_fields( self, sql_expr: str, params: list[Any], query: dict[str, Any], expr: dict[str, Any], ) -> tuple[str, list[Any], list[str]] | None: """ Combine $expr SQL with other query fields. Args: sql_expr: The $expr SQL expression params: SQL parameters query: Full query dictionary expr: The $expr expression Returns: Tuple of (WHERE clause, parameters, tables) or None for Python fallback """ # Build WHERE clause starting with $expr # MongoDB $expr truthiness: NOT (null, 0, false, undefined). # Handle both integer booleans (0/1) and JSON booleans (json('true')/json('false')). if "json('true')" in sql_expr or 'json("true")' in sql_expr: # Expression returns JSON booleans - compare to json('true') truthy_expr = f"({sql_expr} = json('true'))" else: # Expression returns integers - use != 0 truthy_expr = f"COALESCE(({sql_expr}), 0) != 0" where_parts = [f"({truthy_expr})"] all_params: list[Any] = list(params) # Process other query fields (excluding $expr) for field, value in query.items(): if field == "$expr": continue # Handle logical operators - fall back to Python if field in ("$and", "$or", "$nor", "$not"): return None # Build clause for regular fields field_result = self._build_field_clause(field, value) if field_result is None: # If any field can't be handled in SQL, fall back to Python return None field_clause, field_params = field_result where_parts.append(field_clause) all_params.extend(field_params) return f"WHERE {' AND '.join(where_parts)}", all_params, []