from __future__ import annotations
import logging
import time
from collections.abc import Callable, Iterable, Iterator
from copy import deepcopy
from functools import partial
from typing import TYPE_CHECKING, Any
from .json_path_utils import parse_json_path
from .jsonb_support import _get_json_function_prefix, json_data_column
from .type_utils import validate_session
if TYPE_CHECKING:
from ..client_session import ClientSession
from . import Collection
logger = logging.getLogger(__name__)
ASCENDING = 1
DESCENDING = -1
[docs]
class Cursor:
"""
Class representing a cursor for iterating over documents in a collection with
applied filters, projections, sorting, and pagination.
"""
[docs]
def __init__(
self,
collection: Collection,
filter: dict[str, Any] | None = None,
projection: dict[str, Any] | None = None,
hint: str | None = None,
session: ClientSession | None = None,
tables_to_cleanup: list[str] | None = None,
):
"""
Initialize a new cursor instance.
Args:
collection (Collection): The collection to operate on.
filter (dict[str, Any], optional): Filter criteria to apply to the documents.
projection (dict[str, Any], optional): Projection criteria to specify which fields to include.
hint (str, optional): Hint for the database to improve query performance.
session (ClientSession, optional): A ClientSession for transactions.
tables_to_cleanup (list[str], optional): List of temporary tables to drop when closed.
"""
self._collection = collection
self._query_helpers = collection.query_engine.helpers
self._filter = filter or {}
self._projection = projection or {}
self._hint: str | list[tuple[str, int]] | None = hint
self._comment: str | None = None
self._min: dict[str, Any] | None = None
self._max: dict[str, Any] | None = None
self._collation: dict[str, Any] | None = None
self._where_predicate: Callable[[dict[str, Any]], bool] | None = None
self._skip = 0
self._limit: int | None = None
self._sort: dict[str, int] | None = None
self._retrieved: int = 0
self._batch_size = 101 # MongoDB-compatible default
self._session = session
self._tables_to_cleanup = tables_to_cleanup or []
self._closed = False
# Validate session
validate_session(session, collection._database)
[docs]
def max_await_time_ms(self, max_await_time_ms: int | None) -> Cursor:
"""
Set the maximum time to wait for new documents (for tailable cursors).
Args:
max_await_time_ms (int, optional): The maximum time to wait in milliseconds.
Returns:
Cursor: The cursor object with the max_await_time_ms applied.
"""
self._max_await_time_ms = max_await_time_ms
return self
[docs]
def add_option(self, mask: int) -> Cursor:
"""
Set query flags (bitmask) for this cursor.
Args:
mask (int): The bitmask of options to set.
Returns:
Cursor: The cursor object with the options applied.
"""
if not hasattr(self, "_options"):
self._options = 0
self._options |= mask
return self
[docs]
def remove_option(self, mask: int) -> Cursor:
"""
Unset query flags (bitmask) for this cursor.
Args:
mask (int): The bitmask of options to unset.
Returns:
Cursor: The cursor object with the options removed.
"""
if not hasattr(self, "_options"):
self._options = 0
self._options &= ~mask
return self
@property
def session(self) -> Any | None:
"""
Get the ClientSession associated with this cursor.
Returns:
Any | None: The ClientSession, or None if no session is associated.
"""
return getattr(self, "_session", None)
@property
def cursor_id(self) -> int:
"""
Get the ID of this cursor.
Returns:
int: The cursor ID (always 0 for NeoSQLite).
"""
return 0
def __iter__(self) -> Iterator[dict[str, Any]]:
"""
Return an iterator over the documents in the cursor.
Returns:
Iterator[dict[str, Any]]: An iterator yielding documents that match the filter,
projection, sorting, and pagination criteria.
"""
return self._execute_query()
[docs]
def limit(self, limit: int) -> Cursor:
"""
Limit the number of documents returned by the cursor.
Args:
limit (int): The maximum number of documents to return.
Returns:
Cursor: The cursor object with the limit applied.
"""
self._limit = limit
return self
[docs]
def skip(self, skip: int) -> Cursor:
"""
Skip the specified number of documents when iterating over the cursor.
Args:
skip (int): The number of documents to skip.
Returns:
Cursor: The cursor object with the skip applied.
"""
self._skip = skip
return self
[docs]
def sort(
self,
key_or_list: str | list[tuple],
direction: int | None = None,
) -> Cursor:
"""
Sort the documents returned by the cursor.
Args:
key_or_list (str | list[tuple]): The key or list of keys to sort by.
direction (int, optional): The sorting direction (ASCENDING or DESCENDING).
Defaults to ASCENDING if None.
Returns:
Cursor: The cursor object with the sorting applied.
"""
if isinstance(key_or_list, str):
self._sort = {key_or_list: direction or ASCENDING}
else:
self._sort = dict(key_or_list)
return self
[docs]
def batch_size(self, size: int) -> Cursor:
"""
Set the batch size for the cursor.
MongoDB-compatible batchSize parameter for controlling memory usage.
Results are fetched from the database in batches of the specified size.
Args:
size (int): The batch size (default: 101, matches MongoDB)
Returns:
Cursor: The cursor object for chaining
Example:
>>> cursor = collection.find().batch_size(50)
"""
if size < 1:
raise ValueError("batch_size must be at least 1")
self._batch_size = size
return self
[docs]
def hint(self, hint: str | list[tuple[str, int]]) -> Cursor:
"""
Set the index hint for the cursor.
Args:
hint: The index name (str) or list of (field, direction) tuples
Returns:
Cursor: The cursor object with the hint applied
"""
self._hint = hint # type: ignore[assignment]
return self
[docs]
def min(self, min_spec: list[tuple[str, Any]] | tuple[str, Any]) -> Cursor:
"""
Set the minimum bound for index queries.
This method sets a lower bound on the index values to be scanned.
Only documents with index values greater than or equal to the
specified minimum will be returned.
Args:
min_spec: A list of (field, value) tuples specifying the
inclusive lower bound, e.g., [("age", 18)]
Returns:
Cursor: The cursor object with the minimum bound applied
Raises:
TypeError: If min_spec is not a list or tuple
Example:
>>> cursor = collection.find({"age": {"$gte": 18}}).min([("age", 18)])
"""
if not isinstance(min_spec, (list, tuple)):
raise TypeError(
f"spec must be an instance of list or tuple, not {type(min_spec)}"
)
self._min = dict(min_spec) # type: ignore[arg-type]
return self
[docs]
def max(self, max_spec: list[tuple[str, Any]] | tuple[str, Any]) -> Cursor:
"""
Set the maximum bound for index queries.
This method sets an upper bound on the index values to be scanned.
Only documents with index values less than the specified maximum
will be returned.
Args:
max_spec: A list of (field, value) tuples specifying the
exclusive upper bound, e.g., [("age", 65)]
Returns:
Cursor: The cursor object with the maximum bound applied
Raises:
TypeError: If max_spec is not a list or tuple
Example:
>>> cursor = collection.find({"age": {"$lte": 65}}).max([("age", 65)])
"""
if not isinstance(max_spec, (list, tuple)):
raise TypeError(
f"spec must be an instance of list or tuple, not {type(max_spec)}"
)
self._max = dict(max_spec) # type: ignore[arg-type]
return self
[docs]
def collation(self, collation: dict[str, Any]) -> Cursor:
"""
Set the collation for the cursor.
Collation allows users to specify language-specific rules for string
comparison, such as rules for lettercase and accent marks.
Args:
collation (dict[str, Any]): A dictionary specifying collation options:
- locale (str): Language locale (e.g., "en_US", "fr_FR", "de_DE")
- caseLevel (bool): Whether to include case comparison
- caseFirst (str): "upper", "lower", or "off"
- strength (int): Comparison strength (1-5)
- numericOrdering (bool): Compare numbers numerically
- alternate (str): "shifted" or "non-ignorable"
- backwards (bool): Sort backwards (for French)
Returns:
Cursor: The cursor object with the collation applied
Example:
>>> cursor = collection.find({"name": {"$gte": "A"}}).collation(
... {"locale": "fr_FR", "strength": 2}
... )
Note:
NeoSQLite maps common locales to SQLite collations:
- Default/unknown: BINARY (case-sensitive)
- Locales with case-insensitive: NOCASE
- Custom collations can be registered via Connection tokenizers
"""
self._collation = collation
return self
[docs]
def where(self, predicate: Callable[[dict[str, Any]], bool]) -> Cursor:
"""
Filter cursor results using a Python predicate function.
This is a Tier-3 (Python fallback) method that applies a Python
function to filter documents after they are retrieved from the database.
Args:
predicate (Callable[[dict[str, Any]], bool]): A function that takes a document and returns
True if the document should be included.
Returns:
Cursor: The cursor object with the predicate applied
Example:
>>> def is_high_value(doc):
... return doc.get('value', 0) > 10
>>> cursor = collection.find({}).where(is_high_value)
Note:
This method uses Python-based filtering (Tier-3), which means all
matching documents are retrieved from the database first, then
filtered in Python. For better performance, use MongoDB-style
query operators in the find() filter when possible.
"""
self._where_predicate = predicate
return self
[docs]
def to_list(self, length: int | None = None) -> list[dict[str, Any]]:
"""
Convert the cursor to a list of documents.
This method efficiently converts the cursor contents to a list.
If length is specified, returns at most that many documents.
Args:
length (int, optional): Maximum number of documents to return.
If None, returns all documents.
Returns:
list[dict[str, Any]]: List of documents in the cursor
Example:
>>> cursor = collection.find({"age": {"$gte": 18}})
>>> adults = cursor.to_list()
>>> first_5 = cursor.to_list(5)
"""
results = list(self)
if length is not None:
return results[:length]
return results
[docs]
def clone(self) -> Cursor:
"""
Create a clone of this cursor with the same options.
Returns a new cursor with the same filter, projection, hint,
sort, skip, and limit settings. The clone is unevaluated and
can be iterated independently.
Returns:
Cursor: A new cursor with the same settings
Example:
>>> cursor = collection.find({"age": {"$gte": 18}}).limit(10)
>>> clone = cursor.clone()
>>> results1 = list(cursor)
>>> results2 = list(clone)
"""
cloned = Cursor(
self._collection,
filter=deepcopy(self._filter),
projection=self._projection,
hint=self._hint, # type: ignore[arg-type]
)
cloned._skip = self._skip
cloned._limit = self._limit
cloned._sort = deepcopy(self._sort) if self._sort else None
cloned._comment = self._comment
cloned._min = deepcopy(self._min) if self._min else None
cloned._max = deepcopy(self._max) if self._max else None
cloned._collation = (
deepcopy(self._collation) if self._collation else None
)
cloned._where_predicate = (
self._where_predicate
) # Functions can't be deep copied
cloned._session = self._session
cloned._retrieved = 0 # Clone starts fresh
return cloned
@property
def retrieved(self) -> int:
"""
Return the number of documents retrieved from the cursor.
This property tracks how many documents have been iterated over
since the cursor was created or last reset.
Returns:
int: The number of documents retrieved so far
Example:
>>> cursor = collection.find({}).limit(10)
>>> docs = list(cursor)
>>> cursor.retrieved
10
"""
return self._retrieved
@property
def alive(self) -> bool:
"""
Check if the cursor has more documents to iterate.
In NeoSQLite, a cursor is considered alive if it hasn't been fully exhausted.
This is a simplified implementation for PyMongo API compatibility.
Returns:
bool: True if the cursor may have more documents, False if exhausted
Note:
NeoSQLite cursors are re-iterable, so this property tracks whether
the current iteration has been completed.
Example:
>>> cursor = collection.find({}).limit(10)
>>> cursor.alive
True
>>> list(cursor)
>>> cursor.alive
False
"""
# Cursor is alive if we haven't retrieved any documents yet
# or if we haven't reached the limit
if self._limit is not None:
return self._retrieved < self._limit
# Without limit, cursor is considered alive until iteration starts
# After iteration, check if we got any results
return self._retrieved == 0 or not hasattr(self, "_exhausted")
@property
def collection(self):
"""
Return a reference to the collection this cursor is iterating over.
Returns:
Collection: The collection associated with this cursor
Example:
>>> cursor = collection.find({})
>>> cursor.collection
Collection(database, "collection_name")
>>> cursor.collection.name
'collection_name'
"""
return self._collection
@property
def address(self) -> tuple | None:
"""
Return the address of the database.
For NeoSQLite, this returns a tuple representing the database connection.
This is a simplified implementation for PyMongo API compatibility.
Returns:
tuple | None: A tuple of (database_path, 0) after iteration starts,
None before the cursor has been executed.
- For file databases: ('sqlite:///path/to/file.db', 0)
- For memory databases: ('sqlite::memory:', 0)
Note:
SQLite is an embedded database without a server, so this returns
the database path instead of a network address. Returns None until
the cursor has been iterated, matching PyMongo behavior.
Example:
>>> cursor = collection.find({})
>>> cursor.address # Before iteration
None
>>> list(cursor)
>>> cursor.address # After iteration
('sqlite::memory:', 0) # or ('sqlite:///path/to/file.db', 0)
"""
# Match PyMongo behavior: None before iteration, tuple after
if self._retrieved == 0 and not hasattr(self, "_iterated"):
return None
# Get database path from connection
db_name = self._collection.db.execute(
"PRAGMA database_list"
).fetchone()[2]
if db_name == ":memory:" or db_name == "":
return ("sqlite::memory:", 0)
else:
return (f"sqlite://{db_name}", 0)
[docs]
def explain(self, verbosity: str = "queryPlanner") -> dict[str, Any]:
"""
Return the query execution plan.
Uses SQLite's EXPLAIN QUERY PLAN to provide information about
how the query will be executed, including index usage.
Args:
verbosity (str): Verbosity level - "executionStats" or "queryPlanner"
(kept for PyMongo compatibility, SQLite always returns full plan)
Defaults to "queryPlanner".
Returns:
dict[str, Any]: Query execution plan with the following structure:
- queryPlanner: Information about the query plan
- winningPlan: List of plan stages
- indexUsage: Information about index usage
- executionStats: Execution statistics (if verbosity="executionStats")
- nReturned: Number of documents returned
- executionTimeMillis: Execution time in milliseconds
Example:
>>> cursor = collection.find({"age": {"$gte": 18}})
>>> plan = cursor.explain()
>>> print(plan['queryPlanner']['winningPlan'])
"""
# Build the SQL query that would be executed
where_result = self._query_helpers._build_simple_where_clause(
self._filter
)
# Build sorting and pagination clauses for SQL
sort_clause = self._query_helpers._build_sort_clause(
self._sort, self._collation
)
pagination_clause = self._query_helpers._build_pagination_clause(
self._limit, self._skip
)
if where_result is not None:
where_clause, params, tables = where_result
jsonb = self._collection.query_engine._jsonb_supported
sql = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}"
)
else:
# No filter - simple select
jsonb = self._collection.query_engine._jsonb_supported
sql = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name}{sort_clause}{pagination_clause}"
)
params = ()
# Get the query plan from SQLite
# Get the query plan from SQLite
try:
plan_rows = self._collection.db.execute(
f"EXPLAIN QUERY PLAN {sql}", params
).fetchall()
# Parse the plan rows
# Row format: (id, parent, notused, detail)
winning_plan = []
index_usage = []
for row in plan_rows:
detail = row[3] if len(row) > 3 else str(row)
winning_plan.append({"detail": detail})
# Extract index usage information
if "USING INDEX" in detail or "USING COVERING INDEX" in detail:
index_usage.append({"detail": detail})
result: dict[str, Any] = {
"queryPlanner": {
"winningPlan": winning_plan,
"indexUsage": index_usage,
}
}
# Add execution stats if requested
if verbosity == "executionStats":
# Actually execute the query to get stats
start_time = time.time()
results = list(self)
execution_time_ms: float = (time.time() - start_time) * 1000
result["executionStats"] = {
"nReturned": len(results),
"executionTimeMillis": round(execution_time_ms, 2),
}
return result
except Exception as e:
logger.debug(f"{e=}")
return {
"queryPlanner": {
"winningPlan": [{"detail": f"Error getting plan: {e}"}],
"indexUsage": [],
},
"error": str(e),
}
[docs]
def _execute_query(self) -> Iterator[dict[str, Any]]:
"""
Execute the query and yield the results after applying filters, sorting,
pagination, and projection.
Yields:
dict[str, Any]: A dictionary representing each document in the result set.
"""
validate_session(self._session, self._collection._database)
# track if SQL handled sorting and pagination
self._sql_handled_sort = False
self._sql_handled_pagination = False
# Get the documents based on filter
docs = self._get_filtered_documents()
# Apply sorting if not handled by SQL
if not self._sql_handled_sort:
docs = self._apply_sorting(docs)
# Apply skip and limit if not handled by SQL
if not self._sql_handled_pagination:
docs = self._apply_pagination(docs)
# Apply projection
docs = self._apply_projection(docs)
# Mark cursor as having been iterated (for address property)
self._iterated = True
# Yield results and track count
for doc in docs:
self._retrieved += 1
yield doc
# Mark as exhausted
self._exhausted = True
[docs]
def _get_filtered_documents(self) -> Iterable[dict[str, Any]]:
"""
Retrieve documents based on the filter criteria, applying SQL-based filtering
where possible, or falling back to Python-based filtering for complex queries.
For datetime queries, use the specialized datetime query processor.
Returns:
Iterable[dict[str, Any]]: An iterable of dictionaries representing
the documents that match the filter criteria.
"""
# Check if this is a datetime query that should use the specialized processor
if self._contains_datetime_operations(self._filter):
# Use the datetime query processor for datetime-specific queries
try:
from .datetime_query_processor import DateTimeQueryProcessor
datetime_processor = DateTimeQueryProcessor(self._collection)
return datetime_processor.process_datetime_query(self._filter)
except Exception as e:
# If datetime processor fails, fall back to normal processing
logger.debug(f"DateTime processor failed, falling back: {e}")
pass
# Special handling for $expr queries
if "$expr" in self._filter:
return self._handle_expr_query()
where_result = self._query_helpers._build_simple_where_clause(
self._filter
)
docs: Iterable[dict[str, Any]]
if where_result is not None:
# Use SQL-based filtering
where_clause, params, tables = where_result
# Track tables for cleanup
if tables:
self._tables_to_cleanup.extend(tables)
# Add min/max bounds if specified
if self._min or self._max:
minmax_clause, minmax_params = self._build_minmax_clause(
where_clause, tuple(params), self._min, self._max
)
where_clause = minmax_clause
params = minmax_params # type: ignore[assignment]
# Build sorting and pagination clauses for SQL
sort_clause = self._query_helpers._build_sort_clause(
self._sort, self._collation
)
# If we have a where predicate (Python filter), we CANNOT do SQL pagination
# because we need to see all documents that matched the SQL filter first.
if self._where_predicate:
pagination_clause = ""
else:
pagination_clause = (
self._query_helpers._build_pagination_clause(
self._limit, self._skip
)
)
# Use the collection's JSONB support flag to determine how to select data
jsonb = self._collection.query_engine._jsonb_supported
cmd = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}"
)
# Track which parts were handled by SQL
if sort_clause:
self._sql_handled_sort = True
if pagination_clause:
self._sql_handled_pagination = True
# Add comment if specified
if self._comment:
safe_comment = (
self._comment.replace("/*", "")
.replace("*/", "")
.replace("--", "")
)
cmd = f"/* {safe_comment} */ {cmd}"
db_cursor = self._collection.db.execute(cmd, params)
def doc_generator():
while True:
rows = db_cursor.fetchmany(self._batch_size)
if not rows:
break
for doc in self._load_documents(rows):
yield doc
docs = doc_generator()
else:
# Fall back to Python-based filtering
docs = self._handle_python_fallback()
# Apply where predicate if specified (Tier-3 Python filtering)
if self._where_predicate:
docs = filter(self._where_predicate, docs) # type: ignore[arg-type]
# Apply $jsonSchema filter if present
if "$jsonSchema" in self._filter:
from .query_helper.schema_validator import matches_json_schema
schema = self._filter["$jsonSchema"]
docs = (doc for doc in docs if matches_json_schema(doc, schema))
return docs
[docs]
def _handle_python_fallback(self) -> Iterable[dict[str, Any]]:
"""Handle complex queries by filtering all documents in Python."""
# Build sorting and pagination clauses for SQL
# Even in fallback mode, we can still use SQL to sort if possible
sort_clause = self._query_helpers._build_sort_clause(
self._sort, self._collation
)
# If we have a where predicate or we are in Python fallback mode for filter,
# we CANNOT do SQL pagination because the Python filter might exclude documents
# that the SQL LIMIT would have included.
pagination_clause = ""
# Use the collection's JSONB support flag to determine how to select data
jsonb = self._collection.query_engine._jsonb_supported
cmd = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name}"
)
# Add min/max bounds if specified (no filter case)
if self._min or self._max:
minmax_clause, minmax_params = self._build_minmax_clause(
"", (), self._min, self._max
)
# Remove leading " WHERE" if present and add our own
minmax_clause = minmax_clause.lstrip().lstrip("WHERE").lstrip()
cmd = f"{cmd} WHERE {minmax_clause}"
params = minmax_params
else:
params = ()
# Append sort and pagination
cmd = f"{cmd}{sort_clause}{pagination_clause}"
# Track which parts were handled by SQL
if sort_clause:
self._sql_handled_sort = True
if pagination_clause:
self._sql_handled_pagination = True
# Add comment if specified
if self._comment:
safe_comment = (
self._comment.replace("--", "- -")
.replace("/*", "/ *")
.replace("*/", "* /")
)
cmd = f"/* {safe_comment} */ {cmd}"
db_cursor = self._collection.db.execute(cmd, params)
apply = partial(self._query_helpers._apply_query, self._filter)
while True:
rows = db_cursor.fetchmany(self._batch_size)
if not rows:
break
for doc in self._load_documents(rows):
if apply(doc):
yield doc
[docs]
def _handle_expr_query(self) -> Iterable[dict[str, Any]]:
"""
Handle $expr queries with SQL evaluation when possible, Python fallback otherwise.
This method uses the query helper's _build_expr_where_clause to attempt SQL
evaluation (Tiers 1 and 2), and falls back to Python evaluation (Tier 3) if needed.
"""
from .expr_evaluator import ExprEvaluator
# Try to build SQL query/WHERE clause with query helper
# This handles Tier 1 (WHERE) and Tier 2 (Full SELECT with temp tables)
expr_result = self._query_helpers._build_expr_where_clause(self._filter)
if expr_result is not None:
# Use SQL-based evaluation
sql_or_where, params, tables = expr_result
# Track tables for cleanup
if tables:
self._tables_to_cleanup.extend(tables)
if sql_or_where.strip().upper().startswith("SELECT"):
# Tier 2: full SELECT query with temp tables
cmd = sql_or_where
pagination_clause = ""
# SQL sorting and pagination handled in the Tier 2 query itself
# (currently Tier 2 doesn't handle them yet, but we mark them
# as NOT handled to let the Python Cursor handle them)
self._sql_handled_sort = False
self._sql_handled_pagination = False
else:
# Tier 1: WHERE clause only
where_clause = sql_or_where
# Build sorting and pagination clauses for SQL
sort_clause = self._query_helpers._build_sort_clause(
self._sort, self._collation
)
# If we have a where predicate (Python filter), we CANNOT do SQL pagination
if self._where_predicate:
pagination_clause = ""
else:
pagination_clause = (
self._query_helpers._build_pagination_clause(
self._limit, self._skip
)
)
jsonb = self._collection.query_engine._jsonb_supported
cmd = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name} {where_clause}{sort_clause}{pagination_clause}"
)
# Track which parts were handled by SQL
if sort_clause:
self._sql_handled_sort = True
if pagination_clause:
self._sql_handled_pagination = True
# Add comment if specified
if self._comment:
safe_comment = (
self._comment.replace("/*", "")
.replace("*/", "")
.replace("--", "")
)
cmd = f"/* {safe_comment} */ {cmd}"
db_cursor = self._collection.db.execute(cmd, params)
# Use fetchmany for memory-efficient batch fetching
docs: list[dict[str, Any]] = []
while True:
rows = db_cursor.fetchmany(self._batch_size)
if not rows:
break
docs.extend(self._load_documents(rows))
return docs
else:
# Fallback to Python evaluation
expr = self._filter["$expr"]
# Create evaluator with database connection for JSONB support detection
evaluator = ExprEvaluator(db_connection=self._collection.db)
# Build sorting clause for SQL even in fallback
sort_clause = self._query_helpers._build_sort_clause(
self._sort, self._collation
)
# Cannot do SQL pagination for expression fallback
pagination_clause = ""
# Get all documents
jsonb = self._collection.query_engine._jsonb_supported
cmd = (
f"SELECT id, _id, {json_data_column(jsonb)} as data "
f"FROM {self._collection.name}{sort_clause}{pagination_clause}"
)
# Track which parts were handled by SQL
if sort_clause:
self._sql_handled_sort = True
if pagination_clause:
self._sql_handled_pagination = True
# Add comment if specified
if self._comment:
safe_comment = (
self._comment.replace("/*", "")
.replace("*/", "")
.replace("--", "")
)
cmd = f"/* {safe_comment} */ {cmd}"
db_cursor = self._collection.db.execute(cmd)
# Use fetchmany for memory-efficient batch fetching
all_docs: list[dict[str, Any]] = []
while True:
rows = db_cursor.fetchmany(self._batch_size)
if not rows:
break
all_docs.extend(self._load_documents(rows))
# Filter documents using $expr Python evaluation
def expr_filter(doc: dict[str, Any]) -> bool:
"""
Evaluate the expression for a given document.
Args:
doc: The document to evaluate.
Returns:
True if the expression evaluates to True, False otherwise.
"""
try:
return evaluator.evaluate_python(expr, doc)
except Exception as e:
logger.warning(f"$expr evaluation failed for document: {e}")
return False
return filter(expr_filter, all_docs) # type: ignore[arg-type]
[docs]
def _build_minmax_clause(
self,
where_clause: str,
params: tuple,
min_spec: dict[str, Any] | None,
max_spec: dict[str, Any] | None,
) -> tuple:
"""
Build SQL clause for min/max index bounds.
Args:
where_clause: Existing WHERE clause (may be empty)
params: Existing parameters
min_spec: Minimum bound specification
max_spec: Maximum bound specification
Returns:
Tuple of (new WHERE clause, new parameters)
"""
additional_conditions = []
additional_params = list(params)
jsonb = self._collection.query_engine._jsonb_supported
json_func = _get_json_function_prefix(jsonb)
# Add minimum bounds
if min_spec:
for field, value in min_spec.items():
json_path = parse_json_path(field)
additional_conditions.append(
f"{json_func}_extract(data, '{json_path}') >= ?"
)
additional_params.append(value)
# Add maximum bounds (strict less than for max)
if max_spec:
for field, value in max_spec.items():
json_path = parse_json_path(field)
additional_conditions.append(
f"{json_func}_extract(data, '{json_path}') < ?"
)
additional_params.append(value)
if additional_conditions:
conditions_sql = " AND ".join(additional_conditions)
# If there's an existing WHERE clause, append to it
if where_clause and where_clause.strip():
base_clause = where_clause.rstrip()
new_clause = f"{base_clause} AND {conditions_sql}"
else:
# No existing WHERE, create new one
new_clause = f"WHERE {conditions_sql}"
return new_clause, tuple(additional_params)
return where_clause, params
[docs]
def _get_collate_clause(self) -> str:
"""
Get the SQL COLLATE clause based on collation settings.
Returns:
str: COLLATE clause or empty string if no collation is set
Note:
Maps MongoDB collation locales to SQLite collations:
- Case-insensitive locales → NOCASE
- Default/unknown → BINARY (case-sensitive)
Custom collations can be registered via Connection tokenizers.
Note: COLLATE is applied to ORDER BY clauses, not WHERE clauses.
For WHERE clause string comparisons, use _apply_collation_to_expr().
"""
if not self._collation:
return ""
strength = self._collation.get("strength", 3)
case_level = self._collation.get("caseLevel", False)
# Determine collation based on settings
# Strength 1-2: Ignore case/diacritics → NOCASE
# Strength 3+: Respect case → BINARY (default)
if strength <= 2 or not case_level:
# Case-insensitive comparison
return " COLLATE NOCASE"
else:
# Case-sensitive comparison (default SQLite behavior)
return ""
[docs]
def _contains_datetime_operations(self, query: dict[str, Any]) -> bool:
"""
Check if a query contains datetime operations that should use the datetime processor.
Args:
query: MongoDB-style query dictionary
Returns:
True if query contains datetime operations, False otherwise
"""
# Quick check for obvious datetime patterns in query
if not isinstance(query, dict):
return False
for field, value in query.items():
if field in ("$and", "$or", "$nor"):
if isinstance(value, list):
for condition in value:
if isinstance(
condition, dict
) and self._contains_datetime_operations(condition):
return True
elif field == "$not":
if isinstance(
value, dict
) and self._contains_datetime_operations(value):
return True
elif isinstance(value, dict):
# Check for datetime-related operators
for operator, op_value in value.items():
if operator in ("$gte", "$gt", "$lte", "$lt", "$eq", "$ne"):
# Check if the value is a datetime object or datetime string
if self._is_datetime_value(op_value):
return True
elif operator in ("$in", "$nin"):
# For $in and $nin, check if any value in the list is a datetime
if isinstance(op_value, list):
if any(
self._is_datetime_value(item)
for item in op_value
):
return True
elif operator == "$type":
# Check if looking for date type
if op_value in (
9,
"date",
"Date",
): # 9 is date type in MongoDB
return True
elif operator == "$regex":
# Check if it's a datetime regex pattern
if self._is_datetime_regex(op_value):
return True
return False
[docs]
def _is_datetime_value(self, value: Any) -> bool:
"""
Check if a value is a datetime object or datetime string.
Args:
value: Value to check
Returns:
True if value is datetime-related, False otherwise
"""
from .datetime_utils import is_datetime_value
return is_datetime_value(value)
[docs]
def _is_datetime_regex(self, pattern: str) -> bool:
"""
Check if a regex pattern is likely to be for datetime matching.
Args:
pattern: Regex pattern string
Returns:
True if pattern is likely datetime-related, False otherwise
"""
from .datetime_utils import is_datetime_regex
return is_datetime_regex(pattern)
[docs]
def _load_documents(self, rows) -> Iterable[dict[str, Any]]:
"""
Load documents from rows returned by the database query, including handling both id and _id.
Args:
rows: Database result rows containing id, _id, and data
Returns:
Iterable[dict[str, Any]]: An iterable of loaded documents
"""
for row in rows:
id_val, stored_id_val, data_val = row
# Use the collection's _load method which now handles both id and _id
doc = self._collection._load_with_stored_id(
id_val, data_val, stored_id_val
)
yield doc
[docs]
def _apply_sorting(
self, docs: Iterable[dict[str, Any]]
) -> list[dict[str, Any]]:
"""
Sort the documents based on the specified sorting criteria.
Args:
docs (Iterable[dict[str, Any]]): The iterable of documents to sort.
Returns:
list[dict[str, Any]]: A list of dictionaries representing the documents
sorted by the specified criteria.
"""
if not self._sort:
return list(docs)
sort_keys = list(self._sort.keys())
sort_keys.reverse()
sorted_docs = list(docs)
# Get collation settings for case-insensitive sorting
case_insensitive = False
if self._collation:
strength = self._collation.get("strength", 3)
case_level = self._collation.get("caseLevel", False)
# Strength 1-2 means case-insensitive
if strength <= 2 or not case_level:
case_insensitive = True
for key in sort_keys:
get_val = partial(self._collection._get_val, key=key)
reverse = self._sort[key] == DESCENDING
if case_insensitive:
# Use case-insensitive sorting
def make_key(get_val=get_val):
"""
Create a case-insensitive key function.
"""
def key_func(doc):
"""
Extract value and convert to lowercase if it's a string.
"""
val = get_val(doc)
if isinstance(val, str):
return val.lower()
return val
return key_func
sorted_docs.sort(key=make_key(), reverse=reverse)
else:
sorted_docs.sort(key=get_val, reverse=reverse)
return sorted_docs
[docs]
def _apply_projection(
self, docs: Iterable[dict[str, Any]]
) -> list[dict[str, Any]]:
"""
Apply projection to the documents.
Args:
docs (Iterable[dict[str, Any]]): The iterable of documents to apply projection to.
Returns:
list[dict[str, Any]]: A list of dictionaries representing the documents
after applying the projection.
"""
project = partial(
self._query_helpers._apply_projection, self._projection
)
return list(map(project, docs))
[docs]
def close(self) -> None:
"""
Close the cursor and release any resources.
"""
if self._closed:
return
self._cleanup_tables()
self._closed = True
def __del__(self) -> None:
"""
Ensure resources are cleaned up when the cursor is garbage collected.
"""
try:
self.close()
except Exception as e:
if logger is not None:
logger.debug(f"{e=}")
pass
def __enter__(self) -> Cursor:
"""
Enter the context manager.
Returns:
Cursor: The cursor itself.
"""
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""
Exit the context manager and close the cursor.
Args:
exc_type: The exception type if an exception was raised.
exc_val: The exception value if an exception was raised.
exc_tb: The exception traceback if an exception was raised.
"""
self.close()
[docs]
def _cleanup_tables(self) -> None:
"""
Drop any temporary tables created for this cursor.
"""
if not self._tables_to_cleanup:
return
for table in self._tables_to_cleanup:
try:
# Use a new connection or the shared connection to drop tables
self._collection.db.execute(f"DROP TABLE IF EXISTS {table}")
except Exception as e:
logger.debug(
f"Failed to drop temporary table '{table}' during cleanup: {e}"
)
pass
self._tables_to_cleanup = []