"""
SQL converters for expression evaluation.
This module contains the SQL tier conversion methods for the ExprEvaluator.
These methods convert MongoDB $expr operators to SQL expressions.
This is designed as a mixin class to be composed into ExprEvaluator.
"""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any
from ..json_path_utils import build_json_extract_expression, parse_json_path
if TYPE_CHECKING:
# Avoid circular import by using TYPE_CHECKING
from .context import AggregationContext
[docs]
class SqlConvertersMixin:
"""
Mixin class providing SQL conversion methods for expression evaluation.
This class is designed to be composed into ExprEvaluator and provides
all the _convert_* methods for converting MongoDB operators to SQL.
Required attributes from parent class:
- data_column: Name of the JSON data column
- json_function_prefix: 'json' or 'jsonb' based on support
- json_each_function: 'json_each' or 'jsonb_each'
- json_group_array_function: 'json_group_array' or 'jsonb_group_array'
- _jsonb_supported: Boolean indicating JSONB support
- _log2_warned: Boolean tracking if $log2 warning was issued
"""
# Type annotations for simple attributes expected from parent class
# (Properties like json_function_prefix are handled separately)
data_column: str
_jsonb_supported: bool
_log2_warned: bool
_current_context: AggregationContext | None
[docs]
def _convert_expr_to_sql(
self, expr: dict[str, Any]
) -> tuple[str, list[Any]]:
"""
Convert a $expr expression to SQL.
Args:
expr: Expression dictionary
Returns:
Tuple of (SQL expression, parameters)
Raises:
NotImplementedError: If operator is not supported in SQL
ValueError: If expression structure is invalid
"""
if not isinstance(expr, dict) or len(expr) != 1:
raise ValueError("Invalid $expr expression structure")
operator, operands = next(iter(expr.items()))
# Handle different operator types
match operator:
case "$and" | "$or" | "$not" | "$nor":
return self._convert_logical_operator(operator, operands)
case "$gt" | "$gte" | "$lt" | "$lte" | "$eq" | "$ne":
return self._convert_comparison_operator(operator, operands)
case "$cmp":
return self._convert_cmp_operator(operands)
case "$add" | "$subtract" | "$multiply" | "$divide" | "$mod":
return self._convert_arithmetic_operator(operator, operands)
case (
"$pow"
| "$sqrt"
| "$ln"
| "$log"
| "$log10"
| "$log2"
| "$exp"
| "$sigmoid"
):
return self._convert_math_operator(operator, operands)
case "$cond":
return self._convert_cond_operator(operands)
case "$ifNull":
return self._convert_ifNull_operator(operands)
case (
"$size"
| "$in"
| "$isArray"
| "$slice"
| "$indexOfArray"
| "$sum"
| "$avg"
| "$min"
| "$max"
| "$setEquals"
| "$setIntersection"
| "$setUnion"
| "$setDifference"
| "$setIsSubset"
| "$anyElementTrue"
| "$allElementsTrue"
):
return self._convert_array_operator(operator, operands)
case (
"$concat"
| "$toLower"
| "$toUpper"
| "$strLenBytes"
| "$substr"
| "$trim"
| "$ltrim"
| "$rtrim"
| "$indexOfBytes"
| "$regexMatch"
| "$regexFind"
| "$regexFindAll"
| "$split"
| "$replaceAll"
| "$replaceOne"
| "$strLenCP"
| "$indexOfCP"
):
return self._convert_string_operator(operator, operands)
case "$abs" | "$ceil" | "$floor" | "$round" | "$trunc":
return self._convert_math_operator(operator, operands)
case (
"$sin"
| "$cos"
| "$tan"
| "$asin"
| "$acos"
| "$atan"
| "$atan2"
| "$sinh"
| "$cosh"
| "$tanh"
| "$asinh"
| "$acosh"
| "$atanh"
):
return self._convert_trig_operator(operator, operands)
case "$degreesToRadians" | "$radiansToDegrees":
return self._convert_angle_operator(operator, operands)
case (
"$year"
| "$month"
| "$dayOfMonth"
| "$hour"
| "$minute"
| "$second"
| "$dayOfWeek"
| "$dayOfYear"
| "$week"
| "$isoDayOfWeek"
| "$isoWeek"
| "$millisecond"
):
return self._convert_date_operator(operator, operands)
case "$dateAdd" | "$dateSubtract":
return self._convert_date_arithmetic_operator(
operator, operands
)
case "$dateDiff":
return self._convert_date_diff_operator(operands)
case (
"$mergeObjects"
| "$getField"
| "$setField"
| "$unsetField"
| "$objectToArray"
):
return self._convert_object_operator(operator, operands)
case "$let":
return self._convert_let_operator(operands)
case (
"$type"
| "$toString"
| "$toInt"
| "$toDouble"
| "$toLong"
| "$toBool"
| "$toDecimal"
| "$toObjectId"
| "$isNumber"
| "$convert"
):
return self._convert_type_operator(operator, operands)
case "$binarySize" | "$bsonSize":
return self._convert_data_size_operator(operator, operands)
case _:
raise NotImplementedError(
f"Operator {operator} not supported in SQL tier"
)
[docs]
def _convert_logical_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert logical operators ($and, $or, $not, $nor) to SQL."""
# Normalize operands for $not to handle both formats:
# {$not: {expression}} and {$not: [expression]}
if operator == "$not" and not isinstance(operands, list):
operands = [operands]
if not isinstance(operands, list):
raise ValueError(f"{operator} requires a list of expressions")
if operator == "$not":
if len(operands) != 1:
raise ValueError("$not requires exactly one operand")
inner_sql, inner_params = self._convert_expr_to_sql(operands[0])
return f"NOT ({inner_sql})", inner_params
# $and, $or, $nor
if len(operands) < 2:
raise ValueError(f"{operator} requires at least 2 operands")
sql_parts = []
all_params = []
for operand in operands:
operand_sql, operand_params = self._convert_expr_to_sql(operand)
sql_parts.append(f"({operand_sql})")
all_params.extend(operand_params)
match operator:
case "$and":
sql = " AND ".join(sql_parts)
case "$or":
sql = " OR ".join(sql_parts)
case "$nor":
sql = f"NOT ({' OR '.join(sql_parts)})"
case _:
raise ValueError(f"Unknown logical operator: {operator}")
return sql, all_params
[docs]
def _convert_comparison_operator(
self, operator: str, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert comparison operators to SQL."""
if len(operands) != 2:
raise ValueError(f"{operator} requires exactly 2 operands")
left_sql, left_params = self._convert_operand_to_sql(operands[0])
right_sql, right_params = self._convert_operand_to_sql(operands[1])
sql_operator = self._map_comparison_operator(operator)
return (
f"{left_sql} {sql_operator} {right_sql}",
left_params + right_params,
)
[docs]
def _convert_cmp_operator(
self, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert $cmp operator to SQL CASE statement."""
if len(operands) != 2:
raise ValueError("$cmp requires exactly 2 operands")
left_sql, left_params = self._convert_operand_to_sql(operands[0])
right_sql, right_params = self._convert_operand_to_sql(operands[1])
sql = f"(CASE WHEN {left_sql} < {right_sql} THEN -1 WHEN {left_sql} > {right_sql} THEN 1 ELSE 0 END)"
return sql, left_params + right_params
[docs]
def _convert_arithmetic_operator(
self, operator: str, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert arithmetic operators to SQL."""
if len(operands) < 2:
raise ValueError(f"{operator} requires at least 2 operands")
sql_parts = []
all_params = []
for operand in operands:
operand_sql, operand_params = self._convert_operand_to_sql(operand)
sql_parts.append(operand_sql)
all_params.extend(operand_params)
sql_operator = self._map_arithmetic_operator(operator)
sql = f"({f' {sql_operator} '.join(sql_parts)})"
return sql, all_params
[docs]
def _convert_cond_operator(
self, operands: dict[str, Any]
) -> tuple[str, list[Any]]:
"""Convert $cond operator to SQL CASE statement."""
if not isinstance(operands, dict):
raise ValueError("$cond requires a dictionary")
if "if" not in operands or "then" not in operands:
raise ValueError("$cond requires 'if' and 'then' fields")
condition_sql, condition_params = self._convert_expr_to_sql(
operands["if"]
)
then_sql, then_params = self._convert_operand_to_sql(operands["then"])
if "else" in operands:
else_sql, else_params = self._convert_operand_to_sql(
operands["else"]
)
else:
else_sql, else_params = "NULL", []
sql = f"CASE WHEN {condition_sql} THEN {then_sql} ELSE {else_sql} END"
return sql, condition_params + then_params + else_params
[docs]
def _convert_ifNull_operator(
self, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert $ifNull operator to SQL COALESCE."""
if not isinstance(operands, list) or len(operands) != 2:
raise ValueError("$ifNull requires exactly 2 operands")
expr_sql, expr_params = self._convert_operand_to_sql(operands[0])
replacement_sql, replacement_params = self._convert_operand_to_sql(
operands[1]
)
sql = f"COALESCE({expr_sql}, {replacement_sql})"
return sql, expr_params + replacement_params
[docs]
def _convert_array_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert array operators to SQL."""
# Get the appropriate function names based on SQLite version
json_each = self.json_each_function # type: ignore[attr-defined]
json_group_array = self.json_group_array_function # type: ignore[attr-defined]
# Normalize operands for operators that accept single values
if operator in (
"$size",
"$isArray",
"$sum",
"$avg",
"$min",
"$max",
) and not isinstance(operands, list):
operands = [operands]
match operator:
case "$size":
if len(operands) != 1:
raise ValueError("$size requires exactly 1 operand")
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
sql = f"json_array_length({array_sql})"
return sql, array_params
case "$in":
if len(operands) != 2:
raise ValueError("$in requires exactly 2 operands")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
array_sql, array_params = self._convert_operand_to_sql(
operands[1]
)
sql = f"EXISTS (SELECT 1 FROM {json_each}({array_sql}) WHERE value = {value_sql})"
return sql, value_params + array_params
case "$isArray":
if len(operands) != 1:
raise ValueError("$isArray requires exactly 1 operand")
operand = operands[0]
if isinstance(operand, str) and operand.startswith("$"):
field_path = operand[1:]
sql = f"CASE WHEN json_type({self.data_column}, '{parse_json_path(field_path)}') = 'array' THEN json('true') ELSE json('false') END"
return sql, []
else:
value_sql, value_params = self._convert_operand_to_sql(
operand
)
sql = f"CASE WHEN json_type({value_sql}) = 'array' THEN json('true') ELSE json('false') END"
return sql, value_params
case "$sum" | "$avg" | "$min" | "$max":
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
sql_agg = operator[1:].upper()
if operator in ("$sum", "$avg"):
sql = f"(SELECT {sql_agg}(value) FROM {json_each}({array_sql}) WHERE typeof(value) IN ('integer', 'real'))"
else:
sql = f"(SELECT {sql_agg}(value) FROM {json_each}({array_sql}))"
return sql, array_params
case "$slice":
if not isinstance(operands, list) or len(operands) < 2:
raise ValueError("$slice requires array and count/position")
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
count = operands[1]
skip = operands[2] if len(operands) > 2 else 0
if skip != 0:
if self._jsonb_supported:
sql = f"(SELECT json({json_group_array}(value)) FROM (SELECT value FROM {json_each}({array_sql}) LIMIT {count} OFFSET {skip}))"
else:
sql = f"(SELECT {json_group_array}(value) FROM (SELECT value FROM {json_each}({array_sql}) LIMIT {count} OFFSET {skip}))"
else:
if self._jsonb_supported:
sql = f"(SELECT json({json_group_array}(value)) FROM (SELECT value FROM {json_each}({array_sql}) LIMIT {count}))"
else:
sql = f"(SELECT {json_group_array}(value) FROM (SELECT value FROM {json_each}({array_sql}) LIMIT {count}))"
return sql, array_params
case "$indexOfArray":
if len(operands) != 2:
raise ValueError(
"$indexOfArray requires exactly 2 operands"
)
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
value_sql, value_params = self._convert_operand_to_sql(
operands[1]
)
sql = f"(SELECT COALESCE((SELECT key FROM {json_each}({array_sql}) WHERE value = {value_sql} LIMIT 1), -1))"
return sql, array_params + value_params
case (
"$setEquals"
| "$setIntersection"
| "$setUnion"
| "$setDifference"
| "$setIsSubset"
| "$anyElementTrue"
| "$allElementsTrue"
):
return self._convert_set_operator(operator, operands)
case _:
raise NotImplementedError(
f"Array operator {operator} not supported in SQL tier"
)
[docs]
def _convert_set_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert set operators to SQL using json_each."""
# Get the appropriate json_each function based on SQLite version
json_each = self.json_each_function # type: ignore[attr-defined]
json_group_array = self.json_group_array_function # type: ignore[attr-defined]
# Normalize operands for operators that accept single values
if operator in (
"$anyElementTrue",
"$allElementsTrue",
) and not isinstance(operands, list):
operands = [operands]
match operator:
case "$setEquals":
# NOT EXISTS clauses duplicate ? placeholders causing param mismatch.
raise NotImplementedError(
"Operator $setEquals not supported in SQL tier"
)
case "$setIntersection":
if len(operands) != 2:
raise ValueError(
"$setIntersection requires exactly 2 operands"
)
array1_sql, array1_params = self._convert_operand_to_sql(
operands[0]
)
array2_sql, array2_params = self._convert_operand_to_sql(
operands[1]
)
# SELECT elements from array1 that exist in array2
sql = f"""
(SELECT json({json_group_array}(DISTINCT a1.value))
FROM {json_each}({array1_sql}) AS a1
WHERE EXISTS (SELECT 1 FROM {json_each}({array2_sql}) AS a2 WHERE a2.value = a1.value))
"""
return sql, array1_params + array2_params
case "$setUnion":
if len(operands) != 2:
raise ValueError("$setUnion requires exactly 2 operands")
array1_sql, array1_params = self._convert_operand_to_sql(
operands[0]
)
array2_sql, array2_params = self._convert_operand_to_sql(
operands[1]
)
# SELECT DISTINCT elements from both arrays
sql = f"""
(SELECT json({json_group_array}(DISTINCT value))
FROM (
SELECT value FROM {json_each}({array1_sql})
UNION
SELECT value FROM {json_each}({array2_sql})
))
"""
return sql, array1_params + array2_params
case "$setDifference":
if len(operands) != 2:
raise ValueError(
"$setDifference requires exactly 2 operands"
)
array1_sql, array1_params = self._convert_operand_to_sql(
operands[0]
)
array2_sql, array2_params = self._convert_operand_to_sql(
operands[1]
)
# SELECT elements from array1 that don't exist in array2
sql = f"""
(SELECT json({json_group_array}(a1.value))
FROM {json_each}({array1_sql}) AS a1
WHERE NOT EXISTS (SELECT 1 FROM {json_each}({array2_sql}) AS a2 WHERE a2.value = a1.value))
"""
return sql, array1_params + array2_params
case "$setIsSubset":
if len(operands) != 2:
raise ValueError("$setIsSubset requires exactly 2 operands")
array1_sql, array1_params = self._convert_operand_to_sql(
operands[0]
)
array2_sql, array2_params = self._convert_operand_to_sql(
operands[1]
)
# Check if all elements of array1 exist in array2
sql = f"""
(
NOT EXISTS (
SELECT 1 FROM {json_each}({array1_sql}) AS a1
WHERE NOT EXISTS (SELECT 1 FROM {json_each}({array2_sql}) AS a2 WHERE a2.value = a1.value)
)
)
"""
return sql, array1_params + array2_params
case "$anyElementTrue":
if len(operands) != 1:
raise ValueError(
"$anyElementTrue requires exactly 1 operand"
)
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
# Check if any element is truthy (not false, null, or 0)
sql = f"""
(
EXISTS (
SELECT 1 FROM {json_each}({array_sql}) AS a
WHERE a.value IS NOT NULL AND a.value != 0 AND a.value != json('false') AND a.value != json('null')
)
)
"""
return sql, array_params
case "$allElementsTrue":
if len(operands) != 1:
raise ValueError(
"$allElementsTrue requires exactly 1 operand"
)
array_sql, array_params = self._convert_operand_to_sql(
operands[0]
)
# Check if all elements are truthy (no false, null, or 0 elements)
# Empty array returns True (vacuous truth, matching Python's all([]))
sql = f"""
(
NOT EXISTS (
SELECT 1 FROM {json_each}({array_sql}) AS a
WHERE a.value IS NULL OR a.value = 0 OR a.value = json('false') OR a.value = json('null')
)
)
"""
return sql, array_params
case _:
raise NotImplementedError(
f"Set operator {operator} not supported in SQL tier"
)
[docs]
def _build_pattern_with_options(self, regex: str, options: str) -> str:
"""Build regex pattern with inline flags."""
if not options:
return regex
flag_str = ""
for char in options.lower():
if char in "imsx":
flag_str += char
return f"(?{flag_str}){regex}" if flag_str else regex
[docs]
def _convert_string_operator(
self, operator: str, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert string operators to SQL."""
match operator:
case "$concat":
if len(operands) < 1:
raise ValueError("$concat requires at least 1 operand")
sql_parts = []
all_params = []
for operand in operands:
operand_sql, operand_params = self._convert_operand_to_sql(
operand
)
sql_parts.append(operand_sql)
all_params.extend(operand_params)
sql = f"({' || '.join(sql_parts)})"
return sql, all_params
case "$toLower":
if len(operands) != 1:
raise ValueError("$toLower requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
sql = f"lower({value_sql})"
return sql, value_params
case "$toUpper":
if len(operands) != 1:
raise ValueError("$toUpper requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
sql = f"upper({value_sql})"
return sql, value_params
case "$strLenBytes":
if len(operands) != 1:
raise ValueError("$strLenBytes requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
sql = f"length({value_sql})"
return sql, value_params
case "$substr":
if len(operands) != 3:
raise ValueError("$substr requires exactly 3 operands")
str_sql, str_params = self._convert_operand_to_sql(operands[0])
start_sql, start_params = self._convert_operand_to_sql(
operands[1]
)
len_sql, len_params = self._convert_operand_to_sql(operands[2])
sql = f"substr({str_sql}, {start_sql} + 1, {len_sql})"
return sql, str_params + start_params + len_params
case "$trim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$trim requires 'input' field")
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
if "chars" in operands:
chars_sql, chars_params = self._convert_operand_to_sql(
operands["chars"]
)
sql = f"trim({input_sql}, {chars_sql})"
return sql, input_params + chars_params
else:
sql = f"trim({input_sql})"
return sql, input_params
case "$ltrim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$ltrim requires 'input' field")
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
if "chars" in operands:
chars_sql, chars_params = self._convert_operand_to_sql(
operands["chars"]
)
sql = f"ltrim({input_sql}, {chars_sql})"
return sql, input_params + chars_params
else:
sql = f"ltrim({input_sql})"
return sql, input_params
case "$rtrim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$rtrim requires 'input' field")
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
if "chars" in operands:
chars_sql, chars_params = self._convert_operand_to_sql(
operands["chars"]
)
sql = f"rtrim({input_sql}, {chars_sql})"
return sql, input_params + chars_params
else:
sql = f"rtrim({input_sql})"
return sql, input_params
case "$indexOfBytes":
if len(operands) < 2:
raise ValueError(
"$indexOfBytes requires string and substring"
)
string_sql, string_params = self._convert_operand_to_sql(
operands[0]
)
substr_sql, substr_params = self._convert_operand_to_sql(
operands[1]
)
sql = f"(instr({string_sql}, {substr_sql}) - 1)"
return sql, string_params + substr_params
case "$strcasecmp":
# Case-insensitive string comparison using SQLite's COLLATE NOCASE
if len(operands) != 2:
raise ValueError("$strcasecmp requires exactly 2 operands")
str1_sql, str1_params = self._convert_operand_to_sql(
operands[0]
)
str2_sql, str2_params = self._convert_operand_to_sql(
operands[1]
)
# Use CASE expression to return -1, 0, or 1
sql = f"""
CASE
WHEN {str1_sql} COLLATE NOCASE < {str2_sql} COLLATE NOCASE THEN -1
WHEN {str1_sql} COLLATE NOCASE > {str2_sql} COLLATE NOCASE THEN 1
ELSE 0
END
"""
return sql, str1_params + str2_params
case "$substrBytes":
# Substring by bytes - SQLite's substr works on characters, not bytes
# For ASCII this is the same, for UTF-8 we need special handling
if len(operands) != 3:
raise ValueError("$substrBytes requires exactly 3 operands")
str_sql, str_params = self._convert_operand_to_sql(operands[0])
start_sql, start_params = self._convert_operand_to_sql(
operands[1]
)
len_sql, len_params = self._convert_operand_to_sql(operands[2])
# Use substr - note this works on characters in SQLite
# For true byte-level operations, would need hex/unescape
sql = f"substr({str_sql}, {start_sql} + 1, {len_sql})"
return sql, str_params + start_params + len_params
case "$regexMatch":
# $regexMatch format: {input, regex, options?}
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$regexMatch requires 'input' and 'regex'")
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
regex = operands.get("regex", "")
options = operands.get("options", "")
pattern = self._build_pattern_with_options(regex, options)
sql = f"CASE WHEN {input_sql} REGEXP ? THEN json('true') ELSE json('false') END"
return sql, input_params + [pattern]
case "$regexFind":
# $regexFind format: {input, regex, options?}
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$regexFind requires 'input' and 'regex'")
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
regex = operands.get("regex", "")
options = operands.get("options", "")
pattern = self._build_pattern_with_options(regex, options)
sql = f"json(REGEXP_FIND(?, {input_sql}))"
return sql, input_params + [pattern]
case "$regexFindAll":
# $regexFindAll format: {input, regex, options?}
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError(
"$regexFindAll requires 'input' and 'regex'"
)
input_sql, input_params = self._convert_operand_to_sql(
operands["input"]
)
regex = operands.get("regex", "")
options = operands.get("options", "")
pattern = self._build_pattern_with_options(regex, options)
sql = f"json(REGEXP_FIND_ALL(?, {input_sql}))"
return sql, input_params + [pattern]
case "$split":
# Recursive CTE duplicates ? placeholders causing param mismatch.
raise NotImplementedError(
"Operator $split not supported in SQL tier"
)
case "$replaceAll":
# Handle MongoDB dict format: {input, find, replacement}
if isinstance(operands, dict):
string_operand = operands.get("input")
find_operand = operands.get("find")
replace_operand = operands.get("replacement")
else:
# Handle list format
if len(operands) != 3:
raise ValueError(
"$replaceAll requires string, find, and replacement"
)
string_operand = operands[0]
find_operand = operands[1]
replace_operand = operands[2]
string_sql, string_params = self._convert_operand_to_sql(
string_operand
)
# Check if it's a regex replace (MongoDB 4.4+)
# MongoDB doesn't natively support regex in $replaceAll (it uses $replaceOne/$replaceAll for strings)
# but we can support it if the find operand is a regex expression
if isinstance(find_operand, dict) and "$regex" in find_operand:
regex = find_operand["$regex"]
options = find_operand.get("$options", "")
pattern = self._build_pattern_with_options(regex, options)
replace_sql, replace_params = self._convert_operand_to_sql(
replace_operand
)
# count=0 for replaceAll
sql = f"REGEXP_REPLACE({string_sql}, ?, {replace_sql}, 0)"
return sql, string_params + [pattern] + replace_params
find_sql, find_params = self._convert_operand_to_sql(
find_operand
)
replace_sql, replace_params = self._convert_operand_to_sql(
replace_operand
)
sql = f"replace({string_sql}, {find_sql}, {replace_sql})"
return sql, string_params + find_params + replace_params
case "$replaceOne":
# Handle MongoDB dict format: {input, find, replacement}
if isinstance(operands, dict):
string_operand = operands.get("input")
find_operand = operands.get("find")
replace_operand = operands.get("replacement")
else:
if len(operands) != 3:
raise ValueError(
"$replaceOne requires string, find, and replacement"
)
string_operand = operands[0]
find_operand = operands[1]
replace_operand = operands[2]
string_sql, string_params = self._convert_operand_to_sql(
string_operand
)
# Check for regex replace
if isinstance(find_operand, dict) and "$regex" in find_operand:
regex = find_operand["$regex"]
options = find_operand.get("$options", "")
pattern = self._build_pattern_with_options(regex, options)
replace_sql, replace_params = self._convert_operand_to_sql(
replace_operand
)
# count=1 for replaceOne
sql = f"REGEXP_REPLACE({string_sql}, ?, {replace_sql}, 1)"
return sql, string_params + [pattern] + replace_params
find_sql, find_params = self._convert_operand_to_sql(
find_operand
)
replace_sql, replace_params = self._convert_operand_to_sql(
replace_operand
)
# Use instr() and substr() to replace only first occurrence
# Note: string_sql and find_sql are used multiple times, so we
# need to duplicate params for each occurrence
sql = (
f"CASE WHEN instr({string_sql}, {find_sql}) > 0 THEN "
f"substr({string_sql}, 1, instr({string_sql}, {find_sql}) - 1) || "
f"{replace_sql} || "
f"substr({string_sql}, instr({string_sql}, {find_sql}) + length({find_sql})) "
f"ELSE {string_sql} END"
)
# Duplicate params to match SQL order:
# 1. instr(string, find) - string_params + find_params
# 2. instr(string, find) - string_params + find_params
# 3. replace - replace_params
# 4. instr(string, find) - string_params + find_params
# 5. length(find) - find_params
all_params = (
string_params
+ find_params # 1st instr
+ string_params
+ find_params # 2nd instr
+ replace_params # replacement
+ string_params
+ find_params # 3rd instr
+ find_params # length
)
return sql, all_params
case "$strLenCP":
# Normalize operands
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError("$strLenCP requires exactly 1 operand")
string_sql, string_params = self._convert_operand_to_sql(
operands[0]
)
# For BMP characters, length in bytes = length in code points
sql = f"length({string_sql})"
return sql, string_params
case "$indexOfCP":
if len(operands) < 2:
raise ValueError("$indexOfCP requires string and substring")
string_sql, string_params = self._convert_operand_to_sql(
operands[0]
)
substr_sql, substr_params = self._convert_operand_to_sql(
operands[1]
)
# SQLite instr(haystack, needle) returns 1-based index, convert to 0-based
# Note: The haystack comes first, needle second (opposite of MongoDB's order)
sql = f"instr({string_sql}, {substr_sql}) - 1"
return sql, string_params + substr_params
case _:
raise NotImplementedError(
f"String operator {operator} not supported in SQL tier"
)
[docs]
def _convert_math_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert math operators to SQL."""
# Normalize operands to handle both single values and lists
# MongoDB allows both: {$exp: 1} and {$exp: [1]}
# Note: $pow, $log, $sigmoid, and $round can have multiple operands
if operator not in (
"$pow",
"$log",
"$sigmoid",
"$round",
) and not isinstance(operands, list):
operands = [operands]
match operator:
case "$pow":
# Handle $pow separately (requires 2 operands)
if len(operands) != 2:
raise ValueError("$pow requires exactly 2 operands")
base_sql, base_params = self._convert_operand_to_sql(
operands[0]
)
exp_sql, exp_params = self._convert_operand_to_sql(operands[1])
sql = f"pow({base_sql}, {exp_sql})"
return sql, base_params + exp_params
case "$log":
# $log with custom base requires 2 operands: [number, base]
if len(operands) != 2:
raise ValueError(
"$log requires exactly 2 operands: [number, base]"
)
number_sql, number_params = self._convert_operand_to_sql(
operands[0]
)
base_sql, base_params = self._convert_operand_to_sql(
operands[1]
)
# SQLite: log(base, number)
sql = f"log({base_sql}, {number_sql})"
return sql, number_params + base_params
case "$round":
# $round can have 1 or 2 operands: [number] or [number, precision]
if len(operands) < 1 or len(operands) > 2:
raise ValueError("$round requires 1 or 2 operands")
number_sql, number_params = self._convert_operand_to_sql(
operands[0]
)
if len(operands) == 2:
precision_sql, precision_params = (
self._convert_operand_to_sql(operands[1])
)
sql = f"round({number_sql}, {precision_sql})"
return sql, number_params + precision_params
else:
sql = f"round({number_sql})"
return sql, number_params
case "$sigmoid":
# Sigmoid function: 1 / (1 + e^(-x))
# Handle object format: { $sigmoid: { input: <expr>, onNull: <expr> } }
if isinstance(operands, dict):
input_sql, input_params = self._convert_operand_to_sql(
operands.get("input")
)
on_null_value = operands.get("onNull")
if on_null_value is not None:
on_null_sql, on_null_params = (
self._convert_operand_to_sql(on_null_value)
)
sql = f"(CASE WHEN {input_sql} IS NULL THEN {on_null_sql} ELSE (1.0 / (1.0 + exp(-({input_sql})))) END)"
return sql, input_params + on_null_params
else:
sql = f"(1.0 / (1.0 + exp(-({input_sql}))))"
return sql, input_params
else:
# Simple format: { $sigmoid: <expr> } or { $sigmoid: [<expr>] }
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError("$sigmoid requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
sql = f"(1.0 / (1.0 + exp(-({value_sql}))))"
return sql, value_params
case _:
# All other math operators require 1 operand
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
match operator:
case "$abs":
sql = f"abs({value_sql})"
case "$ceil":
sql = f"ceil({value_sql})"
case "$floor":
sql = f"floor({value_sql})"
case "$round":
sql = f"round({value_sql})"
case "$trunc":
sql = f"cast({value_sql} as integer)"
case "$sqrt":
sql = f"sqrt({value_sql})"
case "$ln":
# Natural logarithm (base e)
sql = f"ln({value_sql})"
case "$log10":
# Base-10 logarithm
sql = f"log10({value_sql})"
case "$log2":
# Base-2 logarithm
# Warn about NeoSQLite extension (not in MongoDB)
if not self._log2_warned:
warnings.warn(
"$log2 is a NeoSQLite extension (not available in MongoDB). "
"For MongoDB compatibility, use { $log: [ <number>, 2 ] } instead.",
UserWarning,
stacklevel=4,
)
self._log2_warned = True
sql = f"log2({value_sql})"
case "$exp":
# Exponential function (e^x)
sql = f"exp({value_sql})"
case _:
raise NotImplementedError(
f"Math operator {operator} not supported in SQL tier"
)
return sql, value_params
[docs]
def _convert_trig_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert trigonometric and hyperbolic operators to SQL.
Args:
operator: The trig operator ($sin, $cos, etc.)
operands: The operand(s). Can be:
- A single value (string, number) for simple cases like {"$sin": "$angle"}
- A list of values for array format like {"$sin": ["$angle"]}
"""
# Normalize operands to handle both single values and lists
# MongoDB allows both: {$sin: "$angle"} and {$sin: ["$angle"]}
if not isinstance(operands, list):
operands = [operands]
match operator:
case "$atan2":
# Handle $atan2 separately (requires 2 operands)
if len(operands) != 2:
raise ValueError("$atan2 requires exactly 2 operands")
y_sql, y_params = self._convert_operand_to_sql(operands[0])
x_sql, x_params = self._convert_operand_to_sql(operands[1])
sql = f"atan2({y_sql}, {x_sql})"
return sql, y_params + x_params
case _:
# All other trig operators require 1 operand
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(
operands[0]
)
# Standard trigonometric functions
match operator:
case "$sin":
sql_func = "sin"
case "$cos":
sql_func = "cos"
case "$tan":
sql_func = "tan"
case "$asin":
sql_func = "asin"
case "$acos":
sql_func = "acos"
case "$atan":
sql_func = "atan"
# Hyperbolic functions
case "$sinh":
sql_func = "sinh"
case "$cosh":
sql_func = "cosh"
case "$tanh":
sql_func = "tanh"
# Inverse hyperbolic functions
case "$asinh":
sql_func = "asinh"
case "$acosh":
sql_func = "acosh"
case "$atanh":
sql_func = "atanh"
case _:
raise NotImplementedError(
f"Trig operator {operator} not supported in SQL tier"
)
sql = f"{sql_func}({value_sql})"
return sql, value_params
[docs]
def _convert_angle_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert angle conversion operators to SQL."""
# Normalize operands to handle both single values and lists
# MongoDB allows both: {$degreesToRadians: 180} and {$degreesToRadians: [180]}
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(operands[0])
match operator:
case "$degreesToRadians":
# radians = degrees * pi() / 180
sql = f"({value_sql} * pi() / 180.0)"
case "$radiansToDegrees":
# degrees = radians * 180 / pi()
sql = f"({value_sql} * 180.0 / pi())"
case _:
raise NotImplementedError(
f"Angle operator {operator} not supported in SQL tier"
)
return sql, value_params
[docs]
def _convert_date_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert date operators to SQL using strftime."""
# Normalize operands to handle both single values and lists
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(operands[0])
# SQLite strftime format codes
match operator:
case "$year":
fmt = "%Y"
case "$month":
fmt = "%m"
case "$dayOfMonth":
fmt = "%d"
case "$hour":
fmt = "%H"
case "$minute":
fmt = "%M"
case "$second":
fmt = "%S"
case "$dayOfWeek":
fmt = "%w"
sql = f"(CAST(strftime('{fmt}', {value_sql}) AS INTEGER) + 1)"
return sql, value_params
case "$dayOfYear":
fmt = "%j"
case "$week":
fmt = "%U"
case "$isoDayOfWeek":
fmt = "%u"
case "$isoWeek":
fmt = "%V"
case "$millisecond":
fmt = "%f"
case _:
raise NotImplementedError(
f"Date operator {operator} not supported in SQL tier"
)
# For numeric results, cast to integer
if operator == "$millisecond":
sql = (
f"cast(strftime('{fmt}', {value_sql}) * 1000 as integer) % 1000"
)
else:
sql = f"cast(strftime('{fmt}', {value_sql}) as integer)"
return sql, value_params
[docs]
def _convert_date_arithmetic_operator(
self, operator: str, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert $dateAdd/$dateSubtract operators to SQL.
MongoDB syntax: {$dateAdd: [date, amount, unit]} or
{$dateAdd: {startDate: date, amount: N, unit: "day"}}
SQLite: datetime(date, '+N unit' or '-N unit')
"""
# Handle MongoDB dict format: {startDate, amount, unit}
if isinstance(operands, dict):
operands = [
operands.get("startDate"),
operands.get("amount"),
operands.get("unit", "day"),
]
if len(operands) < 2 or len(operands) > 3:
raise ValueError(
f"{operator} requires 2-3 operands: [date, amount, unit]"
)
date_sql, date_params = self._convert_operand_to_sql(operands[0])
amount = operands[1] # Should be a literal number
unit = operands[2] if len(operands) > 2 else "day" # Default to days
# Validate unit
valid_units = (
"day",
"hour",
"minute",
"second",
"week",
"month",
"year",
)
if not isinstance(unit, str) or unit not in valid_units:
raise ValueError(f"{operator} unit must be one of: {valid_units}")
# Handle year/month specially (SQLite doesn't support directly)
if unit == "year":
amount = amount * 12
unit = "month"
# Determine sign based on operator
sign = "+" if operator == "$dateAdd" else "-"
# Handle week conversion to days
sqlite_unit = unit
if unit == "week":
sqlite_unit = "day"
if isinstance(amount, (int, float)):
amount = amount * 7
# Build the modifier
if isinstance(amount, (int, float)):
modifier = f"'{sign}{amount} {sqlite_unit}s'"
# Use strftime with 'T' separator and 'Z' suffix so
# neosqlite_json_loads recognizes the result as a UTC ISO
# date and converts it back to a timezone-aware datetime
sql = f"strftime('%Y-%m-%dT%H:%M:%SZ', {date_sql}, {modifier})"
return sql, date_params
else:
# Amount is a field reference - need to use CASE or build dynamically
# For simplicity, we'll use printf to build the modifier
amount_sql, amount_params = self._convert_operand_to_sql(
operands[1]
)
if sign == "-":
amount_sql = f"-({amount_sql})"
# Use strftime with 'T' separator and 'Z' suffix so
# neosqlite_json_loads recognizes the result as a UTC ISO
# date and converts it back to a timezone-aware datetime
sql = f"strftime('%Y-%m-%dT%H:%M:%SZ', {date_sql}, printf('%+d {sqlite_unit}s', {amount_sql}))"
return sql, date_params + amount_params
[docs]
def _convert_date_diff_operator(
self, operands: list[Any]
) -> tuple[str, list[Any]]:
"""Convert $dateDiff operator to SQL.
MongoDB syntax: {$dateDiff: [date1, date2, unit]} or
{$dateDiff: {startDate: date1, endDate: date2, unit: "day"}}
SQLite: julianday(date2) - julianday(date1) for days
"""
# Handle MongoDB dict format: {startDate, endDate, unit}
if isinstance(operands, dict):
operands = [
operands.get("startDate"),
operands.get("endDate"),
operands.get("unit", "day"),
]
if len(operands) < 2 or len(operands) > 3:
raise ValueError(
"$dateDiff requires 2-3 operands: [date1, date2, unit]"
)
date1_sql, date1_params = self._convert_operand_to_sql(operands[0])
date2_sql, date2_params = self._convert_operand_to_sql(operands[1])
unit = operands[2] if len(operands) > 2 else "day"
# Validate unit
valid_units = (
"day",
"hour",
"minute",
"second",
"week",
"month",
"year",
)
if not isinstance(unit, str) or unit not in valid_units:
raise ValueError(f"$dateDiff unit must be one of: {valid_units}")
# For month and year, use SQLite strftime to extract components
# and compute the difference directly (julianday-based division
# is inaccurate for month/year units).
if unit in ("month", "year"):
sql = f"""(
(strftime('%Y', {date2_sql}) - strftime('%Y', {date1_sql})) * 12
+ (strftime('%m', {date2_sql}) - strftime('%m', {date1_sql}))
)"""
if unit == "year":
sql = f"cast({sql} / 12 as integer)"
return sql, date2_params + date1_params
# Base calculation: difference in days
sql = f"(julianday({date2_sql}) - julianday({date1_sql}))"
# Convert to requested unit
unit_multipliers = {
"day": 1,
"week": 1.0 / 7,
"hour": 24,
"minute": 24 * 60,
"second": 24 * 60 * 60,
}
multiplier = unit_multipliers.get(unit, 1)
if multiplier != 1:
sql = f"cast({sql} * {multiplier} as integer)"
else:
sql = f"cast({sql} as integer)"
# Params must match placeholder order: date2 first, then date1
return sql, date2_params + date1_params
[docs]
def _convert_object_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert object operators to SQL.
Note: json_patch() works with both JSON and JSONB data types.
Only json_extract/jsonb_extract, json_set/jsonb_set have JSONB variants.
"""
json_prefix = self.json_function_prefix # type: ignore[attr-defined]
match operator:
case "$mergeObjects":
if not isinstance(operands, list) or len(operands) < 1:
raise ValueError("$mergeObjects requires a list of objects")
sql_parts = []
all_params = []
for obj in operands:
obj_sql, obj_params = self._convert_operand_to_sql(obj)
sql_parts.append(obj_sql)
all_params.extend(obj_params)
# Use json_patch to merge objects (works with both JSON and JSONB)
if len(sql_parts) == 1:
sql = sql_parts[0]
else:
sql = f"json_patch({sql_parts[0]}, {sql_parts[1]})"
for part in sql_parts[2:]:
sql = f"json_patch({sql}, {part})"
return sql, all_params
case "$getField":
if not isinstance(operands, dict) or "field" not in operands:
raise ValueError("$getField requires 'field' specification")
field = operands["field"]
input_val = operands.get("input")
if input_val is not None:
input_sql, input_params = self._convert_operand_to_sql(
input_val
)
else:
input_sql, input_params = self.data_column, []
sql = f"{json_prefix}_extract({input_sql}, '{parse_json_path(field)}')"
return sql, input_params
case "$setField":
if not isinstance(operands, dict):
raise ValueError("$setField requires a dictionary")
field = operands.get("field")
value = operands.get("value")
input_val = operands.get("input")
if field is None:
raise ValueError("$setField requires 'field'")
if input_val is not None:
input_sql, input_params = self._convert_operand_to_sql(
input_val
)
else:
input_sql, input_params = self.data_column, []
value_sql, value_params = self._convert_operand_to_sql(value)
sql = f"{json_prefix}_set({input_sql}, '{parse_json_path(field)}', {value_sql})"
return sql, input_params + value_params
case "$unsetField":
if not isinstance(operands, dict) or "field" not in operands:
raise ValueError(
"$unsetField requires 'field' specification"
)
field = operands["field"]
input_val = operands.get("input")
if input_val is not None:
input_sql, input_params = self._convert_operand_to_sql(
input_val
)
else:
input_sql, input_params = self.data_column, []
# Use json_remove to remove field
sql = f"{json_prefix}_remove({input_sql}, '{parse_json_path(field)}')"
return sql, input_params
case "$objectToArray":
# Convert object to array of {k, v} objects
# Syntax: { $objectToArray: <object> }
sql_input, params = self._convert_operand_to_sql(operands)
json_group_array = self.json_group_array_function # type: ignore[attr-defined]
json_each = self.json_each_function # type: ignore[attr-defined]
# Use a subquery with json_each to build the array
sql = f"(SELECT json({json_group_array}(json_object('k', key, 'v', value))) FROM {json_each}({sql_input}))"
return sql, params
case _:
raise NotImplementedError(
f"Object operator {operator} not supported in SQL tier"
)
[docs]
def _convert_let_operator(self, operands: Any) -> tuple[str, list[Any]]:
"""
Convert $let operator to SQL by inlining variables.
MongoDB syntax: { $let: { vars: { <var1>: <expr1>, ... }, in: <expr> } }
"""
if not isinstance(operands, dict):
raise ValueError("$let requires a dictionary")
vars_spec = operands.get("vars", {})
in_expr = operands.get("in")
if in_expr is None:
raise ValueError("$let requires 'in' expression")
# We need the current context to store variables for inlining
if (
not hasattr(self, "_current_context")
or self._current_context is None
):
# Fallback to Python if no context is available
raise NotImplementedError(
"$let requires an aggregation context in SQL tier"
)
context = self._current_context
# Create a new context for nested scoping
nested_context = context.clone()
for var_name, var_expr in vars_spec.items():
# Evaluate the variable expression to SQL
var_sql, var_params = self._convert_operand_to_sql(var_expr)
# Store the SQL and params in the nested context
# var_name should be prefixed with $$
nested_context.set_variable("$$" + var_name, (var_sql, var_params))
# Now evaluate 'in' using the nested context by temporarily swapping it
old_context = self._current_context
self._current_context = nested_context
try:
return self._convert_operand_to_sql(in_expr)
finally:
self._current_context = old_context
[docs]
def _convert_data_size_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert $binarySize and $bsonSize operators to SQL."""
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value_sql, value_params = self._convert_operand_to_sql(operands[0])
if operator == "$binarySize":
# In NeoSQLite, binary data is stored as base64 in a JSON object:
# {"__neosqlite_binary__": true, "data": "...", "subtype": 0}
# The 'data' field is base64 encoded.
# Use 'json_extract' (not jsonb) to ensure we get a text string
# if the value is extracted from a JSON document.
# If value_sql is a field reference, it might be jsonb_extract.
# We want the text version for base64 length calculation.
text_value_sql = value_sql.replace("jsonb_extract", "json_extract")
# Extract the base64 string if it's a binary object
base64_data = f"CASE WHEN typeof({text_value_sql}) = 'text' AND json_extract({text_value_sql}, '$.__neosqlite_binary__') = 1 THEN json_extract({text_value_sql}, '$.data') ELSE {text_value_sql} END"
# Simple base64 decoded length approximation: (len * 3 / 4)
# We use CAST AS TEXT to ensure we don't have any JSONB weirdness
return (
f"((length(CAST({base64_data} AS TEXT)) * 3) / 4)",
value_params,
)
else: # $bsonSize
# MongoDB $bsonSize returns the size of the document in BSON bytes.
# In NeoSQLite, we return the size of the JSON representation.
# Use json() to ensure we are measuring the serialized string size,
# and octet_length/length to get the byte count.
return f"length(json({value_sql}))", value_params
[docs]
def _get_operator_return_type(self, operator: str) -> str | None:
"""
Infer the BSON return type of a MongoDB operator.
Returns:
BSON type name (e.g., 'number', 'bool', 'string', 'array', 'object')
or None if the return type is ambiguous or unknown.
"""
match operator:
# Operators returning numbers
case (
"$add"
| "$subtract"
| "$multiply"
| "$divide"
| "$mod"
| "$abs"
| "$ceil"
| "$floor"
| "$round"
| "$trunc"
| "$pow"
| "$sqrt"
| "$ln"
| "$log"
| "$log10"
| "$log2"
| "$exp"
| "$sin"
| "$cos"
| "$tan"
| "$asin"
| "$acos"
| "$atan"
| "$atan2"
| "$sinh"
| "$cosh"
| "$tanh"
| "$asinh"
| "$acosh"
| "$atanh"
| "$size"
| "$indexOfArray"
| "$sum"
| "$avg"
| "$min"
| "$max"
| "$strLenBytes"
| "$strLenCP"
| "$indexOfBytes"
| "$indexOfCP"
| "$year"
| "$month"
| "$dayOfMonth"
| "$hour"
| "$minute"
| "$second"
| "$millisecond"
| "$dayOfWeek"
| "$dayOfYear"
| "$week"
| "$isoDayOfWeek"
| "$isoWeek"
| "$dateDiff"
| "$binarySize"
| "$bsonSize"
| "$toInt"
| "$toDouble"
| "$toLong"
| "$toDecimal"
):
return "number"
# Operators returning booleans
case (
"$eq"
| "$ne"
| "$gt"
| "$gte"
| "$lt"
| "$lte"
| "$and"
| "$or"
| "$not"
| "$nor"
| "$in"
| "$isArray"
| "$setEquals"
| "$setIsSubset"
| "$anyElementTrue"
| "$allElementsTrue"
| "$regexMatch"
| "$isNumber"
| "$toBool"
):
return "bool"
# Operators returning strings
case (
"$concat"
| "$toLower"
| "$toUpper"
| "$substr"
| "$substrBytes"
| "$trim"
| "$ltrim"
| "$rtrim"
| "$replaceAll"
| "$replaceOne"
| "$toString"
| "$type"
):
return "string"
# Operators returning arrays
case (
"$slice"
| "$setIntersection"
| "$setUnion"
| "$setDifference"
| "$split"
| "$objectToArray"
):
return "array"
# Operators returning objects
case "$mergeObjects" | "$getField" | "$setField" | "$unsetField":
return "object"
case _:
return None
[docs]
def _get_literal_bson_type(self, value: Any) -> str | None:
"""Get the BSON type name for a literal value."""
if value is None:
return "null"
if isinstance(value, bool):
return "bool"
if isinstance(value, (int, float)):
return "number"
if isinstance(value, str):
# Check if it's a field reference, which is not a literal
if value.startswith("$"):
return None
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
# Check if it's an expression
if len(value) == 1 and next(iter(value.keys())).startswith("$"):
return None
return "object"
return None
[docs]
def _convert_type_operator(
self, operator: str, operands: Any
) -> tuple[str, list[Any]]:
"""Convert type conversion operators to SQL."""
# Normalize operands to handle both single values and lists
# MongoDB allows both: {$isNumber: "$field"} and {$isNumber: ["$field"]}
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
operand = operands[0]
value_sql, value_params = self._convert_operand_to_sql(operand)
match operator:
case "$toString":
# Cast to text
sql = f"cast({value_sql} as text)"
case "$toInt":
# Cast to integer, handle non-numeric strings by returning NULL
# SQLite CAST('abc' AS INTEGER) returns 0, we want NULL for compatibility
sql = (
f"CASE WHEN typeof({value_sql}) IN ('integer', 'real') THEN CAST({value_sql} AS INTEGER) "
f"WHEN typeof({value_sql}) = 'text' AND (CAST({value_sql} AS INTEGER) != 0 OR {value_sql} IN ('0', '0.0')) "
f"THEN CAST({value_sql} AS INTEGER) ELSE NULL END"
)
case "$toDouble":
# Cast to real/float, handle non-numeric strings by returning NULL
sql = (
f"CASE WHEN typeof({value_sql}) IN ('integer', 'real') THEN CAST({value_sql} AS REAL) "
f"WHEN typeof({value_sql}) = 'text' AND (CAST({value_sql} AS REAL) != 0.0 OR {value_sql} IN ('0', '0.0')) "
f"THEN CAST({value_sql} AS REAL) ELSE NULL END"
)
case "$toLong":
# SQLite integers are already 64-bit, same as toInt logic
sql = (
f"CASE WHEN typeof({value_sql}) IN ('integer', 'real') THEN CAST({value_sql} AS INTEGER) "
f"WHEN typeof({value_sql}) = 'text' AND (CAST({value_sql} AS INTEGER) != 0 OR {value_sql} IN ('0', '0.0')) "
f"THEN CAST({value_sql} AS INTEGER) ELSE NULL END"
)
case "$toBool":
if isinstance(operand, str) and operand.startswith("$"):
field_path = operand[1:]
from ..json_path_utils import parse_json_path
json_path = parse_json_path(field_path)
type_expr = f"json_type({self.data_column}, '{json_path}')"
sql = (
f"CASE WHEN {type_expr} = 'null' THEN json('false') "
f"WHEN {type_expr} = 'false' THEN json('false') "
f"WHEN {type_expr} = 'true' THEN json('true') "
f"WHEN {type_expr} IN ('integer', 'real') THEN CASE WHEN {value_sql} != 0 THEN json('true') ELSE json('false') END "
f"WHEN {type_expr} = 'text' THEN CASE WHEN length({value_sql}) > 0 THEN json('true') ELSE json('false') END "
f"WHEN {type_expr} IN ('array', 'object') THEN json('true') "
f"ELSE json('false') END"
)
else:
inferred_type = None
if isinstance(operand, dict) and len(operand) == 1:
op_name = next(iter(operand.keys()))
if op_name.startswith("$"):
inferred_type = self._get_operator_return_type(
op_name
)
else:
inferred_type = self._get_literal_bson_type(operand)
if inferred_type == "bool":
sql = f"{value_sql}"
elif inferred_type == "number":
sql = f"CASE WHEN {value_sql} != 0 THEN json('true') ELSE json('false') END"
elif inferred_type == "string":
sql = f"CASE WHEN length({value_sql}) > 0 THEN json('true') ELSE json('false') END"
elif inferred_type in ("array", "object"):
sql = "json('true')"
value_params = []
elif inferred_type == "null":
sql = "json('false')"
value_params = []
else:
sql = (
f"CASE WHEN typeof({value_sql}) = 'text' THEN CASE WHEN length({value_sql}) > 0 THEN json('true') ELSE json('false') END "
f"WHEN typeof({value_sql}) = 'null' THEN json('false') "
f"ELSE CASE WHEN {value_sql} != 0 THEN json('true') ELSE json('false') END END"
)
case "$toDecimal":
# SQLite doesn't have native Decimal128, use REAL
raise NotImplementedError(
"$toDecimal not supported in SQL tier (SQLite lacks Decimal128)"
)
case "$toObjectId":
# Cannot convert to ObjectId in SQL
raise NotImplementedError(
"$toObjectId not supported in SQL tier (use Python fallback)"
)
case "$isNumber":
if isinstance(operand, str) and operand.startswith("$"):
# Direct field reference
field_path = operand[1:]
from ..json_path_utils import parse_json_path
json_path = parse_json_path(field_path)
type_expr = f"json_type({self.data_column}, '{json_path}')"
sql = f"CASE WHEN {type_expr} IN ('integer', 'real') THEN json('true') ELSE json('false') END"
else:
# Computed expression or literal - try to infer type
inferred_type = None
if isinstance(operand, dict) and len(operand) == 1:
op_name = next(iter(operand.keys()))
if op_name.startswith("$"):
inferred_type = self._get_operator_return_type(
op_name
)
else:
inferred_type = self._get_literal_bson_type(operand)
if inferred_type == "number":
sql = "json('true')"
value_params = []
elif inferred_type is not None:
sql = "json('false')"
value_params = []
else:
raise NotImplementedError(
"Ambiguous type for $isNumber in SQL tier"
)
case "$type":
if isinstance(operand, str) and operand.startswith("$"):
# Direct field reference
field_path = operand[1:]
from ..json_path_utils import parse_json_path
json_path = parse_json_path(field_path)
type_expr = f"json_type({self.data_column}, '{json_path}')"
sql = (
f"CASE WHEN {type_expr} = 'null' THEN 'null' "
f"WHEN {type_expr} IN ('true', 'false') THEN 'bool' "
f"WHEN {type_expr} = 'integer' THEN 'int' "
f"WHEN {type_expr} = 'real' THEN 'double' "
f"WHEN {type_expr} = 'text' THEN 'string' "
f"WHEN {type_expr} = 'array' THEN 'array' "
f"WHEN {type_expr} = 'object' THEN "
f" CASE WHEN json_extract({self.data_column}, '{json_path}.__neosqlite_binary__') = 1 THEN 'binData' "
f" WHEN json_extract({self.data_column}, '{json_path}.__neosqlite_objectid__') = 1 THEN 'objectId' "
f" ELSE 'object' END "
f"ELSE 'unknown' END"
)
else:
# Computed expression or literal
inferred_type = None
if isinstance(operand, dict) and len(operand) == 1:
op_name = next(iter(operand.keys()))
if op_name.startswith("$"):
inferred_type = self._get_operator_return_type(
op_name
)
else:
inferred_type = self._get_literal_bson_type(operand)
if inferred_type == "number":
sql = f"CASE WHEN typeof({value_sql}) = 'integer' THEN 'int' ELSE 'double' END"
elif inferred_type == "bool":
sql = "'bool'"
value_params = []
elif inferred_type == "string":
sql = "'string'"
value_params = []
elif inferred_type == "array":
sql = "'array'"
value_params = []
elif inferred_type == "object":
sql = (
f"CASE WHEN typeof({value_sql}) = 'text' THEN "
f" CASE WHEN json_extract({value_sql}, '$.__neosqlite_binary__') = 1 THEN 'binData' "
f" WHEN json_extract({value_sql}, '$.__neosqlite_objectid__') = 1 THEN 'objectId' "
f" ELSE 'object' END "
f"ELSE 'object' END"
)
elif inferred_type == "null":
sql = "'null'"
value_params = []
else:
# Fallback to typeof
sql = (
f"CASE WHEN typeof({value_sql}) = 'null' THEN 'null' "
f"WHEN typeof({value_sql}) = 'integer' THEN 'int' "
f"WHEN typeof({value_sql}) = 'real' THEN 'double' "
f"WHEN typeof({value_sql}) = 'text' THEN 'string' "
f"ELSE 'unknown' END"
)
case "$convert":
# $convert is complex - requires 'to' field specification
# Fall back to Python
raise NotImplementedError(
"$convert not supported in SQL tier (use Python fallback)"
)
case _:
raise NotImplementedError(
f"Type operator {operator} not supported in SQL tier"
)
return sql, value_params
[docs]
def _convert_operand_to_sql(self, operand: Any) -> tuple[str, list[Any]]:
"""
Convert an operand to SQL expression.
Handles:
- Field references: "$field" → json_extract/jsonb_extract expression
- Literals: numbers, strings, booleans
- Nested expressions: {"$operator": [...]}
"""
# Check for aggregation variables if context is available
from .context import _is_aggregation_variable
if (
_is_aggregation_variable(operand)
and hasattr(self, "_current_context")
and self._current_context is not None
):
# We are inside an aggregator that set the context
# Use the handle_aggregation_variable method from the parent/mixin
# which is mixed into ExprEvaluator
return self._handle_aggregation_variable(operand, self._current_context) # type: ignore[attr-defined]
match operand:
case str() if operand.startswith("$"):
# Field reference
field_path = operand[1:] # Remove $
# Use dynamic json/jsonb prefix based on support
json_path_expr = build_json_extract_expression(
self.data_column, field_path
)
# Replace hardcoded "json_extract" with dynamic prefix
if self._jsonb_supported:
json_path_expr = json_path_expr.replace(
"json_extract", "jsonb_extract"
)
return json_path_expr, []
case list() | dict():
# Check if it's an expression (dict with single key starting with $)
if isinstance(operand, dict) and len(operand) == 1:
key = next(iter(operand.keys()))
if key.startswith("$"):
return self._convert_expr_to_sql(operand)
# Literal list or dict - convert to JSON for SQL
from neosqlite.collection.json_helpers import (
neosqlite_json_dumps,
)
return "json(?)", [neosqlite_json_dumps(operand)]
case _:
# Literal value (scalar)
return "?", [operand]
[docs]
def _map_comparison_operator(self, op: str) -> str:
"""Map MongoDB comparison operators to SQL."""
mapping = {
"$eq": "=",
"$gt": ">",
"$gte": ">=",
"$lt": "<",
"$lte": "<=",
"$ne": "!=",
}
return mapping.get(op, op)
[docs]
def _map_arithmetic_operator(self, op: str) -> str:
"""Map MongoDB arithmetic operators to SQL."""
mapping = {
"$add": "+",
"$subtract": "-",
"$multiply": "*",
"$divide": "/",
"$mod": "%",
}
return mapping.get(op, op)