Source code for neosqlite.collection.datetime_query_processor

"""
Datetime query processor for NeoSQLite with three-tier fallback mechanism.

This module provides a three-tier approach for handling datetime queries:
1. SQL tier: Direct SQL processing with json_* functions
2. Temp table tier: Temporary table approach for complex queries
3. Python tier: Pure Python processing as fallback

The SQL and temp table queries always use json_* functions not jsonb_* because
we need to compare the datetime string, querying with jsonb_* will get byte instead of string.
This extends the existing text search functionality to also use only json_* functions.
"""

from __future__ import annotations

import hashlib
import logging
import uuid
from typing import Any

from ..sql_utils import quote_table_name

logger = logging.getLogger(__name__)
from .jsonb_support import supports_jsonb
from .query_helper import QueryHelper
from .sql_translator_unified import SQLFieldAccessor, SQLTranslator
from .temporary_table_aggregation import (
    aggregation_pipeline_context,
)


[docs] class DateTimeQueryProcessor: """ Process datetime queries using a three-tier fallback mechanism. The three-tier approach: 1. SQL tier: Direct SQL processing with json_* functions 2. Temp table tier: Temporary table approach for complex queries 3. Python tier: Pure Python processing as fallback The SQL and temp table queries always use json_* functions not jsonb_* because we need to compare the datetime string, querying with jsonb_* will get byte instead of string. """
[docs] def __init__( self, collection, query_engine=None, use_global_kill_switch=False ): """ Initialize the DateTimeQueryProcessor with a collection. Args: collection: The NeoSQLite collection to process datetime queries on query_engine: Optional QueryEngine instance for accessing helpers use_global_kill_switch: If True, use the global kill switch; if False, use local kill switch only """ self.collection = collection self.db = collection.db self.query_engine = query_engine self.helpers = ( query_engine.helpers if query_engine else QueryHelper(collection) ) self.sql_translator = SQLTranslator(collection.name, "data", "id") # Check if JSONB is supported for this connection self._jsonb_supported = supports_jsonb(self.db) # Configuration for kill switch behavior - DEFAULT TO LOCAL for better isolation self._use_global_kill_switch = use_global_kill_switch # Local kill switch state (isolated to this instance) self._local_kill_switch = False
[docs] def set_kill_switch(self, enabled: bool): """ Set the kill switch to force fallback to Python implementation. Behavior depends on initialization setting: - If use_global_kill_switch=True: Sets the global kill switch (affects entire app) - If use_global_kill_switch=False: Sets local kill switch (affects only this instance) Args: enabled: If True, forces fallback to Python implementation """ if self._use_global_kill_switch: # Import locally to avoid circular imports and minimize global state touch from .query_helper import set_force_fallback set_force_fallback(enabled) else: self._local_kill_switch = enabled
[docs] def is_kill_switch_enabled(self) -> bool: """ Check if the kill switch is enabled. Behavior depends on initialization setting: - If use_global_kill_switch=True: Checks global kill switch - If use_global_kill_switch=False: Checks local kill switch Returns: True if kill switch is enabled, False otherwise """ if self._use_global_kill_switch: # Import locally to avoid circular imports and minimize global state touch from .query_helper import get_force_fallback return get_force_fallback() else: return self._local_kill_switch
[docs] def process_datetime_query( self, query: dict[str, Any], use_kill_switch: bool | None = None ) -> list[dict[str, Any]]: """ Process a datetime query using the three-tier fallback mechanism. Args: query: MongoDB-style query dictionary containing datetime operations use_kill_switch: Optional override for kill switch setting Returns: List of matching documents """ # Check if kill switch is enabled (configured setting or parameter override) force_python = ( use_kill_switch if use_kill_switch is not None else self.is_kill_switch_enabled() ) # First, check if query contains datetime operations if not self._contains_datetime_operations(query): # If not datetime query, return empty list return [] # Try SQL tier first if not force_python: try: result = self._process_with_sql_tier(query) if result is not None: return result except Exception as e: # If SQL/temp table tier fails, fall through to next tier logger.debug(f"{e=}") pass # Try temporary table tier if not force_python: try: result = self._process_with_temp_table_tier(query) if result is not None: return result except Exception as e: # If temp table tier fails, fall through to Python tier logger.debug( f"Temp table tier failed in datetime processor: {e}" ) pass # Fallback to Python tier return self._process_with_python_tier(query)
[docs] def _contains_datetime_operations(self, query: dict[str, Any]) -> bool: """ Check if a query contains datetime operations. Args: query: MongoDB-style query dictionary Returns: True if query contains datetime operations, False otherwise """ for field, value in query.items(): if field in ("$and", "$or", "$nor"): if isinstance(value, list): for condition in value: if isinstance( condition, dict ) and self._contains_datetime_operations(condition): return True elif field == "$not": if isinstance( value, dict ) and self._contains_datetime_operations(value): return True elif isinstance(value, dict): # Check for datetime-related operators for operator, op_value in value.items(): if operator in ("$gte", "$gt", "$lte", "$lt", "$eq", "$ne"): # Check if the value is a datetime object or datetime string if self._is_datetime_value(op_value): return True elif operator in ("$in", "$nin"): # For $in and $nin, check if any value in the list is a datetime if isinstance(op_value, list): if any( self._is_datetime_value(item) for item in op_value ): return True elif operator == "$type": # Check if looking for date type if op_value in ( 9, "date", "Date", ): # 9 is date type in MongoDB return True elif operator == "$regex": # Check if it's a datetime regex pattern if self._is_datetime_regex(op_value): return True return False
[docs] def _is_datetime_value(self, value: Any) -> bool: """ Check if a value is a datetime object or datetime string. Args: value: Value to check Returns: True if value is datetime-related, False otherwise """ from .datetime_utils import is_datetime_value return is_datetime_value(value)
[docs] def _is_datetime_regex(self, pattern: str) -> bool: """ Check if a pattern is likely to be datetime-related. Args: pattern: Pattern string (could be a regex pattern or a datetime string) Returns: True if pattern is likely datetime-related, False otherwise """ from .datetime_utils import is_datetime_regex return is_datetime_regex(pattern)
[docs] def _process_with_sql_tier( self, query: dict[str, Any] ) -> list[dict[str, Any]] | None: """ Process datetime query using SQL tier with json_* functions. Args: query: MongoDB-style query dictionary Returns: List of matching documents if successful, None otherwise """ # Check if we should force Python fallback using the configured setting if self.is_kill_switch_enabled(): return None # Create a custom SQLFieldAccessor that ensures json_* functions are used # for datetime queries as required (as done for $text FTS queries) field_accessor = SQLFieldAccessor( data_column="data", id_column="id", jsonb_supported=False, # Force use of json_* functions for datetime ) # Create a custom translator with the field accessor that uses json_* functions from .sql_translator_unified import ( SQLClauseBuilder, SQLOperatorTranslator, ) operator_translator = SQLOperatorTranslator(field_accessor) clause_builder = SQLClauseBuilder(field_accessor, operator_translator) # Translate the match query to WHERE clause using json_* functions only where_clause, params = clause_builder.build_where_clause( query, query_param=query ) # If translation failed, return None to trigger fallback if where_clause is None: return None # Build the SQL query using json_* functions for datetime comparison cmd = f"SELECT id, data FROM {quote_table_name(self.collection.name)} {where_clause}" try: cursor = self.db.execute(cmd, params) results = [ self.collection._load(row[0], row[1]) for row in cursor.fetchall() ] return results except Exception as e: # If SQL execution fails, return None to trigger fallback logger.debug(f"SQL execution failed in datetime processor: {e}") return None
[docs] def _process_with_temp_table_tier( self, query: dict[str, Any] ) -> list[dict[str, Any]] | None: """ Process datetime query using temporary table approach. Args: query: MongoDB-style query dictionary Returns: List of matching documents if successful, None otherwise """ # Check if we should force Python fallback using the configured setting if self.is_kill_switch_enabled(): return None # Use a more robust approach with temporary table for complex datetime queries try: # Create a unique pipeline ID for this operation query_str = str(sorted(query.items())) pipeline_id = f"datetime_{hashlib.sha256(query_str.encode()).hexdigest()[:8]}_{uuid.uuid4().hex[:4]}" with aggregation_pipeline_context( self.db, pipeline_id ) as create_temp: # Create base temp table with all documents base_stage = {"_base": True} temp_table = create_temp( base_stage, f"SELECT id, data FROM {quote_table_name(self.collection.name)}", ) # To ensure we use json_* functions for datetime queries, # we need to create a custom clause builder that forces json_* usage from .sql_translator_unified import ( SQLClauseBuilder, SQLFieldAccessor, SQLOperatorTranslator, ) field_accessor = SQLFieldAccessor( data_column="data", id_column="id", jsonb_supported=False, # Force use of json_* functions ) operator_translator = SQLOperatorTranslator(field_accessor) clause_builder = SQLClauseBuilder( field_accessor, operator_translator ) # Translate the match query to WHERE clause using json_* functions only where_clause, params = clause_builder.build_where_clause( query, context="temp_table", query_param=query ) if where_clause is None: # If we can't translate the query, return None to trigger Python fallback return None # Create filtered temporary table using json_* functions filtered_stage = {"$datetime_filter": query} result_table = create_temp( filtered_stage, f"SELECT * FROM {temp_table} {where_clause}", params, ) # Retrieve results from the filtered table cursor = self.db.execute(f"SELECT id, data FROM {result_table}") results = [ self.collection._load(row[0], row[1]) for row in cursor.fetchall() ] return results except Exception as e: # If temp table processing fails, return None to trigger Python fallback logger.debug( f"Temp table processing failed in datetime processor: {e}" ) return None
[docs] def _process_with_python_tier( self, query: dict[str, Any] ) -> list[dict[str, Any]]: """ Process datetime query using pure Python implementation. Args: query: MongoDB-style query dictionary Returns: List of matching documents """ # Fetch all documents and apply the query using Python all_docs = list(self.collection.find({})) results = [] for doc in all_docs: if self.helpers._apply_query(query, doc): results.append(doc) return results
[docs] class EnhancedDateTimeQueryProcessor(DateTimeQueryProcessor): """ Enhanced datetime query processor with additional datetime-specific query operators. """
[docs] def __init__(self, collection, query_engine=None): """ Initialize the EnhancedDateTimeQueryProcessor with a collection. Args: collection: The NeoSQLite collection to process datetime queries on query_engine: Optional QueryEngine instance for accessing helpers """ super().__init__(collection, query_engine)
[docs] def _apply_datetime_query( self, query: dict[str, Any], document: dict[str, Any] ) -> bool: """ Apply datetime-specific query operations to a document. Args: query: MongoDB-style query dictionary with datetime operations document: Document to check against the query Returns: True if document matches query, False otherwise """ # This method extends the base functionality to handle complex datetime operations # For now, we'll use the base query helper but could add more sophisticated datetime handling return self.helpers._apply_query(query, document)
[docs] def process_complex_datetime_query( self, query: dict[str, Any], use_kill_switch: bool | None = None ) -> list[dict[str, Any]]: """ Process complex datetime queries with additional datetime-specific logic. Args: query: MongoDB-style query dictionary containing complex datetime operations use_kill_switch: Optional override for kill switch setting Returns: List of matching documents """ # Check if kill switch is enabled (configured setting or parameter override) force_python = ( use_kill_switch if use_kill_switch is not None else self.is_kill_switch_enabled() ) # For complex datetime queries, we might have nested conditions or date ranges if not self._contains_datetime_operations(query): # If not datetime query, return empty list or process normally return [] # Try SQL tier first if not force_python: try: result = self._process_with_sql_tier(query) if result is not None: return result except Exception as e: # If SQL/temp table tier fails, fall through to next tier logger.debug(f"{e=}") pass # Try temporary table tier if not force_python: try: result = self._process_with_temp_table_tier(query) if result is not None: return result except Exception as e: # If temp table tier fails, fall through to Python tier logger.debug( f"Temp table tier failed in datetime processor: {e}" ) pass # Fallback to enhanced Python tier return self._process_with_enhanced_python_tier(query)
[docs] def _process_with_enhanced_python_tier( self, query: dict[str, Any] ) -> list[dict[str, Any]]: """ Process datetime query using enhanced pure Python implementation. Args: query: MongoDB-style query dictionary Returns: List of matching documents """ # Fetch all documents and apply the enhanced datetime query using Python all_docs = list(self.collection.find({})) results = [] for doc in all_docs: if self._apply_datetime_query(query, doc): results.append(doc) return results