import datetime
import hashlib
import io
import logging
from typing import Any
from .._sqlite import sqlite3
# Import JSONB support utilities to reuse existing implementation
from ..collection.jsonb_support import supports_jsonb
from ..collection.schema_utils import column_exists
# Import the centralized ID normalization and lookup functions
from ..collection.type_correction import (
get_integer_id_for_table,
normalize_id_query_for_db,
)
from ..collection.type_utils import validate_session
# Import ObjectId for MongoDB-compatible ID support
from ..objectid import ObjectId
logger = logging.getLogger(__name__)
from ..sql_utils import quote_table_name
from .errors import FileExists, NoFile
from .grid_file import GridIn, GridOut, GridOutCursor
from .utils import (
deserialize_metadata,
force_sync_if_needed,
serialize_metadata,
)
[docs]
class GridFSBucket:
"""
A GridFSBucket-like class for storing large files in SQLite.
This implementation provides a PyMongo-compatible interface for GridFS
functionality using SQLite as the backend storage.
"""
[docs]
def __init__(
self,
db: sqlite3.Connection,
bucket_name: str = "fs",
chunk_size_bytes: int = 255 * 1024, # 255KB default chunk size
write_concern: dict[str, Any] | None = None,
read_preference: Any | None = None,
disable_md5: bool = False,
):
"""
Initialize a new GridFSBucket instance.
Args:
db: SQLite database connection
bucket_name: The bucket name for the GridFS files (default: "fs")
chunk_size_bytes: The chunk size in bytes (default: 255KB)
write_concern: Write concern settings (simulated for compatibility)
read_preference: Read preference settings (not applicable to SQLite)
disable_md5: Disable MD5 checksum calculation for performance
"""
self._db = db
self._bucket_name = bucket_name
self._chunk_size_bytes = chunk_size_bytes
self._files_collection = f"{bucket_name}_files"
self._chunks_collection = f"{bucket_name}_chunks"
# Process write concern settings
self._write_concern = write_concern or {}
self._read_preference = read_preference
self._disable_md5 = disable_md5
# Apply write concern settings to SQLite
self._apply_write_concern()
# Validate chunk size
if chunk_size_bytes <= 0:
raise ValueError("chunk_size_bytes must be a positive integer")
# Validate write concern settings (basic validation for compatibility)
if write_concern:
# Basic validation - in a real implementation, you might want to do more
if "w" in write_concern and not isinstance(
write_concern["w"], (int, str)
):
raise ValueError(
"write_concern 'w' must be an integer or string"
)
if "wtimeout" in write_concern and not isinstance(
write_concern["wtimeout"], int
):
raise ValueError("write_concern 'wtimeout' must be an integer")
if "j" in write_concern and not isinstance(
write_concern["j"], bool
):
raise ValueError("write_concern 'j' must be a boolean")
# Check for and migrate old GridFS table names (dot-based to underscore-based)
self._migrate_legacy_tables_if_needed()
# Create the necessary tables if they don't exist
self._create_collections()
[docs]
def _migrate_legacy_tables_if_needed(self):
"""Migrate legacy GridFS tables from dot-based names to underscore-based names."""
old_files = f"{self._bucket_name}.files"
old_chunks = f"{self._bucket_name}.chunks"
new_files = f"{self._files_collection}"
new_chunks = f"{self._chunks_collection}"
cursor = self._db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(old_files,),
)
if cursor.fetchone():
try:
self._db.execute(
f'ALTER TABLE "{old_files}" RENAME TO {new_files}'
)
except Exception as e:
logger.warning(
f"Failed to rename legacy files table '{old_files}' to '{new_files}': {e}. "
f"Using legacy table names."
)
self._files_collection = old_files
self._chunks_collection = old_chunks
return
cursor = self._db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(old_chunks,),
)
if cursor.fetchone():
try:
self._db.execute(
f'ALTER TABLE "{old_chunks}" RENAME TO {new_chunks}'
)
except Exception as e:
logger.warning(
f"Failed to rename legacy chunks table '{old_chunks}' to '{new_chunks}': {e}. "
f"GridFS bucket may be in inconsistent state."
)
[docs]
def _apply_write_concern(self):
"""Apply write concern settings to SQLite connection."""
# Handle journal concern (j=True)
if self._write_concern.get("j"):
# Set synchronous to FULL for maximum durability
self._db.execute("PRAGMA synchronous = FULL")
else:
# Default behavior (NORMAL is a good balance)
self._db.execute("PRAGMA synchronous = NORMAL")
# Handle write acknowledgment level
w_level = self._write_concern.get("w", 1)
if w_level == 0:
# No acknowledgment - set to OFF for maximum performance
self._db.execute("PRAGMA synchronous = OFF")
[docs]
def _create_collections(self):
"""Create the files and chunks collections (tables) if they don't exist."""
jsonb_supported = supports_jsonb(self._db)
files_coll = quote_table_name(self._files_collection)
chunks_coll = quote_table_name(self._chunks_collection)
if jsonb_supported:
self._db.execute(f"""
CREATE TABLE IF NOT EXISTS {files_coll} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_id JSONB,
filename TEXT,
length INTEGER,
chunkSize INTEGER,
uploadDate TEXT,
md5 TEXT,
metadata JSONB,
content_type TEXT,
aliases JSONB
)
""")
else:
self._db.execute(f"""
CREATE TABLE IF NOT EXISTS {files_coll} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
_id TEXT,
filename TEXT,
length INTEGER,
chunkSize INTEGER,
uploadDate TEXT,
md5 TEXT,
metadata TEXT,
content_type TEXT,
aliases TEXT
)
""")
self._db.execute(f"""
CREATE TABLE IF NOT EXISTS {chunks_coll} (
_id INTEGER PRIMARY KEY AUTOINCREMENT,
files_id INTEGER,
n INTEGER,
data BLOB,
FOREIGN KEY (files_id) REFERENCES {files_coll} (id)
)
""")
self._db.execute(f"""
CREATE INDEX IF NOT EXISTS {quote_table_name(f"idx_{self._files_collection}_filename")}
ON {files_coll} (filename)
""")
self._db.execute(f"""
CREATE INDEX IF NOT EXISTS {quote_table_name(f"idx_{self._chunks_collection}_files_id")}
ON {chunks_coll} (files_id)
""")
# Migrate existing tables to add new columns
self._migrate_table_schema()
# Create unique index on _id column for faster lookups
files_coll = quote_table_name(self._files_collection)
try:
self._db.execute(
f"CREATE UNIQUE INDEX IF NOT EXISTS {quote_table_name(f'idx_{self._files_collection}_id')} ON {files_coll}(_id)"
)
except Exception as e:
logger.debug(f"{e=}")
pass
[docs]
def _migrate_table_schema(self):
"""Migrate existing tables to add new columns for content_type and aliases."""
files_coll = quote_table_name(self._files_collection)
try:
if not column_exists(
self._db, self._files_collection, "content_type"
):
self._db.execute(
f"ALTER TABLE {files_coll} ADD COLUMN content_type TEXT"
)
if not column_exists(self._db, self._files_collection, "aliases"):
jsonb_supported = supports_jsonb(self._db)
column_type = "JSONB" if jsonb_supported else "TEXT"
self._db.execute(
f"ALTER TABLE {files_coll} ADD COLUMN aliases {column_type}"
)
except Exception as e:
logger.debug(f"{e=}")
pass
[docs]
def _force_sync_if_needed(self):
"""Force database synchronization if write concern requires it."""
force_sync_if_needed(self._db, self._write_concern)
[docs]
def upload_from_stream(
self,
filename: str,
source: bytes | io.IOBase,
chunk_size_bytes: int | None = None,
metadata: dict[str, Any] | None = None,
session: Any | None = None,
) -> ObjectId:
"""
Uploads a user file to a GridFS bucket.
Reads the contents of the user file from source and uploads it
as chunks in the chunks collection. After all the chunks have
been uploaded, it creates a file document in the files collection.
Args:
filename: The name of the file to upload
source: The source data (bytes or file-like object)
chunk_size_bytes: Bytes per chunk (defaults to bucket's chunk_size_bytes)
metadata: Optional metadata for the file
session: A ClientSession for transactions.
Returns:
The ObjectId of the uploaded file document
"""
if session:
validate_session(session, self._db)
# Get the data from the source
if isinstance(source, bytes):
data = source
elif hasattr(source, "read"):
data = source.read()
else:
raise TypeError("source must be bytes or a file-like object")
# Calculate MD5 hash of the data (unless disabled)
md5_hash = None
if not self._disable_md5:
md5_hash = hashlib.md5(data).hexdigest()
# Generate ObjectId for the file
file_oid = ObjectId()
# Insert file metadata first
upload_date = datetime.datetime.now(datetime.timezone.utc).isoformat()
cursor = self._db.execute(
f"""
INSERT INTO {self._files_collection}
(id, _id, filename, length, chunkSize, uploadDate, md5, metadata)
VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(file_oid), # Store ObjectId as hex string
filename,
len(data),
self._chunk_size_bytes,
upload_date,
md5_hash,
serialize_metadata(metadata),
),
)
file_id = cursor.lastrowid
if file_id is None:
raise RuntimeError("Failed to get file ID")
# Split data into chunks and insert them
self._insert_chunks(file_id, data)
# Force sync if write concern requires it
self._force_sync_if_needed()
return file_oid
[docs]
def _insert_chunks(self, file_id: int, data: bytes):
"""
Split data into chunks and insert them into the chunks collection.
Args:
file_id: The ID of the file document
data: The data to be chunked
"""
# Split data into chunks
for i in range(0, len(data), self._chunk_size_bytes):
chunk_data = data[i : i + self._chunk_size_bytes]
self._db.execute(
f"""
INSERT INTO {self._chunks_collection}
(files_id, n, data)
VALUES (?, ?, ?)
""",
(file_id, i // self._chunk_size_bytes, chunk_data),
)
[docs]
def download_to_stream(
self, file_id: ObjectId | str | int, destination: io.IOBase
) -> None:
"""
Downloads the contents of the stored file specified by file_id
and writes the contents to destination.
Args:
file_id: The _id of the file document (ObjectId, hex string, or integer ID)
destination: A file-like object to which the file contents will be written
"""
# Convert file_id to appropriate format for lookup
if (file_int_id := self._get_integer_id_for_file(file_id)) is None:
raise NoFile(f"File with id {file_id} not found")
# Get file metadata using integer ID
row = self._db.execute(
f"""
SELECT length, chunkSize FROM {self._files_collection}
WHERE id = ?
""",
(file_int_id,),
).fetchone()
if row is None:
raise NoFile(f"File with id {file_id} not found")
# Get all chunks in order
cursor = self._db.execute(
f"""
SELECT data FROM {self._chunks_collection}
WHERE files_id = ?
ORDER BY n ASC
""",
(file_int_id,),
)
# Write chunks to destination
for chunk_row in cursor:
destination.write(chunk_row[0])
[docs]
def _get_integer_id_for_file(
self, file_id: ObjectId | str | int
) -> int | None:
"""
Convert a file identifier (ObjectId, hex string, or integer) to an integer ID.
Args:
file_id: The file identifier (ObjectId, hex string, or integer ID)
Returns:
The integer ID corresponding to the file, or None if not found
"""
try:
return get_integer_id_for_table(
self._db, self._files_collection, file_id
)
except ValueError:
# Handle string ID types that the centralized function doesn't handle
if isinstance(file_id, str):
# Check if it's a valid ObjectId hex string
if len(file_id) == 24:
try:
ObjectId(file_id) # Validate the hex string
cursor = self._db.execute(
f"SELECT id FROM {self._files_collection} WHERE _id = ?",
(file_id,),
)
return row[0] if (row := cursor.fetchone()) else None
except ValueError as e:
logger.debug(
f"Invalid ObjectId hex string '{file_id}': {e}"
)
pass
# Try as integer string
try:
int_file_id = int(file_id)
cursor = self._db.execute(
f"SELECT id FROM {self._files_collection} WHERE id = ?",
(int_file_id,),
)
return row[0] if (row := cursor.fetchone()) else None
except ValueError as e:
logger.debug(
f"Invalid integer file ID string '{file_id}': {e}"
)
pass
return None
[docs]
def download_to_stream_by_name(
self, filename: str, destination: io.IOBase, revision: int = -1
) -> None:
"""
Downloads the contents of the stored file specified by filename
and writes the contents to destination.
Args:
filename: The name of the file to download
destination: A file-like object to which the file contents will be written
revision: The revision of the file to download (default: -1 for latest)
"""
file_id = self._get_file_id_by_name(filename, revision)
self.download_to_stream(file_id, destination)
[docs]
def _get_file_id_by_name(self, filename: str, revision: int = -1) -> int:
"""
Get the file ID for a given filename and revision.
Args:
filename: The name of the file
revision: The revision number (-1 for latest, 0 for first, etc.)
Returns:
The integer _id of the file document
"""
if revision == -1:
# Get the latest revision
row = self._db.execute(
f"""
SELECT id FROM {self._files_collection}
WHERE filename = ?
ORDER BY uploadDate DESC
LIMIT 1
""",
(filename,),
).fetchone()
else:
# Get specific revision (0-indexed)
row = self._db.execute(
f"""
SELECT id FROM {self._files_collection}
WHERE filename = ?
ORDER BY uploadDate ASC
LIMIT 1 OFFSET ?
""",
(filename, revision),
).fetchone()
if row is None:
raise NoFile(f"File with name {filename} not found")
return row[0]
[docs]
def open_download_stream(self, file_id: ObjectId | str | int) -> GridOut:
"""
Opens a stream to read the contents of the stored file specified by file_id.
Args:
file_id: The _id of the file document (ObjectId, hex string, or integer ID)
Returns:
A GridOut instance to read the file contents
"""
# Convert to integer ID for internal use
if (file_int_id := self._get_integer_id_for_file(file_id)) is None:
raise NoFile(f"File with id {file_id} not found")
return GridOut(self._db, self._bucket_name, file_int_id)
[docs]
def open_download_stream_by_name(
self, filename: str, revision: int = -1
) -> GridOut:
"""
Opens a stream to read the contents of the stored file specified by filename.
Args:
filename: The name of the file to read
revision: The revision of the file to read (default: -1 for latest)
Returns:
A GridOut instance to read the file contents
"""
file_id = self._get_file_id_by_name(filename, revision)
return GridOut(self._db, self._bucket_name, file_id)
[docs]
def delete(self, file_id: ObjectId | str | int) -> None:
"""
Given a file_id, delete the stored file's files collection document
and associated chunks from a GridFS bucket.
Args:
file_id: The _id of the file document (ObjectId, hex string, or integer ID)
"""
# Convert to integer ID for internal use
if (file_int_id := self._get_integer_id_for_file(file_id)) is None:
raise NoFile(f"File with id {file_id} not found")
# Delete chunks first
self._db.execute(
f"""
DELETE FROM {self._chunks_collection}
WHERE files_id = ?
""",
(file_int_id,),
)
# Delete file document
cursor = self._db.execute(
f"""
DELETE FROM {self._files_collection}
WHERE id = ?
""",
(file_int_id,),
)
if cursor.rowcount == 0:
raise NoFile(f"File with id {file_id} not found")
[docs]
def find(
self, filter: dict[str, Any] | None = None, session: Any | None = None
) -> GridOutCursor:
"""
Find and return the files collection documents that match filter.
Args:
filter: The filter to apply when searching for files
session: A ClientSession for transactions.
Returns:
A GridOutCursor instance
"""
validate_session(session, self._db)
# Apply ID type normalization to handle cases where users query 'id' with ObjectId
# or other common type mismatches, using the centralized function
if filter is not None:
filter = normalize_id_query_for_db(filter)
return GridOutCursor(self._db, self._bucket_name, filter or {})
[docs]
def open_upload_stream(
self,
filename: str,
chunk_size_bytes: int | None = None,
metadata: dict[str, Any] | None = None,
session: Any | None = None,
) -> GridIn:
"""
Opens a stream for writing a file to a GridFS bucket.
Args:
filename: The name of the file to upload
chunk_size_bytes: Bytes per chunk (defaults to bucket's chunk_size_bytes)
metadata: Optional metadata for the file
session: A ClientSession for transactions.
Returns:
A GridIn instance to write the file contents
"""
validate_session(session, self._db)
return GridIn(
self._db,
self._bucket_name,
chunk_size_bytes or self._chunk_size_bytes,
filename,
metadata,
disable_md5=self._disable_md5,
write_concern=self._write_concern,
)
[docs]
def upload_from_stream_with_id(
self,
file_id: ObjectId | int,
filename: str,
source: bytes | io.IOBase,
chunk_size_bytes: int | None = None,
metadata: dict[str, Any] | None = None,
session: Any | None = None,
):
"""
Uploads a user file to a GridFS bucket with a custom file id.
Args:
file_id: The custom _id for the file document (ObjectId or integer ID)
filename: The name of the file to upload
source: The source data (bytes or file-like object)
chunk_size_bytes: Bytes per chunk (defaults to bucket's chunk_size_bytes)
metadata: Optional metadata for the file
session: A ClientSession for transactions.
"""
validate_session(session, self._db)
# Convert file_id to appropriate format for storage
if isinstance(file_id, ObjectId):
id_for_lookup = str(
file_id
) # Look up by the string representation of ObjectId
else:
id_for_lookup = str(
file_id
) # Look up by string representation of integer
# Check if file with this ID already exists
row = self._db.execute(
f"""
SELECT id FROM {self._files_collection}
WHERE _id = ?
""",
(id_for_lookup,),
).fetchone()
if row is not None:
raise FileExists(f"File with id {file_id} already exists")
# Get the data from the source
if isinstance(source, bytes):
data = source
elif hasattr(source, "read"):
data = source.read()
else:
raise TypeError("source must be bytes or a file-like object")
# Calculate MD5 hash of the data (unless disabled)
md5_hash = None
if not self._disable_md5:
md5_hash = hashlib.md5(data).hexdigest()
# Insert file metadata
upload_date = datetime.datetime.now(datetime.timezone.utc).isoformat()
# When providing a custom ID, we need to handle it properly
if isinstance(file_id, ObjectId):
# Store the ObjectId in the _id column, let SQLite auto-generate the integer id
self._db.execute(
f"""
INSERT INTO {self._files_collection}
(id, _id, filename, length, chunkSize, uploadDate, md5, metadata)
VALUES (NULL, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(file_id),
filename,
len(data),
chunk_size_bytes or self._chunk_size_bytes,
upload_date,
md5_hash,
serialize_metadata(metadata),
),
)
else:
# When using integer ID as custom ID, store it in both places but in appropriate formats
# The integer in the 'id' column as the primary key, and string representation in '_id' column
self._db.execute(
f"""
INSERT INTO {self._files_collection}
(id, _id, filename, length, chunkSize, uploadDate, md5, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
file_id, # Use integer ID for the auto-increment column (for compatibility)
str(
file_id
), # Store as string in _id column for general searchability
filename,
len(data),
chunk_size_bytes or self._chunk_size_bytes,
upload_date,
md5_hash,
serialize_metadata(metadata),
),
)
# Get the integer ID that was used/created
if isinstance(file_id, ObjectId):
# If it was an ObjectId, get the auto-generated integer ID
file_int_id = self._db.execute(
f"SELECT id FROM {self._files_collection} WHERE _id = ?",
(str(file_id),),
).fetchone()[0]
else:
# If it was an integer ID, that's the integer ID
file_int_id = file_id
# Split data into chunks and insert them
self._insert_chunks(file_int_id, data)
# Force sync if write concern requires it
self._force_sync_if_needed()
[docs]
def open_upload_stream_with_id(
self,
file_id: ObjectId | int,
filename: str,
metadata: dict[str, Any] | None = None,
content_type: str | None = None,
aliases: list[str] | None = None,
) -> GridIn:
"""
Opens a stream for writing a file to a GridFS bucket with a custom file id.
Args:
file_id: The custom _id for the file document (ObjectId or integer ID)
filename: The name of the file to upload
metadata: Optional metadata for the file
content_type: Optional MIME type of the file
aliases: Optional list of alternative names for the file
Returns:
A GridIn instance to write the file contents
"""
# Check if file with this ID already exists
if isinstance(file_id, ObjectId):
id_for_lookup = str(file_id)
else:
id_for_lookup = str(
file_id
) # Use string representation for consistency
row = self._db.execute(
f"""
SELECT id FROM {self._files_collection}
WHERE _id = ?
""",
(id_for_lookup,),
).fetchone()
if row is not None:
raise FileExists(f"File with id {file_id} already exists")
return GridIn(
self._db,
self._bucket_name,
self._chunk_size_bytes,
filename,
metadata,
file_id, # Can be ObjectId or int
disable_md5=self._disable_md5,
write_concern=self._write_concern,
content_type=content_type,
aliases=aliases,
)
[docs]
def delete_by_name(self, filename: str) -> None:
"""
Delete all stored file documents and associated chunks with the given filename.
Args:
filename: The name of the file to delete
"""
# Get all file IDs with this filename
cursor = self._db.execute(
f"""
SELECT _id FROM {self._files_collection}
WHERE filename = ?
""",
(filename,),
)
file_ids = [row[0] for row in cursor.fetchall()]
if not file_ids:
raise NoFile(f"File with name {filename} not found")
# Delete all chunks for these files
placeholders = ",".join("?" * len(file_ids))
self._db.execute(
f"""
DELETE FROM {self._chunks_collection}
WHERE files_id IN ({placeholders})
""",
file_ids,
)
# Delete all file documents
self._db.execute(
f"""
DELETE FROM {self._files_collection}
WHERE filename = ?
""",
(filename,),
)
[docs]
def rename(self, file_id: ObjectId | str | int, new_filename: str) -> None:
"""
Rename a stored file with the specified file_id to a new filename.
Args:
file_id: The _id of the file to rename (ObjectId, hex string, or integer ID)
new_filename: The new name for the file
"""
if (file_int_id := self._get_integer_id_for_file(file_id)) is None:
raise NoFile(f"File with id {file_id} not found")
cursor = self._db.execute(
f"""
UPDATE {self._files_collection}
SET filename = ?
WHERE id = ?
""",
(new_filename, file_int_id),
)
if cursor.rowcount == 0:
raise NoFile(f"File with id {file_id} not found")
[docs]
def rename_by_name(self, filename: str, new_filename: str) -> None:
"""
Rename all stored files with the specified filename to a new filename.
Args:
filename: The current name of the files to rename
new_filename: The new name for the files
"""
cursor = self._db.execute(
f"""
UPDATE {self._files_collection}
SET filename = ?
WHERE filename = ?
""",
(new_filename, filename),
)
if cursor.rowcount == 0:
raise NoFile(f"File with name {filename} not found")
[docs]
def drop(self) -> None:
"""
Remove all files and chunks from the bucket.
This method deletes all data in the GridFS bucket, including
all files and their associated chunks.
"""
# Delete all chunks first (foreign key constraint)
self._db.execute(f"DELETE FROM {self._chunks_collection}")
# Delete all files
self._db.execute(f"DELETE FROM {self._files_collection}")
[docs]
def list(self) -> list[str]:
"""
List all unique filenames in the GridFS bucket.
Returns:
A list of unique filenames
"""
cursor = self._db.execute(
f"SELECT DISTINCT filename FROM {self._files_collection} ORDER BY filename"
)
return [row[0] for row in cursor.fetchall()]