from __future__ import annotations
import logging
import warnings
from typing import TYPE_CHECKING, Any, Literal, overload
from neosqlite.collection.json_helpers import neosqlite_json_loads
from .._sqlite import sqlite3
from ..bulk_operations import BulkOperationExecutor
from ..changestream import ChangeStream
from ..index_model import IndexModel
from ..objectid import ObjectId
from ..results import (
BulkWriteResult,
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
from ..sql_utils import quote_table_name
from .aggregation_cursor import AggregationCursor
from .cursor import Cursor
from .index_manager import IndexManager
from .query_engine import QueryEngine
from .raw_batch_cursor import RawBatchCursor
from .schema_utils import (
create_unique_index_on_id,
get_table_info,
)
from .type_utils import validate_session
if TYPE_CHECKING:
from ..client_session import ClientSession
from ..connection import Connection
logger = logging.getLogger(__name__)
[docs]
class Collection:
"""
Provides a class representing a collection in a SQLite database.
This class encapsulates operations on a collection such as inserting,
updating, deleting, and querying documents.
"""
[docs]
def __init__(
self,
db: sqlite3.Connection,
name: str,
create: bool = True,
database=None,
**kwargs: Any,
):
"""
Initialize a new collection object.
Args:
db: Database object to which the collection belongs.
name: Name of the collection.
create: Whether to create the collection table if it doesn't exist.
database: Database object that contains this collection.
**kwargs: Additional options for collection creation.
"""
self.db = db
self.name = name
self._database = database
self.indexes = IndexManager(self)
self.query_engine = QueryEngine(self)
self._options = kwargs
if create:
self.create(**kwargs)
[docs]
def cleanup(self) -> None:
"""Clean up resources used by the collection."""
if hasattr(self, "query_engine"):
self.query_engine.cleanup()
# --- Collection helper methods ---
[docs]
def _load(
self, id: int, data: str | bytes, stored_id: Any = None
) -> dict[str, Any]:
"""
Deserialize and load a document from its ID and JSON data.
Deserialize the JSON string or bytes back into a Python dictionary,
add the document ID to it, and return the document.
Args:
id (int): The document ID.
data (str | bytes): The JSON string or bytes representing the document.
stored_id (Any, optional): The stored _id value if already retrieved.
Returns:
dict[str, Any]: The deserialized document with the _id field added.
"""
if isinstance(data, bytes):
data = data.decode("utf-8")
document: dict[str, Any] = neosqlite_json_loads(data)
# If stored_id is provided, parse it. Otherwise look it up or use the auto-increment id
final_id = (
self._parse_stored_id(stored_id) if stored_id is not None else None
)
if final_id is None:
final_id = self._get_stored_id(id)
document["_id"] = final_id if final_id is not None else id
return document
[docs]
def _parse_stored_id(self, stored_id: Any) -> Any:
"""
Parse a value retrieved from the _id column into its appropriate Python type.
Args:
stored_id: The raw value from the _id column.
Returns:
Any: The parsed value (e.g., ObjectId, int, str, or None).
"""
match stored_id:
case None:
return None
case str() as s if len(s) == 24:
try:
return ObjectId(s)
except (ValueError, ImportError) as e:
logger.debug(
f"Failed to parse stored _id value '{s}' as ObjectId: {e}"
)
return s
case str() as s if (s.startswith("{") and s.endswith("}")) or (
s.startswith("[") and s.endswith("]")
):
try:
return neosqlite_json_loads(s)
except Exception as e:
logger.debug(
f"Failed to parse JSON string in _get_id_value: {e}"
)
return s
case _:
return stored_id
[docs]
def _load_with_stored_id(
self, id_val: int, data: str | bytes, stored_id_val
) -> dict[str, Any]:
"""
Deserialize and load a document with the stored _id value.
Args:
id_val (int): The auto-increment document ID.
data (str | bytes): The JSON string or bytes representing the document.
stored_id_val: The stored _id value from the _id column.
Returns:
dict[str, Any]: The deserialized document with the _id field added.
"""
try:
if isinstance(data, bytes):
data = data.decode("utf-8")
document: dict[str, Any] = neosqlite_json_loads(data)
except (UnicodeDecodeError, ValueError, TypeError) as e:
logger.warning(f"Skipping corrupted document (id={id_val}): {e}")
# Return a minimal document with just the _id to allow processing to continue
# The document won't match most filters anyway
_id = self._resolve_stored_id(stored_id_val, id_val)
return {"_id": _id, "__neosqlite_corrupted__": True}
# Use the stored _id value if available, otherwise fall back to the auto-increment id
_id = self._resolve_stored_id(stored_id_val, id_val)
document["_id"] = _id
return document
[docs]
def _resolve_stored_id(
self, stored_id_val: Any, fallback_id: int
) -> ObjectId | Any:
"""
Resolve the stored _id value, attempting to parse as ObjectId.
Args:
stored_id_val: The stored _id value from the _id column.
fallback_id: Fallback auto-increment ID if stored_id_val is None.
Returns:
ObjectId or the original stored_id_val, or fallback_id.
"""
if stored_id_val is not None:
# Try to decode as ObjectId if it looks like one
if isinstance(stored_id_val, str) and len(stored_id_val) == 24:
try:
return ObjectId(stored_id_val)
except ValueError as e:
logger.debug(
f"Failed to parse stored _id value '{stored_id_val}' as ObjectId: {e}"
)
return stored_id_val
else:
return stored_id_val
else:
# Fallback to the auto-increment ID for backward compatibility
return fallback_id
[docs]
def _get_stored_id(self, doc_id: int) -> ObjectId | int | str | None:
"""
Retrieve the stored _id for a document from the _id column.
Args:
doc_id (int): The document ID.
Returns:
ObjectId | int | None: The stored _id value, or None if the column doesn't exist yet.
"""
try:
# Check if the _id column exists
cursor = self.db.execute(
"SELECT name FROM pragma_table_info(?) WHERE name = '_id'",
(self.name,),
)
column_exists = cursor.fetchone() is not None
if column_exists:
cursor = self.db.execute(
f"SELECT _id FROM {quote_table_name(self.name)} WHERE id = ?",
(doc_id,),
)
if (row := cursor.fetchone()) and row[0] is not None:
return self._parse_stored_id(row[0])
else:
# If no row is found or row[0] is None, return None
return None
else:
# For backward compatibility, if _id column doesn't exist, return the original ID
return doc_id
except Exception as e:
# If there's any error retrieving the _id, return None
logger.debug(
f"Error in _get_stored_id for collection '{self.name}': {e}"
)
return None
[docs]
def _get_val(self, item: dict[str, Any], key: Any) -> Any:
"""
Retrieves a value from a dictionary using a key, handling nested keys and
optional prefixes.
Args:
item (dict[str, Any]): The dictionary to search.
key (Any): The key to retrieve. If a string, may include nested keys
separated by dots or be prefixed with '$'. If non-string,
returns the key itself (for literal values like $group _id).
Returns:
Any: The value associated with the key, or None if the key is not found.
"""
if not isinstance(key, str):
return key
if key.startswith("$"):
key = key[1:]
val: Any = item
for k in key.split("."):
if val is None:
return None
val = val.get(k)
return val
[docs]
def _set_val(self, item: dict[str, Any], key: str, value: Any) -> None:
"""
Sets a value in a dictionary using a key, handling nested keys and
optional prefixes.
Args:
item (dict[str, Any]): The dictionary to modify.
key (str): The key to set, may include nested keys separated by dots
or may be prefixed with \'$.
value (Any): The value to set.
"""
if key.startswith("$"):
key = key[1:]
keys = key.split(".")
current = item
# Navigate to the parent of the target key
for k in keys[:-1]:
if k not in current or not isinstance(current[k], dict):
current[k] = {}
current = current[k]
# Set the value at the target key
current[keys[-1]] = value
# --- Collection methods ---
[docs]
def create(self, **kwargs: Any):
"""
Initialize the collection table if it does not exist.
This method creates a table with an 'id' column, a '_id' column for
ObjectId storage, and a 'data' column for storing JSON data.
If the JSONB data type is supported, it will be used,
otherwise, TEXT data type will be used.
"""
validator = kwargs.get("validator")
check_clause = ""
if validator and "$jsonSchema" in validator:
from .query_helper.schema_compiler import compile_schema_to_sql
schema_sql = compile_schema_to_sql(
validator["$jsonSchema"],
jsonb=self.query_engine._jsonb_supported,
)
if schema_sql and schema_sql != "1":
check_clause = f", CHECK ({schema_sql})"
# Use the QueryEngine's cached JSONB support flag
if self.query_engine._jsonb_supported:
self.db.execute(f"""
CREATE TABLE IF NOT EXISTS {quote_table_name(self.name)} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_id JSONB,
data JSONB NOT NULL
{check_clause}
)""")
else:
self.db.execute(f"""
CREATE TABLE IF NOT EXISTS {quote_table_name(self.name)} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_id TEXT,
data TEXT NOT NULL
{check_clause}
)
""")
# Create unique index on _id column for faster lookups
create_unique_index_on_id(self.db, self.name)
# Add the _id column if it doesn't exist (for backward compatibility)
self._ensure_id_column_exists()
[docs]
def _ensure_id_column_exists(self):
"""
Ensure that the _id column exists in the collection table for backward compatibility.
"""
try:
# Check if _id column exists using PRAGMA table_info
cursor = self.db.execute(
"SELECT name FROM pragma_table_info(?) WHERE name = '_id'",
(self.name,),
)
column_exists = cursor.fetchone() is not None
if not column_exists:
# Add the _id column using the same type as the data column
if self.query_engine._jsonb_supported:
self.db.execute(
f"ALTER TABLE {quote_table_name(self.name)} ADD COLUMN _id JSONB"
)
else:
self.db.execute(
f"ALTER TABLE {quote_table_name(self.name)} ADD COLUMN _id TEXT"
)
# Create unique index on _id column for faster lookups
create_unique_index_on_id(self.db, self.name)
except Exception as e:
# If we can't add the column, continue without it (for backward compatibility)
logger.debug(
f"Failed to add _id column to collection '{self.name}': {e}"
)
pass
def __getattr__(self, name: str) -> Any:
"""
Support GridFS-style nested access like collection.files, collection.chunks.
For GridFS compatibility, allow access to sub-collections like 'files' and 'chunks'
under a bucket name (e.g., db.fs.files).
"""
if name in ("files", "chunks"):
full_name = f"{quote_table_name(self.name)}_{name}"
return Collection(self.db, full_name, database=self._database)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'"
)
[docs]
def rename(self, new_name: str) -> None:
"""
Renames the collection to the specified new name.
If the new name is the same as the current name, does nothing.
Checks if a table with the new name exists and raises an error if it does.
Renames the underlying table and updates the collection's name.
Args:
new_name (str): The new name for the collection.
Raises:
sqlite3.Error: If a collection with the new name already exists.
"""
# If the new name is the same as the current name, do nothing
if new_name == self.name:
return
# Check if a collection with the new name already exists
if self._object_exists(type_="table", name=new_name):
raise sqlite3.Error(f"Collection '{new_name}' already exists")
# Rename the table
self.db.execute(
f"ALTER TABLE {quote_table_name(self.name)} RENAME TO {quote_table_name(new_name)}"
)
# Update the collection name
self.name = new_name
[docs]
def options(self) -> dict[str, Any]:
"""
Retrieves options set on this collection.
Returns:
dict: A dictionary containing various options for the collection,
including the table's name, columns, indexes, and count of documents.
"""
# For SQLite, we can provide information about the table structure
options: dict[str, Any] = {
"name": self.name,
}
# Get table information
try:
table_info = get_table_info(self.db, self.name)
options["columns"] = table_info["columns"]
options["indexes"] = table_info["indexes"]
# Get row count
if count_row := self.db.execute(
f"SELECT COUNT(*) FROM {quote_table_name(self.name)}"
).fetchone():
options["count"] = (
int(count_row[0]) if count_row[0] is not None else 0
)
else:
options["count"] = 0
# Add PyMongo compatibility options
options["codec_options"] = self.codec_options
options["read_preference"] = self.read_preference
options["write_concern"] = self.write_concern
options["read_concern"] = self.read_concern
return options
except sqlite3.Error as e:
# If we can't get detailed information, return basic info
logger.debug(
f"Failed to get collection details for '{self.name}': {e}"
)
options["columns"] = []
options["indexes"] = []
options["count"] = 0
return options
# --- Querying methods delegated to QueryEngine ---
[docs]
def insert_one(
self, document: dict[str, Any], session: ClientSession | None = None
) -> InsertOneResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.insert_one`.
"""
return self.query_engine.insert_one(document, session=session)
[docs]
def insert_many(
self,
documents: list[dict[str, Any]],
ordered: bool = True,
session: ClientSession | None = None,
) -> InsertManyResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.insert_many`.
"""
return self.query_engine.insert_many(
documents, ordered=ordered, session=session
)
[docs]
def update_one(
self,
filter: dict[str, Any],
update: dict[str, Any],
upsert: bool = False,
array_filters: list[dict[str, Any]] | None = None,
session: ClientSession | None = None,
) -> UpdateResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.update_one`.
"""
return self.query_engine.update_one(
filter,
update,
upsert=upsert,
array_filters=array_filters,
session=session,
)
[docs]
def update_many(
self,
filter: dict[str, Any],
update: dict[str, Any],
upsert: bool = False,
array_filters: list[dict[str, Any]] | None = None,
session: ClientSession | None = None,
) -> UpdateResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.update_many`.
"""
return self.query_engine.update_many(
filter,
update,
upsert=upsert,
array_filters=array_filters,
session=session,
)
[docs]
def replace_one(
self,
filter: dict[str, Any],
replacement: dict[str, Any],
upsert: bool = False,
session: ClientSession | None = None,
) -> UpdateResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.replace_one`.
"""
return self.query_engine.replace_one(
filter, replacement, upsert=upsert, session=session
)
[docs]
def delete_one(
self, filter: dict[str, Any], session: ClientSession | None = None
) -> DeleteResult:
"""
Delete a single document.
For GridFS system collections (e.g., fs_files, fs_chunks), this method
automatically delegates to GridFSBucket.delete() to handle the different schema
and properly clean up both files and chunks.
Args:
filter: Query filter to match document to delete
session: A ClientSession for transactions.
Returns:
DeleteResult: Result of the delete operation
"""
# Check if this is a GridFS system collection
if self._is_gridfs_collection():
return self._delete_one_as_gridfs(filter)
return self.query_engine.delete_one(filter, session=session)
[docs]
def _delete_one_as_gridfs(self, filter: dict[str, Any]):
"""
Delete a single document from a GridFS system collection using GridFSBucket API.
This properly handles GridFS deletion by removing both the file document
and associated chunks.
Args:
filter: Query filter to match document to delete
Returns:
DeleteResult: Result of the delete operation
"""
from ..gridfs import GridFSBucket
from ..results import DeleteResult
# Extract bucket name from collection name
if self.name.endswith("_files"):
bucket_name = self.name.removesuffix("_files")
elif self.name.endswith("_chunks"):
bucket_name = self.name.removesuffix("_chunks")
else:
raise RuntimeError(
f"Invalid GridFS collection name: {quote_table_name(self.name)}"
)
# Find the file(s) to delete
bucket = GridFSBucket(self.db, bucket_name=bucket_name)
cursor = bucket.find(filter)
files = list(cursor)
if not files:
# No files found, nothing deleted
return DeleteResult(0)
# Delete the first matching file (and its chunks)
file_to_delete = files[0]
bucket.delete(file_to_delete._id)
# Return DeleteResult with deleted count
return DeleteResult(1)
[docs]
def delete_many(
self, filter: dict[str, Any], session: ClientSession | None = None
) -> DeleteResult:
"""
Delete multiple documents.
For GridFS system collections (e.g., fs_files, fs_chunks), this method
automatically delegates to GridFSBucket.delete() to handle the different schema
and properly clean up both files and chunks.
Args:
filter: Query filter to match documents to delete
session: A ClientSession for transactions.
Returns:
DeleteResult: Result of the delete operation
"""
# Check if this is a GridFS system collection
if self._is_gridfs_collection():
return self._delete_many_as_gridfs(filter)
return self.query_engine.delete_many(filter, session=session)
[docs]
def _delete_many_as_gridfs(self, filter: dict[str, Any]):
"""
Delete multiple documents from a GridFS system collection using GridFSBucket API.
This properly handles GridFS deletion by removing both file documents
and associated chunks.
Args:
filter: Query filter to match documents to delete
Returns:
DeleteResult: Result of the delete operation
"""
from ..gridfs import GridFSBucket
from ..results import DeleteResult
# Extract bucket name from collection name
if self.name.endswith("_files"):
bucket_name = self.name.removesuffix("_files")
elif self.name.endswith("_chunks"):
bucket_name = self.name.removesuffix("_chunks")
else:
raise RuntimeError(
f"Invalid GridFS collection name: {quote_table_name(self.name)}"
)
# Find the files to delete
bucket = GridFSBucket(self.db, bucket_name=bucket_name)
cursor = bucket.find(filter)
files = list(cursor)
if not files:
# No files found, nothing deleted
return DeleteResult(0)
# Delete all matching files (and their chunks)
deleted_count = 0
for file in files:
bucket.delete(file._id)
deleted_count += 1
# Return DeleteResult with deleted count
return DeleteResult(deleted_count)
[docs]
def find(
self,
filter: dict[str, Any] | None = None,
projection: dict[str, Any] | None = None,
hint: str | None = None,
session: ClientSession | None = None,
**kwargs: Any,
) -> Cursor:
"""
Find documents in the collection.
For GridFS system collections (e.g., fs_files, fs_chunks), this method
automatically delegates to GridFSBucket.find() to handle the different schema.
Args:
filter: Query filter
projection: Field projection (not supported for GridFS collections)
hint: Index hint (not supported for GridFS collections)
session: A ClientSession for transactions.
Returns:
Cursor or GridOutCursor: Query results
"""
# Check if this is a GridFS system collection
if self._is_gridfs_collection():
return self._find_as_gridfs(filter, session=session)
return self.query_engine.find(filter, projection, hint, session=session)
[docs]
def _is_gridfs_collection(self) -> bool:
"""
Check if this collection is a GridFS system collection.
Uses a two-step verification:
1. Check naming convention (ends with _files or _chunks)
2. Verify schema has GridFS-specific columns
Returns:
bool: True if this is a GridFS system collection
"""
# Step 1: Check naming convention
if not (self.name.endswith("_files") or self.name.endswith("_chunks")):
return False
# Step 2: Verify schema has GridFS-specific columns
# GridFS files table has: filename, length, chunkSize, uploadDate, md5, metadata
# GridFS chunks table has: files_id, n, data
try:
cursor = self.db.execute(
f"PRAGMA table_info({quote_table_name(self.name)})"
)
columns = {row[1] for row in cursor} # Column names are in index 1
if self.name.endswith("_files"):
# Check for GridFS files table columns
gridfs_columns = {
"filename",
"length",
"chunkSize",
"uploadDate",
"metadata",
}
return gridfs_columns.issubset(columns)
elif self.name.endswith("_chunks"):
# Check for GridFS chunks table columns
gridfs_columns = {"files_id", "n", "data"}
return gridfs_columns.issubset(columns)
except Exception as e:
# If we can't check schema, fall back to naming convention
logger.debug(
f"Failed to check schema for GridFS identification in '{self.name}': {e}"
)
pass
# Default to naming convention if schema check fails
return self.name.endswith("_files") or self.name.endswith("_chunks")
[docs]
def _find_as_gridfs(
self,
filter: dict[str, Any] | None = None,
session: ClientSession | None = None,
):
"""
Execute find on a GridFS system collection using GridFSBucket API.
This allows PyMongo-style access like db.fs.files.find({...}) to work
by delegating to the GridFSBucket.find() method which understands the
GridFS schema.
Args:
filter: Query filter
session: A ClientSession for transactions.
Returns:
GridOutCursor: Cursor over GridOut objects
"""
from ..gridfs import GridFSBucket
# Extract bucket name from collection name (e.g., "fs_files" -> "fs")
if self.name.endswith("_files"):
bucket_name = self.name.removesuffix("_files")
elif self.name.endswith("_chunks"):
bucket_name = self.name.removesuffix("_chunks")
else:
# Should not happen if _is_gridfs_collection() is correct
raise RuntimeError(
f"Invalid GridFS collection name: {quote_table_name(self.name)}"
)
# Create GridFSBucket and delegate find operation
bucket = GridFSBucket(self.db, bucket_name=bucket_name)
return bucket.find(filter, session=session)
[docs]
def find_raw_batches(
self,
filter: dict[str, Any] | None = None,
projection: dict[str, Any] | None = None,
hint: str | None = None,
batch_size: int = 100,
session: ClientSession | None = None,
) -> RawBatchCursor:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.find_raw_batches`.
"""
return self.query_engine.find_raw_batches(
filter, projection, hint, batch_size, session=session
)
[docs]
def find_one(
self,
filter: dict[str, Any] | None = None,
projection: dict[str, Any] | None = None,
hint: str | None = None,
session: ClientSession | None = None,
) -> dict[str, Any] | None:
"""
Find a single document.
For GridFS system collections (e.g., fs_files, fs_chunks), this method
automatically delegates to GridFSBucket.find() to handle the different schema.
Args:
filter: Query filter
projection: Field projection (not supported for GridFS collections)
hint: Index hint (not supported for GridFS collections)
session: A ClientSession for transactions.
Returns:
Dict or GridOut or None: Query result
"""
# Check if this is a GridFS system collection
if self._is_gridfs_collection():
cursor = self._find_as_gridfs(filter)
# Return first result or None
for doc in cursor:
return doc
return None
return self.query_engine.find_one(filter, projection, hint)
[docs]
def count_documents(
self, filter: dict[str, Any], session: ClientSession | None = None
) -> int:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.count_documents`.
"""
return self.query_engine.count_documents(filter, session=session)
[docs]
def estimated_document_count(
self,
options: dict[str, Any] | None = None,
session: ClientSession | None = None,
) -> int:
"""
Get an estimated count of documents in the collection.
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.estimated_document_count`.
Args:
options (dict[str, Any], optional): Options for the count operation.
Supported options (for PyMongo API compatibility):
- maxTimeMS: Maximum execution time in milliseconds (ignored in NeoSQLite)
- hint: Index to use for the count (ignored in NeoSQLite)
session: A ClientSession for transactions.
Returns:
int: Estimated number of documents in the collection
Note:
This method returns an estimate based on SQLite metadata, which is fast
but may not be exact. For an exact count, use count_documents({}).
The options parameter is accepted for PyMongo API compatibility but
most options are not applicable to SQLite.
"""
# Options are accepted for API compatibility but not used
# maxTimeMS, hint, etc. are MongoDB-specific
return self.query_engine.estimated_document_count(session=session)
[docs]
def find_one_and_delete(
self,
filter: dict[str, Any],
projection: dict[str, Any] | None = None,
sort: list[tuple[str, int]] | None = None,
session: ClientSession | None = None,
**kwargs: Any,
) -> dict[str, Any] | None:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.find_one_and_delete`.
"""
return self.query_engine.find_one_and_delete(
filter, projection=projection, sort=sort, session=session, **kwargs
)
[docs]
def find_one_and_replace(
self,
filter: dict[str, Any],
replacement: dict[str, Any],
projection: dict[str, Any] | None = None,
sort: list[tuple[str, int]] | None = None,
upsert: bool = False,
return_document: bool = False,
session: ClientSession | None = None,
**kwargs: Any,
) -> dict[str, Any] | None:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.find_one_and_replace`.
"""
return self.query_engine.find_one_and_replace(
filter,
replacement,
projection=projection,
sort=sort,
upsert=upsert,
return_document=return_document,
session=session,
**kwargs,
)
[docs]
def find_one_and_update(
self,
filter: dict[str, Any],
update: dict[str, Any],
projection: dict[str, Any] | None = None,
sort: list[tuple[str, int]] | None = None,
upsert: bool = False,
return_document: bool = False,
array_filters: list[dict[str, Any]] | None = None,
session: ClientSession | None = None,
**kwargs: Any,
) -> dict[str, Any] | None:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.find_one_and_update`.
"""
return self.query_engine.find_one_and_update(
filter,
update,
projection=projection,
sort=sort,
upsert=upsert,
return_document=return_document,
array_filters=array_filters,
session=session,
**kwargs,
)
[docs]
def aggregate(
self,
pipeline: list[dict[str, Any]],
allowDiskUse: bool | None = None,
batchSize: int | None = None,
session: ClientSession | None = None,
**kwargs: Any,
) -> AggregationCursor:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.aggregate`.
Args:
pipeline: The aggregation pipeline to execute
allowDiskUse: Ignored in NeoSQLite (kept for PyMongo compatibility)
batchSize: Batch size for results (kept for PyMongo compatibility)
session: A ClientSession for transactions.
**kwargs: Additional keyword arguments for PyMongo compatibility
Returns:
An AggregationCursor instance
"""
return AggregationCursor(
self,
pipeline,
allowDiskUse=allowDiskUse,
batchSize=batchSize,
session=session,
**kwargs,
)
[docs]
def aggregate_raw_batches(
self,
pipeline: list[dict[str, Any]],
batch_size: int = 100,
session: ClientSession | None = None,
) -> RawBatchCursor:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.aggregate_raw_batches`.
"""
return self.query_engine.aggregate_raw_batches(
pipeline, batch_size, session=session
)
[docs]
def distinct(
self,
key: str,
filter: dict[str, Any] | None = None,
session: ClientSession | None = None,
) -> list[Any]:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.distinct`.
"""
return self.query_engine.distinct(key, filter, session=session)
# --- Bulk Write methods delegated to QueryEngine ---
[docs]
def bulk_write(
self,
requests: list[Any],
ordered: bool = True,
session: ClientSession | None = None,
) -> BulkWriteResult:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.bulk_write`.
"""
return self.query_engine.bulk_write(requests, ordered, session=session)
[docs]
def initialize_ordered_bulk_op(self) -> BulkOperationExecutor:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.initialize_ordered_bulk_op`.
.. deprecated::
Use :meth:`bulk_write` instead.
"""
warnings.warn(
"initialize_ordered_bulk_op is deprecated, use bulk_write instead",
DeprecationWarning,
stacklevel=2,
)
return self.query_engine.initialize_ordered_bulk_op()
[docs]
def initialize_unordered_bulk_op(self) -> BulkOperationExecutor:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.query_engine.QueryEngine.initialize_unordered_bulk_op`.
.. deprecated::
Use :meth:`bulk_write` instead.
"""
warnings.warn(
"initialize_unordered_bulk_op is deprecated, use bulk_write instead",
DeprecationWarning,
stacklevel=2,
)
return self.query_engine.initialize_unordered_bulk_op()
# --- Indexing methods delegated to IndexManager ---
[docs]
def create_index(
self,
key: str | list[str] | list[tuple[str, int]],
reindex: bool = True,
sparse: bool = False,
unique: bool = False,
fts: bool = False,
tokenizer: str | None = None,
datetime_field: bool = False,
):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.create_index`.
"""
self.indexes.create_index(
key, reindex, sparse, unique, fts, tokenizer, datetime_field
)
[docs]
def create_search_index(
self,
key: str,
tokenizer: str | None = None,
):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.create_search_index`.
"""
return self.indexes.create_search_index(key, tokenizer)
[docs]
def create_indexes(
self,
indexes: list[IndexModel],
) -> list[str]:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.create_indexes`.
"""
return self.indexes.create_indexes(indexes)
[docs]
def create_search_indexes(
self,
indexes: list[str],
) -> list[str]:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.create_search_indexes`.
"""
return self.indexes.create_search_indexes(indexes)
[docs]
def reindex(
self,
table: str,
sparse: bool = False,
documents: list[dict[str, Any]] | None = None,
):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.reindex`.
"""
self.indexes.reindex(table, sparse, documents)
@overload
def list_indexes(self, as_keys: Literal[True]) -> list[list[str]]: ...
@overload
def list_indexes(self, as_keys: Literal[False] = False) -> list[str]: ...
[docs]
def list_indexes(
self,
as_keys: bool = False,
) -> list[str] | list[list[str]]:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.list_indexes`.
"""
# This explicit check is the key to solving the Mypy error on overloading.
if as_keys:
# Inside this block, Mypy knows 'as_keys' is Literal[True].
return self.indexes.list_indexes(as_keys)
else:
# Inside this block, Mypy knows 'as_keys' is Literal[False].
return self.indexes.list_indexes(as_keys)
[docs]
def list_search_indexes(self) -> list[str]:
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.list_search_indexes`.
"""
return self.indexes.list_search_indexes()
[docs]
def update_search_index(self, key: str, tokenizer: str | None = None):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.update_search_index`.
"""
self.indexes.update_search_index(key, tokenizer)
[docs]
def drop_index(self, index: str):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.drop_index`.
"""
self.indexes.drop_index(index)
[docs]
def drop_search_index(self, index: str):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.drop_search_index`.
"""
self.indexes.drop_search_index(index)
[docs]
def drop_indexes(self):
"""
This is a delegating method. For implementation details, see the
core logic in :meth:`~neosqlite.collection.index_manager.IndexManager.drop_indexes`.
"""
self.indexes.drop_indexes()
# --- Other methods ---
@property
def client(self) -> Connection:
"""
Get the MongoClient instance (returns the parent Connection).
Returns:
Connection: The parent connection instance.
"""
return self.database
@property
def codec_options(self) -> Any:
"""
Get the codec options for this collection.
Returns:
Any: The codec options.
"""
if hasattr(self, "_codec_options") and self._codec_options is not None:
return self._codec_options
return self.database.codec_options if self.database else None
@property
def read_preference(self) -> Any:
"""
Get the read preference for this collection.
Returns:
Any: The read preference.
"""
if (
hasattr(self, "_read_preference")
and self._read_preference is not None
):
return self._read_preference
return self.database.read_preference if self.database else None
@property
def write_concern(self) -> Any:
"""
Get the write concern for this collection.
Returns:
Any: The write concern.
"""
if hasattr(self, "_write_concern") and self._write_concern is not None:
return self._write_concern
return self.database.write_concern if self.database else None
@property
def read_concern(self) -> Any:
"""
Get the read concern for this collection.
Returns:
Any: The read concern.
"""
if hasattr(self, "_read_concern") and self._read_concern is not None:
return self._read_concern
return self.database.read_concern if self.database else None
@property
def database(self) -> Connection:
"""
Get the database that this collection is a part of.
Returns:
Connection: The connection object this collection is associated with.
"""
return self._database
@property
def db_path(self) -> str:
"""
Get the path to the database file.
Returns:
str: The database file path.
"""
return self.database.db_path if self.database else ":memory:"
@property
def full_name(self) -> str:
"""
Get the full name of the collection (database.collection).
Returns:
str: The full name of the collection
Example:
>>> db = Connection("test.db")
>>> coll = db.my_collection
>>> print(coll.full_name)
'test.my_collection'
"""
if self._database and hasattr(self._database, "name"):
return f"{self._database.name}.{quote_table_name(self.name)}"
return self.name
[docs]
def with_options(
self,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
):
"""
Get a clone of this collection with different options.
Note: NeoSQLite is a single-node database, so read_preference,
write_concern, and read_concern are stored for API compatibility
but don't affect query behavior.
Args:
codec_options: Codec options (stored for compatibility, not used)
read_preference: Read preference (stored for compatibility, not used)
write_concern: Write concern (stored for compatibility, not used)
read_concern: Read concern (stored for compatibility, not used)
Returns:
Collection: A new collection instance with the specified options
Example:
>>> coll = db.my_collection
>>> coll_with_options = coll.with_options(write_concern={"w": "majority"})
"""
# Create a new collection instance (clone)
clone = Collection(
self.db,
self.name,
create=False,
database=self._database,
)
# Store options for API compatibility
clone._codec_options = codec_options
clone._read_preference = read_preference
clone._write_concern = write_concern
clone._read_concern = read_concern
return clone
[docs]
def _object_exists(self, type_: str, name: str) -> bool:
"""
Check if an object (table or index) of a specific type and name exists within the database.
Args:
type_ (str): The type of object to check, either "table" or "index".
name (str): The name of the object to check.
Returns:
bool: True if the object exists, False otherwise.
"""
match type_:
case "table":
if row := self.db.execute(
"SELECT COUNT(1) FROM sqlite_master WHERE type = ? AND name = ?",
(type_, name.strip("[]")),
).fetchone():
return int(row[0]) > 0
return False
case "index":
# For indexes, check if it exists with our naming convention
if row := self.db.execute(
"SELECT COUNT(1) FROM sqlite_master WHERE type = ? AND name = ?",
(type_, name),
).fetchone():
return int(row[0]) > 0
return False
case _:
return False
[docs]
def drop(self):
"""
Drop the entire collection.
This method removes the collection (table) from the database. After calling
this method, the collection will no longer exist in the database.
"""
self.db.execute(f"DROP TABLE IF EXISTS {quote_table_name(self.name)}")
[docs]
def watch(
self,
pipeline: list[dict[str, Any]] | None = None,
full_document: str | None = None,
resume_after: dict[str, Any] | None = None,
max_await_time_ms: int | None = None,
batch_size: int | None = None,
collation: dict[str, Any] | None = None,
start_at_operation_time: Any | None = None,
session: ClientSession | None = None,
start_after: dict[str, Any] | None = None,
) -> ChangeStream:
"""
Monitor changes on this collection using SQLite's change tracking features.
This method creates a change stream that allows iterating over change events
generated by modifications to the collection. While SQLite doesn't natively
support change streams like MongoDB, this implementation uses triggers and
SQLite's built-in change tracking mechanisms to provide similar functionality.
Args:
pipeline (list[dict[str, Any]]): Aggregation pipeline stages to apply to change events.
full_document (str): Determines how the 'fullDocument' field is populated in change events.
resume_after (dict[str, Any]): Logical starting point for the change stream.
max_await_time_ms (int): Maximum time to wait for new documents in milliseconds.
batch_size (int): Number of documents to return per batch.
collation (dict[str, Any]): Collation settings for the operation.
start_at_operation_time (Any): Operation time to start monitoring from.
session (ClientSession): Client session for the operation.
start_after (dict[str, Any]): Logical starting point for the change stream.
Returns:
ChangeStream: A change stream object that can be iterated over to receive change events.
"""
validate_session(session, self._database)
return ChangeStream(
collection=self,
pipeline=pipeline,
full_document=full_document,
resume_after=resume_after,
max_await_time_ms=max_await_time_ms,
batch_size=batch_size,
collation=collation,
start_at_operation_time=start_at_operation_time,
session=session,
start_after=start_after,
)