Source code for neosqlite.query_operators

import logging
import re
from typing import Any

from .exceptions import MalformedQueryException

logger = logging.getLogger(__name__)


[docs] def _get_nested_field(field: str, document: dict[str, Any]) -> Any: """ Get a nested field value from a document using dot notation. Args: field (str): The field path using dot notation (e.g., "profile.age"). document (dict[str, Any]): The document to get the field value from. Returns: Any: The field value, or None if the field doesn't exist. """ if "." not in field: return document.get(field, None) # Handle nested fields doc_value: Any = document for path in field.split("."): if not isinstance(doc_value, dict) or path not in doc_value: return None doc_value = doc_value.get(path, None) return doc_value
[docs] def _get_int_value(field: str, document: dict[str, Any]) -> int | None: """ Get field value and convert to int, returning None if not possible. Args: field (str): The document field to get. document (dict[str, Any]): The document to get the value from. Returns: int | None: The integer value, or None if conversion fails. """ doc_value = _get_nested_field(field, document) if doc_value is None: return None if isinstance(doc_value, bool): return None # Booleans are not valid for bitwise operations try: return int(doc_value) except (TypeError, ValueError) as e: logger.debug(f"{e=}") return None
[docs] def _convert_to_bitmask(value: Any) -> int | None: """ Convert a value to a bitmask using pattern matching. Handles: - int: Direct integer bitmask - list/tuple: Array of bit positions - Other iterables: Iterable of bit positions - Other: Try to convert to int Args: value: The value to convert. Returns: int | None: The bitmask, or None if conversion fails. """ match value: case int(): return value case list() | tuple() as items: try: bitmask = 0 for bit_pos in items: bitmask |= 1 << int(bit_pos) return bitmask except (TypeError, ValueError) as e: logger.debug(f"{e=}") return None case _ if hasattr(value, "__iter__") and not isinstance( value, (str, bytes) ): try: bitmask = 0 for bit_pos in value: bitmask |= 1 << int(bit_pos) return bitmask except (TypeError, ValueError) as e: logger.debug(f"{e=}") return None case _: try: return int(value) except (TypeError, ValueError) as e: logger.debug(f"{e=}") return None
# Query operators
[docs] def _eq(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the equals operator. MongoDB semantics: {field: {$eq: value}} matches if: - The field value equals the value (scalar-to-scalar or array-to-array) - The field is an array containing the value (array contains element) Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value equals the given value, False otherwise. """ try: doc_value = _get_nested_field(field, document) # Both are arrays: exact array equality if isinstance(doc_value, list) and isinstance(value, list): return doc_value == value # Doc is array, value is scalar: check if value is in array if isinstance(doc_value, list): return value in doc_value # Scalar semantics: exact equality return doc_value == value except (TypeError, AttributeError) as e: logger.debug(f"Equality comparison failed for field '{field}': {e}") return False
[docs] def _gt(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the greater than operator. MongoDB semantics: match if ANY array element satisfies the condition. Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is greater than the given value, False otherwise. """ try: doc_value = _get_nested_field(field, document) # MongoDB array semantics: match if ANY element satisfies if isinstance(doc_value, list): return any( isinstance(x, (int, float)) and x > value for x in doc_value ) return doc_value > value except TypeError as e: logger.debug(f"Operator evaluation failed due to TypeError: {e}") return False
[docs] def _lt(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the less than operator. MongoDB semantics: match if ANY array element satisfies the condition. Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is less than the given value, False otherwise. """ try: doc_value = _get_nested_field(field, document) # MongoDB array semantics: match if ANY element satisfies if isinstance(doc_value, list): return any( isinstance(x, (int, float)) and x < value for x in doc_value ) return doc_value < value except TypeError as e: logger.debug(f"Operator evaluation failed due to TypeError: {e}") return False
[docs] def _gte(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the greater than or equal to operator. MongoDB semantics: match if ANY array element satisfies the condition. Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is greater than or equal to the given value, False otherwise. """ try: doc_value = _get_nested_field(field, document) # MongoDB array semantics: match if ANY element satisfies if isinstance(doc_value, list): return any( isinstance(x, (int, float)) and x >= value for x in doc_value ) return doc_value >= value except TypeError as e: logger.debug(f"Operator evaluation failed due to TypeError: {e}") return False
[docs] def _lte(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the less than or equal to operator. MongoDB semantics: match if ANY array element satisfies the condition. Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is less than or equal to the given value, False otherwise. """ try: doc_value = _get_nested_field(field, document) # MongoDB array semantics: match if ANY element satisfies if isinstance(doc_value, list): return any( isinstance(x, (int, float)) and x <= value for x in doc_value ) return doc_value <= value except TypeError as e: logger.debug(f"Operator evaluation failed due to TypeError: {e}") return False
[docs] def _all(field: str, value: list[Any], document: dict[str, Any]) -> bool: """ Check if all elements in an array field match the provided value. Args: field (str): The document field to compare. value (list[Any]): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if all elements in the array field match the given value, False otherwise. """ try: a = set(value) except TypeError as e: logger.debug( f"Operator evaluation failed for '$all' field '{field}': {e}" ) raise MalformedQueryException("'$all' must accept an iterable") try: doc_value = _get_nested_field(field, document) b = set(doc_value if isinstance(doc_value, list) else []) except TypeError as e: logger.debug(f"Operator evaluation failed due to TypeError: {e}") return False else: return a.issubset(b)
[docs] def _in(field: str, value: list[Any], document: dict[str, Any]) -> bool: """ Check if a field value is present in the provided list. Args: field (str): The document field to compare. value (list[Any]): The list to check against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is present in the list, False otherwise. """ if not isinstance(value, list): raise MalformedQueryException("$in must be followed by an array") doc_value = _get_nested_field(field, document) # If the field value is a list, check if any element is in the provided list if isinstance(doc_value, list): return any(item in value for item in doc_value) else: # If the field value is not a list, check if it's in the provided list return doc_value in value
[docs] def _ne(field: str, value: Any, document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the not equal operator. MongoDB semantics: {field: {$ne: value}} matches if: - The field value does not equal the value (scalar-to-scalar or array-to-array) - The field is an array NOT containing the value (array does not contain element) Args: field (str): The document field to compare. value (Any): The value to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is not equal to the given value, False otherwise. """ doc_value = _get_nested_field(field, document) # MongoDB array semantics: check if value is NOT in array if isinstance(doc_value, list): return value not in doc_value # Scalar semantics: not equal return doc_value != value
[docs] def _nin(field: str, value: list[Any], document: dict[str, Any]) -> bool: """ Check if a field value is not present in the provided list. Args: field (str): The document field to compare. value (list[Any]): The list to check against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value is not present in the list, False otherwise. """ try: values = iter(value) except TypeError as e: logger.debug( f"Operator evaluation failed for '$nin' field '{field}': {e}" ) raise MalformedQueryException("'$nin' must accept an iterable") doc_value = _get_nested_field(field, document) # If the field value is a list, check if none of the elements are in the provided list if isinstance(doc_value, list): return not any(item in value for item in doc_value) else: # If the field value is not a list, check if it's not in the provided list return doc_value not in values
[docs] def _mod(field: str, value: list[int], document: dict[str, Any]) -> bool: """ Compare a field value with a given value using the modulo operator. Args: field (str): The document field to compare. value (list[int]): The divisor and remainder to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value modulo the divisor equals the remainder, False otherwise. """ try: divisor, remainder = list(map(int, value)) except (TypeError, ValueError) as e: logger.debug( f"Operator evaluation failed for '$mod' field '{field}': {e}" ) raise MalformedQueryException( "'$mod' must accept an iterable: [divisor, remainder]" ) try: doc_value = _get_nested_field(field, document) # $mod only works on scalar values, not arrays if isinstance(doc_value, list): # MongoDB: $mod on arrays matches if ANY element satisfies return any( isinstance(x, (int, float)) and int(x) % divisor == remainder for x in doc_value ) if doc_value is None: return False return int(doc_value) % divisor == remainder except (TypeError, ValueError) as e: logger.debug(f"Modulo comparison failed for field '{field}': {e}") return False
[docs] def _exists(field: str, value: bool, document: dict[str, Any]) -> bool: """ Check if a field exists in the document. Args: field (str): The document field to check. value (bool): True if the field must exist, False if it must not exist. document (dict[str, Any]): The document to check the field in. Returns: bool: True if the field exists (if value is True), or does not exist (if value is False), False otherwise. """ if not isinstance(value, bool): raise MalformedQueryException("'$exists' must be supplied a boolean") # Handle nested fields if "." in field: doc_value: Any = document field_parts = field.split(".") for i, path in enumerate(field_parts): if not isinstance(doc_value, dict) or path not in doc_value: # Field doesn't exist return not value if i == len(field_parts) - 1: # We've reached the final field return value doc_value = doc_value.get(path, None) # Should be unreachable as the loop returns for the last element, # but required for static type checking. return not value else: return (field in document) if value else (field not in document)
[docs] def _regex( field: str, value: Any, document: dict[str, Any], options: str = "" ) -> bool: """ Match a field value against a regular expression. Args: field (str): The document field to compare. value (Any): The regular expression to compare against (str or re.Pattern). document (dict[str, Any]): The document to compare the field value from. options (str): Optional regex flags (i, m, x, s). Returns: bool: True if the field value matches the regular expression, False otherwise. """ flags = 0 if options: if "i" in options.lower(): flags |= re.IGNORECASE if "m" in options.lower(): flags |= re.MULTILINE if "x" in options.lower(): flags |= re.VERBOSE if "s" in options.lower(): flags |= re.DOTALL try: doc_val = _get_nested_field(field, document) if doc_val is None: doc_val = "" # If value is already a compiled pattern, flags are ignored in re.search if isinstance(value, re.Pattern): return value.search(str(doc_val)) is not None return re.search(value, str(doc_val), flags) is not None except (TypeError, re.error) as e: logger.debug(f"Regex matching failed for field '{field}': {e}") return False
[docs] def _elemMatch(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if a field value matches all criteria in a provided dictionary or simple value. Args: field (str): The document field to compare. value (Any): Either a simple value to match directly, a dictionary of query operators (e.g., {"$gte": 90}), or a dictionary of field-value pairs for arrays of objects. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value matches the criteria, False otherwise. """ field_val = document.get(field) if not isinstance(field_val, list): return False # If value is a dictionary, check if it contains query operators or field-value pairs if isinstance(value, dict): # Check if the dict contains query operators (keys starting with $) has_query_operators = any(k.startswith("$") for k in value.keys()) if has_query_operators: # Handle query operators like {"$gte": 90} # Apply the operators to each array element for elem in field_val: if _apply_query_operators(value, elem): return True return False else: # Handle field-value pairs for arrays of objects # e.g., {"scores": {"$elemMatch": {"subject": "math", "score": {"$gt": 80}}}} for elem in field_val: if not isinstance(elem, dict): continue match_all = True for k, v in value.items(): if isinstance(v, dict) and any( sk.startswith("$") for sk in v.keys() ): # Handle query operators like {"$gt": 80} for fields field_val_in_elem = _get_nested_field(k, elem) if not _apply_query_operators(v, field_val_in_elem): match_all = False break else: # Handle literal field-value pairs if not _eq(k, v, elem): match_all = False break if match_all: return True return False else: # If value is not a dictionary, check for simple equality match # This handles cases like {"tags": {"$elemMatch": "c"}} where array contains simple values for elem in field_val: if elem == value: return True return False
[docs] def _apply_query_operators(operators: dict[str, Any], value: Any) -> bool: """ Apply query operators to a single value. Args: operators: Dictionary of query operators (e.g., {"$gte": 90}) value: The value to test against Returns: bool: True if all operators match, False otherwise """ # Extract $options for $regex if present options = operators.get("$options", "") if options and "$regex" not in operators: raise MalformedQueryException("Can't use $options without $regex") for op, operand in operators.items(): if op == "$options": # $options is handled together with $regex continue # Create a temporary document with the value temp_doc = {"_temp": value} # Get the operator function try: op_func = globals().get(f"_{op.replace('$', '')}") if op_func is None: # Try to import from the module import neosqlite.query_operators as qo op_func = getattr(qo, f"_{op.replace('$', '')}", None) if op_func is None: # Operator not found, return False return False # Call the operator function if op == "$regex": if not op_func("_temp", operand, temp_doc, options=options): return False else: if not op_func("_temp", operand, temp_doc): return False except Exception as e: logger.debug(f"{e=}") return False return True
[docs] def _size(field: str, value: int, document: dict[str, Any]) -> bool: """ Check if the size of an array field matches a specified value. Args: field (str): The document field to compare. value (int): The size to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the size of the array field matches the specified value, False otherwise. """ field_val = _get_nested_field(field, document) if not isinstance(field_val, list): return False return len(field_val) == value
[docs] def _contains(field: str, value: str, document: dict[str, Any]) -> bool: """ Check if a field value contains a specified substring. Args: field (str): The document field to compare. value (str): The substring to compare against. document (dict[str, Any]): The document to compare the field value from. Returns: bool: True if the field value contains the specified substring, False otherwise. """ try: field_val = document.get(field) if field_val is None: return False # Convert both values to strings and do a case-insensitive comparison return str(value).lower() in str(field_val).lower() except (TypeError, AttributeError) as e: logger.debug( f"Contains operator evaluation failed for field '{field}': {e}" ) return False
[docs] def _type(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if field is of specified type. Args: field (str): The document field to check. value (Any): The type to check against (as a number, type object, or string name). document (dict[str, Any]): The document to check the field value from. Returns: bool: True if the field is of the specified type, False otherwise. """ doc_value = _get_nested_field(field, document) # MongoDB type mapping (numeric codes) type_mapping = { 1: float, 2: str, 3: dict, 4: list, 8: bool, 10: type(None), 16: int, 18: int, 19: int, } # MongoDB string type aliases string_type_mapping: dict[str, type | tuple[type, ...]] = { "double": float, "string": str, "object": dict, "array": list, "bool": bool, "null": type(None), "int": int, "long": int, "decimal": int, "number": (int, float), } # Determine expected type based on value format expected_type: type | tuple[type, ...] | None if isinstance(value, int): expected_type = type_mapping.get(value) if expected_type is None: return False elif isinstance(value, str): expected_type = string_type_mapping.get(value.lower()) if expected_type is None: return False else: expected_type = value return isinstance(doc_value, expected_type)
[docs] def _bits_all_clear(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if all specified bits are clear (0) in a numeric field. MongoDB $bitsAllClear operator. Args: field (str): The document field to check. value (Any): Bitmask as integer, BinData, or array of bit positions. document (dict[str, Any]): The document to check. Returns: bool: True if all specified bits are clear, False otherwise. """ int_value = _get_int_value(field, document) if int_value is None: return False bitmask = _convert_to_bitmask(value) if bitmask is None: return False # Check if all specified bits are clear (result of AND should be 0) return (int_value & bitmask) == 0
[docs] def _bits_all_set(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if all specified bits are set (1) in a numeric field. MongoDB $bitsAllSet operator. Args: field (str): The document field to check. value (Any): Bitmask as integer, BinData, or array of bit positions. document (dict[str, Any]): The document to check. Returns: bool: True if all specified bits are set, False otherwise. """ int_value = _get_int_value(field, document) if int_value is None: return False bitmask = _convert_to_bitmask(value) if bitmask is None: return False # Check if all specified bits are set return (int_value & bitmask) == bitmask
[docs] def _bits_any_clear(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if any of the specified bits are clear (0) in a numeric field. MongoDB $bitsAnyClear operator. Args: field (str): The document field to check. value (Any): Bitmask as integer, BinData, or array of bit positions. document (dict[str, Any]): The document to check. Returns: bool: True if any of the specified bits are clear, False otherwise. """ int_value = _get_int_value(field, document) if int_value is None: return False bitmask = _convert_to_bitmask(value) if bitmask is None: return False # Check if any of the specified bits are clear # Invert the value and check if any of the specified bits are set return ((~int_value) & bitmask) != 0
[docs] def _bits_any_set(field: str, value: Any, document: dict[str, Any]) -> bool: """ Check if any of the specified bits are set (1) in a numeric field. MongoDB $bitsAnySet operator. Args: field (str): The document field to check. value (Any): Bitmask as integer, BinData, or array of bit positions. document (dict[str, Any]): The document to check. Returns: bool: True if any of the specified bits are set, False otherwise. """ int_value = _get_int_value(field, document) if int_value is None: return False bitmask = _convert_to_bitmask(value) if bitmask is None: return False # Check if any of the specified bits are set return (int_value & bitmask) != 0
# Aliases for operator lookup (to match MongoDB camelCase naming) _bitsAllClear = _bits_all_clear _bitsAllSet = _bits_all_set _bitsAnyClear = _bits_any_clear _bitsAnySet = _bits_any_set