neosqlite.collection package

Subpackages

Submodules

Module contents

class neosqlite.collection.Collection(db: Connection, name: str, create: bool = True, database=None, **kwargs: Any)[source]

Bases: object

Provides a class representing a collection in a SQLite database.

This class encapsulates operations on a collection such as inserting, updating, deleting, and querying documents.

__init__(db: Connection, name: str, create: bool = True, database=None, **kwargs: Any)[source]

Initialize a new collection object.

Parameters:
  • db – Database object to which the collection belongs.

  • name – Name of the collection.

  • create – Whether to create the collection table if it doesn’t exist.

  • database – Database object that contains this collection.

  • **kwargs – Additional options for collection creation.

cleanup() None[source]

Clean up resources used by the collection.

_load(id: int, data: str | bytes, stored_id: Any | None = None) dict[str, Any][source]

Deserialize and load a document from its ID and JSON data.

Deserialize the JSON string or bytes back into a Python dictionary, add the document ID to it, and return the document.

Parameters:
  • id (int) – The document ID.

  • data (str | bytes) – The JSON string or bytes representing the document.

  • stored_id (Any, optional) – The stored _id value if already retrieved.

Returns:

The deserialized document with the _id field added.

Return type:

dict[str, Any]

_parse_stored_id(stored_id: Any) Any[source]

Parse a value retrieved from the _id column into its appropriate Python type.

Parameters:

stored_id – The raw value from the _id column.

Returns:

The parsed value (e.g., ObjectId, int, str, or None).

Return type:

Any

_load_with_stored_id(id_val: int, data: str | bytes, stored_id_val) dict[str, Any][source]

Deserialize and load a document with the stored _id value.

Parameters:
  • id_val (int) – The auto-increment document ID.

  • data (str | bytes) – The JSON string or bytes representing the document.

  • stored_id_val – The stored _id value from the _id column.

Returns:

The deserialized document with the _id field added.

Return type:

dict[str, Any]

_resolve_stored_id(stored_id_val: Any, fallback_id: int) ObjectId | Any[source]

Resolve the stored _id value, attempting to parse as ObjectId.

Parameters:
  • stored_id_val – The stored _id value from the _id column.

  • fallback_id – Fallback auto-increment ID if stored_id_val is None.

Returns:

ObjectId or the original stored_id_val, or fallback_id.

_get_stored_id(doc_id: int) ObjectId | int | str | None[source]

Retrieve the stored _id for a document from the _id column.

Parameters:

doc_id (int) – The document ID.

Returns:

The stored _id value, or None if the column doesn’t exist yet.

Return type:

ObjectId | int | None

_get_val(item: dict[str, Any], key: Any) Any[source]

Retrieves a value from a dictionary using a key, handling nested keys and optional prefixes.

Parameters:
  • item (dict[str, Any]) – The dictionary to search.

  • key (Any) – The key to retrieve. If a string, may include nested keys separated by dots or be prefixed with ‘$’. If non-string, returns the key itself (for literal values like $group _id).

Returns:

The value associated with the key, or None if the key is not found.

Return type:

Any

_set_val(item: dict[str, Any], key: str, value: Any) None[source]

Sets a value in a dictionary using a key, handling nested keys and optional prefixes.

Parameters:
  • item (dict[str, Any]) – The dictionary to modify.

  • key (str) – The key to set, may include nested keys separated by dots or may be prefixed with ‘$.

  • value (Any) – The value to set.

create(**kwargs: Any)[source]

Initialize the collection table if it does not exist.

This method creates a table with an ‘id’ column, a ‘_id’ column for ObjectId storage, and a ‘data’ column for storing JSON data. If the JSONB data type is supported, it will be used, otherwise, TEXT data type will be used.

_ensure_id_column_exists()[source]

Ensure that the _id column exists in the collection table for backward compatibility.

rename(new_name: str) None[source]

Renames the collection to the specified new name. If the new name is the same as the current name, does nothing.

Checks if a table with the new name exists and raises an error if it does. Renames the underlying table and updates the collection’s name.

Parameters:

new_name (str) – The new name for the collection.

Raises:

sqlite3.Error – If a collection with the new name already exists.

options() dict[str, Any][source]

Retrieves options set on this collection.

Returns:

A dictionary containing various options for the collection,

including the table’s name, columns, indexes, and count of documents.

Return type:

dict

insert_one(document: dict[str, Any], session: ClientSession | None = None) InsertOneResult[source]

This is a delegating method. For implementation details, see the core logic in insert_one().

insert_many(documents: list[dict[str, Any]], ordered: bool = True, session: ClientSession | None = None) InsertManyResult[source]

This is a delegating method. For implementation details, see the core logic in insert_many().

update_one(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in update_one().

update_many(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in update_many().

replace_one(filter: dict[str, Any], replacement: dict[str, Any], upsert: bool = False, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in replace_one().

delete_one(filter: dict[str, Any], session: ClientSession | None = None) DeleteResult[source]

Delete a single document.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.delete() to handle the different schema and properly clean up both files and chunks.

Parameters:
  • filter – Query filter to match document to delete

  • session – A ClientSession for transactions.

Returns:

Result of the delete operation

Return type:

DeleteResult

_delete_one_as_gridfs(filter: dict[str, Any])[source]

Delete a single document from a GridFS system collection using GridFSBucket API.

This properly handles GridFS deletion by removing both the file document and associated chunks.

Parameters:

filter – Query filter to match document to delete

Returns:

Result of the delete operation

Return type:

DeleteResult

delete_many(filter: dict[str, Any], session: ClientSession | None = None) DeleteResult[source]

Delete multiple documents.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.delete() to handle the different schema and properly clean up both files and chunks.

Parameters:
  • filter – Query filter to match documents to delete

  • session – A ClientSession for transactions.

Returns:

Result of the delete operation

Return type:

DeleteResult

_delete_many_as_gridfs(filter: dict[str, Any])[source]

Delete multiple documents from a GridFS system collection using GridFSBucket API.

This properly handles GridFS deletion by removing both file documents and associated chunks.

Parameters:

filter – Query filter to match documents to delete

Returns:

Result of the delete operation

Return type:

DeleteResult

find(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, **kwargs: Any) Cursor[source]

Find documents in the collection.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.find() to handle the different schema.

Parameters:
  • filter – Query filter

  • projection – Field projection (not supported for GridFS collections)

  • hint – Index hint (not supported for GridFS collections)

  • session – A ClientSession for transactions.

Returns:

Query results

Return type:

Cursor or GridOutCursor

_is_gridfs_collection() bool[source]

Check if this collection is a GridFS system collection.

Uses a two-step verification: 1. Check naming convention (ends with _files or _chunks) 2. Verify schema has GridFS-specific columns

Returns:

True if this is a GridFS system collection

Return type:

bool

_find_as_gridfs(filter: dict[str, Any] | None = None, session: ClientSession | None = None)[source]

Execute find on a GridFS system collection using GridFSBucket API.

This allows PyMongo-style access like db.fs.files.find({…}) to work by delegating to the GridFSBucket.find() method which understands the GridFS schema.

Parameters:
  • filter – Query filter

  • session – A ClientSession for transactions.

Returns:

Cursor over GridOut objects

Return type:

GridOutCursor

find_raw_batches(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, batch_size: int = 100, session: ClientSession | None = None) RawBatchCursor[source]

This is a delegating method. For implementation details, see the core logic in find_raw_batches().

find_one(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None) dict[str, Any] | None[source]

Find a single document.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.find() to handle the different schema.

Parameters:
  • filter – Query filter

  • projection – Field projection (not supported for GridFS collections)

  • hint – Index hint (not supported for GridFS collections)

  • session – A ClientSession for transactions.

Returns:

Query result

Return type:

Dict or GridOut or None

count_documents(filter: dict[str, Any], session: ClientSession | None = None) int[source]

This is a delegating method. For implementation details, see the core logic in count_documents().

estimated_document_count(options: dict[str, Any] | None = None, session: ClientSession | None = None) int[source]

Get an estimated count of documents in the collection.

This is a delegating method. For implementation details, see the core logic in estimated_document_count().

Parameters:
  • options (dict[str, Any], optional) – Options for the count operation. Supported options (for PyMongo API compatibility): - maxTimeMS: Maximum execution time in milliseconds (ignored in NeoSQLite) - hint: Index to use for the count (ignored in NeoSQLite)

  • session – A ClientSession for transactions.

Returns:

Estimated number of documents in the collection

Return type:

int

Note

This method returns an estimate based on SQLite metadata, which is fast but may not be exact. For an exact count, use count_documents({}). The options parameter is accepted for PyMongo API compatibility but most options are not applicable to SQLite.

find_one_and_delete(filter: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_delete().

find_one_and_replace(filter: dict[str, Any], replacement: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_replace().

find_one_and_update(filter: dict[str, Any], update: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_update().

aggregate(pipeline: list[dict[str, Any]], allowDiskUse: bool | None = None, batchSize: int | None = None, session: ClientSession | None = None, **kwargs: Any) AggregationCursor[source]

This is a delegating method. For implementation details, see the core logic in aggregate().

Parameters:
  • pipeline – The aggregation pipeline to execute

  • allowDiskUse – Ignored in NeoSQLite (kept for PyMongo compatibility)

  • batchSize – Batch size for results (kept for PyMongo compatibility)

  • session – A ClientSession for transactions.

  • **kwargs – Additional keyword arguments for PyMongo compatibility

Returns:

An AggregationCursor instance

aggregate_raw_batches(pipeline: list[dict[str, Any]], batch_size: int = 100, session: ClientSession | None = None) RawBatchCursor[source]

This is a delegating method. For implementation details, see the core logic in aggregate_raw_batches().

distinct(key: str, filter: dict[str, Any] | None = None, session: ClientSession | None = None) list[Any][source]

This is a delegating method. For implementation details, see the core logic in distinct().

bulk_write(requests: list[Any], ordered: bool = True, session: ClientSession | None = None) BulkWriteResult[source]

This is a delegating method. For implementation details, see the core logic in bulk_write().

initialize_ordered_bulk_op() BulkOperationExecutor[source]

This is a delegating method. For implementation details, see the core logic in initialize_ordered_bulk_op().

Deprecated since version Use: bulk_write() instead.

initialize_unordered_bulk_op() BulkOperationExecutor[source]

This is a delegating method. For implementation details, see the core logic in initialize_unordered_bulk_op().

Deprecated since version Use: bulk_write() instead.

create_index(key: str | list[str] | list[tuple[str, int]], reindex: bool = True, sparse: bool = False, unique: bool = False, fts: bool = False, tokenizer: str | None = None, datetime_field: bool = False)[source]

This is a delegating method. For implementation details, see the core logic in create_index().

create_search_index(key: str, tokenizer: str | None = None)[source]

This is a delegating method. For implementation details, see the core logic in create_search_index().

create_indexes(indexes: list[IndexModel]) list[str][source]

This is a delegating method. For implementation details, see the core logic in create_indexes().

create_search_indexes(indexes: list[str]) list[str][source]

This is a delegating method. For implementation details, see the core logic in create_search_indexes().

reindex(table: str, sparse: bool = False, documents: list[dict[str, Any]] | None = None)[source]

This is a delegating method. For implementation details, see the core logic in reindex().

list_indexes(as_keys: Literal[True]) list[list[str]][source]
list_indexes(as_keys: Literal[False] = False) list[str]

This is a delegating method. For implementation details, see the core logic in list_indexes().

list_search_indexes() list[str][source]

This is a delegating method. For implementation details, see the core logic in list_search_indexes().

update_search_index(key: str, tokenizer: str | None = None)[source]

This is a delegating method. For implementation details, see the core logic in update_search_index().

drop_index(index: str)[source]

This is a delegating method. For implementation details, see the core logic in drop_index().

drop_search_index(index: str)[source]

This is a delegating method. For implementation details, see the core logic in drop_search_index().

drop_indexes()[source]

This is a delegating method. For implementation details, see the core logic in drop_indexes().

index_information() dict[str, Any][source]

This is a delegating method. For implementation details, see the core logic in index_information().

property client: Connection

Get the MongoClient instance (returns the parent Connection).

Returns:

The parent connection instance.

Return type:

Connection

property codec_options: Any

Get the codec options for this collection.

Returns:

The codec options.

Return type:

Any

property read_preference: Any

Get the read preference for this collection.

Returns:

The read preference.

Return type:

Any

property write_concern: Any

Get the write concern for this collection.

Returns:

The write concern.

Return type:

Any

property read_concern: Any

Get the read concern for this collection.

Returns:

The read concern.

Return type:

Any

property database: Connection

Get the database that this collection is a part of.

Returns:

The connection object this collection is associated with.

Return type:

Connection

property db_path: str

Get the path to the database file.

Returns:

The database file path.

Return type:

str

property full_name: str

Get the full name of the collection (database.collection).

Returns:

The full name of the collection

Return type:

str

Example

>>> db = Connection("test.db")
>>> coll = db.my_collection
>>> print(coll.full_name)
'test.my_collection'
with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)[source]

Get a clone of this collection with different options.

Note: NeoSQLite is a single-node database, so read_preference, write_concern, and read_concern are stored for API compatibility but don’t affect query behavior.

Parameters:
  • codec_options – Codec options (stored for compatibility, not used)

  • read_preference – Read preference (stored for compatibility, not used)

  • write_concern – Write concern (stored for compatibility, not used)

  • read_concern – Read concern (stored for compatibility, not used)

Returns:

A new collection instance with the specified options

Return type:

Collection

Example

>>> coll = db.my_collection
>>> coll_with_options = coll.with_options(write_concern={"w": "majority"})
_object_exists(type_: str, name: str) bool[source]

Check if an object (table or index) of a specific type and name exists within the database.

Parameters:
  • type (str) – The type of object to check, either “table” or “index”.

  • name (str) – The name of the object to check.

Returns:

True if the object exists, False otherwise.

Return type:

bool

drop()[source]

Drop the entire collection.

This method removes the collection (table) from the database. After calling this method, the collection will no longer exist in the database.

watch(pipeline: list[dict[str, Any]] | None = None, full_document: str | None = None, resume_after: dict[str, Any] | None = None, max_await_time_ms: int | None = None, batch_size: int | None = None, collation: dict[str, Any] | None = None, start_at_operation_time: Any | None = None, session: ClientSession | None = None, start_after: dict[str, Any] | None = None) ChangeStream[source]

Monitor changes on this collection using SQLite’s change tracking features.

This method creates a change stream that allows iterating over change events generated by modifications to the collection. While SQLite doesn’t natively support change streams like MongoDB, this implementation uses triggers and SQLite’s built-in change tracking mechanisms to provide similar functionality.

Parameters:
  • pipeline (list[dict[str, Any]]) – Aggregation pipeline stages to apply to change events.

  • full_document (str) – Determines how the ‘fullDocument’ field is populated in change events.

  • resume_after (dict[str, Any]) – Logical starting point for the change stream.

  • max_await_time_ms (int) – Maximum time to wait for new documents in milliseconds.

  • batch_size (int) – Number of documents to return per batch.

  • collation (dict[str, Any]) – Collation settings for the operation.

  • start_at_operation_time (Any) – Operation time to start monitoring from.

  • session (ClientSession) – Client session for the operation.

  • start_after (dict[str, Any]) – Logical starting point for the change stream.

Returns:

A change stream object that can be iterated over to receive change events.

Return type:

ChangeStream