Source code for neosqlite.collection.cursor

from __future__ import annotations

import logging
import time
from collections.abc import Callable, Iterable, Iterator
from copy import deepcopy
from functools import partial
from typing import TYPE_CHECKING, Any

from .json_path_utils import parse_json_path
from .jsonb_support import _get_json_function_prefix, json_data_column
from .type_utils import validate_session

if TYPE_CHECKING:
    from ..client_session import ClientSession
    from . import Collection

logger = logging.getLogger(__name__)

ASCENDING = 1
DESCENDING = -1


[docs] class Cursor: """ Class representing a cursor for iterating over documents in a collection with applied filters, projections, sorting, and pagination. """
[docs] def __init__( self, collection: Collection, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, tables_to_cleanup: list[str] | None = None, ): """ Initialize a new cursor instance. Args: collection (Collection): The collection to operate on. filter (dict[str, Any], optional): Filter criteria to apply to the documents. projection (dict[str, Any], optional): Projection criteria to specify which fields to include. hint (str, optional): Hint for the database to improve query performance. session (ClientSession, optional): A ClientSession for transactions. tables_to_cleanup (list[str], optional): List of temporary tables to drop when closed. """ self._collection = collection self._query_helpers = collection.query_engine.helpers self._filter = filter or {} self._projection = projection or {} self._hint: str | list[tuple[str, int]] | None = hint self._comment: str | None = None self._min: dict[str, Any] | None = None self._max: dict[str, Any] | None = None self._collation: dict[str, Any] | None = None self._where_predicate: Callable[[dict[str, Any]], bool] | None = None self._skip = 0 self._limit: int | None = None self._sort: dict[str, int] | None = None self._retrieved: int = 0 self._batch_size = 101 # MongoDB-compatible default self._session = session self._tables_to_cleanup = tables_to_cleanup or [] self._closed = False # Validate session validate_session(session, collection._database)
[docs] def max_await_time_ms(self, max_await_time_ms: int | None) -> Cursor: """ Set the maximum time to wait for new documents (for tailable cursors). Args: max_await_time_ms (int, optional): The maximum time to wait in milliseconds. Returns: Cursor: The cursor object with the max_await_time_ms applied. """ self._max_await_time_ms = max_await_time_ms return self
[docs] def add_option(self, mask: int) -> Cursor: """ Set query flags (bitmask) for this cursor. Args: mask (int): The bitmask of options to set. Returns: Cursor: The cursor object with the options applied. """ if not hasattr(self, "_options"): self._options = 0 self._options |= mask return self
[docs] def remove_option(self, mask: int) -> Cursor: """ Unset query flags (bitmask) for this cursor. Args: mask (int): The bitmask of options to unset. Returns: Cursor: The cursor object with the options removed. """ if not hasattr(self, "_options"): self._options = 0 self._options &= ~mask return self
@property def session(self) -> Any | None: """ Get the ClientSession associated with this cursor. Returns: Any | None: The ClientSession, or None if no session is associated. """ return getattr(self, "_session", None) @property def cursor_id(self) -> int: """ Get the ID of this cursor. Returns: int: The cursor ID (always 0 for NeoSQLite). """ return 0 def __iter__(self) -> Iterator[dict[str, Any]]: """ Return an iterator over the documents in the cursor. Returns: Iterator[dict[str, Any]]: An iterator yielding documents that match the filter, projection, sorting, and pagination criteria. """ return self._execute_query()
[docs] def limit(self, limit: int) -> Cursor: """ Limit the number of documents returned by the cursor. Args: limit (int): The maximum number of documents to return. Returns: Cursor: The cursor object with the limit applied. """ self._limit = limit return self
[docs] def skip(self, skip: int) -> Cursor: """ Skip the specified number of documents when iterating over the cursor. Args: skip (int): The number of documents to skip. Returns: Cursor: The cursor object with the skip applied. """ self._skip = skip return self
[docs] def sort( self, key_or_list: str | list[tuple], direction: int | None = None, ) -> Cursor: """ Sort the documents returned by the cursor. Args: key_or_list (str | list[tuple]): The key or list of keys to sort by. direction (int, optional): The sorting direction (ASCENDING or DESCENDING). Defaults to ASCENDING if None. Returns: Cursor: The cursor object with the sorting applied. """ if isinstance(key_or_list, str): self._sort = {key_or_list: direction or ASCENDING} else: self._sort = dict(key_or_list) return self
[docs] def batch_size(self, size: int) -> Cursor: """ Set the batch size for the cursor. MongoDB-compatible batchSize parameter for controlling memory usage. Results are fetched from the database in batches of the specified size. Args: size (int): The batch size (default: 101, matches MongoDB) Returns: Cursor: The cursor object for chaining Example: >>> cursor = collection.find().batch_size(50) """ if size < 1: raise ValueError("batch_size must be at least 1") self._batch_size = size return self
[docs] def hint(self, hint: str | list[tuple[str, int]]) -> Cursor: """ Set the index hint for the cursor. Args: hint: The index name (str) or list of (field, direction) tuples Returns: Cursor: The cursor object with the hint applied """ self._hint = hint # type: ignore[assignment] return self
[docs] def min(self, min_spec: list[tuple[str, Any]] | tuple[str, Any]) -> Cursor: """ Set the minimum bound for index queries. This method sets a lower bound on the index values to be scanned. Only documents with index values greater than or equal to the specified minimum will be returned. Args: min_spec: A list of (field, value) tuples specifying the inclusive lower bound, e.g., [("age", 18)] Returns: Cursor: The cursor object with the minimum bound applied Raises: TypeError: If min_spec is not a list or tuple Example: >>> cursor = collection.find({"age": {"$gte": 18}}).min([("age", 18)]) """ if not isinstance(min_spec, (list, tuple)): raise TypeError( f"spec must be an instance of list or tuple, not {type(min_spec)}" ) self._min = dict(min_spec) # type: ignore[arg-type] return self
[docs] def max(self, max_spec: list[tuple[str, Any]] | tuple[str, Any]) -> Cursor: """ Set the maximum bound for index queries. This method sets an upper bound on the index values to be scanned. Only documents with index values less than the specified maximum will be returned. Args: max_spec: A list of (field, value) tuples specifying the exclusive upper bound, e.g., [("age", 65)] Returns: Cursor: The cursor object with the maximum bound applied Raises: TypeError: If max_spec is not a list or tuple Example: >>> cursor = collection.find({"age": {"$lte": 65}}).max([("age", 65)]) """ if not isinstance(max_spec, (list, tuple)): raise TypeError( f"spec must be an instance of list or tuple, not {type(max_spec)}" ) self._max = dict(max_spec) # type: ignore[arg-type] return self
[docs] def collation(self, collation: dict[str, Any]) -> Cursor: """ Set the collation for the cursor. Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks. Args: collation (dict[str, Any]): A dictionary specifying collation options: - locale (str): Language locale (e.g., "en_US", "fr_FR", "de_DE") - caseLevel (bool): Whether to include case comparison - caseFirst (str): "upper", "lower", or "off" - strength (int): Comparison strength (1-5) - numericOrdering (bool): Compare numbers numerically - alternate (str): "shifted" or "non-ignorable" - backwards (bool): Sort backwards (for French) Returns: Cursor: The cursor object with the collation applied Example: >>> cursor = collection.find({"name": {"$gte": "A"}}).collation( ... {"locale": "fr_FR", "strength": 2} ... ) Note: NeoSQLite maps common locales to SQLite collations: - Default/unknown: BINARY (case-sensitive) - Locales with case-insensitive: NOCASE - Custom collations can be registered via Connection tokenizers """ self._collation = collation return self
[docs] def where(self, predicate: Callable[[dict[str, Any]], bool]) -> Cursor: """ Filter cursor results using a Python predicate function. This is a Tier-3 (Python fallback) method that applies a Python function to filter documents after they are retrieved from the database. Args: predicate (Callable[[dict[str, Any]], bool]): A function that takes a document and returns True if the document should be included. Returns: Cursor: The cursor object with the predicate applied Example: >>> def is_high_value(doc): ... return doc.get('value', 0) > 10 >>> cursor = collection.find({}).where(is_high_value) Note: This method uses Python-based filtering (Tier-3), which means all matching documents are retrieved from the database first, then filtered in Python. For better performance, use MongoDB-style query operators in the find() filter when possible. """ self._where_predicate = predicate return self
[docs] def comment(self, comment: str) -> Cursor: """ Add a comment to the query for debugging and profiling. The comment is injected as a SQL comment in the generated query, which can be useful for query profiling and debugging. Args: comment (str): The comment text to add to the query Returns: Cursor: The cursor object with the comment applied Example: >>> cursor = collection.find({"age": {"$gte": 18}}).comment("find adults") """ self._comment = comment return self
[docs] def to_list(self, length: int | None = None) -> list[dict[str, Any]]: """ Convert the cursor to a list of documents. This method efficiently converts the cursor contents to a list. If length is specified, returns at most that many documents. Args: length (int, optional): Maximum number of documents to return. If None, returns all documents. Returns: list[dict[str, Any]]: List of documents in the cursor Example: >>> cursor = collection.find({"age": {"$gte": 18}}) >>> adults = cursor.to_list() >>> first_5 = cursor.to_list(5) """ results = list(self) if length is not None: return results[:length] return results
[docs] def clone(self) -> Cursor: """ Create a clone of this cursor with the same options. Returns a new cursor with the same filter, projection, hint, sort, skip, and limit settings. The clone is unevaluated and can be iterated independently. Returns: Cursor: A new cursor with the same settings Example: >>> cursor = collection.find({"age": {"$gte": 18}}).limit(10) >>> clone = cursor.clone() >>> results1 = list(cursor) >>> results2 = list(clone) """ cloned = Cursor( self._collection, filter=deepcopy(self._filter), projection=self._projection, hint=self._hint, # type: ignore[arg-type] ) cloned._skip = self._skip cloned._limit = self._limit cloned._sort = deepcopy(self._sort) if self._sort else None cloned._comment = self._comment cloned._min = deepcopy(self._min) if self._min else None cloned._max = deepcopy(self._max) if self._max else None cloned._collation = ( deepcopy(self._collation) if self._collation else None ) cloned._where_predicate = ( self._where_predicate ) # Functions can't be deep copied cloned._session = self._session cloned._retrieved = 0 # Clone starts fresh return cloned
@property def retrieved(self) -> int: """ Return the number of documents retrieved from the cursor. This property tracks how many documents have been iterated over since the cursor was created or last reset. Returns: int: The number of documents retrieved so far Example: >>> cursor = collection.find({}).limit(10) >>> docs = list(cursor) >>> cursor.retrieved 10 """ return self._retrieved @property def alive(self) -> bool: """ Check if the cursor has more documents to iterate. In NeoSQLite, a cursor is considered alive if it hasn't been fully exhausted. This is a simplified implementation for PyMongo API compatibility. Returns: bool: True if the cursor may have more documents, False if exhausted Note: NeoSQLite cursors are re-iterable, so this property tracks whether the current iteration has been completed. Example: >>> cursor = collection.find({}).limit(10) >>> cursor.alive True >>> list(cursor) >>> cursor.alive False """ # Cursor is alive if we haven't retrieved any documents yet # or if we haven't reached the limit if self._limit is not None: return self._retrieved < self._limit # Without limit, cursor is considered alive until iteration starts # After iteration, check if we got any results return self._retrieved == 0 or not hasattr(self, "_exhausted") @property def collection(self): """ Return a reference to the collection this cursor is iterating over. Returns: Collection: The collection associated with this cursor Example: >>> cursor = collection.find({}) >>> cursor.collection Collection(database, "collection_name") >>> cursor.collection.name 'collection_name' """ return self._collection @property def address(self) -> tuple | None: """ Return the address of the database. For NeoSQLite, this returns a tuple representing the database connection. This is a simplified implementation for PyMongo API compatibility. Returns: tuple | None: A tuple of (database_path, 0) after iteration starts, None before the cursor has been executed. - For file databases: ('sqlite:///path/to/file.db', 0) - For memory databases: ('sqlite::memory:', 0) Note: SQLite is an embedded database without a server, so this returns the database path instead of a network address. Returns None until the cursor has been iterated, matching PyMongo behavior. Example: >>> cursor = collection.find({}) >>> cursor.address # Before iteration None >>> list(cursor) >>> cursor.address # After iteration ('sqlite::memory:', 0) # or ('sqlite:///path/to/file.db', 0) """ # Match PyMongo behavior: None before iteration, tuple after if self._retrieved == 0 and not hasattr(self, "_iterated"): return None # Get database path from connection db_name = self._collection.db.execute( "PRAGMA database_list" ).fetchone()[2] if db_name == ":memory:" or db_name == "": return ("sqlite::memory:", 0) else: return (f"sqlite://{db_name}", 0)
[docs] def explain(self, verbosity: str = "queryPlanner") -> dict[str, Any]: """ Return the query execution plan. Uses SQLite's EXPLAIN QUERY PLAN to provide information about how the query will be executed, including index usage. Args: verbosity (str): Verbosity level - "executionStats" or "queryPlanner" (kept for PyMongo compatibility, SQLite always returns full plan) Defaults to "queryPlanner". Returns: dict[str, Any]: Query execution plan with the following structure: - queryPlanner: Information about the query plan - winningPlan: List of plan stages - indexUsage: Information about index usage - executionStats: Execution statistics (if verbosity="executionStats") - nReturned: Number of documents returned - executionTimeMillis: Execution time in milliseconds Example: >>> cursor = collection.find({"age": {"$gte": 18}}) >>> plan = cursor.explain() >>> print(plan['queryPlanner']['winningPlan']) """ # Build the SQL query that would be executed where_result = self._query_helpers._build_simple_where_clause( self._filter ) # Build sorting and pagination clauses for SQL sort_clause = self._query_helpers._build_sort_clause( self._sort, self._collation ) pagination_clause = self._query_helpers._build_pagination_clause( self._limit, self._skip ) if where_result is not None: where_clause, params, tables = where_result jsonb = self._collection.query_engine._jsonb_supported sql = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}" ) else: # No filter - simple select jsonb = self._collection.query_engine._jsonb_supported sql = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name}{sort_clause}{pagination_clause}" ) params = () # Get the query plan from SQLite # Get the query plan from SQLite try: plan_rows = self._collection.db.execute( f"EXPLAIN QUERY PLAN {sql}", params ).fetchall() # Parse the plan rows # Row format: (id, parent, notused, detail) winning_plan = [] index_usage = [] for row in plan_rows: detail = row[3] if len(row) > 3 else str(row) winning_plan.append({"detail": detail}) # Extract index usage information if "USING INDEX" in detail or "USING COVERING INDEX" in detail: index_usage.append({"detail": detail}) result: dict[str, Any] = { "queryPlanner": { "winningPlan": winning_plan, "indexUsage": index_usage, } } # Add execution stats if requested if verbosity == "executionStats": # Actually execute the query to get stats start_time = time.time() results = list(self) execution_time_ms: float = (time.time() - start_time) * 1000 result["executionStats"] = { "nReturned": len(results), "executionTimeMillis": round(execution_time_ms, 2), } return result except Exception as e: logger.debug(f"{e=}") return { "queryPlanner": { "winningPlan": [{"detail": f"Error getting plan: {e}"}], "indexUsage": [], }, "error": str(e), }
[docs] def _execute_query(self) -> Iterator[dict[str, Any]]: """ Execute the query and yield the results after applying filters, sorting, pagination, and projection. Yields: dict[str, Any]: A dictionary representing each document in the result set. """ validate_session(self._session, self._collection._database) # track if SQL handled sorting and pagination self._sql_handled_sort = False self._sql_handled_pagination = False # Get the documents based on filter docs = self._get_filtered_documents() # Apply sorting if not handled by SQL if not self._sql_handled_sort: docs = self._apply_sorting(docs) # Apply skip and limit if not handled by SQL if not self._sql_handled_pagination: docs = self._apply_pagination(docs) # Apply projection docs = self._apply_projection(docs) # Mark cursor as having been iterated (for address property) self._iterated = True # Yield results and track count for doc in docs: self._retrieved += 1 yield doc # Mark as exhausted self._exhausted = True
[docs] def _get_filtered_documents(self) -> Iterable[dict[str, Any]]: """ Retrieve documents based on the filter criteria, applying SQL-based filtering where possible, or falling back to Python-based filtering for complex queries. For datetime queries, use the specialized datetime query processor. Returns: Iterable[dict[str, Any]]: An iterable of dictionaries representing the documents that match the filter criteria. """ # Check if this is a datetime query that should use the specialized processor if self._contains_datetime_operations(self._filter): # Use the datetime query processor for datetime-specific queries try: from .datetime_query_processor import DateTimeQueryProcessor datetime_processor = DateTimeQueryProcessor(self._collection) return datetime_processor.process_datetime_query(self._filter) except Exception as e: # If datetime processor fails, fall back to normal processing logger.debug(f"DateTime processor failed, falling back: {e}") pass # Special handling for $expr queries if "$expr" in self._filter: return self._handle_expr_query() where_result = self._query_helpers._build_simple_where_clause( self._filter ) docs: Iterable[dict[str, Any]] if where_result is not None: # Use SQL-based filtering where_clause, params, tables = where_result # Track tables for cleanup if tables: self._tables_to_cleanup.extend(tables) # Add min/max bounds if specified if self._min or self._max: minmax_clause, minmax_params = self._build_minmax_clause( where_clause, tuple(params), self._min, self._max ) where_clause = minmax_clause params = minmax_params # type: ignore[assignment] # Build sorting and pagination clauses for SQL sort_clause = self._query_helpers._build_sort_clause( self._sort, self._collation ) # If we have a where predicate (Python filter), we CANNOT do SQL pagination # because we need to see all documents that matched the SQL filter first. if self._where_predicate: pagination_clause = "" else: pagination_clause = ( self._query_helpers._build_pagination_clause( self._limit, self._skip ) ) # Use the collection's JSONB support flag to determine how to select data jsonb = self._collection.query_engine._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}" ) # Track which parts were handled by SQL if sort_clause: self._sql_handled_sort = True if pagination_clause: self._sql_handled_pagination = True # Add comment if specified if self._comment: safe_comment = ( self._comment.replace("/*", "") .replace("*/", "") .replace("--", "") ) cmd = f"/* {safe_comment} */ {cmd}" db_cursor = self._collection.db.execute(cmd, params) def doc_generator(): while True: rows = db_cursor.fetchmany(self._batch_size) if not rows: break for doc in self._load_documents(rows): yield doc docs = doc_generator() else: # Fall back to Python-based filtering docs = self._handle_python_fallback() # Apply where predicate if specified (Tier-3 Python filtering) if self._where_predicate: docs = filter(self._where_predicate, docs) # type: ignore[arg-type] # Apply $jsonSchema filter if present if "$jsonSchema" in self._filter: from .query_helper.schema_validator import matches_json_schema schema = self._filter["$jsonSchema"] docs = (doc for doc in docs if matches_json_schema(doc, schema)) return docs
[docs] def _handle_python_fallback(self) -> Iterable[dict[str, Any]]: """Handle complex queries by filtering all documents in Python.""" # Build sorting and pagination clauses for SQL # Even in fallback mode, we can still use SQL to sort if possible sort_clause = self._query_helpers._build_sort_clause( self._sort, self._collation ) # If we have a where predicate or we are in Python fallback mode for filter, # we CANNOT do SQL pagination because the Python filter might exclude documents # that the SQL LIMIT would have included. pagination_clause = "" # Use the collection's JSONB support flag to determine how to select data jsonb = self._collection.query_engine._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name}" ) # Add min/max bounds if specified (no filter case) if self._min or self._max: minmax_clause, minmax_params = self._build_minmax_clause( "", (), self._min, self._max ) # Remove leading " WHERE" if present and add our own minmax_clause = minmax_clause.lstrip().lstrip("WHERE").lstrip() cmd = f"{cmd} WHERE {minmax_clause}" params = minmax_params else: params = () # Append sort and pagination cmd = f"{cmd}{sort_clause}{pagination_clause}" # Track which parts were handled by SQL if sort_clause: self._sql_handled_sort = True if pagination_clause: self._sql_handled_pagination = True # Add comment if specified if self._comment: safe_comment = ( self._comment.replace("--", "- -") .replace("/*", "/ *") .replace("*/", "* /") ) cmd = f"/* {safe_comment} */ {cmd}" db_cursor = self._collection.db.execute(cmd, params) apply = partial(self._query_helpers._apply_query, self._filter) while True: rows = db_cursor.fetchmany(self._batch_size) if not rows: break for doc in self._load_documents(rows): if apply(doc): yield doc
[docs] def _handle_expr_query(self) -> Iterable[dict[str, Any]]: """ Handle $expr queries with SQL evaluation when possible, Python fallback otherwise. This method uses the query helper's _build_expr_where_clause to attempt SQL evaluation (Tiers 1 and 2), and falls back to Python evaluation (Tier 3) if needed. """ from .expr_evaluator import ExprEvaluator # Try to build SQL query/WHERE clause with query helper # This handles Tier 1 (WHERE) and Tier 2 (Full SELECT with temp tables) expr_result = self._query_helpers._build_expr_where_clause(self._filter) if expr_result is not None: # Use SQL-based evaluation sql_or_where, params, tables = expr_result # Track tables for cleanup if tables: self._tables_to_cleanup.extend(tables) if sql_or_where.strip().upper().startswith("SELECT"): # Tier 2: full SELECT query with temp tables cmd = sql_or_where pagination_clause = "" # SQL sorting and pagination handled in the Tier 2 query itself # (currently Tier 2 doesn't handle them yet, but we mark them # as NOT handled to let the Python Cursor handle them) self._sql_handled_sort = False self._sql_handled_pagination = False else: # Tier 1: WHERE clause only where_clause = sql_or_where # Build sorting and pagination clauses for SQL sort_clause = self._query_helpers._build_sort_clause( self._sort, self._collation ) # If we have a where predicate (Python filter), we CANNOT do SQL pagination if self._where_predicate: pagination_clause = "" else: pagination_clause = ( self._query_helpers._build_pagination_clause( self._limit, self._skip ) ) jsonb = self._collection.query_engine._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}" ) # Track which parts were handled by SQL if sort_clause: self._sql_handled_sort = True if pagination_clause: self._sql_handled_pagination = True # Add comment if specified if self._comment: safe_comment = ( self._comment.replace("/*", "") .replace("*/", "") .replace("--", "") ) cmd = f"/* {safe_comment} */ {cmd}" db_cursor = self._collection.db.execute(cmd, params) # Use fetchmany for memory-efficient batch fetching docs: list[dict[str, Any]] = [] while True: rows = db_cursor.fetchmany(self._batch_size) if not rows: break docs.extend(self._load_documents(rows)) return docs else: # Fallback to Python evaluation expr = self._filter["$expr"] # Create evaluator with database connection for JSONB support detection evaluator = ExprEvaluator(db_connection=self._collection.db) # Build sorting clause for SQL even in fallback sort_clause = self._query_helpers._build_sort_clause( self._sort, self._collation ) # Cannot do SQL pagination for expression fallback pagination_clause = "" # Get all documents jsonb = self._collection.query_engine._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {self._collection.name}{sort_clause}{pagination_clause}" ) # Track which parts were handled by SQL if sort_clause: self._sql_handled_sort = True if pagination_clause: self._sql_handled_pagination = True # Add comment if specified if self._comment: safe_comment = ( self._comment.replace("/*", "") .replace("*/", "") .replace("--", "") ) cmd = f"/* {safe_comment} */ {cmd}" db_cursor = self._collection.db.execute(cmd) # Use fetchmany for memory-efficient batch fetching all_docs: list[dict[str, Any]] = [] while True: rows = db_cursor.fetchmany(self._batch_size) if not rows: break all_docs.extend(self._load_documents(rows)) # Filter documents using $expr Python evaluation def expr_filter(doc: dict[str, Any]) -> bool: """ Evaluate the expression for a given document. Args: doc: The document to evaluate. Returns: True if the expression evaluates to True, False otherwise. """ try: return evaluator.evaluate_python(expr, doc) except Exception as e: logger.warning(f"$expr evaluation failed for document: {e}") return False return filter(expr_filter, all_docs) # type: ignore[arg-type]
[docs] def _build_minmax_clause( self, where_clause: str, params: tuple, min_spec: dict[str, Any] | None, max_spec: dict[str, Any] | None, ) -> tuple: """ Build SQL clause for min/max index bounds. Args: where_clause: Existing WHERE clause (may be empty) params: Existing parameters min_spec: Minimum bound specification max_spec: Maximum bound specification Returns: Tuple of (new WHERE clause, new parameters) """ additional_conditions = [] additional_params = list(params) jsonb = self._collection.query_engine._jsonb_supported json_func = _get_json_function_prefix(jsonb) # Add minimum bounds if min_spec: for field, value in min_spec.items(): json_path = parse_json_path(field) additional_conditions.append( f"{json_func}_extract(data, '{json_path}') >= ?" ) additional_params.append(value) # Add maximum bounds (strict less than for max) if max_spec: for field, value in max_spec.items(): json_path = parse_json_path(field) additional_conditions.append( f"{json_func}_extract(data, '{json_path}') < ?" ) additional_params.append(value) if additional_conditions: conditions_sql = " AND ".join(additional_conditions) # If there's an existing WHERE clause, append to it if where_clause and where_clause.strip(): base_clause = where_clause.rstrip() new_clause = f"{base_clause} AND {conditions_sql}" else: # No existing WHERE, create new one new_clause = f"WHERE {conditions_sql}" return new_clause, tuple(additional_params) return where_clause, params
[docs] def _get_collate_clause(self) -> str: """ Get the SQL COLLATE clause based on collation settings. Returns: str: COLLATE clause or empty string if no collation is set Note: Maps MongoDB collation locales to SQLite collations: - Case-insensitive locales → NOCASE - Default/unknown → BINARY (case-sensitive) Custom collations can be registered via Connection tokenizers. Note: COLLATE is applied to ORDER BY clauses, not WHERE clauses. For WHERE clause string comparisons, use _apply_collation_to_expr(). """ if not self._collation: return "" strength = self._collation.get("strength", 3) case_level = self._collation.get("caseLevel", False) # Determine collation based on settings # Strength 1-2: Ignore case/diacritics → NOCASE # Strength 3+: Respect case → BINARY (default) if strength <= 2 or not case_level: # Case-insensitive comparison return " COLLATE NOCASE" else: # Case-sensitive comparison (default SQLite behavior) return ""
[docs] def _contains_datetime_operations(self, query: dict[str, Any]) -> bool: """ Check if a query contains datetime operations that should use the datetime processor. Args: query: MongoDB-style query dictionary Returns: True if query contains datetime operations, False otherwise """ # Quick check for obvious datetime patterns in query if not isinstance(query, dict): return False for field, value in query.items(): if field in ("$and", "$or", "$nor"): if isinstance(value, list): for condition in value: if isinstance( condition, dict ) and self._contains_datetime_operations(condition): return True elif field == "$not": if isinstance( value, dict ) and self._contains_datetime_operations(value): return True elif isinstance(value, dict): # Check for datetime-related operators for operator, op_value in value.items(): if operator in ("$gte", "$gt", "$lte", "$lt", "$eq", "$ne"): # Check if the value is a datetime object or datetime string if self._is_datetime_value(op_value): return True elif operator in ("$in", "$nin"): # For $in and $nin, check if any value in the list is a datetime if isinstance(op_value, list): if any( self._is_datetime_value(item) for item in op_value ): return True elif operator == "$type": # Check if looking for date type if op_value in ( 9, "date", "Date", ): # 9 is date type in MongoDB return True elif operator == "$regex": # Check if it's a datetime regex pattern if self._is_datetime_regex(op_value): return True return False
[docs] def _is_datetime_value(self, value: Any) -> bool: """ Check if a value is a datetime object or datetime string. Args: value: Value to check Returns: True if value is datetime-related, False otherwise """ from .datetime_utils import is_datetime_value return is_datetime_value(value)
[docs] def _is_datetime_regex(self, pattern: str) -> bool: """ Check if a regex pattern is likely to be for datetime matching. Args: pattern: Regex pattern string Returns: True if pattern is likely datetime-related, False otherwise """ from .datetime_utils import is_datetime_regex return is_datetime_regex(pattern)
[docs] def _load_documents(self, rows) -> Iterable[dict[str, Any]]: """ Load documents from rows returned by the database query, including handling both id and _id. Args: rows: Database result rows containing id, _id, and data Returns: Iterable[dict[str, Any]]: An iterable of loaded documents """ for row in rows: id_val, stored_id_val, data_val = row # Use the collection's _load method which now handles both id and _id doc = self._collection._load_with_stored_id( id_val, data_val, stored_id_val ) yield doc
[docs] def _apply_sorting( self, docs: Iterable[dict[str, Any]] ) -> list[dict[str, Any]]: """ Sort the documents based on the specified sorting criteria. Args: docs (Iterable[dict[str, Any]]): The iterable of documents to sort. Returns: list[dict[str, Any]]: A list of dictionaries representing the documents sorted by the specified criteria. """ if not self._sort: return list(docs) sort_keys = list(self._sort.keys()) sort_keys.reverse() sorted_docs = list(docs) # Get collation settings for case-insensitive sorting case_insensitive = False if self._collation: strength = self._collation.get("strength", 3) case_level = self._collation.get("caseLevel", False) # Strength 1-2 means case-insensitive if strength <= 2 or not case_level: case_insensitive = True for key in sort_keys: get_val = partial(self._collection._get_val, key=key) reverse = self._sort[key] == DESCENDING if case_insensitive: # Use case-insensitive sorting def make_key(get_val=get_val): """ Create a case-insensitive key function. """ def key_func(doc): """ Extract value and convert to lowercase if it's a string. """ val = get_val(doc) if isinstance(val, str): return val.lower() return val return key_func sorted_docs.sort(key=make_key(), reverse=reverse) else: sorted_docs.sort(key=get_val, reverse=reverse) return sorted_docs
[docs] def _apply_pagination( self, docs: Iterable[dict[str, Any]] ) -> list[dict[str, Any]]: """ Apply skip and limit to the documents. Args: docs (Iterable[dict[str, Any]]): The iterable of documents to apply pagination to. Returns: list[dict[str, Any]]: A list of dictionaries representing the documents after applying skip and limit. """ doc_list = list(docs) skipped_docs = doc_list[self._skip :] if self._limit is not None: return skipped_docs[: self._limit] return skipped_docs
[docs] def _apply_projection( self, docs: Iterable[dict[str, Any]] ) -> list[dict[str, Any]]: """ Apply projection to the documents. Args: docs (Iterable[dict[str, Any]]): The iterable of documents to apply projection to. Returns: list[dict[str, Any]]: A list of dictionaries representing the documents after applying the projection. """ project = partial( self._query_helpers._apply_projection, self._projection ) return list(map(project, docs))
[docs] def close(self) -> None: """ Close the cursor and release any resources. """ if self._closed: return self._cleanup_tables() self._closed = True
def __del__(self) -> None: """ Ensure resources are cleaned up when the cursor is garbage collected. """ try: self.close() except Exception as e: if logger is not None: logger.debug(f"{e=}") pass def __enter__(self) -> Cursor: """ Enter the context manager. Returns: Cursor: The cursor itself. """ return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Exit the context manager and close the cursor. Args: exc_type: The exception type if an exception was raised. exc_val: The exception value if an exception was raised. exc_tb: The exception traceback if an exception was raised. """ self.close()
[docs] def _cleanup_tables(self) -> None: """ Drop any temporary tables created for this cursor. """ if not self._tables_to_cleanup: return for table in self._tables_to_cleanup: try: # Use a new connection or the shared connection to drop tables self._collection.db.execute(f"DROP TABLE IF EXISTS {table}") except Exception as e: logger.debug( f"Failed to drop temporary table '{table}' during cleanup: {e}" ) pass self._tables_to_cleanup = []