Source code for neosqlite.collection.index_manager

import logging
from typing import TYPE_CHECKING, Any, Literal, overload

from .._sqlite import sqlite3
from ..sql_utils import quote_identifier, quote_table_name
from .json_path_utils import parse_json_path
from .jsonb_support import (
    _get_json_function_prefix,
    _get_json_tree_function,
    supports_jsonb,
    supports_jsonb_each,
)

if TYPE_CHECKING:
    from ..index_model import IndexModel

logger = logging.getLogger(__name__)


[docs] class IndexManager: """ Manages indexes for a NeoSQLite collection. This class provides functionality to create, list, drop, and manage various types of indexes including single-field, compound, unique, sparse, FTS (Full Text Search), and datetime indexes. It handles both regular B-tree indexes using JSON extraction and specialized FTS5 virtual tables for text search capabilities. """
[docs] def __init__(self, collection): """ Initialize the IndexManager with a collection instance. Args: collection: The NeoSQLite collection instance to manage indexes for """ self.collection = collection # Check if JSONB is supported for this connection self._jsonb_supported = supports_jsonb(collection.db)
[docs] def create_index( self, key: str | list[str] | list[tuple[str, int]], reindex: bool = True, sparse: bool = False, unique: bool = False, fts: bool = False, tokenizer: str | None = None, datetime_field: bool = False, ): """ Create an index on the specified key(s) for this collection. Handles both single-key and compound indexes by using SQLite's json_extract function to create indexes on the JSON-stored data. For compound indexes, multiple json_extract calls are used for each key in the list. Args: key: A string or list of strings representing the field(s) to index. reindex: Boolean indicating whether to reindex (not used in this implementation). sparse: Boolean indicating whether the index should be sparse (only include documents with the field). unique: Boolean indicating whether the index should be unique. fts: Boolean indicating whether to create an FTS index for text search. tokenizer: Optional tokenizer to use for FTS index (e.g., 'icu', 'icu_th'). datetime_field: Boolean indicating whether this is a datetime field that requires special indexing. """ # For datetime fields, use special indexing. if datetime_field: if isinstance(key, str): self._create_datetime_index(key, unique=unique) else: raise ValueError("Compound datetime indexes are not supported") elif ( isinstance(key, list) and len(key) == 1 and isinstance(key[0], tuple) ): # Handle MongoDB tuple format: [("field", "type")] field, index_type = key[0] if index_type == "text": self._create_fts_index(field, tokenizer) else: # Create index name (replace dots with underscores for valid identifiers) index_name = field.replace(".", "_") # Determine which function to use based on JSONB support func_prefix = _get_json_function_prefix(self._jsonb_supported) # Create the index using appropriate JSON/JSONB function self.collection.db.execute( ( f"CREATE {'UNIQUE ' if unique else ''}INDEX " f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} " f"ON {quote_table_name(self.collection.name)}({func_prefix}_extract(data, '{parse_json_path(field)}'))" ) ) elif isinstance(key, str): if fts: # Create FTS index with optional tokenizer self._create_fts_index(key, tokenizer) else: # Create index name (replace dots with underscores for valid identifiers) index_name = key.replace(".", "_") # Determine which function to use based on JSONB support func_prefix = _get_json_function_prefix(self._jsonb_supported) # Create the index using appropriate JSON/JSONB function self.collection.db.execute( ( f"CREATE {'UNIQUE ' if unique else ''}INDEX " f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} " f"ON {quote_table_name(self.collection.name)}({func_prefix}_extract(data, '{parse_json_path(key)}'))" ) ) else: # Compound indexes: must use PyMongo tuple format # [("field1", 1), ("field2", -1)] fields: list[str] if ( isinstance(key, list) and len(key) > 0 and isinstance(key[0], tuple) ): fields = [k[0] for k in key] elif ( isinstance(key, list) and len(key) == 1 and isinstance(key[0], str) ): # Single-element list like ["field"] - treat as single-field index fields = key # type: ignore[assignment] else: raise ValueError( "Compound indexes must use PyMongo tuple format: " f'[("field1", 1), ("field2", -1)]. Got: {key}' ) index_name = "_".join(fields).replace(".", "_") # Determine which function to use based on JSONB support func_prefix = _get_json_function_prefix(self._jsonb_supported) # Create the compound index using multiple JSON/JSONB extract calls index_columns = ", ".join( f"{func_prefix}_extract(data, '{parse_json_path(f)}')" for f in fields ) self.collection.db.execute( ( f"CREATE {'UNIQUE ' if unique else ''}INDEX " f"IF NOT EXISTS {quote_identifier(f'idx_{self.collection.name}_{index_name}')} " f"ON {quote_table_name(self.collection.name)}({index_columns})" ) )
[docs] def _create_fts_index(self, field: str, tokenizer: str | None = None): """ Creates an FTS5 index on the specified field for text search. For FTS indexes, we create both JSON and JSONB versions to ensure compatibility with both types of queries. For nested array fields (e.g., "comments.text"), we use json_tree() to properly index all array elements, not just the first one. Args: field (str): The field to create the FTS index on. tokenizer (str, optional): Optional tokenizer to use for the FTS index. """ # Create index name (replace dots with underscores for valid identifiers) index_name = field.replace(".", "_") fts_table_name = quote_identifier( f"{self.collection.name}_{index_name}_fts" ) # Create FTS table with optional tokenizer # Note: We don't use the 'content' option because we manage the FTS data manually # to properly support array fields with json_tree() tokenizer_clause = f"TOKENIZE={tokenizer}" if tokenizer else "" self.collection.db.execute(f""" CREATE VIRTUAL TABLE IF NOT EXISTS {fts_table_name} USING FTS5({index_name}, {tokenizer_clause}) """) # For nested fields with arrays, use json_tree() to index all matching values # Convert field path to json_tree fullkey pattern (e.g., "comments.text" -> "$.comments[*].text") field_parts = field.split(".") if len(field_parts) == 1: # Simple field - use direct json_extract json_path = parse_json_path(field) self.collection.db.execute(f""" INSERT INTO {fts_table_name}(rowid, {index_name}) SELECT id, lower(json_extract(data, '{json_path}')) FROM {quote_table_name(self.collection.name)} WHERE json_extract(data, '{json_path}') IS NOT NULL """) else: # Nested field - use json_tree/jsonb_tree to find all matching values including array elements # Concatenate all text values with spaces for FTS indexing jsonb_each_supported = supports_jsonb_each(self.collection.db) json_tree_func = _get_json_tree_function( self._jsonb_supported, jsonb_each_supported ) last_key = field_parts[-1] self.collection.db.execute( f""" INSERT INTO {fts_table_name}(rowid, {index_name}) SELECT r.id, group_concat(lower(t.value), ' ') FROM {quote_table_name(self.collection.name)} r JOIN {json_tree_func}(r.data) t WHERE t.key = ? AND t.type = 'text' GROUP BY r.id """, [last_key], ) # Create triggers to keep FTS table in sync with main table # For array fields, we need to delete all entries for the rowid and re-insert all values field_parts = field.split(".") # Insert trigger - delete any existing entries and insert all matching values if len(field_parts) == 1: # Simple field json_path = parse_json_path(field) self.collection.db.execute(f""" CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert AFTER INSERT ON {quote_table_name(self.collection.name)} BEGIN DELETE FROM {fts_table_name} WHERE rowid = new.id; INSERT INTO {fts_table_name}(rowid, {index_name}) VALUES (new.id, lower(json_extract(new.data, '{json_path}'))); END """) else: # Nested field - use json_tree/jsonb_tree with GROUP BY jsonb_each_supported = supports_jsonb_each(self.collection.db) json_tree_func = _get_json_tree_function( self._jsonb_supported, jsonb_each_supported ) self.collection.db.execute(f""" CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert AFTER INSERT ON {quote_table_name(self.collection.name)} BEGIN DELETE FROM {fts_table_name} WHERE rowid = new.id; INSERT INTO {fts_table_name}(rowid, {index_name}) SELECT new.id, group_concat(lower(t.value), ' ') FROM {json_tree_func}(new.data) t WHERE t.key = '{last_key}' AND t.type = 'text' GROUP BY new.id; END """) # Update trigger - delete old entries and insert new ones if len(field_parts) == 1: # Simple field json_path = parse_json_path(field) self.collection.db.execute(f""" CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update AFTER UPDATE ON {quote_table_name(self.collection.name)} BEGIN DELETE FROM {fts_table_name} WHERE rowid = old.id; INSERT INTO {fts_table_name}(rowid, {index_name}) VALUES (new.id, lower(json_extract(new.data, '{json_path}'))); END """) else: # Nested field - use json_tree/jsonb_tree with GROUP BY jsonb_each_supported = supports_jsonb_each(self.collection.db) json_tree_func = _get_json_tree_function( self._jsonb_supported, jsonb_each_supported ) self.collection.db.execute(f""" CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update AFTER UPDATE ON {quote_table_name(self.collection.name)} BEGIN DELETE FROM {fts_table_name} WHERE rowid = old.id; INSERT INTO {fts_table_name}(rowid, {index_name}) SELECT new.id, group_concat(lower(t.value), ' ') FROM {json_tree_func}(new.data) t WHERE t.key = '{last_key}' AND t.type = 'text' GROUP BY new.id; END """) # Delete trigger - remove all entries for the deleted row self.collection.db.execute(f""" CREATE TRIGGER IF NOT EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_delete AFTER DELETE ON {quote_table_name(self.collection.name)} BEGIN DELETE FROM {fts_table_name} WHERE rowid = old.id; END """)
[docs] def create_indexes( self, indexes: list["IndexModel"], ) -> list[str]: """ Create multiple indexes at once. This method accepts a list of IndexModel objects, matching the PyMongo API. Args: indexes: A list of IndexModel objects. Returns: list[str]: A list of the names of the indexes that were created. """ created_indexes = [] for index_model in indexes: doc = index_model.document key = doc.get("key") if key is None: continue unique: bool = bool(doc.get("unique", False)) sparse: bool = bool(doc.get("sparse", False)) fts: bool = bool(doc.get("fts", False)) tokenizer: str | None = doc.get("tokenizer") # Convert key dict to the format expected by create_index match key: case dict(): if len(key) == 1: field = list(key.keys())[0] self.create_index( field, unique=unique, sparse=sparse, fts=fts, tokenizer=tokenizer, ) index_name = field.replace(".", "_") else: tuple_key = [(f, d) for f, d in key.items()] self.create_index( tuple_key, unique=unique, sparse=sparse, fts=fts, tokenizer=tokenizer, ) index_name = "_".join(key.keys()).replace(".", "_") case str(): self.create_index( key, unique=unique, sparse=sparse, fts=fts, tokenizer=tokenizer, ) index_name = key.replace(".", "_") case [str()]: self.create_index( key[0], unique=unique, sparse=sparse, fts=fts, tokenizer=tokenizer, ) index_name = key[0].replace(".", "_") case list() if isinstance(key[0], tuple): self.create_index( key, unique=unique, sparse=sparse, fts=fts, tokenizer=tokenizer, ) index_name = "_".join(k[0] for k in key).replace(".", "_") case _: raise ValueError(f"Invalid key specification: {key}") created_indexes.append( f"idx_{quote_table_name(self.collection.name)}_{index_name}" ) return created_indexes
[docs] def reindex( self, table: str, sparse: bool = False, documents: list[dict[str, Any]] | None = None, ): """ Reindex the collection. With native JSON indexing, reindexing is handled automatically by SQLite. This method is kept for API compatibility but does nothing. Args: table (str): The table name (not used in this implementation). sparse (bool): Whether the index should be sparse (not used in this implementation). documents (list[dict[str, Any]]): List of documents to reindex (not used in this implementation). """ # With native JSON indexing, reindexing is handled automatically by SQLite # This method is kept for API compatibility but does nothing pass
@overload def list_indexes(self, as_keys: Literal[True]) -> list[list[str]]: ... @overload def list_indexes(self, as_keys: Literal[False] = False) -> list[str]: ...
[docs] def list_indexes( self, as_keys: bool = False, ) -> list[str] | list[list[str]]: """ Retrieve indexes for the collection. Indexes are identified by names following a specific pattern. Args: as_keys (bool): If True, return the key names (converted from underscores to dots) instead of the full index names. Returns: list[str] or list[list[str]]: List of index names or keys, depending on the as_keys parameter. If as_keys is True, each entry is a list containing a single string (the key name). """ # Get indexes that match our naming convention cmd = ( "SELECT name FROM sqlite_master WHERE type='index' AND name LIKE ?" ) like_pattern = f"idx_{quote_table_name(self.collection.name)}_%" if as_keys: # Extract key names from index names indexes = self.collection.db.execute( cmd, (like_pattern,) ).fetchall() result = [] for idx in indexes: # Skip the automatically created _id index since it should be hidden # like MongoDB's automatic _id index if idx[0] == f"idx_{quote_table_name(self.collection.name)}_id": continue # Extract key name from index name (idx_collection_key -> key) key_name = idx[0][ len(f"idx_{quote_table_name(self.collection.name)}_") : ] # Convert underscores back to dots for nested keys key_name = key_name.replace("_", ".") result.append([key_name]) return result # Return index names all_indexes = [ idx[0] for idx in self.collection.db.execute( cmd, (like_pattern,) ).fetchall() ] # Filter out the automatically created _id index since it should be hidden # like MongoDB's automatic _id index filtered_indexes = [ idx_name for idx_name in all_indexes if idx_name != f"idx_{quote_table_name(self.collection.name)}_id" ] return filtered_indexes
[docs] def drop_index(self, index: str): """ Drop an index from the collection. Handles both single-key and compound indexes. For compound indexes, the input should be a list of field names. The index name is generated by joining the field names with underscores and replacing dots with underscores. Args: index (str or list): The name of the index to drop. If a list is provided, it represents a compound index. """ # With native JSON indexing, we just need to drop the index if isinstance(index, str): # For single indexes index_name = index.replace(".", "_") self.collection.db.execute( f"DROP INDEX IF EXISTS idx_{quote_table_name(self.collection.name)}_{index_name}" ) else: # For compound indexes index_name = "_".join(index).replace(".", "_") self.collection.db.execute( f"DROP INDEX IF EXISTS idx_{quote_table_name(self.collection.name)}_{index_name}" )
[docs] def drop_indexes(self): """ Drop all indexes associated with this collection. This method retrieves the list of indexes using the list_indexes method and drops each one. """ indexes = self.list_indexes() for index in indexes: # Extract the actual index name from the full name self.collection.db.execute(f"DROP INDEX IF EXISTS {index}")
[docs] def index_information(self) -> dict[str, Any]: """ Retrieves information on this collection's indexes. The function fetches all indexes associated with the collection and extracts relevant details such as whether the index is unique and the keys used in the index. It constructs a dictionary where the keys are the index names and the values are dictionaries containing the index information. Returns: dict: A dictionary containing index information. """ info: dict[str, Any] = {} try: # Get all indexes for this collection indexes = self.collection.db.execute( "SELECT name, sql FROM sqlite_master WHERE type='index' AND tbl_name=?", (self.collection.name,), ).fetchall() for idx_name, idx_sql in indexes: # Skip the automatically created _id index since it should be hidden # like MongoDB's automatic _id index if ( idx_name == f"idx_{quote_table_name(self.collection.name)}_id" ): continue # Parse the index information index_info: dict[str, Any] = { "v": 2, # Index version } # Check if it's a unique index if idx_sql and "UNIQUE" in idx_sql.upper(): index_info["unique"] = True else: index_info["unique"] = False # Try to extract key information from the SQL if idx_sql: # Extract key information from json_extract or jsonb_extract expressions import re # Look for both json_extract and jsonb_extract json_extract_matches = re.findall( r"(?:json|jsonb)_extract\(data, '(\$..*?)'\)", idx_sql ) if json_extract_matches: # Convert SQLite JSON paths back to dot notation keys = [] for path in json_extract_matches: # Remove $ and leading dot if path.startswith("$."): path = path[2:] keys.append(path) if len(keys) == 1: index_info["key"] = {keys[0]: 1} else: index_info["key"] = {key: 1 for key in keys} info[idx_name] = index_info except sqlite3.Error as e: logger.debug( f"Failed to get index information for collection '{self.collection.name}': {e}" ) pass return info
[docs] def create_search_index( self, key: str, tokenizer: str | None = None, ): """ Create a search index on the specified key for text search functionality. This is a convenience method that creates an FTS5 index for efficient text search. It is equivalent to calling create_index(key, fts=True, tokenizer=tokenizer). Args: key: A string representing the field to index for text search. tokenizer: Optional tokenizer to use for the FTS index (e.g., 'icu'). Returns: The result of the index creation operation. """ return self.create_index(key, fts=True, tokenizer=tokenizer)
[docs] def create_search_indexes( self, indexes: list[str], ) -> list[str]: """ Create multiple search indexes at once for text search functionality. This is a convenience method that creates multiple FTS5 indexes for efficient text search. It is equivalent to calling create_index(key, fts=True) for each key. Args: indexes: A list of strings representing the fields to index for text search. Returns: list[str]: A list of the names of the search indexes that were created. """ created_indexes = [] for key in indexes: # Call create_index with fts=True for each key self.create_index(key, fts=True) # Generate the index name the same way as in IndexManager index_name = key.replace(".", "_") created_indexes.append( f"idx_{quote_table_name(self.collection.name)}_{index_name}" ) return created_indexes
[docs] def list_search_indexes(self) -> list[str]: """ List all search indexes for the collection. This method returns a list of all FTS5 search indexes associated with the collection. Note: This implementation scans for FTS virtual tables in the database schema. Returns: list[str]: A list of search index names. """ # Get all FTS tables from sqlite_master # FTS tables have a specific naming pattern: {collection}_{field}_fts fts_tables = self.collection.db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?", (f"{quote_table_name(self.collection.name)}_%_fts",), ).fetchall() # Extract the field names from the FTS table names search_indexes = [] prefix_len = len(f"{quote_table_name(self.collection.name)}_") suffix_len = len("_fts") for (table_name,) in fts_tables: # Extract field name from FTS table name # Format: {collection}_{field}_fts field_name = table_name[prefix_len:-suffix_len] search_indexes.append(field_name) return search_indexes
[docs] def update_search_index(self, key: str, tokenizer: str | None = None): """ Update a search index by recreating it with potentially new options. This method drops and recreates a search index, allowing for updates to tokenizer settings or other index options. Args: key: The field name for which to update the search index. tokenizer: Optional new tokenizer to use for the FTS index. """ # Drop the existing FTS index self.drop_search_index(key) # Create a new FTS index with the updated options self.create_search_index(key, tokenizer=tokenizer)
[docs] def drop_search_index(self, index: str): """ Drop a search index from the collection. This is a convenience method for dropping FTS5 search indexes. Args: index (str): The name of the search index to drop (field name). """ # For FTS indexes, we need to drop the FTS virtual table and its triggers # FTS tables have a specific naming pattern: {collection}_{field}_fts index_name = index.replace(".", "_") fts_table_name = ( f"{quote_table_name(self.collection.name)}_{index_name}_fts" ) # Drop the FTS table self.collection.db.execute(f"DROP TABLE IF EXISTS {fts_table_name}") # Drop the triggers associated with the FTS table self.collection.db.execute( f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_insert" ) self.collection.db.execute( f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_update" ) self.collection.db.execute( f"DROP TRIGGER IF EXISTS {quote_table_name(self.collection.name)}_{index_name}_fts_delete" )
[docs] def _create_datetime_index(self, key: str, unique: bool = False): """Create a timezone normalized datetime index. This method creates a specialized datetime index with datetime() for timezone normalization. Args: key: The field name to create the datetime index on (e.g., 'timestamp' or 'user.created_at') unique: Whether the index should be unique """ # Generate the column name by replacing dots with underscores and appending '_utc' # This is just for naming consistency column_name = f"{key.replace('.', '_')}_utc" index_sql = f""" CREATE {"UNIQUE " if unique else ""}INDEX IF NOT EXISTS idx_{quote_table_name(self.collection.name)}_{column_name} ON {quote_table_name(self.collection.name)}(datetime(json_extract(data, '{parse_json_path(key)}'))) """ self.collection.db.execute(index_sql)