Source code for neosqlite.collection.query_helper.crud_operations
"""CRUD operations for QueryHelper."""
import logging
from typing import TYPE_CHECKING, Any
from ..._sqlite import sqlite3
from ...objectid import ObjectId
from ...sql_utils import quote_table_name
from ..json_helpers import neosqlite_json_dumps
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from .. import Collection
[docs]
class CRUDOperationsMixin:
"""Mixin providing CRUD operations for QueryHelper."""
collection: "Collection"
_get_integer_id_for_oid: Any
_validate_json_document: Any
_get_json_error_position: Any
[docs]
def _internal_insert(self, document: dict[str, Any]) -> Any:
"""
Inserts a document into the collection and returns the inserted document's _id.
This method inserts a document into the collection after converting any bytes
objects to Binary objects for proper JSON serialization and validating the
resulting JSON string. It handles both databases with JSON1 support and those
without by providing appropriate fallbacks.
Args:
document (dict): The document to insert. Must be a dictionary.
Returns:
int: The auto-increment id of the inserted document.
Raises:
MalformedDocument: If the document is not a dictionary
ValueError: If the document contains invalid JSON
sqlite3.Error: If database operations fail
"""
from copy import deepcopy
from ...exceptions import MalformedDocument
from ..json_helpers import neosqlite_json_dumps
from .utils import _convert_bytes_to_binary
if not isinstance(document, dict):
raise MalformedDocument(
f"document must be a dictionary, not a {type(document)}"
)
doc_to_insert = deepcopy(document)
original_has_id = "_id" in doc_to_insert
doc_to_insert.pop(
"_id", None
) # Remove _id from doc_to_insert to avoid duplication
# Convert any bytes objects to Binary objects for proper JSON serialization
doc_to_insert = _convert_bytes_to_binary(doc_to_insert)
# Serialize to JSON string
json_str = neosqlite_json_dumps(doc_to_insert)
# Validate JSON
if not self._validate_json_document(json_str):
# Try to get error position for better error reporting
error_pos = self._get_json_error_position(json_str)
if error_pos >= 0:
raise ValueError(
f"Invalid JSON document at position {error_pos}"
)
else:
raise ValueError("Invalid JSON document")
# Handle _id generation if not provided in the document
if not original_has_id:
# Generate a new ObjectId for the _id field
generated_id: ObjectId | Any = ObjectId()
else:
# If _id was provided in the original document, use that value in the _id column
provided_id = document["_id"]
if provided_id is None:
# If _id was explicitly set to None, generate a new ObjectId
generated_id = ObjectId()
elif isinstance(provided_id, str) and len(provided_id) == 24:
try:
generated_id = ObjectId(provided_id)
except ValueError as e:
# If it's not a valid ObjectId string, keep the original
logger.debug(
f"Provided _id '{provided_id}' is not a valid ObjectId: {e}"
)
generated_id = provided_id
elif isinstance(provided_id, ObjectId):
generated_id = provided_id
else:
# For other types, keep the original value
generated_id = provided_id
# Insert with the _id value in the dedicated column
cursor = self.collection.db.execute(
f"INSERT INTO {quote_table_name(self.collection.name)}(data, _id) VALUES (?, ?)",
(
json_str,
(
str(generated_id)
if hasattr(generated_id, "__str__")
else generated_id
),
),
)
inserted_id = cursor.lastrowid
if inserted_id is None:
raise sqlite3.Error("Failed to get last row id.")
# Only add the _id field to the original document if it wasn't originally provided
# This preserves the user-provided _id value if one was given
if not original_has_id:
document["_id"] = generated_id
return generated_id
[docs]
def _internal_replace(self, doc_id: Any, replacement: dict[str, Any]):
"""
Replaces an entire document in the collection.
Args:
doc_id (Any): The ID of the document to replace (can be ObjectId, int, etc.).
replacement (dict[str, Any]): The new document to replace the existing one.
"""
# Convert the doc_id to integer ID for internal operations
int_doc_id = self._get_integer_id_for_oid(doc_id)
self.collection.db.execute(
f"UPDATE {quote_table_name(self.collection.name)} SET data = ? WHERE id = ?",
(neosqlite_json_dumps(replacement), int_doc_id),
)
[docs]
def _internal_delete(self, doc_id: Any):
"""
Deletes a document from the collection based on the document ID.
Args:
doc_id (Any): The ID of the document to delete (can be ObjectId, int, etc.).
"""
# Convert the doc_id to integer ID for internal operations
int_doc_id = self._get_integer_id_for_oid(doc_id)
self.collection.db.execute(
f"DELETE FROM {quote_table_name(self.collection.name)} WHERE id = ?",
(int_doc_id,),
)