"""
Python evaluation methods for NeoSQLite $expr operator.
This module contains the PythonEvaluatorsMixin class which provides
all the _evaluate_*_python methods for evaluating MongoDB $expr expressions
in Python as a fallback when SQL evaluation is not possible.
"""
from __future__ import annotations
import calendar
import logging
import math
import random
import re
import warnings
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
logger = logging.getLogger(__name__)
# Import from sibling modules
from .constants import REMOVE_SENTINEL
from .type_utils import (
_convert_to_bindata,
_convert_to_bool,
_convert_to_bsonbindata,
_convert_to_bsonregex,
_convert_to_date,
_convert_to_decimal,
_convert_to_double,
_convert_to_int,
_convert_to_long,
_convert_to_null,
_convert_to_objectid,
_convert_to_regex,
_convert_to_string,
get_bson_type,
)
if TYPE_CHECKING:
pass
[docs]
class PythonEvaluatorsMixin:
"""
Mixin class providing Python evaluation methods for $expr expressions.
This mixin provides fallback evaluation capabilities when SQL-based
evaluation (Tier 1 and Tier 2) is not possible or when the kill switch
is activated.
"""
# Type annotations for attributes expected from parent class
_log2_warned: bool
[docs]
def evaluate_python(
self, expr: dict[str, Any], document: dict[str, Any]
) -> bool:
"""
Python fallback evaluation for $expr.
This ensures identical results to SQL evaluation and provides
the kill switch functionality.
Args:
expr: The $expr expression
document: Document to evaluate against
Returns:
Boolean result of expression evaluation
"""
result = self._evaluate_expr_python(expr, document)
# For boolean context, ensure we return a boolean
if isinstance(result, bool):
return result
# For comparison results (like $cmp), convert to boolean context
return bool(result)
[docs]
def _evaluate_expr_python(
self, expr: dict[str, Any], document: dict[str, Any]
) -> Any:
"""Recursively evaluate expression in Python."""
if not isinstance(expr, dict) or len(expr) != 1:
raise ValueError("Invalid $expr expression structure")
operator, operands = next(iter(expr.items()))
# Handle different operator types
match operator:
case "$and" | "$or" | "$not" | "$nor":
return self._evaluate_logical_python(
operator, operands, document
)
case "$gt" | "$gte" | "$lt" | "$lte" | "$eq" | "$ne":
return self._evaluate_comparison_python(
operator, operands, document
)
case "$cmp":
return self._evaluate_cmp_python(operands, document)
case "$add" | "$subtract" | "$multiply" | "$divide" | "$mod":
return self._evaluate_arithmetic_python(
operator, operands, document
)
case "$abs" | "$ceil" | "$floor" | "$round" | "$trunc":
return self._evaluate_math_python(operator, operands, document)
case "$ln" | "$log" | "$log10" | "$log2" | "$exp" | "$sigmoid":
return self._evaluate_math_python(operator, operands, document)
case "$pow":
return self._evaluate_pow_python(operands, document)
case "$sqrt":
return self._evaluate_sqrt_python(operands, document)
case (
"$sin"
| "$cos"
| "$tan"
| "$asin"
| "$acos"
| "$atan"
| "$atan2"
| "$sinh"
| "$cosh"
| "$tanh"
| "$asinh"
| "$acosh"
| "$atanh"
):
return self._evaluate_trig_python(operator, operands, document)
case "$degreesToRadians" | "$radiansToDegrees":
return self._evaluate_angle_python(operator, operands, document)
case "$cond":
return self._evaluate_cond_python(operands, document)
case "$ifNull":
return self._evaluate_ifNull_python(operands, document)
case "$switch":
return self._evaluate_switch_python(operands, document)
case (
"$size"
| "$in"
| "$isArray"
| "$arrayElemAt"
| "$first"
| "$last"
| "$firstN"
| "$lastN"
| "$maxN"
| "$minN"
| "$sortArray"
| "$slice"
| "$indexOfArray"
| "$sum"
| "$avg"
| "$min"
| "$max"
| "$setEquals"
| "$setIntersection"
| "$setUnion"
| "$setDifference"
| "$setIsSubset"
| "$anyElementTrue"
| "$allElementsTrue"
):
return self._evaluate_array_python(operator, operands, document)
case "$meta":
# Handle $meta: "textScore" for FTS relevance scoring
if operands == "textScore":
# Extract stored text score from document
return document.get("_textScore", 0.0)
else:
raise NotImplementedError(
f"$meta with '{operands}' not supported"
)
case "$filter" | "$map" | "$reduce":
return self._evaluate_array_transform_python(
operator, operands, document
)
case (
"$concat"
| "$toLower"
| "$toUpper"
| "$strLenBytes"
| "$substr"
| "$trim"
| "$ltrim"
| "$rtrim"
| "$indexOfBytes"
| "$regexMatch"
| "$regexFind"
| "$regexFindAll"
| "$split"
| "$replaceAll"
| "$replaceOne"
| "$strLenCP"
| "$indexOfCP"
| "$strcasecmp"
| "$substrBytes"
| "$substrCP"
):
return self._evaluate_string_python(
operator, operands, document
)
case (
"$year"
| "$month"
| "$dayOfMonth"
| "$hour"
| "$minute"
| "$second"
| "$dayOfWeek"
| "$dayOfYear"
| "$week"
| "$isoDayOfWeek"
| "$isoWeek"
| "$millisecond"
):
return self._evaluate_date_python(operator, operands, document)
case "$dateAdd" | "$dateSubtract" | "$dateDiff":
return self._evaluate_date_arithmetic_python(
operator, operands, document
)
case (
"$dateFromString"
| "$dateToString"
| "$dateFromParts"
| "$dateToParts"
| "$dateTrunc"
):
return self._evaluate_date_arithmetic_python(
operator, operands, document
)
case (
"$mergeObjects"
| "$getField"
| "$setField"
| "$unsetField"
| "$objectToArray"
| "$let"
| "$literal"
| "$rand"
):
return self._evaluate_object_python(
operator, operands, document
)
case (
"$type"
| "$toString"
| "$toInt"
| "$toDouble"
| "$toBool"
| "$toLong"
| "$toDecimal"
| "$toObjectId"
| "$isNumber"
| "$convert"
):
return self._evaluate_type_python(operator, operands, document)
case "$binarySize" | "$bsonSize":
return self._evaluate_data_size_python(
operator, operands, document
)
case "$literal":
return self._evaluate_literal_python(operands, document)
case "$function":
raise NotImplementedError(
"The '$function' operator is not supported in NeoSQLite. "
"Please use '$expr' with Python expressions, or post-process results in Python."
)
case "$accumulator":
raise NotImplementedError(
"The '$accumulator' operator is not supported in NeoSQLite. "
"Please use built-in accumulators ($sum, $avg, $min, $max, $count, $push, $addToSet, $first, $last), "
"or post-process results in Python."
)
case _:
raise NotImplementedError(
f"Operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_logical_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> bool:
"""Evaluate logical operators in Python."""
if operator == "$not":
if len(operands) != 1:
raise ValueError("$not requires exactly one operand")
return not self._evaluate_expr_python(operands[0], document)
results = [self._evaluate_expr_python(op, document) for op in operands]
match operator:
case "$and":
return all(results)
case "$or":
return any(results)
case "$nor":
return not any(results)
case _:
raise ValueError(f"Unknown logical operator: {operator}")
[docs]
def _evaluate_comparison_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> bool:
"""Evaluate comparison operators in Python."""
left = self._evaluate_operand_python(operands[0], document)
right = self._evaluate_operand_python(operands[1], document)
match operator:
case "$eq":
return left == right
case "$ne":
return left != right
case "$gt" | "$gte" | "$lt" | "$lte":
# For ordering comparisons, if any operand is None, return False
# (MongoDB behavior - null values don't participate in ordering)
if left is None or right is None:
return False
return (
left > right
if operator == "$gt"
else (
left >= right
if operator == "$gte"
else (
left < right if operator == "$lt" else left <= right
)
)
)
case _:
raise ValueError(f"Unknown comparison operator: {operator}")
[docs]
def _evaluate_cmp_python(
self, operands: list[Any], document: dict[str, Any]
) -> int:
"""Evaluate $cmp operator in Python."""
if len(operands) != 2:
raise ValueError("$cmp requires exactly 2 operands")
left = self._evaluate_operand_python(operands[0], document)
right = self._evaluate_operand_python(operands[1], document)
if left < right:
return -1
elif left > right:
return 1
else:
return 0
[docs]
def _evaluate_arithmetic_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> float | None:
"""Evaluate arithmetic operators in Python.
Note: In MongoDB, arithmetic operations with null return null.
"""
values = [
self._evaluate_operand_python(op, document) for op in operands
]
# If any operand is None, return None (MongoDB behavior)
if any(v is None for v in values):
return None
match operator:
case "$add":
return sum(values)
case "$subtract":
return values[0] - sum(values[1:])
case "$multiply":
result = 1
for v in values:
result *= v
return result
case "$divide":
result = values[0]
for v in values[1:]:
if v == 0:
return None # Division by zero
result /= v
return result
case "$mod":
if len(values) != 2 or values[1] == 0:
return None
return values[0] % values[1]
case _:
raise ValueError(f"Unknown arithmetic operator: {operator}")
[docs]
def _evaluate_math_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> float | None:
"""Evaluate math operators in Python."""
# Handle $log with custom base separately (requires 2 operands)
if operator == "$log":
if not isinstance(operands, list) or len(operands) != 2:
raise ValueError(
"$log requires exactly 2 operands: [number, base]"
)
number = self._evaluate_operand_python(operands[0], document)
base = self._evaluate_operand_python(operands[1], document)
if number is None or base is None:
return None
if number <= 0 or base <= 1:
return None
return math.log(number, base)
# $sigmoid can be either simple form or object form with onNull
if operator == "$sigmoid":
# Object format: { $sigmoid: { input: <expr>, onNull: <expr> } }
if isinstance(operands, dict):
# Handled in the operator-specific section below
pass
else:
# Simple format: { $sigmoid: <expr> }
operands = (
[operands] if not isinstance(operands, list) else operands
)
# Handle both list and single operand formats for other operators
if operator not in ("$sigmoid", "$round") and not isinstance(
operands, list
):
operands = [operands]
if operator not in ("$sigmoid", "$round") and len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
if operator == "$round" and (len(operands) < 1 or len(operands) > 2):
raise ValueError(f"{operator} requires 1 or 2 operands")
if operator not in ("$sigmoid", "$round"):
value = self._evaluate_operand_python(operands[0], document)
elif operator == "$round":
value = self._evaluate_operand_python(operands[0], document)
precision = (
self._evaluate_operand_python(operands[1], document)
if len(operands) > 1
else 0
)
else:
value = (
self._evaluate_operand_python(operands[0], document)
if isinstance(operands, list)
else None
)
match operator:
case "$abs":
return abs(value) if value is not None else None
case "$ceil":
return math.ceil(value) if value is not None else None
case "$floor":
return math.floor(value) if value is not None else None
case "$round":
if value is None:
return None
if precision is None:
precision = 0
return round(value, int(precision))
case "$trunc":
return int(value) if value is not None else None
case "$ln":
# Natural logarithm (base e)
return (
math.log(value) if value is not None and value > 0 else None
)
case "$log10":
# Base-10 logarithm
return (
math.log10(value)
if value is not None and value > 0
else None
)
case "$log2":
# Base-2 logarithm
# Warn about NeoSQLite extension (not in MongoDB)
if not self._log2_warned:
warnings.warn(
"$log2 is a NeoSQLite extension (not available in MongoDB). "
"For MongoDB compatibility, use { $log: [ <number>, 2 ] } instead.",
UserWarning,
stacklevel=4,
)
self._log2_warned = True
return (
math.log2(value)
if value is not None and value > 0
else None
)
case "$exp":
# Exponential function (e^x)
return math.exp(value) if value is not None else None
case "$sigmoid":
# Sigmoid function: 1 / (1 + e^(-x))
# Handle object format: { $sigmoid: { input: <expr>, onNull: <expr> } }
if isinstance(operands, dict):
input_val = self._evaluate_operand_python(
operands.get("input"), document
)
on_null = operands.get("onNull")
if input_val is None:
return self._evaluate_operand_python(on_null, document)
return 1.0 / (1.0 + math.exp(-input_val))
# Simple format: operands is a list [expr] or just expr
if isinstance(operands, list):
if not operands:
return None
input_val = self._evaluate_operand_python(
operands[0], document
)
else:
input_val = self._evaluate_operand_python(
operands, document
)
if input_val is None:
return None
return 1.0 / (1.0 + math.exp(-input_val))
case _:
raise ValueError(f"Unknown math operator: {operator}")
[docs]
def _evaluate_pow_python(
self, operands: list[Any], document: dict[str, Any]
) -> float | None:
"""Evaluate $pow operator in Python."""
if len(operands) != 2:
raise ValueError("$pow requires exactly 2 operands")
base = self._evaluate_operand_python(operands[0], document)
exponent = self._evaluate_operand_python(operands[1], document)
if base is None or exponent is None:
return None
return pow(base, exponent)
[docs]
def _evaluate_sqrt_python(
self, operands: list[Any], document: dict[str, Any]
) -> float | None:
"""Evaluate $sqrt operator in Python."""
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError("$sqrt requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return math.sqrt(value) if value is not None and value >= 0 else None
[docs]
def _evaluate_trig_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> float | None:
"""Evaluate trigonometric operators in Python."""
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
# Handle $atan2 separately (requires 2 operands)
if operator == "$atan2":
if len(operands) != 2:
raise ValueError("$atan2 requires exactly 2 operands")
y = self._evaluate_operand_python(operands[0], document)
x = self._evaluate_operand_python(operands[1], document)
if y is None or x is None:
return None
return math.atan2(y, x)
# All other trig operators require 1 operand
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return None
match operator:
case "$sin":
return math.sin(value)
case "$cos":
return math.cos(value)
case "$tan":
return math.tan(value)
case "$asin":
return math.asin(value) if -1 <= value <= 1 else None
case "$acos":
return math.acos(value) if -1 <= value <= 1 else None
case "$atan":
return math.atan(value)
# Hyperbolic functions
case "$sinh":
return math.sinh(value)
case "$cosh":
return math.cosh(value)
case "$tanh":
return math.tanh(value)
# Inverse hyperbolic functions
case "$asinh":
return math.asinh(value)
case "$acosh":
return math.acosh(value) if value >= 1 else None
case "$atanh":
return math.atanh(value) if -1 < value < 1 else None
case _:
raise ValueError(f"Unknown trig operator: {operator}")
[docs]
def _evaluate_angle_python(
self, operator: str, operands: Any, document: dict[str, Any]
) -> float | None:
"""Evaluate angle conversion operators in Python."""
# Handle both list and single operand formats
if not isinstance(operands, list):
ops = [operands]
else:
ops = operands
if len(ops) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value = self._evaluate_operand_python(ops[0], document)
if value is None:
return None
match operator:
case "$degreesToRadians":
return math.radians(value)
case "$radiansToDegrees":
return math.degrees(value)
case _:
raise ValueError(f"Unknown angle operator: {operator}")
[docs]
def _evaluate_cond_python(
self, operands: dict[str, Any], document: dict[str, Any]
) -> Any:
"""Evaluate $cond operator in Python."""
if not isinstance(operands, dict):
# Handle array format: [condition, true_case, false_case]
if isinstance(operands, list) and len(operands) == 3:
condition = self._evaluate_expr_python(operands[0], document)
if condition:
return self._evaluate_operand_python(operands[1], document)
else:
return self._evaluate_operand_python(operands[2], document)
raise ValueError("$cond requires a dictionary or 3-element array")
if "if" not in operands or "then" not in operands:
raise ValueError("$cond requires 'if' and 'then' fields")
condition = self._evaluate_expr_python(operands["if"], document)
if condition:
return self._evaluate_operand_python(operands["then"], document)
elif "else" in operands:
return self._evaluate_operand_python(operands["else"], document)
else:
return None
[docs]
def _evaluate_ifNull_python(
self, operands: list[Any], document: dict[str, Any]
) -> Any:
"""Evaluate $ifNull operator in Python."""
if not isinstance(operands, list) or len(operands) != 2:
raise ValueError("$ifNull requires exactly 2 operands")
expr = self._evaluate_operand_python(operands[0], document)
if expr is not None:
return expr
return self._evaluate_operand_python(operands[1], document)
[docs]
def _evaluate_switch_python(
self, operands: dict[str, Any], document: dict[str, Any]
) -> Any:
"""Evaluate $switch operator in Python."""
if not isinstance(operands, dict):
raise ValueError("$switch requires a dictionary")
branches = operands.get("branches", [])
default = operands.get("default")
for branch in branches:
if not isinstance(branch, dict):
continue
case = branch.get("case")
then = branch.get("then")
if case is not None and self._evaluate_expr_python(case, document):
return self._evaluate_operand_python(then, document)
if default is not None:
return self._evaluate_operand_python(default, document)
return None
[docs]
def _evaluate_array_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> Any:
"""Evaluate array operators in Python."""
# Normalize operands for operators that accept single values
if operator in (
"$size",
"$isArray",
"$sum",
"$avg",
"$min",
"$max",
) and not isinstance(operands, list):
operands = [operands]
match operator:
case "$size":
if len(operands) != 1:
raise ValueError("$size requires exactly 1 operand")
array = self._evaluate_operand_python(operands[0], document)
if isinstance(array, list):
return len(array)
return None
case "$in":
if len(operands) != 2:
raise ValueError("$in requires exactly 2 operands")
value = self._evaluate_operand_python(operands[0], document)
array = self._evaluate_operand_python(operands[1], document)
if isinstance(array, list):
return value in array
return False
case "$isArray":
if len(operands) != 1:
raise ValueError("$isArray requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return isinstance(value, list)
case "$sum" | "$avg" | "$min" | "$max":
# Handle both list and single operand formats
if not isinstance(operands, list):
array_ops = [operands]
else:
array_ops = operands
if len(array_ops) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
array = self._evaluate_operand_python(array_ops[0], document)
if not isinstance(array, list):
return 0 if operator == "$sum" else None
# Filter numeric values for sum/avg
nums = [
v
for v in array
if isinstance(v, (int, float)) and not isinstance(v, bool)
]
if not nums:
if operator == "$sum":
return 0
return None
match operator:
case "$sum":
return sum(nums)
case "$avg":
return sum(nums) / len(nums)
case "$min":
return min(array) # min/max work on all types
case "$max":
return max(array)
case _:
return None
case "$arrayElemAt":
if len(operands) != 2:
raise ValueError("$arrayElemAt requires exactly 2 operands")
array = self._evaluate_operand_python(operands[0], document)
index = self._evaluate_operand_python(operands[1], document)
if isinstance(array, list) and isinstance(index, int):
try:
return array[index]
except IndexError:
return None
return None
case "$first":
# Handle both list and single operand formats
if not isinstance(operands, list):
ops = [operands]
else:
ops = operands
if len(ops) != 1:
raise ValueError("$first requires exactly 1 operand")
array = self._evaluate_operand_python(ops[0], document)
if isinstance(array, list) and len(array) > 0:
return array[0]
return None
case "$last":
# Handle both list and single operand formats
if not isinstance(operands, list):
ops = [operands]
else:
ops = operands
if len(ops) != 1:
raise ValueError("$last requires exactly 1 operand")
array = self._evaluate_operand_python(ops[0], document)
if isinstance(array, list) and len(array) > 0:
return array[-1]
return None
case "$firstN":
# Get first N elements from array
# MongoDB syntax: { $firstN: { input: <array>, n: <number> } }
if isinstance(operands, dict):
array_operand = operands.get("input")
n_operand = operands.get("n")
elif isinstance(operands, list) and len(operands) == 2:
array_operand = operands[0]
n_operand = operands[1]
else:
raise ValueError("$firstN requires input array and n count")
array = self._evaluate_operand_python(array_operand, document)
n = self._evaluate_operand_python(n_operand, document)
if not isinstance(array, list) or n is None:
return []
return array[: int(n)]
case "$lastN":
# Get last N elements from array
# MongoDB syntax: { $lastN: { input: <array>, n: <number> } }
if isinstance(operands, dict):
array_operand = operands.get("input")
n_operand = operands.get("n")
elif isinstance(operands, list) and len(operands) == 2:
array_operand = operands[0]
n_operand = operands[1]
else:
raise ValueError("$lastN requires input array and n count")
array = self._evaluate_operand_python(array_operand, document)
n = self._evaluate_operand_python(n_operand, document)
if not isinstance(array, list) or n is None:
return []
n_int = int(n)
if n_int <= 0:
return []
return array[-n_int:] if n_int < len(array) else array
case "$maxN":
# Get maximum N elements from array (sorted descending, take first N)
# MongoDB syntax: { $maxN: { input: <array>, n: <number> } }
if isinstance(operands, dict):
array_operand = operands.get("input")
n_operand = operands.get("n")
elif isinstance(operands, list) and len(operands) == 2:
array_operand = operands[0]
n_operand = operands[1]
else:
raise ValueError("$maxN requires input array and n count")
array = self._evaluate_operand_python(array_operand, document)
n = self._evaluate_operand_python(n_operand, document)
if not isinstance(array, list) or n is None:
return []
# Sort descending and take first N
try:
sorted_array = sorted(array, reverse=True)
return sorted_array[: int(n)]
except (TypeError, ValueError):
return []
case "$minN":
# Get minimum N elements from array (sorted ascending, take first N)
# MongoDB syntax: { $minN: { input: <array>, n: <number> } }
if isinstance(operands, dict):
array_operand = operands.get("input")
n_operand = operands.get("n")
elif isinstance(operands, list) and len(operands) == 2:
array_operand = operands[0]
n_operand = operands[1]
else:
raise ValueError("$minN requires input array and n count")
array = self._evaluate_operand_python(array_operand, document)
n = self._evaluate_operand_python(n_operand, document)
if not isinstance(array, list) or n is None:
return []
# Sort ascending and take first N
try:
sorted_array = sorted(array)
return sorted_array[: int(n)]
except (TypeError, ValueError):
return []
case "$sortArray":
# Sort array elements
# MongoDB syntax: { $sortArray: { input: <array>, sortBy: { <field>: <direction> } } }
if isinstance(operands, dict):
array_operand = operands.get("input")
sort_by = operands.get("sortBy")
elif isinstance(operands, list) and len(operands) >= 1:
array_operand = operands[0]
sort_by = operands[1] if len(operands) > 1 else None
else:
raise ValueError("$sortArray requires input array")
array = self._evaluate_operand_python(array_operand, document)
if not isinstance(array, list):
return []
# If no sortBy specified, sort primitive values
if sort_by is None:
try:
return sorted(array)
except TypeError:
return array
# Sort by field (for array of objects)
if isinstance(sort_by, dict):
# Get first field and direction
sort_field = next(iter(sort_by.keys()))
direction = sort_by[sort_field]
reverse = direction == -1
try:
def sort_key(x: Any) -> Any:
"""
Extract the sort field from a dictionary or return the value.
"""
return (
x.get(sort_field) if isinstance(x, dict) else x
)
return sorted(
array,
key=sort_key, # type: ignore[arg-type]
reverse=reverse,
)
except (TypeError, AttributeError):
return array
return array
case "$slice":
if not isinstance(operands, list) or len(operands) < 2:
raise ValueError("$slice requires array and count/position")
array = self._evaluate_operand_python(operands[0], document)
count = self._evaluate_operand_python(operands[1], document)
if not isinstance(array, list):
return []
if len(operands) >= 3:
skip = self._evaluate_operand_python(operands[2], document)
return array[skip : skip + count]
elif isinstance(count, int) and count < 0:
return array[count:]
else:
return array[:count]
case "$indexOfArray":
if len(operands) != 2:
raise ValueError(
"$indexOfArray requires exactly 2 operands"
)
array = self._evaluate_operand_python(operands[0], document)
value = self._evaluate_operand_python(operands[1], document)
if isinstance(array, list):
try:
return array.index(value)
except ValueError:
return -1
return -1
case "$setEquals":
if len(operands) != 2:
raise ValueError("$setEquals requires exactly 2 operands")
set1 = self._evaluate_operand_python(operands[0], document)
set2 = self._evaluate_operand_python(operands[1], document)
if isinstance(set1, list) and isinstance(set2, list):
return set(set1) == set(set2)
return False
case "$setIntersection":
if len(operands) != 2:
raise ValueError(
"$setIntersection requires exactly 2 operands"
)
set1 = self._evaluate_operand_python(operands[0], document)
set2 = self._evaluate_operand_python(operands[1], document)
if isinstance(set1, list) and isinstance(set2, list):
return list(set(set1) & set(set2))
return []
case "$setUnion":
if len(operands) != 2:
raise ValueError("$setUnion requires exactly 2 operands")
set1 = self._evaluate_operand_python(operands[0], document)
set2 = self._evaluate_operand_python(operands[1], document)
if isinstance(set1, list) and isinstance(set2, list):
return list(set(set1) | set(set2))
return []
case "$setDifference":
if len(operands) != 2:
raise ValueError(
"$setDifference requires exactly 2 operands"
)
set1 = self._evaluate_operand_python(operands[0], document)
set2 = self._evaluate_operand_python(operands[1], document)
if isinstance(set1, list) and isinstance(set2, list):
return list(set(set1) - set(set2))
return []
case "$setIsSubset":
if len(operands) != 2:
raise ValueError("$setIsSubset requires exactly 2 operands")
set1 = self._evaluate_operand_python(operands[0], document)
set2 = self._evaluate_operand_python(operands[1], document)
if isinstance(set1, list) and isinstance(set2, list):
return set(set1).issubset(set(set2))
return False
case "$anyElementTrue":
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(
"$anyElementTrue requires exactly 1 operand"
)
array = self._evaluate_operand_python(operands[0], document)
if isinstance(array, list):
return any(array)
return False
case "$allElementsTrue":
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(
"$allElementsTrue requires exactly 1 operand"
)
array = self._evaluate_operand_python(operands[0], document)
if isinstance(array, list):
return all(array)
return False
case _:
raise NotImplementedError(
f"Array operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_string_python(
self, operator: str, operands: Any, document: dict[str, Any]
) -> Any:
"""Evaluate string operators in Python.
Args:
operator: The string operator ($toUpper, $toLower, etc.)
operands: The operand(s). Can be:
- A single value for simple cases like {"$toUpper": "$field"}
- A list of values for array format
- A dict for operators like $trim, $regexMatch
document: The document to evaluate against
"""
# Normalize operands to handle both single values and lists
# MongoDB allows both: {$toUpper: "$field"} and {$toUpper: ["$field"]}
# But some operators like $trim, $regexMatch, $replaceAll use dict format
if operator in (
"$trim",
"$ltrim",
"$rtrim",
"$regexMatch",
"$regexFind",
"$regexFindAll",
"$replaceAll",
"$replaceOne",
):
# These operators use dict format, don't normalize
pass
elif not isinstance(operands, list):
operands = [operands]
match operator:
case "$concat":
values = [
self._evaluate_operand_python(op, document)
for op in operands
]
return "".join(str(v) if v is not None else "" for v in values)
case "$toLower":
if len(operands) != 1:
raise ValueError("$toLower requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return str(value).lower() if value is not None else None
case "$toUpper":
if len(operands) != 1:
raise ValueError("$toUpper requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return str(value).upper() if value is not None else None
case "$strLenBytes":
if len(operands) != 1:
raise ValueError("$strLenBytes requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return (
len(str(value).encode("utf-8"))
if value is not None
else None
)
case "$substr":
if len(operands) != 3:
raise ValueError("$substr requires exactly 3 operands")
string = self._evaluate_operand_python(operands[0], document)
start = self._evaluate_operand_python(operands[1], document)
length = self._evaluate_operand_python(operands[2], document)
if (
string is not None
and start is not None
and length is not None
):
return str(string)[int(start) : int(start) + int(length)]
return None
case "$trim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$trim requires 'input' field")
input_val = self._evaluate_operand_python(
operands["input"], document
)
if input_val is None:
return None
chars = operands.get("chars")
if chars is not None:
chars_val = self._evaluate_operand_python(chars, document)
if chars_val is not None:
return str(input_val).strip(str(chars_val))
return str(input_val).strip()
case "$ltrim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$ltrim requires 'input' field")
input_val = self._evaluate_operand_python(
operands["input"], document
)
if input_val is None:
return None
chars = operands.get("chars")
if chars is not None:
chars_val = self._evaluate_operand_python(chars, document)
if chars_val is not None:
return str(input_val).lstrip(str(chars_val))
return str(input_val).lstrip()
case "$rtrim":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$rtrim requires 'input' field")
input_val = self._evaluate_operand_python(
operands["input"], document
)
if input_val is None:
return None
chars = operands.get("chars")
if chars is not None:
chars_val = self._evaluate_operand_python(chars, document)
if chars_val is not None:
return str(input_val).rstrip(str(chars_val))
return str(input_val).rstrip()
case "$indexOfBytes":
if len(operands) < 2:
raise ValueError(
"$indexOfBytes requires string and substring"
)
string = self._evaluate_operand_python(operands[0], document)
substr = self._evaluate_operand_python(operands[1], document)
if substr is None or string is None:
return -1
idx = str(string).find(str(substr))
return idx
case "$regexMatch":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$regexMatch requires 'input' and 'regex'")
input_val = self._evaluate_operand_python(
operands["input"], document
)
regex = operands.get("regex", "")
options = operands.get("options", "")
if input_val is None:
return False
flags = 0
if "i" in options.lower():
flags |= re.IGNORECASE
if "m" in options.lower():
flags |= re.MULTILINE
if "s" in options.lower():
flags |= re.DOTALL
if "x" in options.lower():
flags |= re.VERBOSE
return bool(re.search(regex, str(input_val), flags))
case "$split":
if len(operands) != 2:
raise ValueError("$split requires string and delimiter")
string = self._evaluate_operand_python(operands[0], document)
delimiter = self._evaluate_operand_python(operands[1], document)
if string is None or delimiter is None:
return []
return str(string).split(str(delimiter))
case "$replaceAll":
# Handle MongoDB dict format: {input, find, replacement}
if isinstance(operands, dict):
string = self._evaluate_operand_python(
operands.get("input"), document
)
find = self._evaluate_operand_python(
operands.get("find"), document
)
replacement = self._evaluate_operand_python(
operands.get("replacement"), document
)
else:
# Handle list format
if len(operands) != 3:
raise ValueError(
"$replaceAll requires string, find, and replacement"
)
string = self._evaluate_operand_python(
operands[0], document
)
find = self._evaluate_operand_python(operands[1], document)
replacement = self._evaluate_operand_python(
operands[2], document
)
if string is None:
return None
return str(string).replace(str(find), str(replacement))
case "$replaceOne":
# Handle MongoDB dict format: {input, find, replacement}
if isinstance(operands, dict):
string = self._evaluate_operand_python(
operands.get("input"), document
)
find = self._evaluate_operand_python(
operands.get("find"), document
)
replacement = self._evaluate_operand_python(
operands.get("replacement"), document
)
else:
if len(operands) != 3:
raise ValueError(
"$replaceOne requires string, find, and replacement"
)
string = self._evaluate_operand_python(
operands[0], document
)
find = self._evaluate_operand_python(operands[1], document)
replacement = self._evaluate_operand_python(
operands[2], document
)
if string is None:
return None
# Replace only first occurrence
return str(string).replace(str(find), str(replacement), 1)
case "$strLenCP":
# String length in code points (Unicode characters)
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError("$strLenCP requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return None
return len(str(value))
case "$substrCP":
# Substring by code points (not implemented - use $substr)
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 3:
raise ValueError("$substrCP requires exactly 3 operands")
string = self._evaluate_operand_python(operands[0], document)
start = self._evaluate_operand_python(operands[1], document)
length = self._evaluate_operand_python(operands[2], document)
if (
string is not None
and start is not None
and length is not None
):
# For BMP characters, this is the same as $substr
# For full Unicode support, would need proper code point handling
return str(string)[int(start) : int(start) + int(length)]
return None
case "$indexOfCP":
# Find substring by code points
if len(operands) < 2:
raise ValueError("$indexOfCP requires string and substring")
string = self._evaluate_operand_python(operands[0], document)
substr = self._evaluate_operand_python(operands[1], document)
if substr is None or string is None:
return -1
idx = str(string).find(str(substr))
return idx
case "$strcasecmp":
# Case-insensitive string comparison
if len(operands) != 2:
raise ValueError("$strcasecmp requires exactly 2 operands")
str1 = self._evaluate_operand_python(operands[0], document)
str2 = self._evaluate_operand_python(operands[1], document)
if str1 is None or str2 is None:
return None
# Return -1, 0, or 1 like MongoDB
s1 = str(str1).lower()
s2 = str(str2).lower()
if s1 < s2:
return -1
elif s1 > s2:
return 1
else:
return 0
case "$substrBytes":
# Substring by bytes (for UTF-8 encoded strings)
if len(operands) != 3:
raise ValueError("$substrBytes requires exactly 3 operands")
string = self._evaluate_operand_python(operands[0], document)
start = self._evaluate_operand_python(operands[1], document)
length = self._evaluate_operand_python(operands[2], document)
if string is None or start is None or length is None:
return None
# Encode to UTF-8, slice by bytes, decode back
encoded = str(string).encode("utf-8")
sliced = encoded[int(start) : int(start) + int(length)]
try:
return sliced.decode("utf-8")
except UnicodeDecodeError:
# If we cut in the middle of a multi-byte character, return what we can
return sliced.decode("utf-8", errors="ignore")
case "$regexFind":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError("$regexFind requires 'input' and 'regex'")
input_val = self._evaluate_operand_python(
operands["input"], document
)
regex = operands.get("regex", "")
options = operands.get("options", "")
if input_val is None:
return None
flags = 0
if "i" in options.lower():
flags |= re.IGNORECASE
if "m" in options.lower():
flags |= re.MULTILINE
if "s" in options.lower():
flags |= re.DOTALL
if "x" in options.lower():
flags |= re.VERBOSE
match_result = re.search(regex, str(input_val), flags)
if match_result:
result = {
"match": match_result.group(),
"idx": match_result.start(),
"captures": (
list(match_result.groups())
if match_result.groups()
else []
),
}
return result
return None
case "$regexFindAll":
if not isinstance(operands, dict) or "input" not in operands:
raise ValueError(
"$regexFindAll requires 'input' and 'regex'"
)
input_val = self._evaluate_operand_python(
operands["input"], document
)
regex = operands.get("regex", "")
options = operands.get("options", "")
if input_val is None:
return []
flags = 0
if "i" in options.lower():
flags |= re.IGNORECASE
if "m" in options.lower():
flags |= re.MULTILINE
if "s" in options.lower():
flags |= re.DOTALL
if "x" in options.lower():
flags |= re.VERBOSE
matches = list(re.finditer(regex, str(input_val), flags))
all_results: list[dict[str, Any]] = []
for match_result in matches:
match_obj: dict[str, Any] = {
"match": match_result.group(),
"idx": match_result.start(),
"captures": (
list(match_result.groups())
if match_result.groups()
else []
),
}
all_results.append(match_obj)
return all_results
case _:
raise NotImplementedError(
f"String operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_date_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> int | None:
"""Evaluate date operators in Python.
MongoDB compatibility: Date operators require the field to be stored as
BSON Date/datetime type. String dates are NOT automatically converted,
matching MongoDB's behavior.
"""
# Handle both list and single operand formats
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return None
# MongoDB compatibility: Only accept datetime objects, not strings
# MongoDB's $year, $month, etc. fail with "can't convert from BSON type string to Date"
if isinstance(value, datetime):
dt = value
elif isinstance(value, str):
# Reject string dates to match MongoDB behavior
raise ValueError(
f"${operator} requires a date type field, got string. "
"Store dates as datetime objects, not ISO strings."
)
else:
return None
# Extract date components
match operator:
case "$year":
return dt.year
case "$month":
return dt.month
case "$dayOfMonth":
return dt.day
case "$hour":
return dt.hour
case "$minute":
return dt.minute
case "$second":
return dt.second
case "$millisecond":
return dt.microsecond // 1000
case "$dayOfWeek":
# MongoDB uses 1 (Sunday) to 7 (Saturday)
# Python's weekday() returns 0 (Monday) to 6 (Sunday)
return ((dt.weekday() + 1) % 7) + 1
case "$dayOfYear":
return dt.timetuple().tm_yday
case "$week":
# Week of year (0-53)
return int(dt.strftime("%U"))
case "$isoDayOfWeek":
return dt.isocalendar()[2] # 1=Monday
case "$isoWeek":
return dt.isocalendar()[1]
case _:
raise NotImplementedError(
f"Date operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_date_arithmetic_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> Any:
"""Evaluate $dateAdd, $dateSubtract, $dateDiff operators in Python."""
match operator:
case "$dateAdd" | "$dateSubtract":
# Handle MongoDB dict format: {startDate, amount, unit}
if isinstance(operands, dict):
operands = [
operands.get("startDate"),
operands.get("amount"),
operands.get("unit", "day"),
]
if len(operands) < 2 or len(operands) > 3:
raise ValueError(
f"{operator} requires 2-3 operands: [date, amount, unit]"
)
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return None
amount = self._evaluate_operand_python(operands[1], document)
if amount is None:
return None
unit = operands[2] if len(operands) > 2 else "day"
# Parse date value
if isinstance(value, str):
try:
dt = datetime.fromisoformat(
value.replace("Z", "+00:00")
)
except ValueError:
return None
elif isinstance(value, datetime):
dt = value
else:
return None
# Create timedelta based on unit
if unit == "year":
# Handle years separately (not supported by timedelta directly)
years = amount if operator == "$dateAdd" else -amount
try:
new_dt = dt.replace(year=dt.year + int(years))
dt = new_dt
except ValueError:
# Handle Feb 29 edge case
new_dt = dt.replace(year=dt.year + int(years), day=28)
dt = new_dt
elif unit == "month":
# Handle months separately
months = amount if operator == "$dateAdd" else -amount
new_month = dt.month + int(months)
new_year = dt.year + (new_month - 1) // 12
new_month = ((new_month - 1) % 12) + 1
try:
dt = dt.replace(year=new_year, month=new_month)
except ValueError:
# Handle day overflow (e.g., Jan 31 + 1 month)
last_day = calendar.monthrange(new_year, new_month)[1]
dt = dt.replace(
year=new_year,
month=new_month,
day=min(dt.day, last_day),
)
else:
# Convert to timedelta
delta_kwargs = {
f"{unit}s": (
amount if operator == "$dateAdd" else -amount
)
}
delta = timedelta(**delta_kwargs)
dt = dt + delta
# Return datetime object (MongoDB compatibility)
return dt
case "$dateDiff":
# Handle MongoDB dict format: {startDate, endDate, unit}
if isinstance(operands, dict):
start_operand = operands.get("startDate")
end_operand = operands.get("endDate")
unit = operands.get("unit", "day")
# Evaluate operands
start = self._evaluate_operand_python(
start_operand, document
)
end = self._evaluate_operand_python(end_operand, document)
else:
if len(operands) < 2:
raise ValueError(
"$dateDiff requires startDate and endDate"
)
start = self._evaluate_operand_python(operands[0], document)
end = self._evaluate_operand_python(operands[1], document)
unit = (
self._evaluate_operand_python(operands[2], document)
if len(operands) > 2
else "day"
)
if start is None or end is None:
return None
# Parse dates
if isinstance(start, str):
try:
start = datetime.fromisoformat(
start.replace("Z", "+00:00")
)
except ValueError:
return None
if isinstance(end, str):
try:
end = datetime.fromisoformat(end.replace("Z", "+00:00"))
except ValueError:
return None
if not isinstance(start, datetime) or not isinstance(
end, datetime
):
return None
# Calculate difference
delta = end - start
match unit:
case "year":
return end.year - start.year
case "month":
return (end.year - start.year) * 12 + (
end.month - start.month
)
case "day":
return delta.days
case "hour":
return int(delta.total_seconds() / 3600)
case "minute":
return int(delta.total_seconds() / 60)
case "second":
return int(delta.total_seconds())
case "millisecond":
return int(delta.total_seconds() * 1000)
case "week":
return delta.days // 7
case _:
return delta.days
case "$dateFromString":
# Handle MongoDB dict format: {dateString, timezone, onError, onNull}
if isinstance(operands, dict):
date_string_operand = operands.get("dateString")
timezone = operands.get("timezone")
on_error = operands.get("onError")
on_null = operands.get("onNull")
# Evaluate the dateString operand
date_string = self._evaluate_operand_python(
date_string_operand, document
)
else:
if len(operands) < 1:
raise ValueError("$dateFromString requires dateString")
date_string = self._evaluate_operand_python(
operands[0], document
)
timezone = (
self._evaluate_operand_python(operands[1], document)
if len(operands) > 1
else None
)
on_error = (
self._evaluate_operand_python(operands[2], document)
if len(operands) > 2
else None
)
on_null = (
self._evaluate_operand_python(operands[3], document)
if len(operands) > 3
else None
)
if date_string is None:
return on_null
try:
# If already a datetime, return it
if isinstance(date_string, datetime):
return date_string
# Parse ISO 8601 date string
if isinstance(date_string, str):
# Handle various ISO 8601 formats
date_string = date_string.replace("Z", "+00:00")
dt = datetime.fromisoformat(date_string)
# Handle timezone if specified
if timezone and dt.tzinfo is None:
# Simple timezone handling (e.g., "+05:30")
try:
from datetime import timezone as tz
if str(timezone).startswith("+") or str(
timezone
).startswith("-"):
tz_str = str(timezone)
hours = int(tz_str[1:3])
minutes = (
int(tz_str[4:6])
if len(tz_str) > 4
else 0
)
offset_seconds = hours * 3600 + minutes * 60
if tz_str[0] == "-":
offset_seconds = -offset_seconds
dt = dt.replace(tzinfo=tz.utc) # Simplified
except (ValueError, TypeError, AttributeError) as e:
logger.debug(
f"Failed to parse timezone in $dateFromString: {e}"
)
pass
return dt
return None
except Exception as e:
logger.debug(f"Failed to evaluate $dateFromString: {e}")
return on_error
case "$dateToString":
# Handle MongoDB dict format: {format, date, timezone}
if isinstance(operands, dict):
fmt = operands.get("format", "%Y-%m-%d")
date_operand = operands.get("date")
timezone = operands.get("timezone")
# Evaluate the date operand
date_val = self._evaluate_operand_python(
date_operand, document
)
else:
if len(operands) < 2:
raise ValueError(
"$dateToString requires format and date"
)
fmt = self._evaluate_operand_python(operands[0], document)
date_val = self._evaluate_operand_python(
operands[1], document
)
timezone = operands[2] if len(operands) > 2 else None
if date_val is None:
return None
# Parse date
if isinstance(date_val, str):
try:
date_val = datetime.fromisoformat(
date_val.replace("Z", "+00:00")
)
except ValueError:
return None
if not isinstance(date_val, datetime):
return None
# Convert MongoDB format to Python strftime format
# MongoDB uses %Y, %m, %d, %H, %M, %S, %L (milliseconds), %Z (timezone)
python_fmt = fmt.replace("%L", "%f")[
:19
] # %f gives microseconds, we'll truncate
result = date_val.strftime(python_fmt)
# Handle milliseconds (%L)
if "%L" in fmt:
ms = date_val.microsecond // 1000
result = result.replace(
str(date_val.microsecond)[:3].zfill(3), str(ms).zfill(3)
)
return result
case "$dateFromParts":
# Handle MongoDB dict format: {year, month, day, hour, minute, second, millisecond, timezone}
if not isinstance(operands, dict):
raise ValueError("$dateFromParts requires a dictionary")
year = self._evaluate_operand_python(
operands.get("year"), document
)
month = (
self._evaluate_operand_python(
operands.get("month"), document
)
or 1
)
day = (
self._evaluate_operand_python(operands.get("day"), document)
or 1
)
hour = (
self._evaluate_operand_python(
operands.get("hour"), document
)
or 0
)
minute = (
self._evaluate_operand_python(
operands.get("minute"), document
)
or 0
)
second = (
self._evaluate_operand_python(
operands.get("second"), document
)
or 0
)
millisecond = (
self._evaluate_operand_python(
operands.get("millisecond"), document
)
or 0
)
timezone = operands.get("timezone")
if year is None:
return None
try:
dt = datetime(
year=int(year),
month=int(month),
day=int(day),
hour=int(hour),
minute=int(minute),
second=int(second),
microsecond=(
int(millisecond) * 1000 if millisecond else 0
),
)
return dt
except (ValueError, TypeError):
return None
case "$dateToParts":
# Handle MongoDB dict format: {date, timezone, unit}
if isinstance(operands, dict):
date_operand = operands.get("date")
timezone = operands.get("timezone")
unit = operands.get("unit")
# Evaluate the date operand
date_val = self._evaluate_operand_python(
date_operand, document
)
else:
if len(operands) < 1:
raise ValueError("$dateToParts requires date")
date_val = self._evaluate_operand_python(
operands[0], document
)
timezone = (
self._evaluate_operand_python(operands[1], document)
if len(operands) > 1
else None
)
unit = (
self._evaluate_operand_python(operands[2], document)
if len(operands) > 2
else None
)
if date_val is None:
return None
# Parse date
if isinstance(date_val, str):
try:
date_val = datetime.fromisoformat(
date_val.replace("Z", "+00:00")
)
except ValueError:
return None
if not isinstance(date_val, datetime):
return None
# Build parts dictionary
parts = {
"year": date_val.year,
"month": date_val.month,
"day": date_val.day,
"hour": date_val.hour,
"minute": date_val.minute,
"second": date_val.second,
"millisecond": date_val.microsecond // 1000,
}
# If unit is specified, only return parts up to that unit
match unit:
case "year":
return {"year": parts["year"]}
case "month":
return {"year": parts["year"], "month": parts["month"]}
case "day":
return {
"year": parts["year"],
"month": parts["month"],
"day": parts["day"],
}
case "hour":
return {
k: v
for k, v in parts.items()
if k in ["year", "month", "day", "hour"]
}
case "minute":
return {
k: v
for k, v in parts.items()
if k in ["year", "month", "day", "hour", "minute"]
}
case "second":
return {
k: v
for k, v in parts.items()
if k
in [
"year",
"month",
"day",
"hour",
"minute",
"second",
]
}
case _:
return parts
case "$dateTrunc":
# Handle MongoDB dict format: {date, unit, startOfWeek}
if isinstance(operands, dict):
date_operand = operands.get("date")
unit = operands.get("unit", "day")
# Evaluate the date operand
date_val = self._evaluate_operand_python(
date_operand, document
)
else:
if len(operands) < 2:
raise ValueError("$dateTrunc requires date and unit")
date_val = self._evaluate_operand_python(
operands[0], document
)
unit = self._evaluate_operand_python(operands[1], document)
if date_val is None:
return None
# Parse date
if isinstance(date_val, str):
try:
date_val = datetime.fromisoformat(
date_val.replace("Z", "+00:00")
)
except ValueError:
return None
if not isinstance(date_val, datetime):
return None
# Truncate based on unit
if unit == "year":
return date_val.replace(
month=1,
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)
elif unit == "quarter":
# Round down to start of quarter
quarter_month = ((date_val.month - 1) // 3) * 3 + 1
return date_val.replace(
month=quarter_month,
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)
elif unit == "month":
return date_val.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
elif unit == "week":
# Round down to start of week (Monday by default)
days_since_monday = date_val.weekday()
return (
date_val - timedelta(days=days_since_monday)
).replace(hour=0, minute=0, second=0, microsecond=0)
elif unit == "day":
return date_val.replace(
hour=0, minute=0, second=0, microsecond=0
)
elif unit == "hour":
return date_val.replace(minute=0, second=0, microsecond=0)
elif unit == "minute":
return date_val.replace(second=0, microsecond=0)
elif unit == "second":
return date_val.replace(microsecond=0)
else:
return date_val
case _:
raise NotImplementedError(
f"Date arithmetic operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_object_python(
self, operator: str, operands: Any, document: dict[str, Any]
) -> Any:
"""Evaluate object operators in Python."""
match operator:
case "$mergeObjects":
if not isinstance(operands, list):
raise ValueError("$mergeObjects requires a list of objects")
result: dict[str, Any] = {}
for obj in operands:
obj_val = self._evaluate_operand_python(obj, document)
if isinstance(obj_val, dict):
result |= obj_val
return result
case "$getField":
if not isinstance(operands, dict) or "field" not in operands:
raise ValueError("$getField requires 'field' specification")
field = operands["field"]
input_val = operands.get("input")
if input_val is not None:
obj = self._evaluate_operand_python(input_val, document)
else:
obj = document
if not isinstance(obj, dict):
return None
return obj.get(field)
case "$setField":
if not isinstance(operands, dict):
raise ValueError("$setField requires a dictionary")
field = operands.get("field")
value = operands.get("value")
input_val = operands.get("input")
if field is None:
raise ValueError("$setField requires 'field'")
if input_val is not None:
obj = self._evaluate_operand_python(input_val, document)
else:
obj = dict(document)
if not isinstance(obj, dict):
obj = {}
result = dict(obj)
result[field] = self._evaluate_operand_python(value, document)
return result
case "$unsetField":
if not isinstance(operands, dict) or "field" not in operands:
raise ValueError(
"$unsetField requires 'field' specification"
)
field = operands["field"]
input_val = operands.get("input")
if input_val is not None:
obj = self._evaluate_operand_python(input_val, document)
else:
obj = dict(document)
if not isinstance(obj, dict):
return None
result = dict(obj)
result.pop(field, None)
return result
case "$objectToArray":
# Convert object to array of {k, v} objects
if isinstance(operands, dict):
obj = operands
else:
obj = self._evaluate_operand_python(operands, document)
if not isinstance(obj, dict):
return []
return [{"k": k, "v": v} for k, v in obj.items()]
case "$let":
# MongoDB syntax: { $let: { vars: { <var1>: <expr1>, ... }, in: <expr> } }
if not isinstance(operands, dict):
raise ValueError("$let requires a dictionary")
vars_spec = operands.get("vars", {})
in_expr = operands.get("in")
if in_expr is None:
raise ValueError("$let requires 'in' expression")
# Create new document context with variables
new_context = dict(document)
for var_name, var_expr in vars_spec.items():
var_value = self._evaluate_operand_python(
var_expr, document
)
new_context["$$" + var_name] = var_value
# Evaluate the 'in' expression with new context
return self._evaluate_expr_python(in_expr, new_context)
case "$literal":
# Return the operand as-is without evaluation
return operands
case "$rand":
# Return random number between 0 and 1
return random.random()
case _:
raise NotImplementedError(
f"Object operator {operator} not supported in Python evaluation"
)
[docs]
def _evaluate_data_size_python(
self, operator: str, operands: Any, document: dict[str, Any]
) -> int:
"""Evaluate data size operators ($binarySize, $bsonSize) in Python."""
if not isinstance(operands, list):
operands = [operands]
if len(operands) != 1:
raise ValueError(f"{operator} requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if operator == "$binarySize":
if isinstance(value, (bytes, bytearray, memoryview)):
return len(value)
# Binary class is a subclass of bytes, so it's already covered.
# Handle encoded binary objects
if isinstance(value, dict) and value.get("__neosqlite_binary__"):
from ...binary import Binary
try:
bin_val = Binary.decode_from_storage(value)
return len(bin_val)
except Exception as e:
logger.debug(
f"Failed to decode binary for $binarySize: {e}"
)
pass
raise TypeError(
f"$binarySize requires a binary value, got {type(value)}"
)
elif operator == "$bsonSize":
# MongoDB $bsonSize returns the size of the document in BSON bytes.
# In NeoSQLite, we'll return the size of the JSON representation.
import json
# Use simple JSON dump for size calculation (approximates BSON)
try:
# Use a basic approach for now
return len(json.dumps(value).encode("utf-8"))
except Exception as e:
# Fallback to string length for non-serializable objects
logger.debug(
f"Failed to calculate $bsonSize using JSON, falling back to string length: {e}"
)
return len(str(value).encode("utf-8"))
raise NotImplementedError(f"Operator {operator} not supported")
[docs]
def _evaluate_type_python(
self, operator: str, operands: list[Any], document: dict[str, Any]
) -> Any:
"""Evaluate type conversion operators in Python."""
# Handle both list and single operand formats (but not for $convert which needs dict)
if operator != "$convert" and not isinstance(operands, list):
operands = [operands]
match operator:
case "$type":
if len(operands) != 1:
raise ValueError("$type requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return get_bson_type(value)
case "$toString":
if len(operands) != 1:
raise ValueError("$toString requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
return str(value) if value is not None else None
case "$toInt":
if len(operands) != 1:
raise ValueError("$toInt requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
try:
return int(value) if value is not None else None
except (ValueError, TypeError):
return None
case "$toDouble":
if len(operands) != 1:
raise ValueError("$toDouble requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
try:
return float(value) if value is not None else None
except (ValueError, TypeError):
return None
case "$toBool":
if len(operands) != 1:
raise ValueError("$toBool requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return False
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
if isinstance(value, str):
return len(value) > 0
return bool(value)
case "$toLong":
if len(operands) != 1:
raise ValueError("$toLong requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
try:
# Python ints are already 64-bit
return int(value) if value is not None else None
except (ValueError, TypeError):
return None
case "$toDecimal":
if len(operands) != 1:
raise ValueError("$toDecimal requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
try:
from decimal import Decimal, InvalidOperation
return Decimal(str(value)) if value is not None else None
except (ValueError, TypeError, ImportError, InvalidOperation):
return None
case "$toObjectId":
if len(operands) != 1:
raise ValueError("$toObjectId requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
if value is None:
return None
# Convert hex string to ObjectId
from neosqlite.objectid import ObjectId
try:
if isinstance(value, str) and len(value) == 24:
return ObjectId(value)
# For other types, try to create from string representation
return ObjectId(str(value))
except Exception as e:
logger.debug(
f"Failed to convert to ObjectId in expression: {e}"
)
return None
case "$isNumber":
# Check if value is a number (int or float, but not bool)
if len(operands) != 1:
raise ValueError("$isNumber requires exactly 1 operand")
value = self._evaluate_operand_python(operands[0], document)
# In Python, bool is a subclass of int, so we need to check for bool first
if isinstance(value, bool):
return False
return isinstance(value, (int, float))
case "$convert":
# $convert is complex - requires 'to' field
if not isinstance(operands, dict):
raise ValueError("$convert requires a dictionary")
input_val = self._evaluate_operand_python(
operands.get("input"), document
)
to_type = operands.get("to")
on_error = operands.get("onError")
on_null = operands.get("onNull")
if input_val is None:
return on_null
# Import required types upfront
from neosqlite.objectid import ObjectId
# Map conversion types to named converter methods
conversion_map = {
"int": _convert_to_int,
"long": _convert_to_long,
"double": _convert_to_double,
"decimal": _convert_to_decimal,
"string": _convert_to_string,
"bool": _convert_to_bool,
"objectId": _convert_to_objectid,
"binData": _convert_to_bindata,
"bsonBinData": _convert_to_bsonbindata,
"regex": _convert_to_regex,
"bsonRegex": _convert_to_bsonregex,
"date": _convert_to_date,
"null": _convert_to_null,
}
try:
converter = conversion_map.get(to_type)
if converter:
return converter(input_val)
return input_val
except Exception as e:
logger.debug(f"Failed to convert type in $convert: {e}")
return on_error
case _:
raise NotImplementedError(
f"Type operator {operator} not supported in Python evaluation"
)
# Type converter wrapper methods for backward compatibility (used by tests)
[docs]
@staticmethod
def _convert_to_int(value: Any) -> Any:
"""Convert value to int."""
return _convert_to_int(value)
[docs]
@staticmethod
def _convert_to_long(value: Any) -> Any:
"""Convert value to long (64-bit int)."""
return _convert_to_long(value)
[docs]
@staticmethod
def _convert_to_double(value: Any) -> Any:
"""Convert value to double (float)."""
return _convert_to_double(value)
[docs]
@staticmethod
def _convert_to_decimal(value: Any) -> Any:
"""Convert value to decimal (float, as SQLite lacks Decimal128)."""
return _convert_to_decimal(value)
[docs]
@staticmethod
def _convert_to_string(value: Any) -> Any:
"""Convert value to string."""
return _convert_to_string(value)
[docs]
@staticmethod
def _convert_to_bool(value: Any) -> Any:
"""Convert value to bool."""
return _convert_to_bool(value)
[docs]
@staticmethod
def _convert_to_objectid(value: Any) -> Any:
"""Convert value to ObjectId."""
return _convert_to_objectid(value)
[docs]
@staticmethod
def _convert_to_bindata(value: Any) -> Any:
"""Convert value to Binary (binData)."""
return _convert_to_bindata(value)
[docs]
@staticmethod
def _convert_to_bsonbindata(value: Any) -> Any:
"""Convert value to Binary (bsonBinData)."""
return _convert_to_bsonbindata(value)
[docs]
@staticmethod
def _convert_to_regex(value: Any) -> Any:
"""Convert value to regex pattern."""
return _convert_to_regex(value)
[docs]
@staticmethod
def _convert_to_bsonregex(value: Any) -> Any:
"""Convert value to regex pattern (bsonRegex)."""
return _convert_to_bsonregex(value)
[docs]
@staticmethod
def _convert_to_date(value: Any) -> Any:
"""Convert value to date."""
return _convert_to_date(value)
[docs]
@staticmethod
def _convert_to_null(value: Any) -> None:
"""Convert any value to None."""
return _convert_to_null(value)
[docs]
def _get_bson_type(self, value: Any) -> str:
"""Get BSON type name for a value."""
return get_bson_type(value)
[docs]
def _evaluate_literal_python(
self, operands: Any, document: dict[str, Any]
) -> Any:
"""Evaluate $literal operator in Python."""
# $literal just returns its argument as-is (used to escape special characters)
return self._evaluate_operand_python(operands, document)
[docs]
def _evaluate_operand_python(
self, operand: Any, document: dict[str, Any]
) -> Any:
"""Evaluate an operand in Python context."""
match operand:
case str() if operand.startswith("$"):
# Field reference - navigate document
field_path = operand[1:] # Remove $
# Handle $$variable syntax
if field_path.startswith("$"):
# $$var syntax - check for special variables
var_name = "$" + field_path # Reconstruct $$var
# Check if there's a field path after the variable name
# e.g., $$comment.comment_author should split into var_name="$$comment" and field_parts=["comment_author"]
if "." in var_name:
# Split on first dot: "$$comment.comment_author" -> "$$comment" and "comment_author"
var_name, field_suffix = var_name.split(".", 1)
field_parts = field_suffix.split(".")
else:
field_parts = []
if var_name == "$$REMOVE":
# Special sentinel for field removal in $project
return REMOVE_SENTINEL
if var_name == "$$ROOT" or var_name == "$$CURRENT":
# If not explicitly in document context, the document itself
# is the root/current context
value = document.get(var_name, document)
else:
# Otherwise look up directly in document context
value = document.get(var_name)
# Navigate the field path within the variable value
for key in field_parts:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
# Regular field navigation (not a variable)
keys = field_path.split(".")
current: Any | None = document
for key in keys:
if isinstance(current, dict):
current = current.get(key)
else:
return None
return current
case dict():
# Check if it's an expression (single key starting with $) or literal dict
if len(operand) == 1:
key = next(iter(operand.keys()))
if key.startswith("$"):
# Nested expression
return self._evaluate_expr_python(operand, document)
# Otherwise, it's a literal dict (e.g., for $mergeObjects)
return operand
case _:
# Literal value
return operand