neosqlite package

Subpackages

Submodules

Module contents

class neosqlite.AggregationCursor(collection: Collection, pipeline: list[dict[str, Any]], allowDiskUse: bool | None = None, batchSize: int | None = None, session: ClientSession | None = None, **kwargs: Any)[source]

Bases: object

A cursor that iterates over the results of an aggregation pipeline.

This cursor implements the same interface as PyMongo’s CommandCursor, allowing iteration over aggregation results.

__init__(collection: Collection, pipeline: list[dict[str, Any]], allowDiskUse: bool | None = None, batchSize: int | None = None, session: ClientSession | None = None, **kwargs: Any)[source]

Initialize the AggregationCursor.

Parameters:
  • collection – The collection to run the aggregation on

  • pipeline – The aggregation pipeline to execute

  • allowDiskUse – Ignored in NeoSQLite (kept for PyMongo compatibility)

  • batchSize – Batch size for results (kept for PyMongo compatibility)

  • session – A ClientSession for transactions

  • **kwargs – Additional keyword arguments for PyMongo compatibility

max_await_time_ms(max_await_time_ms: int | None) AggregationCursor[source]

Set the maximum time to wait for new documents (for tailable cursors).

Parameters:

max_await_time_ms (int, optional) – The maximum time to wait in milliseconds.

Returns:

The cursor object with the max_await_time_ms applied.

Return type:

AggregationCursor

add_option(mask: int) AggregationCursor[source]

Set query flags (bitmask) for this cursor.

Parameters:

mask (int) – The bitmask of options to set.

Returns:

The cursor object with the options applied.

Return type:

AggregationCursor

remove_option(mask: int) AggregationCursor[source]

Unset query flags (bitmask) for this cursor.

Parameters:

mask (int) – The bitmask of options to unset.

Returns:

The cursor object with the options removed.

Return type:

AggregationCursor

property session: Any | None

Get the ClientSession associated with this cursor.

Returns:

The ClientSession, or None if no session is associated.

Return type:

Any | None

property cursor_id: int

Get the ID of this cursor.

Returns:

The cursor ID (always 0 for NeoSQLite).

Return type:

int

sort(key=None, reverse=False)[source]

Sort the results in-place.

Parameters:
  • key – A function to extract a comparison key from each element

  • reverse – If True, sort in descending order

Returns:

The cursor itself for chaining

_execute() None[source]

Execute the aggregation pipeline and store the results.

_estimate_result_size() int[source]

Estimate the size of the aggregation result in bytes.

Returns:

Estimated size in bytes

next() dict[str, Any][source]

Get the next document in the aggregation result.

Returns:

The next document in the result set

Raises:

StopIteration – When there are no more documents

to_list() list[dict[str, Any]][source]

Convert the cursor to a list of documents.

Returns:

A list containing all documents in the result set

batch_size(size: int) AggregationCursor[source]

Set the batch size for memory-constrained processing.

Parameters:

size – The batch size to use

Returns:

The cursor itself for chaining

allow_disk_use(allow: bool = True) AggregationCursor[source]

Enable or disable disk use for aggregation operations.

This method supports both method chaining (PyMongo fluent style) and is also settable via the allowDiskUse parameter to aggregate().

Parameters:

allow – Whether to allow disk use

Returns:

The cursor itself for chaining

use_quez(use_quez: bool = True) AggregationCursor[source]

Enable or disable quez memory-constrained processing.

Parameters:

use_quez – Whether to use quez for memory-constrained processing

Returns:

The cursor itself for chaining

property retrieved: int

Return the number of documents retrieved from the cursor.

Returns:

The number of documents retrieved so far

Return type:

int

property alive: bool

Check if the cursor has more documents to iterate.

Returns:

True if the cursor may have more documents, False if exhausted

Return type:

bool

property collection

Return a reference to the collection this cursor is iterating over.

Returns:

The collection associated with this cursor

Return type:

Collection

property address: tuple | None

Return the address of the database.

Returns:

A tuple of (database_path, 0) after execution starts,

None before the cursor has been executed.

Return type:

tuple | None

get_quez_stats() dict[str, Any] | None[source]

Get quez compression statistics if quez is being used.

Returns:

Dict with compression statistics or None if quez is not being used. Statistics include: - ‘count’: Number of items in the queue - ‘raw_size_bytes’: Total raw size of items in bytes - ‘compressed_size_bytes’: Total compressed size of items in bytes - ‘compression_ratio_pct’: Compression ratio as percentage (None if empty)

class neosqlite.AutoVacuumMode[source]

Bases: object

Represents SQLite auto_vacuum modes. Controls how SQLite reclaims space after deleting data.

NONE = 0
FULL = 1
INCREMENTAL = 2
classmethod validate(mode: int | str) int[source]

Validate and normalize an auto_vacuum mode.

Parameters:

mode – The auto_vacuum mode (0, 1, 2) or string (“NONE”, “FULL”, “INCREMENTAL”).

Returns:

The normalized auto_vacuum mode value.

Return type:

int

Raises:

ValueError – If the auto_vacuum mode is not valid.

classmethod to_string(mode: int) str[source]

Convert numeric mode to string representation.

class neosqlite.Binary(data: bytes | bytearray | memoryview, subtype: int = 0)[source]

Bases: bytes

A BSON Binary-like class for representing binary data in neosqlite.

This class provides a PyMongo-compatible interface for storing binary data directly in documents (outside of GridFS). The data is automatically encoded/decoded when stored in SQLite JSON documents.

Example usage:

# Create binary data binary_data = Binary(b”some binary content”)

# Store in document collection.insert_one({“data”: binary_data})

# Retrieve and use doc = collection.find_one({}) retrieved_data = doc[“data”] # Returns Binary instance raw_bytes = bytes(retrieved_data) # Convert to bytes if needed

_subtype: int
BINARY_SUBTYPE = 0
FUNCTION_SUBTYPE = 1
OLD_BINARY_SUBTYPE = 2
UUID_SUBTYPE = 4
MD5_SUBTYPE = 5
COLUMN_SUBTYPE = 7
SENSITIVE_SUBTYPE = 8
VECTOR_SUBTYPE = 9
USER_DEFINED_SUBTYPE = 128
property subtype: int

Get the binary subtype.

encode_for_storage() dict[source]

Encode the binary data for JSON storage.

Returns:

A dictionary representation for JSON storage

classmethod decode_from_storage(encoded_data: dict) Binary[source]

Decode binary data from JSON storage.

Parameters:

encoded_data – Dictionary representation from JSON storage

Returns:

A Binary instance

classmethod from_uuid(uuid_value, uuid_representation=None)[source]

Create a Binary instance from a UUID.

Parameters:
  • uuid_value – A UUID instance

  • uuid_representation – UUID representation (ignored in neosqlite)

Returns:

A Binary instance with UUID_SUBTYPE

as_uuid(uuid_representation=None)[source]

Convert this Binary instance to a UUID.

Returns:

A UUID instance

Raises:

ValueError – If the subtype is not UUID_SUBTYPE

class neosqlite.BulkOperationExecutor(collection: neosqlite.Collection, ordered: bool = True)[source]

Bases: object

Executor for bulk operations.

__init__(collection: neosqlite.Collection, ordered: bool = True)[source]

Initialize the BulkOperationExecutor.

This method initializes a new BulkOperationExecutor with the given collection and ordering flag. The executor will execute operations in the order they are added if the ordered flag is True. Otherwise, the executor may execute operations in any order.

Parameters:
  • collection (neosqlite.Collection) – The collection to perform operations on.

  • ordered (bool, optional) – Whether to execute operations in order. Defaults to True.

add(operation)[source]

Add an operation to the bulk operations list.

For PyMongo API compatibility, accepts InsertOne, UpdateOne, DeleteOne operations.

Parameters:

operation – The operation to add (InsertOne, UpdateOne, DeleteOne, etc.)

Returns:

The current executor for chaining

Return type:

BulkOperationExecutor

insert(document: dict[str, Any])[source]

Add an insert operation to the bulk operations list.

This method appends an insert operation to the bulk operations list. The operation will insert the specified document into the collection.

Parameters:

document (dict[str, Any]) – The document to be inserted.

Returns:

The current context object for chaining further operations.

Return type:

BulkOperationContext

find(filter: dict[str, Any])[source]

Create a context for find-based operations.

This method creates a new BulkOperationContext for find-based operations with the given filter.

Parameters:

filter (dict[str, Any]) – The filter to be used for find operations.

Returns:

A new BulkOperationContext object for chaining find operations.

Return type:

BulkOperationContext

execute(session: ClientSession | None = None) BulkWriteResult[source]

Execute all bulk operations.

This method executes all bulk operations in the current context. If ordered is True, operations are executed in the order they were added. Otherwise, operations may be executed in any order.

Parameters:

session (ClientSession, optional) – A ClientSession for transactions.

Returns:

A result object containing the counts of inserted, matched, modified, deleted, and upserted documents.

Return type:

BulkWriteResult

_execute_ordered(session: ClientSession | None = None) BulkWriteResult[source]

Execute operations in order.

Parameters:

session (ClientSession, optional) – A ClientSession for transactions.

Returns:

A result object containing the counts of inserted, matched, modified, deleted, and upserted documents.

Return type:

BulkWriteResult

_execute_unordered(session: ClientSession | None = None) BulkWriteResult[source]

Execute operations in any order, continuing on individual failures.

Unlike ordered execution, a failure in one operation does not prevent subsequent operations from being attempted.

Parameters:

session (ClientSession, optional) – A ClientSession for transactions.

Returns:

A result object containing the counts of inserted, matched, modified, deleted, and upserted documents.

Return type:

BulkWriteResult

class neosqlite.BulkWriteResult(inserted_count: int, matched_count: int, modified_count: int, deleted_count: int, upserted_count: int)[source]

Bases: object

Represents the result of a bulk write operation.

__init__(inserted_count: int, matched_count: int, modified_count: int, deleted_count: int, upserted_count: int)[source]

Initialize a BulkWriteResult object with counts of various operations.

Parameters:
  • inserted_count (int) – The number of documents that were inserted.

  • matched_count (int) – The number of documents that matched the filter criteria.

  • modified_count (int) – The number of documents that were modified.

  • deleted_count (int) – The number of documents that were deleted.

  • upserted_count (int) – The number of documents that were upserted.

class neosqlite.ChangeStream(collection: Collection, pipeline: list[dict[str, Any]] | None = None, full_document: str | None = None, resume_after: dict[str, Any] | None = None, max_await_time_ms: int | None = None, batch_size: int | None = None, collation: dict[str, Any] | None = None, start_at_operation_time: Any | None = None, session: ClientSession | None = None, start_after: dict[str, Any] | None = None)[source]

Bases: object

A change stream that watches for changes on a collection.

This implementation uses SQLite’s built-in features to monitor changes. It provides an iterator interface to receive change events.

__init__(collection: Collection, pipeline: list[dict[str, Any]] | None = None, full_document: str | None = None, resume_after: dict[str, Any] | None = None, max_await_time_ms: int | None = None, batch_size: int | None = None, collation: dict[str, Any] | None = None, start_at_operation_time: Any | None = None, session: ClientSession | None = None, start_after: dict[str, Any] | None = None)[source]

Initialize a change stream for a specific collection.

Parameters:
  • collection (Collection) – The collection to monitor for changes.

  • pipeline (list[dict[str, Any]], optional) – A pipeline of operations to apply to the change stream.

  • full_document (str, optional) – Specifies whether to include the full document in change events.

  • resume_after (dict[str, Any], optional) – A resume token to start the change stream from a specific point.

  • max_await_time_ms (int, optional) – The maximum time in milliseconds to wait for change events.

  • batch_size (int, optional) – The batch size for the change stream.

  • collation (dict[str, Any], optional) – Collation options to apply to change events.

  • start_at_operation_time (Any, optional) – Operation time to start the change stream from.

  • session (Any, optional) – The session to use for the change stream.

  • start_after (dict[str, Any], optional) – A document ID to start the change stream from.

static _sanitize_collection_name(name: str) str[source]

Validate and sanitize a collection name to prevent SQL injection.

Parameters:

name – The collection name to validate.

Returns:

The validated collection name.

Raises:

ValueError – If the collection name contains invalid characters.

_setup_triggers()[source]

Set up SQLite triggers to capture changes to the collection.

This method ensures that triggers are created in the SQLite database to log INSERT, UPDATE, and DELETE operations on the specified collection. These triggers insert records into a change tracking table, enabling the change stream to monitor these events.

Triggers are created dynamically using SQL commands. They are designed to capture the essential details of each change operation, including the operation type, document ID, and data.

_cleanup_triggers()[source]

Clean up the triggers when the change stream is closed.

This method ensures that triggers created for capturing changes to the collection are properly dropped from the SQLite database when the change stream is no longer needed. This cleanup helps in freeing up resources and avoiding unnecessary logging.

The method handles exceptions gracefully, ensuring that any errors during the cleanup process are ignored, thus allowing the change stream to close without interruption.

close() None[source]

Close the change stream and clean up resources.

This method ensures that the change stream is properly closed and resources are released. It sets the _closed flag to True and calls the _cleanup_triggers method to clean up any triggers that were set up for capturing changes to the collection. This helps in freeing up resources and avoiding unnecessary logging or data handling.

By calling this method, the change stream is effectively terminated, and no further change events will be received.

class neosqlite.CodecOptions(document_class: type = <class 'dict'>, tz_aware: bool = False, uuid_representation: int = 0)[source]

Bases: object

Represents codec options for MongoDB compatibility.

__init__(document_class: type = <class 'dict'>, tz_aware: bool = False, uuid_representation: int = 0)[source]
class neosqlite.Collection(db: Connection, name: str, create: bool = True, database=None, **kwargs: Any)[source]

Bases: object

Provides a class representing a collection in a SQLite database.

This class encapsulates operations on a collection such as inserting, updating, deleting, and querying documents.

__init__(db: Connection, name: str, create: bool = True, database=None, **kwargs: Any)[source]

Initialize a new collection object.

Parameters:
  • db – Database object to which the collection belongs.

  • name – Name of the collection.

  • create – Whether to create the collection table if it doesn’t exist.

  • database – Database object that contains this collection.

  • **kwargs – Additional options for collection creation.

cleanup() None[source]

Clean up resources used by the collection.

_load(id: int, data: str | bytes, stored_id: Any | None = None) dict[str, Any][source]

Deserialize and load a document from its ID and JSON data.

Deserialize the JSON string or bytes back into a Python dictionary, add the document ID to it, and return the document.

Parameters:
  • id (int) – The document ID.

  • data (str | bytes) – The JSON string or bytes representing the document.

  • stored_id (Any, optional) – The stored _id value if already retrieved.

Returns:

The deserialized document with the _id field added.

Return type:

dict[str, Any]

_parse_stored_id(stored_id: Any) Any[source]

Parse a value retrieved from the _id column into its appropriate Python type.

Parameters:

stored_id – The raw value from the _id column.

Returns:

The parsed value (e.g., ObjectId, int, str, or None).

Return type:

Any

_load_with_stored_id(id_val: int, data: str | bytes, stored_id_val) dict[str, Any][source]

Deserialize and load a document with the stored _id value.

Parameters:
  • id_val (int) – The auto-increment document ID.

  • data (str | bytes) – The JSON string or bytes representing the document.

  • stored_id_val – The stored _id value from the _id column.

Returns:

The deserialized document with the _id field added.

Return type:

dict[str, Any]

_resolve_stored_id(stored_id_val: Any, fallback_id: int) ObjectId | Any[source]

Resolve the stored _id value, attempting to parse as ObjectId.

Parameters:
  • stored_id_val – The stored _id value from the _id column.

  • fallback_id – Fallback auto-increment ID if stored_id_val is None.

Returns:

ObjectId or the original stored_id_val, or fallback_id.

_get_stored_id(doc_id: int) ObjectId | int | str | None[source]

Retrieve the stored _id for a document from the _id column.

Parameters:

doc_id (int) – The document ID.

Returns:

The stored _id value, or None if the column doesn’t exist yet.

Return type:

ObjectId | int | None

_get_val(item: dict[str, Any], key: Any) Any[source]

Retrieves a value from a dictionary using a key, handling nested keys and optional prefixes.

Parameters:
  • item (dict[str, Any]) – The dictionary to search.

  • key (Any) – The key to retrieve. If a string, may include nested keys separated by dots or be prefixed with ‘$’. If non-string, returns the key itself (for literal values like $group _id).

Returns:

The value associated with the key, or None if the key is not found.

Return type:

Any

_set_val(item: dict[str, Any], key: str, value: Any) None[source]

Sets a value in a dictionary using a key, handling nested keys and optional prefixes.

Parameters:
  • item (dict[str, Any]) – The dictionary to modify.

  • key (str) – The key to set, may include nested keys separated by dots or may be prefixed with ‘$.

  • value (Any) – The value to set.

create(**kwargs: Any)[source]

Initialize the collection table if it does not exist.

This method creates a table with an ‘id’ column, a ‘_id’ column for ObjectId storage, and a ‘data’ column for storing JSON data. If the JSONB data type is supported, it will be used, otherwise, TEXT data type will be used.

_ensure_id_column_exists()[source]

Ensure that the _id column exists in the collection table for backward compatibility.

rename(new_name: str) None[source]

Renames the collection to the specified new name. If the new name is the same as the current name, does nothing.

Checks if a table with the new name exists and raises an error if it does. Renames the underlying table and updates the collection’s name.

Parameters:

new_name (str) – The new name for the collection.

Raises:

sqlite3.Error – If a collection with the new name already exists.

options() dict[str, Any][source]

Retrieves options set on this collection.

Returns:

A dictionary containing various options for the collection,

including the table’s name, columns, indexes, and count of documents.

Return type:

dict

insert_one(document: dict[str, Any], session: ClientSession | None = None) InsertOneResult[source]

This is a delegating method. For implementation details, see the core logic in insert_one().

insert_many(documents: list[dict[str, Any]], ordered: bool = True, session: ClientSession | None = None) InsertManyResult[source]

This is a delegating method. For implementation details, see the core logic in insert_many().

update_one(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in update_one().

update_many(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in update_many().

replace_one(filter: dict[str, Any], replacement: dict[str, Any], upsert: bool = False, session: ClientSession | None = None) UpdateResult[source]

This is a delegating method. For implementation details, see the core logic in replace_one().

delete_one(filter: dict[str, Any], session: ClientSession | None = None) DeleteResult[source]

Delete a single document.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.delete() to handle the different schema and properly clean up both files and chunks.

Parameters:
  • filter – Query filter to match document to delete

  • session – A ClientSession for transactions.

Returns:

Result of the delete operation

Return type:

DeleteResult

_delete_one_as_gridfs(filter: dict[str, Any])[source]

Delete a single document from a GridFS system collection using GridFSBucket API.

This properly handles GridFS deletion by removing both the file document and associated chunks.

Parameters:

filter – Query filter to match document to delete

Returns:

Result of the delete operation

Return type:

DeleteResult

delete_many(filter: dict[str, Any], session: ClientSession | None = None) DeleteResult[source]

Delete multiple documents.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.delete() to handle the different schema and properly clean up both files and chunks.

Parameters:
  • filter – Query filter to match documents to delete

  • session – A ClientSession for transactions.

Returns:

Result of the delete operation

Return type:

DeleteResult

_delete_many_as_gridfs(filter: dict[str, Any])[source]

Delete multiple documents from a GridFS system collection using GridFSBucket API.

This properly handles GridFS deletion by removing both file documents and associated chunks.

Parameters:

filter – Query filter to match documents to delete

Returns:

Result of the delete operation

Return type:

DeleteResult

find(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, **kwargs: Any) Cursor[source]

Find documents in the collection.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.find() to handle the different schema.

Parameters:
  • filter – Query filter

  • projection – Field projection (not supported for GridFS collections)

  • hint – Index hint (not supported for GridFS collections)

  • session – A ClientSession for transactions.

Returns:

Query results

Return type:

Cursor or GridOutCursor

_is_gridfs_collection() bool[source]

Check if this collection is a GridFS system collection.

Uses a two-step verification: 1. Check naming convention (ends with _files or _chunks) 2. Verify schema has GridFS-specific columns

Returns:

True if this is a GridFS system collection

Return type:

bool

_find_as_gridfs(filter: dict[str, Any] | None = None, session: ClientSession | None = None)[source]

Execute find on a GridFS system collection using GridFSBucket API.

This allows PyMongo-style access like db.fs.files.find({…}) to work by delegating to the GridFSBucket.find() method which understands the GridFS schema.

Parameters:
  • filter – Query filter

  • session – A ClientSession for transactions.

Returns:

Cursor over GridOut objects

Return type:

GridOutCursor

find_raw_batches(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, batch_size: int = 100, session: ClientSession | None = None) RawBatchCursor[source]

This is a delegating method. For implementation details, see the core logic in find_raw_batches().

find_one(filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None) dict[str, Any] | None[source]

Find a single document.

For GridFS system collections (e.g., fs_files, fs_chunks), this method automatically delegates to GridFSBucket.find() to handle the different schema.

Parameters:
  • filter – Query filter

  • projection – Field projection (not supported for GridFS collections)

  • hint – Index hint (not supported for GridFS collections)

  • session – A ClientSession for transactions.

Returns:

Query result

Return type:

Dict or GridOut or None

count_documents(filter: dict[str, Any], session: ClientSession | None = None) int[source]

This is a delegating method. For implementation details, see the core logic in count_documents().

estimated_document_count(options: dict[str, Any] | None = None, session: ClientSession | None = None) int[source]

Get an estimated count of documents in the collection.

This is a delegating method. For implementation details, see the core logic in estimated_document_count().

Parameters:
  • options (dict[str, Any], optional) – Options for the count operation. Supported options (for PyMongo API compatibility): - maxTimeMS: Maximum execution time in milliseconds (ignored in NeoSQLite) - hint: Index to use for the count (ignored in NeoSQLite)

  • session – A ClientSession for transactions.

Returns:

Estimated number of documents in the collection

Return type:

int

Note

This method returns an estimate based on SQLite metadata, which is fast but may not be exact. For an exact count, use count_documents({}). The options parameter is accepted for PyMongo API compatibility but most options are not applicable to SQLite.

find_one_and_delete(filter: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_delete().

find_one_and_replace(filter: dict[str, Any], replacement: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_replace().

find_one_and_update(filter: dict[str, Any], update: dict[str, Any], projection: dict[str, Any] | None = None, sort: list[tuple[str, int]] | None = None, upsert: bool = False, return_document: bool = False, array_filters: list[dict[str, Any]] | None = None, session: ClientSession | None = None, **kwargs: Any) dict[str, Any] | None[source]

This is a delegating method. For implementation details, see the core logic in find_one_and_update().

aggregate(pipeline: list[dict[str, Any]], allowDiskUse: bool | None = None, batchSize: int | None = None, session: ClientSession | None = None, **kwargs: Any) AggregationCursor[source]

This is a delegating method. For implementation details, see the core logic in aggregate().

Parameters:
  • pipeline – The aggregation pipeline to execute

  • allowDiskUse – Ignored in NeoSQLite (kept for PyMongo compatibility)

  • batchSize – Batch size for results (kept for PyMongo compatibility)

  • session – A ClientSession for transactions.

  • **kwargs – Additional keyword arguments for PyMongo compatibility

Returns:

An AggregationCursor instance

aggregate_raw_batches(pipeline: list[dict[str, Any]], batch_size: int = 100, session: ClientSession | None = None) RawBatchCursor[source]

This is a delegating method. For implementation details, see the core logic in aggregate_raw_batches().

distinct(key: str, filter: dict[str, Any] | None = None, session: ClientSession | None = None) list[Any][source]

This is a delegating method. For implementation details, see the core logic in distinct().

bulk_write(requests: list[Any], ordered: bool = True, session: ClientSession | None = None) BulkWriteResult[source]

This is a delegating method. For implementation details, see the core logic in bulk_write().

initialize_ordered_bulk_op() BulkOperationExecutor[source]

This is a delegating method. For implementation details, see the core logic in initialize_ordered_bulk_op().

Deprecated since version Use: bulk_write() instead.

initialize_unordered_bulk_op() BulkOperationExecutor[source]

This is a delegating method. For implementation details, see the core logic in initialize_unordered_bulk_op().

Deprecated since version Use: bulk_write() instead.

create_index(key: str | list[str] | list[tuple[str, int]], reindex: bool = True, sparse: bool = False, unique: bool = False, fts: bool = False, tokenizer: str | None = None, datetime_field: bool = False)[source]

This is a delegating method. For implementation details, see the core logic in create_index().

create_search_index(key: str, tokenizer: str | None = None)[source]

This is a delegating method. For implementation details, see the core logic in create_search_index().

create_indexes(indexes: list[IndexModel]) list[str][source]

This is a delegating method. For implementation details, see the core logic in create_indexes().

create_search_indexes(indexes: list[str]) list[str][source]

This is a delegating method. For implementation details, see the core logic in create_search_indexes().

reindex(table: str, sparse: bool = False, documents: list[dict[str, Any]] | None = None)[source]

This is a delegating method. For implementation details, see the core logic in reindex().

list_indexes(as_keys: Literal[True]) list[list[str]][source]
list_indexes(as_keys: Literal[False] = False) list[str]

This is a delegating method. For implementation details, see the core logic in list_indexes().

list_search_indexes() list[str][source]

This is a delegating method. For implementation details, see the core logic in list_search_indexes().

update_search_index(key: str, tokenizer: str | None = None)[source]

This is a delegating method. For implementation details, see the core logic in update_search_index().

drop_index(index: str)[source]

This is a delegating method. For implementation details, see the core logic in drop_index().

drop_search_index(index: str)[source]

This is a delegating method. For implementation details, see the core logic in drop_search_index().

drop_indexes()[source]

This is a delegating method. For implementation details, see the core logic in drop_indexes().

index_information() dict[str, Any][source]

This is a delegating method. For implementation details, see the core logic in index_information().

property client: Connection

Get the MongoClient instance (returns the parent Connection).

Returns:

The parent connection instance.

Return type:

Connection

property codec_options: Any

Get the codec options for this collection.

Returns:

The codec options.

Return type:

Any

property read_preference: Any

Get the read preference for this collection.

Returns:

The read preference.

Return type:

Any

property write_concern: Any

Get the write concern for this collection.

Returns:

The write concern.

Return type:

Any

property read_concern: Any

Get the read concern for this collection.

Returns:

The read concern.

Return type:

Any

property database: Connection

Get the database that this collection is a part of.

Returns:

The connection object this collection is associated with.

Return type:

Connection

property db_path: str

Get the path to the database file.

Returns:

The database file path.

Return type:

str

property full_name: str

Get the full name of the collection (database.collection).

Returns:

The full name of the collection

Return type:

str

Example

>>> db = Connection("test.db")
>>> coll = db.my_collection
>>> print(coll.full_name)
'test.my_collection'
with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)[source]

Get a clone of this collection with different options.

Note: NeoSQLite is a single-node database, so read_preference, write_concern, and read_concern are stored for API compatibility but don’t affect query behavior.

Parameters:
  • codec_options – Codec options (stored for compatibility, not used)

  • read_preference – Read preference (stored for compatibility, not used)

  • write_concern – Write concern (stored for compatibility, not used)

  • read_concern – Read concern (stored for compatibility, not used)

Returns:

A new collection instance with the specified options

Return type:

Collection

Example

>>> coll = db.my_collection
>>> coll_with_options = coll.with_options(write_concern={"w": "majority"})
_object_exists(type_: str, name: str) bool[source]

Check if an object (table or index) of a specific type and name exists within the database.

Parameters:
  • type (str) – The type of object to check, either “table” or “index”.

  • name (str) – The name of the object to check.

Returns:

True if the object exists, False otherwise.

Return type:

bool

drop()[source]

Drop the entire collection.

This method removes the collection (table) from the database. After calling this method, the collection will no longer exist in the database.

watch(pipeline: list[dict[str, Any]] | None = None, full_document: str | None = None, resume_after: dict[str, Any] | None = None, max_await_time_ms: int | None = None, batch_size: int | None = None, collation: dict[str, Any] | None = None, start_at_operation_time: Any | None = None, session: ClientSession | None = None, start_after: dict[str, Any] | None = None) ChangeStream[source]

Monitor changes on this collection using SQLite’s change tracking features.

This method creates a change stream that allows iterating over change events generated by modifications to the collection. While SQLite doesn’t natively support change streams like MongoDB, this implementation uses triggers and SQLite’s built-in change tracking mechanisms to provide similar functionality.

Parameters:
  • pipeline (list[dict[str, Any]]) – Aggregation pipeline stages to apply to change events.

  • full_document (str) – Determines how the ‘fullDocument’ field is populated in change events.

  • resume_after (dict[str, Any]) – Logical starting point for the change stream.

  • max_await_time_ms (int) – Maximum time to wait for new documents in milliseconds.

  • batch_size (int) – Number of documents to return per batch.

  • collation (dict[str, Any]) – Collation settings for the operation.

  • start_at_operation_time (Any) – Operation time to start monitoring from.

  • session (ClientSession) – Client session for the operation.

  • start_after (dict[str, Any]) – Logical starting point for the change stream.

Returns:

A change stream object that can be iterated over to receive change events.

Return type:

ChangeStream

exception neosqlite.CollectionInvalid[source]

Bases: Exception

Exception raised when a collection is invalid.

class neosqlite.Connection(*args: Any, **kwargs: Any)[source]

Bases: object

Represents a connection to an NeoSQLite database.

Provides methods for managing collections, executing SQL queries, and handling database lifecycle events. Supports PyMongo-like API.

DEFAULT_TRANSLATION_CACHE_SIZE = 100
__init__(*args: Any, **kwargs: Any) None[source]

Initialize a new database connection.

Parameters:
  • *args – Positional arguments passed to sqlite3.connect().

  • **kwargs

    Keyword arguments passed to sqlite3.connect(). Special kwargs: - tokenizers: List of tuples (name, path) for FTS5 tokenizers to load - debug: Boolean flag to enable debug printing - name: Optional name for the database (for PyMongo API compatibility) - _is_clone: Internal flag for cloning (not for public use) - journal_mode: Optional journal mode (default: “WAL”) - auto_vacuum: Auto vacuum mode (default: INCREMENTAL).

    Can be 0/NONE, 1/FULL, 2/INCREMENTAL, or “NONE”/”FULL”/”INCREMENTAL”. If database has different auto_vacuum setting, migration may be triggered.

    • translation_cache: SQL translation cache size (default: 100, 0 to disable)

property db_path: str

Get the path to the database file.

Returns:

The database file path.

Return type:

str

connect(*args: Any, **kwargs: Any) None[source]

Establish a connection to the SQLite database.

Configures the database connection with the provided arguments, sets up SQLite-specific settings like isolation level and journal mode, and loads custom FTS5 tokenizers if specified. This method does not return a value.

Parameters:
  • *args – Positional arguments passed to sqlite3.connect().

  • **kwargs – Keyword arguments passed to sqlite3.connect().

_register_custom_functions() None[source]

Register custom SQLite functions, including regex operators.

_check_and_migrate_autovacuum(*args: Any, **kwargs: Any) None[source]

Check auto_vacuum setting and migrate if needed. Called after initial connection is established.

_migrate_to_autovacuum(*args: Any, **kwargs: Any) None[source]

Migrate database to a new auto_vacuum mode.

Delegates to migration.migrate_autovacuum() and then reconnects.

cleanup() None[source]

Clean up all collection resources associated with this connection.

close() None[source]

Close the database connection.

Commits any pending transaction and properly closes the underlying SQLite connection. This method ensures resources are released and the connection is no longer usable after being called.

property client: Connection

Get the MongoClient instance (returns self for NeoSQLite).

Returns:

The connection instance itself.

Return type:

Connection

property codec_options: Any

Get the codec options for this connection.

Returns:

The codec options.

Return type:

Any

property read_preference: Any

Get the read preference for this connection.

Returns:

The read preference.

Return type:

Any

property write_concern: Any

Get the write concern for this connection.

Returns:

The write concern.

Return type:

Any

property read_concern: Any

Get the read concern for this connection.

Returns:

The read concern.

Return type:

Any

start_session(causal_consistency: bool | None = None, default_transaction_options: dict[str, Any] | None = None, **kwargs: Any) ClientSession[source]

Start a new client session for transactions.

This method provides PyMongo-compatible session management by wrapping SQLite’s native ACID transactions.

Parameters:
  • causal_consistency (bool, optional) – Whether to enable causal consistency. Ignored in NeoSQLite (stored for API compatibility).

  • default_transaction_options (dict, optional) – Default transaction options.

  • **kwargs – Additional session arguments.

Returns:

A new ClientSession instance.

Return type:

ClientSession

drop_collection(name: str) None[source]

Drop a collection (table) from the database.

Parameters:

name (str) – The name of the collection (table) to drop. If the table does not exist, the operation is silently ignored due to the use of IF EXISTS in the SQL command.

create_collection(name: str, **kwargs) Collection[source]

Create a new collection with specific options.

Parameters:
  • name (str) – The name of the collection to create.

  • **kwargs – Additional options for collection creation.

Returns:

The newly created collection.

Return type:

Collection

Raises:

CollectionInvalid – If a collection with the given name already exists.

get_collection(name: str, **kwargs) Collection[source]

Get a collection by name.

Parameters:
  • name (str) – The name of the collection to get.

  • **kwargs – Additional options for collection access.

Returns:

The collection instance.

Return type:

Collection

rename_collection(old_name: str, new_name: str) None[source]

Rename a collection.

Parameters:
  • old_name (str) – The current name of the collection.

  • new_name (str) – The new name for the collection.

Raises:

CollectionInvalid – If the old collection doesn’t exist or new name already exists.

list_collection_names() list[str][source]

List all collection names in the database.

Returns:

A list of all collection names in the database, excluding internal SQLite tables (sqlite_sequence, sqlite_stat*, etc.)

Return type:

list[str]

list_collections() list[dict[str, Any]][source]

Get detailed information about collections in the database.

Returns:

A list of dictionaries containing collection information.

Each dictionary has ‘name’ and ‘options’ keys.

Return type:

list[dict[str, Any]]

command(command: str | dict[str, Any], value: Any | None = None, **kwargs: Any) dict[str, Any][source]

Issue a database command and return the response.

This method provides PyMongo-compatible command execution for SQLite. It supports various commands including PRAGMA commands, introspection, and utility commands.

Parameters:
  • command – The command to execute. Can be: - A string (e.g., “table_info”, “integrity_check”) - A dict with command name as key (e.g., {“ping”: 1})

  • value – Optional command value (for string commands)

  • **kwargs – Additional command arguments

Returns:

Command response

Return type:

dict[str, Any]

Supported Commands:
  • “ping” or {“ping”: 1} - Returns {“ok”: 1}

  • “serverStatus” - Returns SQLite version info

  • “listCollections” - Returns collection list

  • “table_info” - Returns table schema (PRAGMA table_info)

  • “integrity_check” - Returns integrity check results

  • “validate” - Returns integrity check for a collection

  • “reIndex” - Rebuilds indexes for a collection

  • “foreign_key_check” - Returns foreign key check results

  • “index_list” - Returns index list for a table

  • “vacuum” - Runs full VACUUM command

  • “compact” - MongoDB-compatible compact (see below)

  • “wal_checkpoint” - WAL checkpoint control (NeoSQLite extension)

  • “cache_size” - Get/set cache size (NeoSQLite extension)

  • “busy_timeout” - Get/set busy timeout (NeoSQLite extension)

  • “query_only” - Get/set read-only mode (NeoSQLite extension)

  • “analyze” - Runs ANALYZE command

compact Command:

MongoDB-compatible compact implementation with NeoSQLite extensions.

Args:

collection: Collection name (ignored, operates on entire database) dryRun: If true, returns estimate without actually compacting freeSpaceTargetMB: Target in MB (default: 20). Only compacts if

free space >= threshold. Uses incremental vacuum in batches. Set to 0 for full VACUUM instead of incremental.

comment: Optional comment (ignored, for MongoDB compatibility)

Returns:

dryRun: {“estimatedBytesFreed”: <bytes>, “ok”: 1} compact: {“bytesFreed”: <bytes>, “ok”: 1}

Examples:
>>> db.command("compact", "collection")  # 20MB threshold + incremental
>>> db.command("compact", "collection", dryRun=True)  # Estimate only
>>> db.command("compact", "collection", freeSpaceTargetMB=10)  # 10MB threshold + incremental
>>> db.command("compact", "collection", freeSpaceTargetMB=0)  # Full vacuum

Example

>>> db = Connection("test.db")
>>> result = db.command("ping")
>>> print(result)
{'ok': 1.0}
cursor_command(command: str | dict[str, Any], value: Any | None = None, **kwargs: Any) AggregationCursor[source]

Execute a database command and return a cursor for its results.

This method provides PyMongo-compatible cursor-based command execution. It wraps the results of command() in an AggregationCursor, allowing them to be iterated.

Parameters:
  • command – The command to execute.

  • value – Optional command value.

  • **kwargs – Additional command arguments.

Returns:

A cursor over the command results.

Return type:

AggregationCursor

dereference(dbref: dict[str, Any]) dict[str, Any] | None[source]

Resolve a DBRef object.

This method provides PyMongo-compatible DBRef resolution by performing a find_one on the target collection using the provided $id.

Parameters:

dbref – A dictionary representing a DBRef with ‘$ref’ and ‘$id’ keys.

Returns:

The referenced document, or None if not found.

Return type:

dict[str, Any] | None

with_options(codec_options: Any | None = None, read_preference: Any | None = None, write_concern: Any | None = None, read_concern: Any | None = None) Connection[source]

Get a clone of this database with different options.

This method returns a new Connection instance with the specified options. For PyMongo API compatibility, the options are stored but not actively used since SQLite has different semantics.

Parameters:
  • codec_options (Any, optional) – Codec options for encoding/decoding. Ignored in NeoSQLite (stored for API compatibility).

  • read_preference (Any, optional) – Read preference for replica sets. Ignored in NeoSQLite (stored for API compatibility).

  • write_concern (Any, optional) – Write concern for durability settings. Stored for API compatibility.

  • read_concern (Any, optional) – Read concern for consistency settings. Ignored in NeoSQLite (stored for API compatibility).

Returns:

A new Connection instance with the same underlying database

but with the specified options stored.

Return type:

Connection

Note

NeoSQLite stores these options for PyMongo API compatibility, but they don’t affect SQLite behavior since SQLite doesn’t have replica sets, codec options, or the same consistency/durability model.

Example

>>> db = Connection("test.db")
>>> db_with_options = db.with_options(
...     write_concern={"w": "majority"},
...     read_preference={"mode": "primaryPreferred"}
... )
_apply_write_concern(write_concern: Any) None[source]

Apply write concern to the underlying SQLite connection via PRAGMAs.

  • w: 0 -> PRAGMA synchronous = OFF

  • w: 1 -> PRAGMA synchronous = NORMAL

  • j: True -> PRAGMA synchronous = FULL

Parameters:

write_concern (Any) – The write concern to apply (dict or WriteConcern object).

transaction() Iterator[None][source]

Context manager for handling database transactions.

Ensures atomicity by beginning a transaction on entry, committing on successful exit, and rolling back in case of exceptions. This allows using the connection in a ‘with’ statement to manage transaction boundaries safely.

Yields control to the block, and automatically commits or rolls back based on execution outcome.

class neosqlite.Cursor(collection: Collection, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, tables_to_cleanup: list[str] | None = None)[source]

Bases: object

Class representing a cursor for iterating over documents in a collection with applied filters, projections, sorting, and pagination.

__init__(collection: Collection, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, session: ClientSession | None = None, tables_to_cleanup: list[str] | None = None)[source]

Initialize a new cursor instance.

Parameters:
  • collection (Collection) – The collection to operate on.

  • filter (dict[str, Any], optional) – Filter criteria to apply to the documents.

  • projection (dict[str, Any], optional) – Projection criteria to specify which fields to include.

  • hint (str, optional) – Hint for the database to improve query performance.

  • session (ClientSession, optional) – A ClientSession for transactions.

  • tables_to_cleanup (list[str], optional) – List of temporary tables to drop when closed.

max_await_time_ms(max_await_time_ms: int | None) Cursor[source]

Set the maximum time to wait for new documents (for tailable cursors).

Parameters:

max_await_time_ms (int, optional) – The maximum time to wait in milliseconds.

Returns:

The cursor object with the max_await_time_ms applied.

Return type:

Cursor

add_option(mask: int) Cursor[source]

Set query flags (bitmask) for this cursor.

Parameters:

mask (int) – The bitmask of options to set.

Returns:

The cursor object with the options applied.

Return type:

Cursor

remove_option(mask: int) Cursor[source]

Unset query flags (bitmask) for this cursor.

Parameters:

mask (int) – The bitmask of options to unset.

Returns:

The cursor object with the options removed.

Return type:

Cursor

property session: Any | None

Get the ClientSession associated with this cursor.

Returns:

The ClientSession, or None if no session is associated.

Return type:

Any | None

property cursor_id: int

Get the ID of this cursor.

Returns:

The cursor ID (always 0 for NeoSQLite).

Return type:

int

limit(limit: int) Cursor[source]

Limit the number of documents returned by the cursor.

Parameters:

limit (int) – The maximum number of documents to return.

Returns:

The cursor object with the limit applied.

Return type:

Cursor

skip(skip: int) Cursor[source]

Skip the specified number of documents when iterating over the cursor.

Parameters:

skip (int) – The number of documents to skip.

Returns:

The cursor object with the skip applied.

Return type:

Cursor

sort(key_or_list: str | list[tuple], direction: int | None = None) Cursor[source]

Sort the documents returned by the cursor.

Parameters:
  • key_or_list (str | list[tuple]) – The key or list of keys to sort by.

  • direction (int, optional) – The sorting direction (ASCENDING or DESCENDING). Defaults to ASCENDING if None.

Returns:

The cursor object with the sorting applied.

Return type:

Cursor

batch_size(size: int) Cursor[source]

Set the batch size for the cursor.

MongoDB-compatible batchSize parameter for controlling memory usage. Results are fetched from the database in batches of the specified size.

Parameters:

size (int) – The batch size (default: 101, matches MongoDB)

Returns:

The cursor object for chaining

Return type:

Cursor

Example

>>> cursor = collection.find().batch_size(50)
hint(hint: str | list[tuple[str, int]]) Cursor[source]

Set the index hint for the cursor.

Parameters:

hint – The index name (str) or list of (field, direction) tuples

Returns:

The cursor object with the hint applied

Return type:

Cursor

min(min_spec: list[tuple[str, Any]] | tuple[str, Any]) Cursor[source]

Set the minimum bound for index queries.

This method sets a lower bound on the index values to be scanned. Only documents with index values greater than or equal to the specified minimum will be returned.

Parameters:

min_spec – A list of (field, value) tuples specifying the inclusive lower bound, e.g., [(“age”, 18)]

Returns:

The cursor object with the minimum bound applied

Return type:

Cursor

Raises:

TypeError – If min_spec is not a list or tuple

Example

>>> cursor = collection.find({"age": {"$gte": 18}}).min([("age", 18)])
max(max_spec: list[tuple[str, Any]] | tuple[str, Any]) Cursor[source]

Set the maximum bound for index queries.

This method sets an upper bound on the index values to be scanned. Only documents with index values less than the specified maximum will be returned.

Parameters:

max_spec – A list of (field, value) tuples specifying the exclusive upper bound, e.g., [(“age”, 65)]

Returns:

The cursor object with the maximum bound applied

Return type:

Cursor

Raises:

TypeError – If max_spec is not a list or tuple

Example

>>> cursor = collection.find({"age": {"$lte": 65}}).max([("age", 65)])
collation(collation: dict[str, Any]) Cursor[source]

Set the collation for the cursor.

Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks.

Parameters:

collation (dict[str, Any]) – A dictionary specifying collation options: - locale (str): Language locale (e.g., “en_US”, “fr_FR”, “de_DE”) - caseLevel (bool): Whether to include case comparison - caseFirst (str): “upper”, “lower”, or “off” - strength (int): Comparison strength (1-5) - numericOrdering (bool): Compare numbers numerically - alternate (str): “shifted” or “non-ignorable” - backwards (bool): Sort backwards (for French)

Returns:

The cursor object with the collation applied

Return type:

Cursor

Example

>>> cursor = collection.find({"name": {"$gte": "A"}}).collation(
...     {"locale": "fr_FR", "strength": 2}
... )

Note

NeoSQLite maps common locales to SQLite collations: - Default/unknown: BINARY (case-sensitive) - Locales with case-insensitive: NOCASE - Custom collations can be registered via Connection tokenizers

where(predicate: Callable[[dict[str, Any]], bool]) Cursor[source]

Filter cursor results using a Python predicate function.

This is a Tier-3 (Python fallback) method that applies a Python function to filter documents after they are retrieved from the database.

Parameters:

predicate (Callable[[dict[str, Any]], bool]) – A function that takes a document and returns True if the document should be included.

Returns:

The cursor object with the predicate applied

Return type:

Cursor

Example

>>> def is_high_value(doc):
...     return doc.get('value', 0) > 10
>>> cursor = collection.find({}).where(is_high_value)

Note

This method uses Python-based filtering (Tier-3), which means all matching documents are retrieved from the database first, then filtered in Python. For better performance, use MongoDB-style query operators in the find() filter when possible.

comment(comment: str) Cursor[source]

Add a comment to the query for debugging and profiling.

The comment is injected as a SQL comment in the generated query, which can be useful for query profiling and debugging.

Parameters:

comment (str) – The comment text to add to the query

Returns:

The cursor object with the comment applied

Return type:

Cursor

Example

>>> cursor = collection.find({"age": {"$gte": 18}}).comment("find adults")
to_list(length: int | None = None) list[dict[str, Any]][source]

Convert the cursor to a list of documents.

This method efficiently converts the cursor contents to a list. If length is specified, returns at most that many documents.

Parameters:

length (int, optional) – Maximum number of documents to return. If None, returns all documents.

Returns:

List of documents in the cursor

Return type:

list[dict[str, Any]]

Example

>>> cursor = collection.find({"age": {"$gte": 18}})
>>> adults = cursor.to_list()
>>> first_5 = cursor.to_list(5)
clone() Cursor[source]

Create a clone of this cursor with the same options.

Returns a new cursor with the same filter, projection, hint, sort, skip, and limit settings. The clone is unevaluated and can be iterated independently.

Returns:

A new cursor with the same settings

Return type:

Cursor

Example

>>> cursor = collection.find({"age": {"$gte": 18}}).limit(10)
>>> clone = cursor.clone()
>>> results1 = list(cursor)
>>> results2 = list(clone)
property retrieved: int

Return the number of documents retrieved from the cursor.

This property tracks how many documents have been iterated over since the cursor was created or last reset.

Returns:

The number of documents retrieved so far

Return type:

int

Example

>>> cursor = collection.find({}).limit(10)
>>> docs = list(cursor)
>>> cursor.retrieved
10
property alive: bool

Check if the cursor has more documents to iterate.

In NeoSQLite, a cursor is considered alive if it hasn’t been fully exhausted. This is a simplified implementation for PyMongo API compatibility.

Returns:

True if the cursor may have more documents, False if exhausted

Return type:

bool

Note

NeoSQLite cursors are re-iterable, so this property tracks whether the current iteration has been completed.

Example

>>> cursor = collection.find({}).limit(10)
>>> cursor.alive
True
>>> list(cursor)
>>> cursor.alive
False
property collection

Return a reference to the collection this cursor is iterating over.

Returns:

The collection associated with this cursor

Return type:

Collection

Example

>>> cursor = collection.find({})
>>> cursor.collection
Collection(database, "collection_name")
>>> cursor.collection.name
'collection_name'
property address: tuple | None

Return the address of the database.

For NeoSQLite, this returns a tuple representing the database connection. This is a simplified implementation for PyMongo API compatibility.

Returns:

A tuple of (database_path, 0) after iteration starts,

None before the cursor has been executed. - For file databases: (‘sqlite:///path/to/file.db’, 0) - For memory databases: (‘sqlite::memory:’, 0)

Return type:

tuple | None

Note

SQLite is an embedded database without a server, so this returns the database path instead of a network address. Returns None until the cursor has been iterated, matching PyMongo behavior.

Example

>>> cursor = collection.find({})
>>> cursor.address  # Before iteration
None
>>> list(cursor)
>>> cursor.address  # After iteration
('sqlite::memory:', 0)  # or ('sqlite:///path/to/file.db', 0)
explain(verbosity: str = 'queryPlanner') dict[str, Any][source]

Return the query execution plan.

Uses SQLite’s EXPLAIN QUERY PLAN to provide information about how the query will be executed, including index usage.

Parameters:

verbosity (str) – Verbosity level - “executionStats” or “queryPlanner” (kept for PyMongo compatibility, SQLite always returns full plan) Defaults to “queryPlanner”.

Returns:

Query execution plan with the following structure:
  • queryPlanner: Information about the query plan
    • winningPlan: List of plan stages

    • indexUsage: Information about index usage

  • executionStats: Execution statistics (if verbosity=”executionStats”)
    • nReturned: Number of documents returned

    • executionTimeMillis: Execution time in milliseconds

Return type:

dict[str, Any]

Example

>>> cursor = collection.find({"age": {"$gte": 18}})
>>> plan = cursor.explain()
>>> print(plan['queryPlanner']['winningPlan'])
_execute_query() Iterator[dict[str, Any]][source]

Execute the query and yield the results after applying filters, sorting, pagination, and projection.

Yields:

dict[str, Any] – A dictionary representing each document in the result set.

_get_filtered_documents() Iterable[dict[str, Any]][source]

Retrieve documents based on the filter criteria, applying SQL-based filtering where possible, or falling back to Python-based filtering for complex queries. For datetime queries, use the specialized datetime query processor.

Returns:

An iterable of dictionaries representing

the documents that match the filter criteria.

Return type:

Iterable[dict[str, Any]]

_handle_python_fallback() Iterable[dict[str, Any]][source]

Handle complex queries by filtering all documents in Python.

_handle_expr_query() Iterable[dict[str, Any]][source]

Handle $expr queries with SQL evaluation when possible, Python fallback otherwise.

This method uses the query helper’s _build_expr_where_clause to attempt SQL evaluation (Tiers 1 and 2), and falls back to Python evaluation (Tier 3) if needed.

_build_minmax_clause(where_clause: str, params: tuple, min_spec: dict[str, Any] | None, max_spec: dict[str, Any] | None) tuple[source]

Build SQL clause for min/max index bounds.

Parameters:
  • where_clause – Existing WHERE clause (may be empty)

  • params – Existing parameters

  • min_spec – Minimum bound specification

  • max_spec – Maximum bound specification

Returns:

Tuple of (new WHERE clause, new parameters)

_get_collate_clause() str[source]

Get the SQL COLLATE clause based on collation settings.

Returns:

COLLATE clause or empty string if no collation is set

Return type:

str

Note

Maps MongoDB collation locales to SQLite collations: - Case-insensitive locales → NOCASE - Default/unknown → BINARY (case-sensitive) Custom collations can be registered via Connection tokenizers.

Note: COLLATE is applied to ORDER BY clauses, not WHERE clauses. For WHERE clause string comparisons, use _apply_collation_to_expr().

_contains_datetime_operations(query: dict[str, Any]) bool[source]

Check if a query contains datetime operations that should use the datetime processor.

Parameters:

query – MongoDB-style query dictionary

Returns:

True if query contains datetime operations, False otherwise

_is_datetime_value(value: Any) bool[source]

Check if a value is a datetime object or datetime string.

Parameters:

value – Value to check

Returns:

True if value is datetime-related, False otherwise

_is_datetime_regex(pattern: str) bool[source]

Check if a regex pattern is likely to be for datetime matching.

Parameters:

pattern – Regex pattern string

Returns:

True if pattern is likely datetime-related, False otherwise

_load_documents(rows) Iterable[dict[str, Any]][source]

Load documents from rows returned by the database query, including handling both id and _id.

Parameters:

rows – Database result rows containing id, _id, and data

Returns:

An iterable of loaded documents

Return type:

Iterable[dict[str, Any]]

_apply_sorting(docs: Iterable[dict[str, Any]]) list[dict[str, Any]][source]

Sort the documents based on the specified sorting criteria.

Parameters:

docs (Iterable[dict[str, Any]]) – The iterable of documents to sort.

Returns:

A list of dictionaries representing the documents

sorted by the specified criteria.

Return type:

list[dict[str, Any]]

_apply_pagination(docs: Iterable[dict[str, Any]]) list[dict[str, Any]][source]

Apply skip and limit to the documents.

Parameters:

docs (Iterable[dict[str, Any]]) – The iterable of documents to apply pagination to.

Returns:

A list of dictionaries representing the documents

after applying skip and limit.

Return type:

list[dict[str, Any]]

_apply_projection(docs: Iterable[dict[str, Any]]) list[dict[str, Any]][source]

Apply projection to the documents.

Parameters:

docs (Iterable[dict[str, Any]]) – The iterable of documents to apply projection to.

Returns:

A list of dictionaries representing the documents

after applying the projection.

Return type:

list[dict[str, Any]]

close() None[source]

Close the cursor and release any resources.

_cleanup_tables() None[source]

Drop any temporary tables created for this cursor.

class neosqlite.DeleteOne(filter: dict[str, Any])[source]

Bases: object

Represents a delete operation for a single document.

__init__(filter: dict[str, Any])[source]

Initialize a DeleteOne object.

Parameters:

filter (dict[str, Any]) – The filter criteria for selecting the document to delete.

class neosqlite.DeleteResult(deleted_count: int)[source]

Bases: object

Represents the result of a single delete operation.

__init__(deleted_count: int)[source]

Initialize a DeleteResult object with the count of deleted documents.

Parameters:

deleted_count (int) – The number of documents that were deleted.

property deleted_count: int

The number of documents that were deleted.

Returns:

The number of documents that were deleted.

Return type:

int

class neosqlite.InsertManyResult(inserted_ids: list[Any])[source]

Bases: object

Represents the result of a multiple insert operation.

__init__(inserted_ids: list[Any])[source]

Initialize an InsertManyResult object.

Parameters:

inserted_ids (list[Any]) – The IDs of the inserted documents.

property inserted_ids: list[Any]

The IDs of the inserted documents.

Returns:

The IDs of the inserted documents.

Return type:

list[Any]

class neosqlite.InsertOne(document: dict[str, Any])[source]

Bases: object

Represents an insert operation for a single document.

__init__(document: dict[str, Any])[source]

Initialize an InsertOne object.

Parameters:

document (dict[str, Any]) – The document to be inserted.

class neosqlite.InsertOneResult(inserted_id: Any)[source]

Bases: object

Represents the result of a single insert operation.

__init__(inserted_id: Any)[source]

Initialize an InsertOneResult object.

Parameters:

inserted_id (Any) – The ID of the inserted document.

property inserted_id: Any

The ID of the inserted document.

Returns:

The ID of the inserted document.

Return type:

Any

exception neosqlite.InvalidOperation[source]

Bases: Exception

Exception raised when an operation is not valid in the current state.

class neosqlite.IndexModel(keys: str | list[str] | list[tuple[str, int]], **kwargs: Any)[source]

Bases: object

Create an Index instance.

For use with create_indexes().

Takes either a single key or a list containing (key, direction) pairs or keys. If no direction is given, ASCENDING will be assumed.

Parameters:
  • keys – A single key or a list of (key, direction) pairs or keys.

  • **kwargs – Additional index creation options (unique, sparse, etc.).

__init__(keys: str | list[str] | list[tuple[str, int]], **kwargs: Any)[source]
property document: dict[str, Any]

An index document suitable for passing to the createIndexes command.

exception neosqlite.MalformedDocument[source]

Bases: Exception

Exception raised when a document is malformed.

exception neosqlite.MalformedQueryException[source]

Bases: Exception

Exception raised when a query is malformed.

class neosqlite.RawBatchCursor(collection: Collection, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, batch_size: int = 100, pipeline: list[dict[str, Any]] | None = None, session: ClientSession | None = None)[source]

Bases: object

A cursor that returns raw batches of JSON data instead of individual documents.

__init__(collection: Collection, filter: dict[str, Any] | None = None, projection: dict[str, Any] | None = None, hint: str | None = None, batch_size: int = 100, pipeline: list[dict[str, Any]] | None = None, session: ClientSession | None = None)[source]

Initialize a RawBatchCursor object.

Parameters:
  • collection (Collection) – The collection associated with this cursor.

  • filter (dict[str, Any]) – A dictionary representing the filter criteria for the documents.

  • projection (dict[str, Any]) – A dictionary representing the projection criteria for the documents.

  • hint (str) – A string hinting at the index to use for the query.

  • batch_size (int) – The number of documents to return in each batch.

  • pipeline (list[dict[str, Any]]) – An optional aggregation pipeline to execute.

  • session (ClientSession, optional) – A ClientSession for transactions.

batch_size(batch_size: int) RawBatchCursor[source]

Set the batch size for this cursor.

Parameters:

batch_size (int) – The number of documents to return in each batch.

Returns:

This cursor object, for method chaining.

Return type:

RawBatchCursor

_cleanup() None[source]

Clean up temporary tables.

class neosqlite.ReadConcern(level: str | None = None)[source]

Bases: object

Represents a read concern for MongoDB compatibility.

__init__(level: str | None = None)[source]
property level: str | None
property ok_for_legacy: bool
property document: dict[str, Any]
class neosqlite.ReadPreference(mode: int, tag_sets: list | None = None, max_staleness_ms: int | None = None, hedge: dict | None = None)[source]

Bases: object

Represents a read preference for MongoDB compatibility. (Mostly a placeholder in NeoSQLite as SQLite is single-node).

PRIMARY = 0
PRIMARY_PREFERRED = 1
SECONDARY = 2
SECONDARY_PREFERRED = 3
NEAREST = 4
__init__(mode: int, tag_sets: list | None = None, max_staleness_ms: int | None = None, hedge: dict | None = None)[source]
property mode: int
property document: dict[str, Any]
class neosqlite.UpdateOne(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False)[source]

Bases: object

Represents an update operation for a single document.

__init__(filter: dict[str, Any], update: dict[str, Any], upsert: bool = False)[source]

Initialize an UpdateOne object.

Parameters:
  • filter (dict[str, Any]) – The filter criteria for selecting the document to update.

  • update (dict[str, Any]) – The update operations to apply to the selected document.

  • upsert (bool, optional) – If True, insert the document if no document matches the filter criteria. Defaults to False.

class neosqlite.UpdateResult(matched_count: int, modified_count: int, upserted_id: Any | None)[source]

Bases: object

Represents the result of a single update operation.

__init__(matched_count: int, modified_count: int, upserted_id: Any | None)[source]

Initialize an UpdateResult object.

Parameters:
  • matched_count (int) – The number of documents that matched the filter criteria.

  • modified_count (int) – The number of documents that were modified.

  • upserted_id (Any | None) – The ID of the inserted document if an upsert operation was performed; None otherwise.

property matched_count: int

The number of documents that matched the filter criteria.

Returns:

The number of documents that matched the filter criteria.

Return type:

int

property modified_count: int

The number of documents that were modified.

Returns:

The number of documents that were modified.

Return type:

int

property upserted_id: Any | None

The ID of the inserted document.

Returns:

The ID of the inserted document if an upsert operation was

performed; None otherwise.

Return type:

Any | None

class neosqlite.WriteConcern(w: int | str | None = None, wtimeout: int | None = None, j: bool | None = None, fsync: bool | None = None)[source]

Bases: object

Represents a write concern for MongoDB compatibility. Maps to SQLite synchronous PRAGMAs.

__init__(w: int | str | None = None, wtimeout: int | None = None, j: bool | None = None, fsync: bool | None = None)[source]
property document: dict[str, Any]
property acknowledged: bool
class neosqlite.GridFSBucket(db: Connection, bucket_name: str = 'fs', chunk_size_bytes: int = 261120, write_concern: dict[str, Any] | None = None, read_preference: Any | None = None, disable_md5: bool = False)[source]

Bases: object

A GridFSBucket-like class for storing large files in SQLite.

This implementation provides a PyMongo-compatible interface for GridFS functionality using SQLite as the backend storage.

__init__(db: Connection, bucket_name: str = 'fs', chunk_size_bytes: int = 261120, write_concern: dict[str, Any] | None = None, read_preference: Any | None = None, disable_md5: bool = False)[source]

Initialize a new GridFSBucket instance.

Parameters:
  • db – SQLite database connection

  • bucket_name – The bucket name for the GridFS files (default: “fs”)

  • chunk_size_bytes – The chunk size in bytes (default: 255KB)

  • write_concern – Write concern settings (simulated for compatibility)

  • read_preference – Read preference settings (not applicable to SQLite)

  • disable_md5 – Disable MD5 checksum calculation for performance

_migrate_legacy_tables_if_needed()[source]

Migrate legacy GridFS tables from dot-based names to underscore-based names.

_apply_write_concern()[source]

Apply write concern settings to SQLite connection.

_create_collections()[source]

Create the files and chunks collections (tables) if they don’t exist.

_migrate_table_schema()[source]

Migrate existing tables to add new columns for content_type and aliases.

_serialize_metadata(metadata: dict[str, Any] | None) str | None[source]

Serialize metadata to JSON string.

Parameters:

metadata – Metadata dictionary to serialize

Returns:

JSON string representation or None

_deserialize_metadata(metadata_str: str | None) dict[str, Any] | None[source]

Deserialize metadata from JSON string.

Parameters:

metadata_str – JSON string representation of metadata

Returns:

Metadata dictionary or None

_force_sync_if_needed()[source]

Force database synchronization if write concern requires it.

upload_from_stream(filename: str, source: bytes | IOBase, chunk_size_bytes: int | None = None, metadata: dict[str, Any] | None = None, session: Any | None = None) ObjectId[source]

Uploads a user file to a GridFS bucket.

Reads the contents of the user file from source and uploads it as chunks in the chunks collection. After all the chunks have been uploaded, it creates a file document in the files collection.

Parameters:
  • filename – The name of the file to upload

  • source – The source data (bytes or file-like object)

  • chunk_size_bytes – Bytes per chunk (defaults to bucket’s chunk_size_bytes)

  • metadata – Optional metadata for the file

  • session – A ClientSession for transactions.

Returns:

The ObjectId of the uploaded file document

_insert_chunks(file_id: int, data: bytes)[source]

Split data into chunks and insert them into the chunks collection.

Parameters:
  • file_id – The ID of the file document

  • data – The data to be chunked

download_to_stream(file_id: ObjectId | str | int, destination: IOBase) None[source]

Downloads the contents of the stored file specified by file_id and writes the contents to destination.

Parameters:
  • file_id – The _id of the file document (ObjectId, hex string, or integer ID)

  • destination – A file-like object to which the file contents will be written

_get_integer_id_for_file(file_id: ObjectId | str | int) int | None[source]

Convert a file identifier (ObjectId, hex string, or integer) to an integer ID.

Parameters:

file_id – The file identifier (ObjectId, hex string, or integer ID)

Returns:

The integer ID corresponding to the file, or None if not found

download_to_stream_by_name(filename: str, destination: IOBase, revision: int = -1) None[source]

Downloads the contents of the stored file specified by filename and writes the contents to destination.

Parameters:
  • filename – The name of the file to download

  • destination – A file-like object to which the file contents will be written

  • revision – The revision of the file to download (default: -1 for latest)

_get_file_id_by_name(filename: str, revision: int = -1) int[source]

Get the file ID for a given filename and revision.

Parameters:
  • filename – The name of the file

  • revision – The revision number (-1 for latest, 0 for first, etc.)

Returns:

The integer _id of the file document

open_download_stream(file_id: ObjectId | str | int) GridOut[source]

Opens a stream to read the contents of the stored file specified by file_id.

Parameters:

file_id – The _id of the file document (ObjectId, hex string, or integer ID)

Returns:

A GridOut instance to read the file contents

open_download_stream_by_name(filename: str, revision: int = -1) GridOut[source]

Opens a stream to read the contents of the stored file specified by filename.

Parameters:
  • filename – The name of the file to read

  • revision – The revision of the file to read (default: -1 for latest)

Returns:

A GridOut instance to read the file contents

delete(file_id: ObjectId | str | int) None[source]

Given a file_id, delete the stored file’s files collection document and associated chunks from a GridFS bucket.

Parameters:

file_id – The _id of the file document (ObjectId, hex string, or integer ID)

find(filter: dict[str, Any] | None = None, session: Any | None = None) GridOutCursor[source]

Find and return the files collection documents that match filter.

Parameters:
  • filter – The filter to apply when searching for files

  • session – A ClientSession for transactions.

Returns:

A GridOutCursor instance

open_upload_stream(filename: str, chunk_size_bytes: int | None = None, metadata: dict[str, Any] | None = None, session: Any | None = None) GridIn[source]

Opens a stream for writing a file to a GridFS bucket.

Parameters:
  • filename – The name of the file to upload

  • chunk_size_bytes – Bytes per chunk (defaults to bucket’s chunk_size_bytes)

  • metadata – Optional metadata for the file

  • session – A ClientSession for transactions.

Returns:

A GridIn instance to write the file contents

upload_from_stream_with_id(file_id: ObjectId | int, filename: str, source: bytes | IOBase, chunk_size_bytes: int | None = None, metadata: dict[str, Any] | None = None, session: Any | None = None)[source]

Uploads a user file to a GridFS bucket with a custom file id.

Parameters:
  • file_id – The custom _id for the file document (ObjectId or integer ID)

  • filename – The name of the file to upload

  • source – The source data (bytes or file-like object)

  • chunk_size_bytes – Bytes per chunk (defaults to bucket’s chunk_size_bytes)

  • metadata – Optional metadata for the file

  • session – A ClientSession for transactions.

open_upload_stream_with_id(file_id: ObjectId | int, filename: str, metadata: dict[str, Any] | None = None, content_type: str | None = None, aliases: list[str] | None = None) GridIn[source]

Opens a stream for writing a file to a GridFS bucket with a custom file id.

Parameters:
  • file_id – The custom _id for the file document (ObjectId or integer ID)

  • filename – The name of the file to upload

  • metadata – Optional metadata for the file

  • content_type – Optional MIME type of the file

  • aliases – Optional list of alternative names for the file

Returns:

A GridIn instance to write the file contents

delete_by_name(filename: str) None[source]

Delete all stored file documents and associated chunks with the given filename.

Parameters:

filename – The name of the file to delete

rename(file_id: ObjectId | str | int, new_filename: str) None[source]

Rename a stored file with the specified file_id to a new filename.

Parameters:
  • file_id – The _id of the file to rename (ObjectId, hex string, or integer ID)

  • new_filename – The new name for the file

rename_by_name(filename: str, new_filename: str) None[source]

Rename all stored files with the specified filename to a new filename.

Parameters:
  • filename – The current name of the files to rename

  • new_filename – The new name for the files

drop() None[source]

Remove all files and chunks from the bucket.

This method deletes all data in the GridFS bucket, including all files and their associated chunks.

list() list[str][source]

List all unique filenames in the GridFS bucket.

Returns:

A list of unique filenames

class neosqlite.GridFS(db: Connection, collection_name: str = 'fs')[source]

Bases: object

A legacy GridFS interface for storing and retrieving files in SQLite.

This class provides the legacy PyMongo-compatible GridFS interface, which is simpler to use than the GridFSBucket API but less flexible.

__init__(db: Connection, collection_name: str = 'fs')[source]

Initialize a new GridFS instance.

Parameters:
  • db – SQLite database connection

  • collection_name – The collection name for the GridFS files (default: “fs”)

put(data: bytes | IOBase, filename: str | None = None, **kwargs: Any) ObjectId[source]

Put data into GridFS.

Parameters:
  • data – The data to store (bytes or file-like object)

  • filename – The filename to use (optional)

  • **kwargs – Additional metadata fields

Returns:

The ObjectId of the stored file document

get(file_id: ObjectId | str | int) GridOut[source]

Get a file from GridFS by its _id.

Parameters:

file_id – The _id of the file to retrieve (ObjectId, hex string, or integer ID)

Returns:

A GridOut instance for reading the file

get_version(filename: str, version: int = -1) GridOut[source]

Get a file from GridFS by filename and version.

Parameters:
  • filename – The name of the file to retrieve

  • version – The version number (-1 for latest, 0 for first, etc.)

Returns:

A GridOut instance for reading the file

get_last_version(filename: str) GridOut[source]

Get the most recent version of a file from GridFS by filename.

Parameters:

filename – The name of the file to retrieve

Returns:

A GridOut instance for reading the file

delete(file_id: ObjectId | str | int) None[source]

Delete a file from GridFS by its _id.

Parameters:

file_id – The _id of the file to delete (ObjectId, hex string, or integer ID)

list() list[source]

List all filenames in GridFS.

Returns:

A list of filenames

find(filter: dict[str, Any] | None = None) GridOutCursor[source]

Find files in GridFS that match the filter.

Parameters:

filter – The filter to apply when searching for files

Returns:

A GridOutCursor instance

find_one(filter: dict[str, Any] | None = None) GridOut | None[source]

Find a single file in GridFS that matches the filter.

Parameters:

filter – The filter to apply when searching for files

Returns:

A GridOut instance for reading the file, or None if not found

exists(file_id: ObjectId | str | int | None = None, **kwargs: Any) bool[source]

Check if a file exists in GridFS.

Parameters:
  • file_id – The _id of the file to check (ObjectId, hex string, or integer ID)

  • **kwargs – Additional filter criteria (e.g., filename=”test.txt”)

Returns:

True if the file exists, False otherwise

new_file(**kwargs: Any) GridIn[source]

Create a new file in GridFS and return a GridIn instance to which data can be written.

Parameters:

**kwargs – Arguments to pass to the GridIn constructor (e.g., filename, metadata)

Returns:

A GridIn instance for writing the file contents