import logging
import re
from typing import Any
from .exceptions import MalformedQueryException
logger = logging.getLogger(__name__)
[docs]
def _get_nested_field(field: str, document: dict[str, Any]) -> Any:
"""
Get a nested field value from a document using dot notation.
Args:
field (str): The field path using dot notation (e.g., "profile.age").
document (dict[str, Any]): The document to get the field value from.
Returns:
Any: The field value, or None if the field doesn't exist.
"""
if "." not in field:
return document.get(field, None)
# Handle nested fields
doc_value: Any = document
for path in field.split("."):
if not isinstance(doc_value, dict) or path not in doc_value:
return None
doc_value = doc_value.get(path, None)
return doc_value
[docs]
def _get_int_value(field: str, document: dict[str, Any]) -> int | None:
"""
Get field value and convert to int, returning None if not possible.
Args:
field (str): The document field to get.
document (dict[str, Any]): The document to get the value from.
Returns:
int | None: The integer value, or None if conversion fails.
"""
doc_value = _get_nested_field(field, document)
if doc_value is None:
return None
if isinstance(doc_value, bool):
return None # Booleans are not valid for bitwise operations
try:
return int(doc_value)
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
[docs]
def _convert_to_bitmask(value: Any) -> int | None:
"""
Convert a value to a bitmask using pattern matching.
Handles:
- int: Direct integer bitmask
- list/tuple: Array of bit positions
- Other iterables: Iterable of bit positions
- Other: Try to convert to int
Args:
value: The value to convert.
Returns:
int | None: The bitmask, or None if conversion fails.
"""
match value:
case int():
return value
case list() | tuple() as items:
try:
bitmask = 0
for bit_pos in items:
bitmask |= 1 << int(bit_pos)
return bitmask
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
case _ if hasattr(value, "__iter__") and not isinstance(
value, (str, bytes)
):
try:
bitmask = 0
for bit_pos in value:
bitmask |= 1 << int(bit_pos)
return bitmask
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
case _:
try:
return int(value)
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
# Query operators
[docs]
def _eq(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the equals operator.
MongoDB semantics: {field: {$eq: value}} matches if:
- The field value equals the value (scalar-to-scalar or array-to-array)
- The field is an array containing the value (array contains element)
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value equals the given value, False otherwise.
"""
try:
doc_value = _get_nested_field(field, document)
# Both are arrays: exact array equality
if isinstance(doc_value, list) and isinstance(value, list):
return doc_value == value
# Doc is array, value is scalar: check if value is in array
if isinstance(doc_value, list):
return value in doc_value
# Scalar semantics: exact equality
return doc_value == value
except (TypeError, AttributeError) as e:
logger.debug(f"Equality comparison failed for field '{field}': {e}")
return False
[docs]
def _gt(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the greater than operator.
MongoDB semantics: match if ANY array element satisfies the condition.
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is greater than the given value, False otherwise.
"""
try:
doc_value = _get_nested_field(field, document)
# MongoDB array semantics: match if ANY element satisfies
if isinstance(doc_value, list):
return any(
isinstance(x, (int, float)) and x > value for x in doc_value
)
return doc_value > value
except TypeError as e:
logger.debug(f"Operator evaluation failed due to TypeError: {e}")
return False
[docs]
def _lt(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the less than operator.
MongoDB semantics: match if ANY array element satisfies the condition.
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is less than the given value, False otherwise.
"""
try:
doc_value = _get_nested_field(field, document)
# MongoDB array semantics: match if ANY element satisfies
if isinstance(doc_value, list):
return any(
isinstance(x, (int, float)) and x < value for x in doc_value
)
return doc_value < value
except TypeError as e:
logger.debug(f"Operator evaluation failed due to TypeError: {e}")
return False
[docs]
def _gte(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the greater than or equal to operator.
MongoDB semantics: match if ANY array element satisfies the condition.
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is greater than or equal to the given value, False otherwise.
"""
try:
doc_value = _get_nested_field(field, document)
# MongoDB array semantics: match if ANY element satisfies
if isinstance(doc_value, list):
return any(
isinstance(x, (int, float)) and x >= value for x in doc_value
)
return doc_value >= value
except TypeError as e:
logger.debug(f"Operator evaluation failed due to TypeError: {e}")
return False
[docs]
def _lte(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the less than or equal to operator.
MongoDB semantics: match if ANY array element satisfies the condition.
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is less than or equal to the given value, False otherwise.
"""
try:
doc_value = _get_nested_field(field, document)
# MongoDB array semantics: match if ANY element satisfies
if isinstance(doc_value, list):
return any(
isinstance(x, (int, float)) and x <= value for x in doc_value
)
return doc_value <= value
except TypeError as e:
logger.debug(f"Operator evaluation failed due to TypeError: {e}")
return False
[docs]
def _all(field: str, value: list[Any], document: dict[str, Any]) -> bool:
"""
Check if all elements in an array field match the provided value.
Args:
field (str): The document field to compare.
value (list[Any]): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if all elements in the array field match the given value, False otherwise.
"""
try:
a = set(value)
except TypeError as e:
logger.debug(
f"Operator evaluation failed for '$all' field '{field}': {e}"
)
raise MalformedQueryException("'$all' must accept an iterable")
try:
doc_value = _get_nested_field(field, document)
b = set(doc_value if isinstance(doc_value, list) else [])
except TypeError as e:
logger.debug(f"Operator evaluation failed due to TypeError: {e}")
return False
else:
return a.issubset(b)
[docs]
def _in(field: str, value: list[Any], document: dict[str, Any]) -> bool:
"""
Check if a field value is present in the provided list.
Args:
field (str): The document field to compare.
value (list[Any]): The list to check against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is present in the list, False otherwise.
"""
if not isinstance(value, list):
raise MalformedQueryException("$in must be followed by an array")
doc_value = _get_nested_field(field, document)
# If the field value is a list, check if any element is in the provided list
if isinstance(doc_value, list):
return any(item in value for item in doc_value)
else:
# If the field value is not a list, check if it's in the provided list
return doc_value in value
[docs]
def _ne(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the not equal operator.
MongoDB semantics: {field: {$ne: value}} matches if:
- The field value does not equal the value (scalar-to-scalar or array-to-array)
- The field is an array NOT containing the value (array does not contain element)
Args:
field (str): The document field to compare.
value (Any): The value to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is not equal to the given value, False otherwise.
"""
doc_value = _get_nested_field(field, document)
# MongoDB array semantics: check if value is NOT in array
if isinstance(doc_value, list):
return value not in doc_value
# Scalar semantics: not equal
return doc_value != value
[docs]
def _nin(field: str, value: list[Any], document: dict[str, Any]) -> bool:
"""
Check if a field value is not present in the provided list.
Args:
field (str): The document field to compare.
value (list[Any]): The list to check against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value is not present in the list, False otherwise.
"""
try:
values = iter(value)
except TypeError as e:
logger.debug(
f"Operator evaluation failed for '$nin' field '{field}': {e}"
)
raise MalformedQueryException("'$nin' must accept an iterable")
doc_value = _get_nested_field(field, document)
# If the field value is a list, check if none of the elements are in the provided list
if isinstance(doc_value, list):
return not any(item in value for item in doc_value)
else:
# If the field value is not a list, check if it's not in the provided list
return doc_value not in values
[docs]
def _mod(field: str, value: list[int], document: dict[str, Any]) -> bool:
"""
Compare a field value with a given value using the modulo operator.
Args:
field (str): The document field to compare.
value (list[int]): The divisor and remainder to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value modulo the divisor equals the remainder, False otherwise.
"""
try:
divisor, remainder = list(map(int, value))
except (TypeError, ValueError) as e:
logger.debug(
f"Operator evaluation failed for '$mod' field '{field}': {e}"
)
raise MalformedQueryException(
"'$mod' must accept an iterable: [divisor, remainder]"
)
try:
doc_value = _get_nested_field(field, document)
# $mod only works on scalar values, not arrays
if isinstance(doc_value, list):
# MongoDB: $mod on arrays matches if ANY element satisfies
return any(
isinstance(x, (int, float)) and int(x) % divisor == remainder
for x in doc_value
)
if doc_value is None:
return False
return int(doc_value) % divisor == remainder
except (TypeError, ValueError) as e:
logger.debug(f"Modulo comparison failed for field '{field}': {e}")
return False
[docs]
def _exists(field: str, value: bool, document: dict[str, Any]) -> bool:
"""
Check if a field exists in the document.
Args:
field (str): The document field to check.
value (bool): True if the field must exist, False if it must not exist.
document (dict[str, Any]): The document to check the field in.
Returns:
bool: True if the field exists (if value is True), or does not exist (if value is False), False otherwise.
"""
if not isinstance(value, bool):
raise MalformedQueryException("'$exists' must be supplied a boolean")
# Handle nested fields
if "." in field:
doc_value: Any = document
field_parts = field.split(".")
for i, path in enumerate(field_parts):
if not isinstance(doc_value, dict) or path not in doc_value:
# Field doesn't exist
return not value
if i == len(field_parts) - 1:
# We've reached the final field
return value
doc_value = doc_value.get(path, None)
# Should be unreachable as the loop returns for the last element,
# but required for static type checking.
return not value
else:
return (field in document) if value else (field not in document)
[docs]
def _regex(
field: str, value: Any, document: dict[str, Any], options: str = ""
) -> bool:
"""
Match a field value against a regular expression.
Args:
field (str): The document field to compare.
value (Any): The regular expression to compare against (str or re.Pattern).
document (dict[str, Any]): The document to compare the field value from.
options (str): Optional regex flags (i, m, x, s).
Returns:
bool: True if the field value matches the regular expression, False otherwise.
"""
flags = 0
if options:
if "i" in options.lower():
flags |= re.IGNORECASE
if "m" in options.lower():
flags |= re.MULTILINE
if "x" in options.lower():
flags |= re.VERBOSE
if "s" in options.lower():
flags |= re.DOTALL
try:
doc_val = _get_nested_field(field, document)
if doc_val is None:
doc_val = ""
# If value is already a compiled pattern, flags are ignored in re.search
if isinstance(value, re.Pattern):
return value.search(str(doc_val)) is not None
return re.search(value, str(doc_val), flags) is not None
except (TypeError, re.error) as e:
logger.debug(f"Regex matching failed for field '{field}': {e}")
return False
[docs]
def _elemMatch(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if a field value matches all criteria in a provided dictionary or simple value.
Args:
field (str): The document field to compare.
value (Any): Either a simple value to match directly, a dictionary of query
operators (e.g., {"$gte": 90}), or a dictionary of field-value
pairs for arrays of objects.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value matches the criteria, False otherwise.
"""
field_val = document.get(field)
if not isinstance(field_val, list):
return False
# If value is a dictionary, check if it contains query operators or field-value pairs
if isinstance(value, dict):
# Check if the dict contains query operators (keys starting with $)
has_query_operators = any(k.startswith("$") for k in value.keys())
if has_query_operators:
# Handle query operators like {"$gte": 90}
# Apply the operators to each array element
for elem in field_val:
if _apply_query_operators(value, elem):
return True
return False
else:
# Handle field-value pairs for arrays of objects
# e.g., {"scores": {"$elemMatch": {"subject": "math", "score": {"$gt": 80}}}}
for elem in field_val:
if not isinstance(elem, dict):
continue
match_all = True
for k, v in value.items():
if isinstance(v, dict) and any(
sk.startswith("$") for sk in v.keys()
):
# Handle query operators like {"$gt": 80} for fields
field_val_in_elem = _get_nested_field(k, elem)
if not _apply_query_operators(v, field_val_in_elem):
match_all = False
break
else:
# Handle literal field-value pairs
if not _eq(k, v, elem):
match_all = False
break
if match_all:
return True
return False
else:
# If value is not a dictionary, check for simple equality match
# This handles cases like {"tags": {"$elemMatch": "c"}} where array contains simple values
for elem in field_val:
if elem == value:
return True
return False
[docs]
def _apply_query_operators(operators: dict[str, Any], value: Any) -> bool:
"""
Apply query operators to a single value.
Args:
operators: Dictionary of query operators (e.g., {"$gte": 90})
value: The value to test against
Returns:
bool: True if all operators match, False otherwise
"""
# Extract $options for $regex if present
options = operators.get("$options", "")
if options and "$regex" not in operators:
raise MalformedQueryException("Can't use $options without $regex")
for op, operand in operators.items():
if op == "$options":
# $options is handled together with $regex
continue
# Create a temporary document with the value
temp_doc = {"_temp": value}
# Get the operator function
try:
op_func = globals().get(f"_{op.replace('$', '')}")
if op_func is None:
# Try to import from the module
import neosqlite.query_operators as qo
op_func = getattr(qo, f"_{op.replace('$', '')}", None)
if op_func is None:
# Operator not found, return False
return False
# Call the operator function
if op == "$regex":
if not op_func("_temp", operand, temp_doc, options=options):
return False
else:
if not op_func("_temp", operand, temp_doc):
return False
except Exception as e:
logger.debug(f"{e=}")
return False
return True
[docs]
def _size(field: str, value: int, document: dict[str, Any]) -> bool:
"""
Check if the size of an array field matches a specified value.
Args:
field (str): The document field to compare.
value (int): The size to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the size of the array field matches the specified value, False otherwise.
"""
field_val = _get_nested_field(field, document)
if not isinstance(field_val, list):
return False
return len(field_val) == value
[docs]
def _contains(field: str, value: str, document: dict[str, Any]) -> bool:
"""
Check if a field value contains a specified substring.
Args:
field (str): The document field to compare.
value (str): The substring to compare against.
document (dict[str, Any]): The document to compare the field value from.
Returns:
bool: True if the field value contains the specified substring, False otherwise.
"""
try:
field_val = document.get(field)
if field_val is None:
return False
# Convert both values to strings and do a case-insensitive comparison
return str(value).lower() in str(field_val).lower()
except (TypeError, AttributeError) as e:
logger.debug(
f"Contains operator evaluation failed for field '{field}': {e}"
)
return False
[docs]
def _type(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if field is of specified type.
Args:
field (str): The document field to check.
value (Any): The type to check against (as a number, type object, or string name).
document (dict[str, Any]): The document to check the field value from.
Returns:
bool: True if the field is of the specified type, False otherwise.
"""
doc_value = _get_nested_field(field, document)
# MongoDB type mapping (numeric codes)
type_mapping = {
1: float,
2: str,
3: dict,
4: list,
8: bool,
10: type(None),
16: int,
18: int,
19: int,
}
# MongoDB string type aliases
string_type_mapping: dict[str, type | tuple[type, ...]] = {
"double": float,
"string": str,
"object": dict,
"array": list,
"bool": bool,
"null": type(None),
"int": int,
"long": int,
"decimal": int,
"number": (int, float),
}
# Determine expected type based on value format
expected_type: type | tuple[type, ...] | None
if isinstance(value, int):
expected_type = type_mapping.get(value)
if expected_type is None:
return False
elif isinstance(value, str):
expected_type = string_type_mapping.get(value.lower())
if expected_type is None:
return False
else:
expected_type = value
return isinstance(doc_value, expected_type)
[docs]
def _bits_all_clear(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if all specified bits are clear (0) in a numeric field.
MongoDB $bitsAllClear operator.
Args:
field (str): The document field to check.
value (Any): Bitmask as integer, BinData, or array of bit positions.
document (dict[str, Any]): The document to check.
Returns:
bool: True if all specified bits are clear, False otherwise.
"""
int_value = _get_int_value(field, document)
if int_value is None:
return False
bitmask = _convert_to_bitmask(value)
if bitmask is None:
return False
# Check if all specified bits are clear (result of AND should be 0)
return (int_value & bitmask) == 0
[docs]
def _bits_all_set(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if all specified bits are set (1) in a numeric field.
MongoDB $bitsAllSet operator.
Args:
field (str): The document field to check.
value (Any): Bitmask as integer, BinData, or array of bit positions.
document (dict[str, Any]): The document to check.
Returns:
bool: True if all specified bits are set, False otherwise.
"""
int_value = _get_int_value(field, document)
if int_value is None:
return False
bitmask = _convert_to_bitmask(value)
if bitmask is None:
return False
# Check if all specified bits are set
return (int_value & bitmask) == bitmask
[docs]
def _bits_any_clear(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if any of the specified bits are clear (0) in a numeric field.
MongoDB $bitsAnyClear operator.
Args:
field (str): The document field to check.
value (Any): Bitmask as integer, BinData, or array of bit positions.
document (dict[str, Any]): The document to check.
Returns:
bool: True if any of the specified bits are clear, False otherwise.
"""
int_value = _get_int_value(field, document)
if int_value is None:
return False
bitmask = _convert_to_bitmask(value)
if bitmask is None:
return False
# Check if any of the specified bits are clear
# Invert the value and check if any of the specified bits are set
return ((~int_value) & bitmask) != 0
[docs]
def _bits_any_set(field: str, value: Any, document: dict[str, Any]) -> bool:
"""
Check if any of the specified bits are set (1) in a numeric field.
MongoDB $bitsAnySet operator.
Args:
field (str): The document field to check.
value (Any): Bitmask as integer, BinData, or array of bit positions.
document (dict[str, Any]): The document to check.
Returns:
bool: True if any of the specified bits are set, False otherwise.
"""
int_value = _get_int_value(field, document)
if int_value is None:
return False
bitmask = _convert_to_bitmask(value)
if bitmask is None:
return False
# Check if any of the specified bits are set
return (int_value & bitmask) != 0
# Aliases for operator lookup (to match MongoDB camelCase naming)
_bitsAllClear = _bits_all_clear
_bitsAllSet = _bits_all_set
_bitsAnyClear = _bits_any_clear
_bitsAnySet = _bits_any_set