import logging
from typing import TYPE_CHECKING, Any, Literal, overload
from .._sqlite import sqlite3
from ..sql_utils import quote_identifier, quote_table_name
from .json_path_utils import parse_json_path
from .jsonb_support import (
_get_json_function_prefix,
_get_json_tree_function,
supports_jsonb,
supports_jsonb_each,
)
if TYPE_CHECKING:
from ..index_model import IndexModel
logger = logging.getLogger(__name__)
[docs]
class IndexManager:
"""
Manages indexes for a NeoSQLite collection.
This class provides functionality to create, list, drop, and manage various types of indexes
including single-field, compound, unique, sparse, FTS (Full Text Search), and datetime indexes.
It handles both regular B-tree indexes using JSON extraction and specialized FTS5 virtual tables
for text search capabilities.
"""
[docs]
def __init__(self, collection):
"""
Initialize the IndexManager with a collection instance.
Args:
collection: The NeoSQLite collection instance to manage indexes for
"""
self.collection = collection
# Check if JSONB is supported for this connection
self._jsonb_supported = supports_jsonb(collection.db)
[docs]
def create_index(
self,
key: str | list[str] | list[tuple[str, int]],
reindex: bool = True,
sparse: bool = False,
unique: bool = False,
fts: bool = False,
tokenizer: str | None = None,
datetime_field: bool = False,
):
"""
Create an index on the specified key(s) for this collection.
Handles both single-key and compound indexes by using SQLite's json_extract
function to create indexes on the JSON-stored data. For compound indexes,
multiple json_extract calls are used for each key in the list.
Args:
key: A string or list of strings representing the field(s) to index.
reindex: Boolean indicating whether to reindex (not used in this implementation).
sparse: Boolean indicating whether the index should be sparse (only include documents with the field).
unique: Boolean indicating whether the index should be unique.
fts: Boolean indicating whether to create an FTS index for text search.
tokenizer: Optional tokenizer to use for FTS index (e.g., 'icu', 'icu_th').
datetime_field: Boolean indicating whether this is a datetime field that requires special indexing.
"""
# For datetime fields, use special indexing.
if datetime_field:
if isinstance(key, str):
self._create_datetime_index(key, unique=unique)
else:
raise ValueError("Compound datetime indexes are not supported")
elif (
isinstance(key, list)
and len(key) == 1
and isinstance(key[0], tuple)
):
# Handle MongoDB tuple format: [("field", "type")]
field, index_type = key[0]
if index_type == "text":
self._create_fts_index(field, tokenizer)
else:
# Create index name (replace dots with underscores for valid identifiers)
index_name = field.replace(".", "_")
# Determine which function to use based on JSONB support
func_prefix = _get_json_function_prefix(self._jsonb_supported)
# Create the index using appropriate JSON/JSONB function
self.collection.db.execute(
(
f"CREATE {'UNIQUE ' if unique else ''}INDEX "
f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} "
f"ON {quote_table_name(self.collection.name)}({func_prefix}_extract(data, '{parse_json_path(field)}'))"
)
)
elif isinstance(key, str):
if fts:
# Create FTS index with optional tokenizer
self._create_fts_index(key, tokenizer)
else:
# Create index name (replace dots with underscores for valid identifiers)
index_name = key.replace(".", "_")
# Determine which function to use based on JSONB support
func_prefix = _get_json_function_prefix(self._jsonb_supported)
# Create the index using appropriate JSON/JSONB function
self.collection.db.execute(
(
f"CREATE {'UNIQUE ' if unique else ''}INDEX "
f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} "
f"ON {quote_table_name(self.collection.name)}({func_prefix}_extract(data, '{parse_json_path(key)}'))"
)
)
else:
# Compound indexes: must use PyMongo tuple format
# [("field1", 1), ("field2", -1)]
fields: list[str]
if (
isinstance(key, list)
and len(key) > 0
and isinstance(key[0], tuple)
):
fields = [k[0] for k in key]
elif (
isinstance(key, list)
and len(key) == 1
and isinstance(key[0], str)
):
# Single-element list like ["field"] - treat as single-field index
fields = key # type: ignore[assignment]
else:
raise ValueError(
"Compound indexes must use PyMongo tuple format: "
f'[("field1", 1), ("field2", -1)]. Got: {key}'
)
index_name = "_".join(fields).replace(".", "_")
# Determine which function to use based on JSONB support
func_prefix = _get_json_function_prefix(self._jsonb_supported)
# Create the compound index using multiple JSON/JSONB extract calls
index_columns = ", ".join(
f"{func_prefix}_extract(data, '{parse_json_path(f)}')"
for f in fields
)
self.collection.db.execute(
(
f"CREATE {'UNIQUE ' if unique else ''}INDEX "
f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} "
f"ON {quote_table_name(self.collection.name)}({index_columns})"
)
)
[docs]
def _create_fts_index(self, field: str, tokenizer: str | None = None):
"""
Creates an FTS5 index on the specified field for text search.
For FTS indexes, we create both JSON and JSONB versions to ensure
compatibility with both types of queries.
For nested array fields (e.g., "comments.text"), we use json_tree() to
properly index all array elements, not just the first one.
Args:
field (str): The field to create the FTS index on.
tokenizer (str, optional): Optional tokenizer to use for the FTS index.
"""
# Create index name (replace dots with underscores for valid identifiers)
index_name = field.replace(".", "_")
fts_table_name = quote_identifier(
f"{self.collection.name}_{index_name}_fts"
)
# Create FTS table with optional tokenizer
# Note: We don't use the 'content' option because we manage the FTS data manually
# to properly support array fields with json_tree()
tokenizer_clause = f"TOKENIZE={tokenizer}" if tokenizer else ""
self.collection.db.execute(f"""
CREATE VIRTUAL TABLE IF NOT EXISTS {fts_table_name}
USING FTS5({index_name}, {tokenizer_clause})
""")
# For nested fields with arrays, use json_tree() to index all matching values
# Convert field path to json_tree fullkey pattern (e.g., "comments.text" -> "$.comments[*].text")
field_parts = field.split(".")
if len(field_parts) == 1:
# Simple field - use direct json_extract
json_path = parse_json_path(field)
self.collection.db.execute(f"""
INSERT INTO {fts_table_name}(rowid, {index_name})
SELECT id, lower(json_extract(data, '{json_path}'))
FROM {quote_table_name(self.collection.name)}
WHERE json_extract(data, '{json_path}') IS NOT NULL
""")
else:
# Nested field - use json_tree/jsonb_tree to find all matching values including array elements
# Concatenate all text values with spaces for FTS indexing
jsonb_each_supported = supports_jsonb_each(self.collection.db)
json_tree_func = _get_json_tree_function(
self._jsonb_supported, jsonb_each_supported
)
last_key = field_parts[-1]
self.collection.db.execute(
f"""
INSERT INTO {fts_table_name}(rowid, {index_name})
SELECT
r.id,
group_concat(lower(t.value), ' ')
FROM {quote_table_name(self.collection.name)} r
JOIN {json_tree_func}(r.data) t
WHERE t.key = ? AND t.type = 'text'
GROUP BY r.id
""",
[last_key],
)
# Create triggers to keep FTS table in sync with main table
# For array fields, we need to delete all entries for the rowid and re-insert all values
field_parts = field.split(".")
# Insert trigger - delete any existing entries and insert all matching values
if len(field_parts) == 1:
# Simple field
json_path = parse_json_path(field)
self.collection.db.execute(f"""
CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert
AFTER INSERT ON {quote_table_name(self.collection.name)}
BEGIN
DELETE FROM {fts_table_name} WHERE rowid = new.id;
INSERT INTO {fts_table_name}(rowid, {index_name})
VALUES (new.id, lower(json_extract(new.data, '{json_path}')));
END
""")
else:
# Nested field - use json_tree/jsonb_tree with GROUP BY
jsonb_each_supported = supports_jsonb_each(self.collection.db)
json_tree_func = _get_json_tree_function(
self._jsonb_supported, jsonb_each_supported
)
self.collection.db.execute(f"""
CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert
AFTER INSERT ON {quote_table_name(self.collection.name)}
BEGIN
DELETE FROM {fts_table_name} WHERE rowid = new.id;
INSERT INTO {fts_table_name}(rowid, {index_name})
SELECT
new.id,
group_concat(lower(t.value), ' ')
FROM {json_tree_func}(new.data) t
WHERE t.key = '{last_key}' AND t.type = 'text'
GROUP BY new.id;
END
""")
# Update trigger - delete old entries and insert new ones
if len(field_parts) == 1:
# Simple field
json_path = parse_json_path(field)
self.collection.db.execute(f"""
CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update
AFTER UPDATE ON {quote_table_name(self.collection.name)}
BEGIN
DELETE FROM {fts_table_name} WHERE rowid = old.id;
INSERT INTO {fts_table_name}(rowid, {index_name})
VALUES (new.id, lower(json_extract(new.data, '{json_path}')));
END
""")
else:
# Nested field - use json_tree/jsonb_tree with GROUP BY
jsonb_each_supported = supports_jsonb_each(self.collection.db)
json_tree_func = _get_json_tree_function(
self._jsonb_supported, jsonb_each_supported
)
self.collection.db.execute(f"""
CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update
AFTER UPDATE ON {quote_table_name(self.collection.name)}
BEGIN
DELETE FROM {fts_table_name} WHERE rowid = old.id;
INSERT INTO {fts_table_name}(rowid, {index_name})
SELECT
new.id,
group_concat(lower(t.value), ' ')
FROM {json_tree_func}(new.data) t
WHERE t.key = '{last_key}' AND t.type = 'text'
GROUP BY new.id;
END
""")
# Delete trigger - remove all entries for the deleted row
self.collection.db.execute(f"""
CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_delete
AFTER DELETE ON {quote_table_name(self.collection.name)}
BEGIN
DELETE FROM {fts_table_name} WHERE rowid = old.id;
END
""")
[docs]
def create_indexes(
self,
indexes: list["IndexModel"],
) -> list[str]:
"""
Create multiple indexes at once.
This method accepts a list of IndexModel objects, matching the
PyMongo API.
Args:
indexes: A list of IndexModel objects.
Returns:
list[str]: A list of the names of the indexes that were created.
"""
created_indexes = []
for index_model in indexes:
doc = index_model.document
key = doc.get("key")
if key is None:
continue
unique: bool = bool(doc.get("unique", False))
sparse: bool = bool(doc.get("sparse", False))
fts: bool = bool(doc.get("fts", False))
tokenizer: str | None = doc.get("tokenizer")
# Convert key dict to the format expected by create_index
match key:
case dict():
if len(key) == 1:
field = list(key.keys())[0]
self.create_index(
field,
unique=unique,
sparse=sparse,
fts=fts,
tokenizer=tokenizer,
)
index_name = field.replace(".", "_")
else:
tuple_key = [(f, d) for f, d in key.items()]
self.create_index(
tuple_key,
unique=unique,
sparse=sparse,
fts=fts,
tokenizer=tokenizer,
)
index_name = "_".join(key.keys()).replace(".", "_")
case str():
self.create_index(
key,
unique=unique,
sparse=sparse,
fts=fts,
tokenizer=tokenizer,
)
index_name = key.replace(".", "_")
case [str()]:
self.create_index(
key[0],
unique=unique,
sparse=sparse,
fts=fts,
tokenizer=tokenizer,
)
index_name = key[0].replace(".", "_")
case list() if isinstance(key[0], tuple):
self.create_index(
key,
unique=unique,
sparse=sparse,
fts=fts,
tokenizer=tokenizer,
)
index_name = "_".join(k[0] for k in key).replace(".", "_")
case _:
raise ValueError(f"Invalid key specification: {key}")
created_indexes.append(
f"idx_{quote_table_name(self.collection.name)}_{index_name}"
)
return created_indexes
[docs]
def reindex(
self,
table: str,
sparse: bool = False,
documents: list[dict[str, Any]] | None = None,
):
"""
Reindex the collection.
With native JSON indexing, reindexing is handled automatically by SQLite.
This method is kept for API compatibility but does nothing.
Args:
table (str): The table name (not used in this implementation).
sparse (bool): Whether the index should be sparse (not used in this implementation).
documents (list[dict[str, Any]]): List of documents to reindex (not used in this implementation).
"""
# With native JSON indexing, reindexing is handled automatically by SQLite
# This method is kept for API compatibility but does nothing
pass
@overload
def list_indexes(self, as_keys: Literal[True]) -> list[list[str]]: ...
@overload
def list_indexes(self, as_keys: Literal[False] = False) -> list[str]: ...
[docs]
def list_indexes(
self,
as_keys: bool = False,
) -> list[str] | list[list[str]]:
"""
Retrieve indexes for the collection. Indexes are identified by names following a specific pattern.
Args:
as_keys (bool): If True, return the key names (converted from
underscores to dots) instead of the full index names.
Returns:
list[str] or list[list[str]]: List of index names or keys, depending on the as_keys parameter.
If as_keys is True, each entry is a list containing a single string (the key name).
"""
# Get indexes that match our naming convention
cmd = (
"SELECT name FROM sqlite_master WHERE type='index' AND name LIKE ?"
)
like_pattern = f"idx_{quote_table_name(self.collection.name)}_%"
if as_keys:
# Extract key names from index names
indexes = self.collection.db.execute(
cmd, (like_pattern,)
).fetchall()
result = []
for idx in indexes:
# Skip the automatically created _id index since it should be hidden
# like MongoDB's automatic _id index
if idx[0] == f"idx_{quote_table_name(self.collection.name)}_id":
continue
# Extract key name from index name (idx_collection_key -> key)
key_name = idx[0][
len(f"idx_{quote_table_name(self.collection.name)}_") :
]
# Convert underscores back to dots for nested keys
key_name = key_name.replace("_", ".")
result.append([key_name])
return result
# Return index names
all_indexes = [
idx[0]
for idx in self.collection.db.execute(
cmd, (like_pattern,)
).fetchall()
]
# Filter out the automatically created _id index since it should be hidden
# like MongoDB's automatic _id index
filtered_indexes = [
idx_name
for idx_name in all_indexes
if idx_name != f"idx_{quote_table_name(self.collection.name)}_id"
]
return filtered_indexes
[docs]
def drop_index(self, index: str):
"""
Drop an index from the collection.
Handles both single-key and compound indexes. For compound indexes, the
input should be a list of field names. The index name is generated by
joining the field names with underscores and replacing dots with underscores.
Args:
index (str or list): The name of the index to drop. If a list is provided,
it represents a compound index.
"""
# With native JSON indexing, we just need to drop the index
if isinstance(index, str):
# For single indexes
index_name = index.replace(".", "_")
self.collection.db.execute(
f"DROP INDEX IF EXISTS idx_{quote_table_name(self.collection.name)}_{index_name}"
)
else:
# For compound indexes
index_name = "_".join(index).replace(".", "_")
self.collection.db.execute(
f"DROP INDEX IF EXISTS idx_{quote_table_name(self.collection.name)}_{index_name}"
)
[docs]
def drop_indexes(self):
"""
Drop all indexes associated with this collection.
This method retrieves the list of indexes using the list_indexes method
and drops each one.
"""
indexes = self.list_indexes()
for index in indexes:
# Extract the actual index name from the full name
self.collection.db.execute(f"DROP INDEX IF EXISTS {index}")
[docs]
def create_search_index(
self,
key: str,
tokenizer: str | None = None,
):
"""
Create a search index on the specified key for text search functionality.
This is a convenience method that creates an FTS5 index for efficient text search.
It is equivalent to calling create_index(key, fts=True, tokenizer=tokenizer).
Args:
key: A string representing the field to index for text search.
tokenizer: Optional tokenizer to use for the FTS index (e.g., 'icu').
Returns:
The result of the index creation operation.
"""
return self.create_index(key, fts=True, tokenizer=tokenizer)
[docs]
def create_search_indexes(
self,
indexes: list[str],
) -> list[str]:
"""
Create multiple search indexes at once for text search functionality.
This is a convenience method that creates multiple FTS5 indexes for efficient text search.
It is equivalent to calling create_index(key, fts=True) for each key.
Args:
indexes: A list of strings representing the fields to index for text search.
Returns:
list[str]: A list of the names of the search indexes that were created.
"""
created_indexes = []
for key in indexes:
# Call create_index with fts=True for each key
self.create_index(key, fts=True)
# Generate the index name the same way as in IndexManager
index_name = key.replace(".", "_")
created_indexes.append(
f"idx_{quote_table_name(self.collection.name)}_{index_name}"
)
return created_indexes
[docs]
def list_search_indexes(self) -> list[str]:
"""
List all search indexes for the collection.
This method returns a list of all FTS5 search indexes associated with the collection.
Note: This implementation scans for FTS virtual tables in the database schema.
Returns:
list[str]: A list of search index names.
"""
# Get all FTS tables from sqlite_master
# FTS tables have a specific naming pattern: {collection}_{field}_fts
fts_tables = self.collection.db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?",
(f"{quote_table_name(self.collection.name)}_%_fts",),
).fetchall()
# Extract the field names from the FTS table names
search_indexes = []
prefix_len = len(f"{quote_table_name(self.collection.name)}_")
suffix_len = len("_fts")
for (table_name,) in fts_tables:
# Extract field name from FTS table name
# Format: {collection}_{field}_fts
field_name = table_name[prefix_len:-suffix_len]
search_indexes.append(field_name)
return search_indexes
[docs]
def update_search_index(self, key: str, tokenizer: str | None = None):
"""
Update a search index by recreating it with potentially new options.
This method drops and recreates a search index, allowing for updates to
tokenizer settings or other index options.
Args:
key: The field name for which to update the search index.
tokenizer: Optional new tokenizer to use for the FTS index.
"""
# Drop the existing FTS index
self.drop_search_index(key)
# Create a new FTS index with the updated options
self.create_search_index(key, tokenizer=tokenizer)
[docs]
def drop_search_index(self, index: str):
"""
Drop a search index from the collection.
This is a convenience method for dropping FTS5 search indexes.
Args:
index (str): The name of the search index to drop (field name).
"""
# For FTS indexes, we need to drop the FTS virtual table and its triggers
# FTS tables have a specific naming pattern: {collection}_{field}_fts
index_name = index.replace(".", "_")
fts_table_name = (
f"{quote_table_name(self.collection.name)}_{index_name}_fts"
)
# Drop the FTS table
self.collection.db.execute(f"DROP TABLE IF EXISTS {fts_table_name}")
# Drop the triggers associated with the FTS table
self.collection.db.execute(
f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert"
)
self.collection.db.execute(
f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update"
)
self.collection.db.execute(
f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_delete"
)
[docs]
def _create_datetime_index(self, key: str, unique: bool = False):
"""Create a timezone normalized datetime index.
This method creates a specialized datetime index with datetime() for timezone
normalization.
Args:
key: The field name to create the datetime index on (e.g., 'timestamp' or 'user.created_at')
unique: Whether the index should be unique
"""
# Generate the column name by replacing dots with underscores and appending '_utc'
# This is just for naming consistency
column_name = f"{key.replace('.', '_')}_utc"
index_sql = f"""
CREATE {"UNIQUE " if unique else ""}INDEX IF NOT EXISTS
idx_{quote_table_name(self.collection.name)}_{column_name}
ON {quote_table_name(self.collection.name)}(datetime(json_extract(data, '{parse_json_path(key)}')))
"""
self.collection.db.execute(index_sql)