"""
Query Builder Mixin for NeoSQLite.
This module contains the QueryBuilderMixin class, which provides methods for
building SQL queries from MongoDB-like query specifications.
"""
import logging
import re
from typing import TYPE_CHECKING, Any
from ... import query_operators
from ...exceptions import MalformedQueryException
from ...sql_utils import quote_table_name
from ..expr_evaluator import ExprEvaluator
from ..json_helpers import neosqlite_json_dumps_for_sql
from ..json_path_utils import parse_json_path
from ..text_search import unified_text_search
from ..type_correction import normalize_id_query_for_db
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from .. import Collection
[docs]
class QueryBuilderMixin:
"""
A mixin class that provides query building capabilities.
This mixin assumes it will be used with a class that has:
- self.collection (with db and name attributes)
- self._jsonb_supported
- self._json_function_prefix
- self._build_expr_where_clause method (for handling $expr queries)
"""
collection: "Collection"
_jsonb_supported: bool
_json_function_prefix: str
_build_expr_where_clause: Any
[docs]
def _is_text_search_query(self, query: dict[str, Any]) -> bool:
"""
Check if the query is a text search query (contains $text operator).
Args:
query: The query to check.
Returns:
True if the query is a text search query, False otherwise.
"""
return "$text" in query
[docs]
def _build_text_search_query(
self, query: dict[str, Any]
) -> tuple[str, list[Any], list[str]] | None:
"""
Builds a SQL query for text search using FTS5.
Args:
query: A dictionary representing the text search query with $text operator.
Returns:
tuple[str, list[Any], list[str]] | None: A tuple containing the SQL WHERE clause,
a list of parameters, and an empty list of
tables to clean up, or None.
"""
if "$text" not in query:
return None
text_query = query["$text"]
if not isinstance(text_query, dict) or "$search" not in text_query:
return None
search_term = text_query["$search"]
if not isinstance(search_term, str):
return None
# Find FTS tables for this collection
cursor = self.collection.db.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name LIKE ?",
(f"{quote_table_name(self.collection.name)}_%_fts",),
)
fts_tables = cursor.fetchall()
if not fts_tables:
return None
# Build UNION query to search across ALL FTS tables
subqueries = []
params = []
for (fts_table_name,) in fts_tables:
# Extract field name from FTS table name (collection_field_fts -> field)
index_name = fts_table_name[
len(f"{quote_table_name(self.collection.name)}_") : -4
] # Remove collection_ prefix and _fts suffix
# Add subquery for this FTS table
subqueries.append(
f"SELECT rowid FROM {fts_table_name} WHERE {index_name} MATCH ?"
)
params.append(search_term.lower())
# Combine all subqueries with UNION to get documents matching in ANY FTS index
union_query = " UNION ".join(subqueries)
# Build the FTS query
where_clause = f"""
WHERE id IN ({union_query})
"""
return where_clause, params, []
[docs]
def _build_other_fields_clause(
self, query: dict[str, Any], expr: dict[str, Any]
) -> tuple[str, list[Any]] | None:
"""
Build WHERE clause for non-$expr fields.
Args:
query: Full query dictionary
expr: The $expr expression
Returns:
Tuple of (WHERE clause, parameters) or None for Python fallback
"""
clauses: list[str] = []
params: list[Any] = []
for field, value in query.items():
if field == "$expr":
continue
if field in ("$and", "$or", "$nor", "$not"):
return None
field_result = self._build_field_clause(field, value)
if field_result is None:
return None
field_clause, field_params = field_result
clauses.append(field_clause)
params.extend(field_params)
if clauses:
return " AND ".join(clauses), params
return "", params
[docs]
def _categorize_ids(
self, ids: list[Any]
) -> tuple[list[int], list[str]] | None:
"""
Categorize a list of ID values into integer IDs and string IDs.
Args:
ids: List of ID values (ints, ObjectIds, or strings)
Returns:
Tuple of (int_ids, string_ids) or None if unsupported types found
"""
from ...objectid import ObjectId
int_ids: list[int] = []
string_ids: list[str] = []
for item in ids:
if isinstance(item, int):
int_ids.append(item)
elif isinstance(item, ObjectId):
string_ids.append(str(item))
elif isinstance(item, str):
# Check if it's a valid ObjectId string
try:
obj_id = ObjectId(item)
string_ids.append(str(obj_id))
except ValueError:
# Try as integer
try:
int_id = int(item)
int_ids.append(int_id)
except ValueError:
string_ids.append(item)
else:
return None # Unsupported type
return int_ids, string_ids
[docs]
def _build_id_operator_clause(
self, value: dict
) -> tuple[str, list[Any]] | None:
"""
Build a WHERE clause for _id field with operators like $in, $nin, $ne, etc.
Args:
value: Dictionary containing operators like $in, $nin, $ne, etc.
Returns:
Tuple of (SQL clause, parameters) or None for Python fallback
"""
table = quote_table_name(self.collection.name)
clauses: list[str] = []
params: list[Any] = []
for op, op_val in value.items():
if op == "$in":
if not isinstance(op_val, list) or len(op_val) == 0:
return None
result = self._categorize_ids(op_val)
if result is None:
return None
int_ids, string_ids = result
# Build IN clauses for both id and _id columns
if int_ids and string_ids:
int_ph = ", ".join("?" for _ in int_ids)
str_ph = ", ".join("?" for _ in string_ids)
clauses.append(
f"({table}.id IN ({int_ph}) OR {table}._id IN ({str_ph}))"
)
params.extend(int_ids)
params.extend(string_ids)
elif int_ids:
ph = ", ".join("?" for _ in int_ids)
clauses.append(f"{table}.id IN ({ph})")
params.extend(int_ids)
elif string_ids:
ph = ", ".join("?" for _ in string_ids)
clauses.append(f"{table}._id IN ({ph})")
params.extend(string_ids)
elif op == "$nin":
if not isinstance(op_val, list) or len(op_val) == 0:
return None
result = self._categorize_ids(op_val)
if result is None:
return None
int_ids, string_ids = result
# Build NOT IN clauses - must exclude from BOTH columns
if int_ids and string_ids:
int_ph = ", ".join("?" for _ in int_ids)
str_ph = ", ".join("?" for _ in string_ids)
clauses.append(
f"({table}.id NOT IN ({int_ph}) AND {table}._id NOT IN ({str_ph}))"
)
params.extend(int_ids)
params.extend(string_ids)
elif int_ids:
ph = ", ".join("?" for _ in int_ids)
clauses.append(f"{table}.id NOT IN ({ph})")
params.extend(int_ids)
elif string_ids:
ph = ", ".join("?" for _ in string_ids)
clauses.append(f"{table}._id NOT IN ({ph})")
params.extend(string_ids)
elif op == "$ne":
# For $ne, we must exclude the value from BOTH columns
# to be strictly correct with the dual-column approach.
# An int value could match id (as int) or _id (as string).
int_id, str_id = self._categorize_id_value(op_val)
if int_id is not None:
clauses.append(f"{table}.id != ?")
params.append(int_id)
# Also exclude the string representation in _id column
clauses.append(f"{table}._id != ?")
params.append(str(int_id))
if str_id is not None:
clauses.append(f"{table}._id != ?")
params.append(str_id)
# Try to also exclude as int in id column
try:
int_val = int(str_id)
clauses.append(f"{table}.id != ?")
params.append(int_val)
except ValueError:
pass # Not convertible to int, skip
if int_id is None and str_id is None:
return None # Unsupported type
elif op in ("$gt", "$gte", "$lt", "$lte"):
# Support range queries on _id
# - int values target the id column
# - ObjectId/string values target the _id column (lexicographic comparison)
int_id, str_id = self._categorize_id_value(op_val)
op_map = {"$gt": ">", "$gte": ">=", "$lt": "<", "$lte": "<="}
sql_op = op_map[op]
if int_id is not None:
clauses.append(f"{table}.id {sql_op} ?")
params.append(int_id)
elif str_id is not None:
clauses.append(f"{table}._id {sql_op} ?")
params.append(str_id)
else:
return None # Unsupported type for range queries
else:
return None # Fall back to Python for unsupported operators
if clauses:
return " AND ".join(clauses), params
return None # Fall back to Python if no clauses generated
[docs]
def _categorize_id_value(self, value: Any) -> tuple[int | None, str | None]:
"""
Categorize a single ID value into either an int ID or a string ID.
Returns:
Tuple of (int_value, string_value) where only one is non-None.
"""
from ...objectid import ObjectId
if isinstance(value, int):
return value, None
elif isinstance(value, ObjectId):
return None, str(value)
elif isinstance(value, str):
# Try as ObjectId first
try:
obj_id = ObjectId(value)
return None, str(obj_id)
except ValueError:
# Try as integer
try:
int_id = int(value)
return int_id, None
except ValueError:
# Treat as plain string _id
return None, value
return None, None
[docs]
def _build_field_clause(
self, field: str, value: Any
) -> tuple[str, list[Any]] | None:
"""
Build a WHERE clause for a single field.
Args:
field: Field name
value: Field value or operator dict
Returns:
Tuple of (SQL clause, parameters) or None for Python fallback
"""
from ...objectid import ObjectId
if field == "$jsonSchema":
return None
if field == "_id":
# Handle _id field specially
if isinstance(value, dict):
# Handle operator-based queries on _id
return self._build_id_operator_clause(value)
elif isinstance(value, ObjectId):
return f"{quote_table_name(self.collection.name)}._id = ?", [
str(value)
]
elif isinstance(value, str) and len(value) == 24:
try:
obj_id = ObjectId(value)
return (
f"{quote_table_name(self.collection.name)}._id = ?",
[str(obj_id)],
)
except ValueError:
try:
int_id = int(value)
return (
f"{quote_table_name(self.collection.name)}.id = ?",
[int_id],
)
except ValueError:
return (
f"{quote_table_name(self.collection.name)}._id = ?",
[value],
)
elif isinstance(value, int):
return f"{quote_table_name(self.collection.name)}.id = ?", [
value
]
else:
return f"{quote_table_name(self.collection.name)}._id = ?", [
value
]
else:
# Handle regular fields with json_extract/jsonb_extract
# Use the correct function based on JSONB support
json_path = f"'{parse_json_path(field)}'"
if isinstance(value, dict):
# Handle operators like $eq, $gt, etc.
# Extract $options for $regex if present
options = value.get("$options", "")
clause, params = self._build_operator_clause(
json_path, value, regex_options=options
)
if clause is None:
return None
return f"{clause}", params
else:
# Simple equality
if isinstance(value, re.Pattern):
return None # Fall back to Python for regex objects
extract_expr = (
f"{self._json_function_prefix}_extract(data, {json_path})"
)
return f"{extract_expr} = ?", [value]
[docs]
def _build_simple_where_clause(
self,
query: dict[str, Any],
) -> tuple[str, list[Any], list[str]] | None:
"""
Builds a SQL WHERE clause for simple queries that can be handled with json_extract.
This method constructs a SQL WHERE clause based on the query provided.
It handles simple equality checks and query operators like $eq, $gt, $lt,
etc. for fields stored in JSON data. For more complex queries, it returns
None, indicating that a Python-based method should be used instead.
When the force fallback flag is set, this method returns None to force
Python-based processing for benchmarking and debugging purposes.
Args:
query (dict[str, Any]): A dictionary representing the query criteria.
Returns:
tuple[str, list[Any], list[str]] | None: A tuple containing the SQL WHERE clause,
a list of parameters, and a list of
temporary tables to clean up, or None.
"""
# Apply type correction to handle cases where users query 'id' with ObjectId
# or other common type mismatches
query = normalize_id_query_for_db(query)
# Check force fallback flag
from .utils import get_force_fallback
if get_force_fallback():
return None # Force fallback to Python implementation
# Handle text search queries separately
if self._is_text_search_query(query):
return self._build_text_search_query(query)
# Handle $expr queries
if "$expr" in query:
return self._build_expr_where_clause(query)
if "$jsonSchema" in query:
return None
if "$where" in query:
raise NotImplementedError(
"The '$where' operator (JavaScript) is not supported in NeoSQLite. "
"Please use the '$expr' operator for field-to-field comparisons, "
"which is fully compatible with MongoDB and highly optimized in NeoSQLite."
)
if "$function" in query:
raise NotImplementedError(
"The '$function' operator is not supported in NeoSQLite. "
"Please use '$expr' with Python expressions, or post-process results in Python."
)
if "$accumulator" in query:
raise NotImplementedError(
"The '$accumulator' operator is not supported in NeoSQLite. "
"Please use built-in accumulators ($sum, $avg, $min, $max, $count, $push, $addToSet, $first, $last), "
"or post-process results in Python."
)
clauses: list[str] = []
params: list[Any] = []
for field, value in query.items():
# Handle logical operators by falling back to Python processing
# This is more robust than trying to build complex SQL queries
if field in ("$and", "$or", "$nor", "$not"):
return (
None # Fall back to Python processing for logical operators
)
elif field == "_id":
# Handle _id field specially since it's now stored in the dedicated _id column for new records
# For backward compatibility, we need to check both the _id column and the auto-increment id column
from ...objectid import ObjectId
# Handle operator-based queries on _id using the dedicated method
if isinstance(value, dict):
result = self._build_id_operator_clause(value)
if result is None:
return None
clause, field_params = result
clauses.append(clause)
params.extend(field_params)
# Convert the value to the appropriate format for storage
elif isinstance(value, ObjectId):
param_value = str(value)
# Query the _id column
clauses.append(
f"{quote_table_name(self.collection.name)}._id = ?"
)
params.append(param_value)
elif isinstance(value, str) and len(value) == 24:
try:
# Validate if it's a valid ObjectId string
obj_id = ObjectId(value)
param_value = str(obj_id)
# Query the _id column
clauses.append(
f"{quote_table_name(self.collection.name)}._id = ?"
)
params.append(param_value)
except ValueError:
# If not a valid ObjectId string, it might be an integer id
try:
int_id = int(
value
) # Try to parse as integer for backward compatibility
clauses.append(
f"{quote_table_name(self.collection.name)}.id = ?"
)
params.append(int_id)
except ValueError:
# If not a valid integer, treat as a string _id
clauses.append(
f"{quote_table_name(self.collection.name)}._id = ?"
)
params.append(value)
elif isinstance(value, int):
# Query the auto-increment id column
clauses.append(
f"{quote_table_name(self.collection.name)}.id = ?"
)
params.append(value)
else:
# For other types, query the _id column
clauses.append(
f"{quote_table_name(self.collection.name)}._id = ?"
)
params.append(value)
else:
# Handle regular fields with json_extract
field_result = self._build_field_clause(field, value)
if field_result is None:
return None # Fall back to Python processing
field_clause, field_params = field_result
if field_clause: # Only add non-empty clauses
clauses.append(field_clause)
params.extend(field_params)
if clauses:
return "WHERE " + " AND ".join(clauses), params, []
return "", params, []
[docs]
def _build_sort_clause(
self,
sort: dict[str, int] | None,
collation: dict[str, Any] | None = None,
) -> str:
"""
Builds a SQL ORDER BY clause from a sort dictionary.
Args:
sort: A dictionary mapping fields to sort directions (1 for ASC, -1 for DESC).
collation: Optional collation settings for case-insensitive sorting.
Returns:
A SQL ORDER BY clause string (including 'ORDER BY' prefix), or empty string.
"""
if not sort:
return ""
clauses = []
# Get collation settings
collate_clause = ""
if collation:
strength = collation.get("strength", 3)
case_level = collation.get("caseLevel", False)
if strength <= 2 or not case_level:
collate_clause = " COLLATE NOCASE"
for field, direction in sort.items():
if field == "_id":
order_field = f"{quote_table_name(self.collection.name)}._id"
else:
json_path = f"'{parse_json_path(field)}'"
order_field = (
f"{self._json_function_prefix}_extract(data, {json_path})"
)
order_dir = "ASC" if direction == 1 else "DESC"
clauses.append(f"{order_field}{collate_clause} {order_dir}")
if clauses:
return " ORDER BY " + ", ".join(clauses)
return ""
[docs]
def _build_operator_clause(
self,
json_path: str,
operators: dict[str, Any],
is_datetime_indexed: bool = False,
regex_options: str = "",
) -> tuple[str | None, list[Any]]:
"""
Builds a SQL clause for query operators.
This method constructs a SQL clause based on the provided operators for
a specific JSON path. It handles various operators like $eq, $gt, $lt, etc.,
and returns a tuple containing the SQL clause and a list of parameters.
If an unsupported operator is encountered, it returns None, indicating
that a fallback to Python processing is needed.
Args:
json_path (str): The JSON path to extract the value from.
operators (dict[str, Any]): A dictionary of operators and their values.
is_datetime_indexed (bool): Whether the field has a datetime index that requires timezone normalization.
Returns:
tuple[str | None, list[Any]]: A tuple containing the SQL clause and
parameters. If the operator is unsupported,
returns (None, []).
"""
clauses = []
params = []
for op, op_val in operators.items():
# Serialize Binary objects for SQL comparisons using compact format
if isinstance(op_val, bytes) and hasattr(
op_val, "encode_for_storage"
):
op_val = neosqlite_json_dumps_for_sql(op_val)
match op:
case "$eq":
# Array values need Python for correct semantics
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) = datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) = ?"
)
params.append(op_val)
case "$gt":
# Array values need Python for correct semantics
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) > datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) > ?"
)
params.append(op_val)
case "$lt":
# Array values need Python for correct semantics
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) < datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) < ?"
)
params.append(op_val)
case "$gte":
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) >= datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) >= ?"
)
params.append(op_val)
case "$lte":
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) <= datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) <= ?"
)
params.append(op_val)
case "$ne":
# Array values need Python for correct semantics
if isinstance(op_val, (list, tuple)):
return None, []
if is_datetime_indexed:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) != datetime(?)"
)
params.append(op_val)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) != ?"
)
params.append(op_val)
case "$in":
json_each_func = getattr(
self, "_json_each_function", "json_each"
)
if isinstance(op_val, (list, tuple)):
placeholders = ", ".join("?" for _ in op_val)
if json_path == "value":
clauses.append(f"value IN ({placeholders})")
else:
clauses.append(
f"EXISTS (SELECT 1 FROM {json_each_func}(data, {json_path}) AS json_each WHERE json_each.value IN ({placeholders}))"
)
params.extend(op_val)
else:
return None, []
case "$nin":
json_each_func = getattr(
self, "_json_each_function", "json_each"
)
if isinstance(op_val, (list, tuple)):
placeholders = ", ".join("?" for _ in op_val)
if json_path == "value":
clauses.append(f"value NOT IN ({placeholders})")
else:
clauses.append(
f"NOT EXISTS (SELECT 1 FROM {json_each_func}(data, {json_path}) AS json_each WHERE json_each.value IN ({placeholders}))"
)
params.extend(op_val)
else:
return None, []
case "$all":
json_each_func = getattr(
self, "_json_each_function", "json_each"
)
if isinstance(op_val, (list, tuple)):
if len(op_val) == 0:
return None, []
for v in op_val:
clauses.append(
f"EXISTS (SELECT 1 FROM {json_each_func}(data, {json_path}) AS json_each WHERE json_each.value = ?)"
)
params.append(v)
else:
return None, []
case "$exists":
# Handle boolean value for $exists
if isinstance(op_val, bool):
if op_val:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) IS NOT NULL"
)
else:
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) IS NULL"
)
else:
# Invalid value for $exists, fallback to Python
return None, []
case "$mod":
# Handle [divisor, remainder] array
if isinstance(op_val, (list, tuple)) and len(op_val) == 2:
divisor, remainder = op_val
clauses.append(
f"json_type(data, {json_path}) IN ('integer', 'real') AND "
f"{self._json_function_prefix}_extract(data, {json_path}) % ? = ?"
)
params.extend([divisor, remainder])
else:
# Invalid format for $mod, fallback to Python
return None, []
case "$size":
# Handle array size comparison
if isinstance(op_val, int):
clauses.append(
f"json_array_length({self._json_function_prefix}_extract(data, {json_path})) = ?"
)
params.append(op_val)
else:
# Invalid value for $size, fallback to Python
return None, []
case "$contains":
# Handle case-insensitive substring search
if isinstance(op_val, str):
clauses.append(
f"lower({self._json_function_prefix}_extract(data, {json_path})) LIKE ?"
)
params.append(f"%{op_val.lower()}%")
else:
# Invalid value for $contains, fallback to Python
return None, []
case "$elemMatch":
# Determine the json_each function to use
json_each_func = getattr(
self, "_json_each_function", "json_each"
)
# Build the inner WHERE clause for the subquery
inner_clauses = []
inner_params = []
if isinstance(op_val, dict):
has_operators = any(
k.startswith("$") for k in op_val.keys()
)
if has_operators:
# Case: {"field": {"$elemMatch": {"$gt": 10}}}
# Use "value" as path which will be retargeted below
c, p = self._build_operator_clause("value", op_val)
if c is None:
return None, []
# Retarget: replace json_extract(data, value) with value
# The _build_operator_clause uses f"{self._json_function_prefix}_extract(data, value)"
c = c.replace(
f"{self._json_function_prefix}_extract(data, value)",
"value",
)
# Also fix json_each(data, value) -> json_each(data, "value")
# This handles $in and $nin inside $elemMatch
c = c.replace(
"json_each(data, value)",
'json_each(data, "value")',
)
inner_clauses.append(c)
inner_params.extend(p)
else:
# Case: {"field": {"$elemMatch": {"k": "v"}}}
for sub_field, sub_val in op_val.items():
if isinstance(sub_val, dict):
# Operator on subfield
c, p = self._build_operator_clause(
f"'{parse_json_path(sub_field)}'",
sub_val,
)
else:
# Equality on subfield
c = (
f"{self._json_function_prefix}_extract(data, "
f"'{parse_json_path(sub_field)}') = ?"
)
p = [sub_val]
if c is None:
return None, []
# Retarget from 'data' to 'value' (the row from json_each)
c = c.replace(
f"{self._json_function_prefix}_extract(data,",
f"{self._json_function_prefix}_extract(value,",
)
inner_clauses.append(c)
inner_params.extend(p)
else:
# Case: {"field": {"$elemMatch": "value"}}
# For simple values in an array
inner_clauses.append("value = ?")
inner_params.append(op_val)
if not inner_clauses:
return None, []
where_inner = " AND ".join(inner_clauses)
# EXISTS (SELECT 1 FROM json_each(data, '$.field') WHERE <inner_where>)
# Use json_type check to ensure it only matches arrays (MongoDB semantics)
clauses.append(
f"json_type(data, {json_path}) = 'array' AND "
f"EXISTS (SELECT 1 FROM {json_each_func}(data, {json_path}) AS json_each WHERE {where_inner})"
)
params.extend(inner_params)
case "$regex":
# Handle regex with optional options
if not isinstance(op_val, str):
return None, []
# Build pattern with inline regex flags for SQLite REGEXP
# Convert MongoDB options to Python inline regex flags
if regex_options:
flag_str = ""
if "i" in regex_options.lower():
flag_str += "i"
if "m" in regex_options.lower():
flag_str += "m"
if "s" in regex_options.lower():
flag_str += "s"
if "x" in regex_options.lower():
flag_str += "x"
pattern = f"(?{flag_str}){op_val}"
else:
pattern = op_val
clauses.append(
f"{self._json_function_prefix}_extract(data, {json_path}) REGEXP ?"
)
params.append(pattern)
case _:
# Unsupported operator, fallback to Python
return None, []
if not clauses:
return None, []
# Combine all clauses with AND
combined_clause = " AND ".join(clauses)
return combined_clause, params
[docs]
def _search_in_value(self, value: Any, search_term: str) -> bool:
"""
Recursively search for a term in a value (string, dict, or list).
Args:
value: The value to search in
search_term: The term to search for
Returns:
bool: True if the search term is found, False otherwise
"""
match value:
case str():
return search_term.lower() in value.lower()
case dict():
return any(
self._search_in_value(v, search_term)
for v in value.values()
)
case list():
return any(
self._search_in_value(elem, search_term) for elem in value
)
case _:
return False
[docs]
def _apply_query(
self,
query: dict[str, Any],
document: dict[str, Any],
) -> bool:
"""
Applies a query to a document to determine if it matches the query criteria.
Handles logical operators ($and, $or, $nor, $not) and nested field paths.
Processes both simple equality checks and complex query operators.
Args:
query (dict[str, Any]): A dictionary representing the query criteria.
document (dict[str, Any]): The document to apply the query to.
Returns:
bool: True if the document matches the query, False otherwise.
"""
if document is None:
return False
matches: list[bool] = []
def reapply(q: dict[str, Any]) -> bool:
"""
Recursively apply the query to the document to determine if it matches
the query criteria.
Args:
q (dict[str, Any]): The query to apply.
document (dict[str, Any]): The document to apply the query to.
Returns:
bool: True if the document matches the query, False otherwise.
"""
return self._apply_query(q, document)
for field, value in query.items():
match field:
case "$expr":
# Handle $expr operator in Python fallback
evaluator = ExprEvaluator(
data_column="data", db_connection=self.collection.db
)
result = evaluator._evaluate_expr_python(value, document)
matches.append(bool(result))
case "$gt" | "$lt" | "$gte" | "$lte" | "$eq" | "$ne" | "$cmp":
# Handle direct comparison expressions (without $expr wrapper)
# These are expressions like {"$gt": [{"$sin": "$angle"}, 0.5]}
# Check if value is an array (expression form) vs dict (field operator form)
if isinstance(value, list) and len(value) == 2:
# This is a direct expression, not a field operator
evaluator = ExprEvaluator(
data_column="data", db_connection=self.collection.db
)
result = evaluator._evaluate_expr_python(
query, document
)
matches.append(bool(result))
break # Direct expression is the entire query
# Otherwise, fall through to normal field operator handling
case "$text":
# Handle $text operator in Python fallback
text_match = False
if isinstance(value, dict) and "$search" in value:
search_term = value["$search"]
if isinstance(search_term, str):
# Find FTS tables for this collection to determine which fields are indexed
cursor = self.collection.db.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name LIKE ?",
(
f"{quote_table_name(self.collection.name)}_%_fts",
),
)
fts_tables = cursor.fetchall()
# Check each FTS-indexed field for matches
if fts_tables:
for fts_table in fts_tables:
fts_table_name = fts_table[0]
index_name = fts_table_name[
len(
f"{quote_table_name(self.collection.name)}_"
) : -4
]
field_name = index_name.replace("_", ".")
try:
field_value = self.collection._get_val(
document, field_name
)
except (AttributeError, TypeError) as e:
logger.debug(
f"Failed to get field '{field_name}' for FTS matching: {e}"
)
continue
if field_value and isinstance(
field_value, str
):
if (
search_term.lower()
in field_value.lower()
):
text_match = True
break
elif isinstance(field_value, list):
for elem in field_value:
if (
isinstance(elem, str)
and search_term.lower()
in elem.lower()
):
text_match = True
break
elif isinstance(
elem, dict
) and self._search_in_value(
elem, search_term
):
text_match = True
break
if text_match:
break
else:
# No FTS indexes, search all fields
text_match = unified_text_search(
document, search_term
)
matches.append(text_match)
case "$and":
matches.append(all(map(reapply, value)))
case "$or":
matches.append(any(map(reapply, value)))
case "$nor":
matches.append(not any(map(reapply, value)))
case "$not":
matches.append(not self._apply_query(value, document))
case "$jsonSchema":
from .schema_validator import matches_json_schema
matches.append(matches_json_schema(document, value))
case _:
if isinstance(value, dict):
# Extract $options for $regex if present
options = value.get("$options", "")
if options and "$regex" not in value:
raise MalformedQueryException(
"Can't use $options without $regex"
)
for operator, arg in value.items():
if operator == "$options":
# $options is handled together with $regex
continue
fn = self._get_operator_fn(operator)
# Call operator function, passing options if it's $regex
if operator == "$regex":
if not fn(
field, arg, document, options=options
):
matches.append(False)
break
else:
if not fn(field, arg, document):
matches.append(False)
break
else:
matches.append(True)
else:
doc_value: dict[str, Any] | None = document
if doc_value and field in doc_value:
doc_value = doc_value.get(field, None)
else:
for path in field.split("."):
if not isinstance(doc_value, dict):
break
doc_value = doc_value.get(path, None)
if isinstance(value, re.Pattern):
if doc_value is None or not value.search(
str(doc_value)
):
matches.append(False)
elif value != doc_value:
matches.append(False)
return all(matches)
[docs]
def _get_operator_fn(self, op: str) -> Any:
"""
Retrieve the function associated with the given operator from the
query_operators module.
Args:
op (str): The operator string, which should start with a '$' prefix.
Returns:
Any: The function corresponding to the operator.
Raises:
MalformedQueryException: If the operator does not start with '$'.
MalformedQueryException: If the operator is not currently implemented.
"""
if not op.startswith("$"):
raise MalformedQueryException(
f"Operator '{op}' is not a valid query operation"
)
try:
return getattr(query_operators, op.replace("$", "_"))
except AttributeError:
raise MalformedQueryException(
f"Operator '{op}' is not currently implemented"
)