Source code for neosqlite.gridfs.gridfs_legacy

import io
import logging
from typing import Any

from .._sqlite import sqlite3

# Import ObjectId for GridFS
from ..objectid import ObjectId
from .errors import NoFile
from .grid_file import GridIn, GridOut, GridOutCursor
from .gridfs_bucket import GridFSBucket

logger = logging.getLogger(__name__)


[docs] class GridFS: """ A legacy GridFS interface for storing and retrieving files in SQLite. This class provides the legacy PyMongo-compatible GridFS interface, which is simpler to use than the GridFSBucket API but less flexible. """
[docs] def __init__( self, db: sqlite3.Connection, collection_name: str = "fs", ): """ Initialize a new GridFS instance. Args: db: SQLite database connection collection_name: The collection name for the GridFS files (default: "fs") """ self._db = db self._bucket = GridFSBucket(db, bucket_name=collection_name)
[docs] def put( self, data: bytes | io.IOBase, filename: str | None = None, **kwargs: Any, ) -> ObjectId: """ Put data into GridFS. Args: data: The data to store (bytes or file-like object) filename: The filename to use (optional) **kwargs: Additional metadata fields Returns: The ObjectId of the stored file document """ # Extract metadata from kwargs metadata = {} for key, value in kwargs.items(): # In legacy GridFS, all extra parameters become metadata except encoding if key != "encoding": metadata[key] = value # If filename is provided in kwargs, use it if filename is None and "filename" in kwargs: filename = kwargs["filename"] # Default filename if none provided if filename is None: filename = "file" # Upload the data - this will return an ObjectId return self._bucket.upload_from_stream( filename, data, metadata=metadata or None )
[docs] def get(self, file_id: ObjectId | str | int) -> GridOut: """ Get a file from GridFS by its _id. Args: file_id: The _id of the file to retrieve (ObjectId, hex string, or integer ID) Returns: A GridOut instance for reading the file """ return self._bucket.open_download_stream(file_id)
[docs] def get_version(self, filename: str, version: int = -1) -> GridOut: """ Get a file from GridFS by filename and version. Args: filename: The name of the file to retrieve version: The version number (-1 for latest, 0 for first, etc.) Returns: A GridOut instance for reading the file """ return self._bucket.open_download_stream_by_name(filename, version)
[docs] def get_last_version(self, filename: str) -> GridOut: """ Get the most recent version of a file from GridFS by filename. Args: filename: The name of the file to retrieve Returns: A GridOut instance for reading the file """ return self._bucket.open_download_stream_by_name(filename, -1)
[docs] def delete(self, file_id: ObjectId | str | int) -> None: """ Delete a file from GridFS by its _id. Args: file_id: The _id of the file to delete (ObjectId, hex string, or integer ID) """ try: self._bucket.delete(file_id) except NoFile as e: # Legacy GridFS API is more lenient - doesn't raise exception for nonexistent files logger.debug( f"Legacy GridFS delete: file '{file_id}' not found: {e}" ) pass
[docs] def list(self) -> list: """ List all filenames in GridFS. Returns: A list of filenames """ cursor = self._bucket.find({}) filenames = [] for grid_out in cursor: if grid_out.filename not in filenames: filenames.append(grid_out.filename) return filenames
[docs] def find(self, filter: dict[str, Any] | None = None) -> GridOutCursor: """ Find files in GridFS that match the filter. Args: filter: The filter to apply when searching for files Returns: A GridOutCursor instance """ return self._bucket.find(filter or {})
[docs] def find_one(self, filter: dict[str, Any] | None = None) -> GridOut | None: """ Find a single file in GridFS that matches the filter. Args: filter: The filter to apply when searching for files Returns: A GridOut instance for reading the file, or None if not found """ cursor = self._bucket.find(filter or {}) try: return next(iter(cursor)) except StopIteration: return None
[docs] def exists( self, file_id: ObjectId | str | int | None = None, **kwargs: Any ) -> bool: """ Check if a file exists in GridFS. Args: file_id: The _id of the file to check (ObjectId, hex string, or integer ID) **kwargs: Additional filter criteria (e.g., filename="test.txt") Returns: True if the file exists, False otherwise """ if file_id is not None: try: self._bucket.open_download_stream(file_id) return True except NoFile: return False elif kwargs: # Check by other criteria cursor = self._bucket.find(kwargs) try: next(iter(cursor)) return True except StopIteration: return False else: # No criteria provided return False
[docs] def new_file(self, **kwargs: Any) -> GridIn: """ Create a new file in GridFS and return a GridIn instance to which data can be written. Args: **kwargs: Arguments to pass to the GridIn constructor (e.g., filename, metadata) Returns: A GridIn instance for writing the file contents """ # Extract parameters from kwargs filename = kwargs.pop("filename", "file") metadata = kwargs.pop("metadata", None) or kwargs or None # Handle the case where _id is specified file_id = kwargs.pop("_id", None) if file_id is not None: return self._bucket.open_upload_stream_with_id( file_id, filename, metadata=metadata ) else: return self._bucket.open_upload_stream(filename, metadata=metadata)
# Backward compatibility alias GridFS.__name__ = "GridFS"