Source code for neosqlite.collection.query_engine.find_operations

"""Find operations for the QueryEngine."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from ..client_session import ClientSession

from ...sql_utils import quote_table_name
from ..cursor import Cursor
from ..jsonb_support import json_data_column

# Import feature detection
from ..query_helper import _supports_returning_clause, get_force_fallback
from ..raw_batch_cursor import RawBatchCursor
from ..type_utils import validate_session
from .base import QueryEngineProtocol


[docs] class FindOperationsMixin(QueryEngineProtocol): """Mixin class providing find operations for QueryEngine."""
[docs] def find( self, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, ) -> Cursor: """ Query the database and retrieve documents matching the provided filter. Args: filter (dict[str, Any] | None): A dictionary specifying the query criteria. projection (dict[str, Any] | None): A dictionary specifying which fields to return. hint (str | None): A string specifying the index to use. session (ClientSession, optional): A ClientSession for transactions. Returns: Cursor: A cursor object to iterate over the results. """ validate_session(session, self.collection._database) # Apply ID type normalization to handle cases where users query 'id' with ObjectId if filter is not None: filter = self.helpers._normalize_id_query(filter) return Cursor( self.collection, filter, projection, hint, session=session )
[docs] def find_raw_batches( self, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, batch_size: int = 100, session: ClientSession | None = None, ) -> RawBatchCursor: """ Query the database and retrieve batches of raw JSON. Similar to the :meth:`find` method but returns a :class:`~neosqlite.raw_batch_cursor.RawBatchCursor`. This method returns raw JSON batches which can be more efficient for certain use cases where you want to process data in batches rather than individual documents. Args: filter (dict[str, Any] | None): A dictionary specifying the query criteria. projection (dict[str, Any] | None): A dictionary specifying which fields to return. hint (str | None): A string specifying the index to use. batch_size (int): The number of documents to include in each batch. session (ClientSession, optional): A ClientSession for transactions. Returns: RawBatchCursor instance. """ validate_session(session, self.collection._database) return RawBatchCursor( self.collection, filter, projection, hint, batch_size, session=session, )
[docs] def find_one( self, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, ) -> dict[str, Any] | None: """ Find a single document matching the filter. Args: filter (dict[str, Any]): A dictionary specifying the filter conditions. projection (dict[str, Any]): A dictionary specifying which fields to return. hint (str): A string specifying the index to use (not used in SQLite). session (ClientSession, optional): A ClientSession for transactions. Returns: dict[str, Any]: A dictionary representing the found document, or None if no document matches. """ validate_session(session, self.collection._database) # Apply ID type normalization to handle cases where users query 'id' with ObjectId if filter is not None: filter = self.helpers._normalize_id_query(filter) try: return next( iter( self.find(filter, projection, hint, session=session).limit( 1 ) ) ) except StopIteration: return None
[docs] def find_one_and_delete( self, filter: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, session: ClientSession | None = None, **kwargs: Any, ) -> dict[str, Any] | None: """ Find a single document and delete it. Args: filter (dict[str, Any]): A dictionary specifying the filter criteria. projection (dict[str, Any]): A dictionary specifying which fields to return. sort (list[tuple[str, int]]): A list of (key, direction) pairs for sorting. session (ClientSession, optional): A ClientSession for transactions. **kwargs: Additional keyword arguments. Returns: dict[str, Any] | None: The document before it was deleted, or None if not found. """ validate_session(session, self.collection._database) # Apply ID type normalization to handle cases where users query 'id' with ObjectId filter = self.helpers._normalize_id_query(filter) # Check if RETURNING clause is supported (Tier-1 optimization) use_returning = ( _supports_returning_clause() and not get_force_fallback() ) # Build sorting clause order_by = "" if sort: order_by = self.helpers._build_sort_clause(dict(sort)) # Use direct query to get integer ID for the delete operation where_clause, params = self.sql_translator.translate_match(filter) if not where_clause: # Fallback: use Python-based approach for complex queries cursor = self.find(filter, projection, session=session) if sort: cursor.sort(sort) try: doc = next(iter(cursor.limit(1))) if doc: int_doc_id = self._get_integer_id_for_oid(doc["_id"]) self.helpers._internal_delete(int_doc_id) return doc except StopIteration: pass return None if use_returning: # Tier-1: Use RETURNING clause for atomic find-and-delete # Note: SQLite doesn't support LIMIT with DELETE RETURNING directly # We need to use a subquery approach jsonb = self._jsonb_supported data_col = json_data_column(jsonb) cmd = ( f"DELETE FROM {quote_table_name(self.collection.name)} " f"WHERE id = (SELECT id FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1) " f"RETURNING id, _id, {data_col} as data" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row return self.collection._load_with_stored_id( int_id, data, stored_id ) return None else: # Tier-1 (Fallback): Two-step process (SELECT then DELETE) # Used when RETURNING clause is not supported (SQLite < 3.35.0) jsonb = self._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row doc = self.collection._load_with_stored_id( int_id, data, stored_id ) self.helpers._internal_delete(int_id) return doc return None
[docs] def find_one_and_replace( self, filter: dict[str, Any], replacement: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, session: ClientSession | None = None, **kwargs: Any, ) -> dict[str, Any] | None: """ Find a single document and replace it. Args: filter (dict[str, Any]): A dictionary specifying the filter criteria. replacement (dict[str, Any]): The replacement document. projection (dict[str, Any]): A dictionary specifying which fields to return. sort (list[tuple[str, int]]): A list of (key, direction) pairs for sorting. upsert (bool): If True, perform an upsert if no document matches. return_document (bool): If True, return the updated document. session (ClientSession, optional): A ClientSession for transactions. **kwargs: Additional keyword arguments. Returns: dict[str, Any] | None: The document before or after replacement, or None if not found. """ validate_session(session, self.collection._database) # Apply ID type normalization to handle cases where users query 'id' with ObjectId filter = self.helpers._normalize_id_query(filter) # Check if RETURNING clause is supported (Tier-1 optimization) use_returning = ( _supports_returning_clause() and not get_force_fallback() ) # Build sorting clause order_by = "" if sort: order_by = self.helpers._build_sort_clause(dict(sort)) # Find document and get its integer ID for the replace operation where_clause, params = self.sql_translator.translate_match(filter) if not where_clause: # Fallback: use Python-based approach for complex queries cursor = self.find(filter, projection, session=session) if sort: cursor.sort(sort) try: doc = next(iter(cursor.limit(1))) if doc: int_doc_id = self._get_integer_id_for_oid(doc["_id"]) self.helpers._internal_replace(int_doc_id, replacement) if return_document: return self.find_one( {"_id": doc["_id"]}, projection, session=session ) return doc except StopIteration: pass if upsert: res = self.insert_one(replacement, session=session) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None if use_returning: # Tier-1: Use RETURNING clause for atomic find-and-replace jsonb = self._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row original_doc = self.collection._load_with_stored_id( int_id, data, stored_id ) # Perform the replace self.helpers._internal_replace(int_id, replacement) if return_document: # Use RETURNING to get the updated document jsonb = self._jsonb_supported data_col = json_data_column(jsonb) update_cmd = ( f"UPDATE {quote_table_name(self.collection.name)} " f"SET data = ? WHERE id = ? " f"RETURNING id, _id, {data_col} as data" ) from ..json_helpers import neosqlite_json_dumps update_cursor = self.collection.db.execute( update_cmd, (neosqlite_json_dumps(replacement), int_id) ) update_row = update_cursor.fetchone() if update_row: return self.collection._load_with_stored_id( update_row[0], update_row[2], update_row[1] ) return original_doc # No document found, handle upsert if upsert: res = self.insert_one(replacement, session=session) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None else: # Tier-1 (Fallback): Two-step process # Used when RETURNING clause is not supported (SQLite < 3.35.0) jsonb = self._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row original_doc = self.collection._load_with_stored_id( int_id, data, stored_id ) self.helpers._internal_replace(int_id, replacement) if return_document: return self.find_one( {"_id": original_doc["_id"]}, projection, session=session, ) return original_doc if upsert: res = self.insert_one(replacement, session=session) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None
[docs] def find_one_and_update( self, filter: dict[str, Any], update: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None, **kwargs: Any, ) -> dict[str, Any] | None: """ Find and update a single document. Args: filter (dict[str, Any]): A dictionary specifying the filter criteria. update (dict[str, Any]): A dictionary specifying the update operations. projection (dict[str, Any]): A dictionary specifying which fields to return. sort (list[tuple[str, int]]): A list of (key, direction) pairs for sorting. upsert (bool): If True, perform an upsert if no document matches. return_document (bool): If True, return the updated document. array_filters (list[dict[str, Any]]): Filters for array updates. session (ClientSession, optional): A ClientSession for transactions. **kwargs: Additional keyword arguments. Returns: dict[str, Any] | None: The original document (before update), or None if no document was found and updated. """ validate_session(session, self.collection._database) # Apply ID type normalization to handle cases where users query 'id' with ObjectId filter = self.helpers._normalize_id_query(filter) # Check if RETURNING clause is supported (Tier-1 optimization) use_returning = ( _supports_returning_clause() and not get_force_fallback() ) # Build sorting clause order_by = "" if sort: order_by = self.helpers._build_sort_clause(dict(sort)) # Find document and get its integer ID for the update operation where_clause, params = self.sql_translator.translate_match(filter) if not where_clause: # Fallback: use Python-based approach for complex queries cursor = self.find(filter, projection, session=session) if sort: cursor.sort(sort) try: doc = next(iter(cursor.limit(1))) if doc: int_doc_id = self._get_integer_id_for_oid(doc["_id"]) # Load the document for the update processing doc_to_update = self.find_one( {"_id": doc["_id"]}, session=session ) if doc_to_update is not None: self.helpers._internal_update( int_doc_id, update, doc_to_update, array_filters, filter, ) if return_document: return self.find_one( {"_id": doc["_id"]}, projection, session=session ) return doc except StopIteration: pass if upsert: # Basic upsert logic new_doc = dict(filter) res = self.insert_one(new_doc, session=session) self.update_one( {"_id": res.inserted_id}, update, array_filters=array_filters, session=session, ) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None if use_returning: # Tier-1: Use RETURNING clause for atomic find-and-update jsonb = self._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row original_doc = self.collection._load_with_stored_id( int_id, data, stored_id ) # Perform the update self.helpers._internal_update( int_id, update, original_doc, array_filters, filter ) if return_document: # Fetch the updated document return self.find_one( {"_id": original_doc["_id"]}, projection, session=session, ) return original_doc # No document found, handle upsert if upsert: new_doc = dict(filter) res = self.insert_one(new_doc, session=session) self.update_one( {"_id": res.inserted_id}, update, array_filters=array_filters, session=session, ) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None else: # Tier-1 (Fallback): Two-step process # Used when RETURNING clause is not supported (SQLite < 3.35.0) jsonb = self._jsonb_supported cmd = ( f"SELECT id, _id, {json_data_column(jsonb)} as data " f"FROM {quote_table_name(self.collection.name)} " f"{where_clause} {order_by} LIMIT 1" ) cursor = self.collection.db.execute(cmd, params) if row := cursor.fetchone(): int_id, stored_id, data = row original_doc = self.collection._load_with_stored_id( int_id, data, stored_id ) self.helpers._internal_update( int_id, update, original_doc, array_filters, filter ) if return_document: return self.find_one( {"_id": original_doc["_id"]}, projection, session=session, ) return original_doc if upsert: # Basic upsert logic new_doc = dict(filter) res = self.insert_one(new_doc, session=session) self.update_one( {"_id": res.inserted_id}, update, array_filters=array_filters, session=session, ) if return_document: return self.find_one( {"_id": res.inserted_id}, projection, session=session ) return None return None