Source code for neosqlite.collection.query_helper.schema_validator

"""
Native JSON Schema validator for NeoSQLite.
Provides MongoDB-compatible $jsonSchema evaluation.
"""

import logging
import re
from typing import Any

from ...binary import Binary
from ...objectid import ObjectId

logger = logging.getLogger(__name__)


[docs] def matches_json_schema( document: dict[str, Any], schema: dict[str, Any] ) -> bool: """ Check if a document matches the provided JSON Schema. Args: document: The document to validate schema: The JSON Schema specification Returns: True if the document matches the schema, False otherwise. """ return _validate_node(document, schema)
[docs] def _validate_node(data: Any, schema: Any) -> bool: """Recursively validate a data node against a schema node.""" if not isinstance(schema, dict): return True # Check type/bsonType if "bsonType" in schema: if not _check_type(data, schema["bsonType"]): return False elif "type" in schema: if not _check_type(data, schema["type"]): return False # Check required fields if "required" in schema and isinstance(data, dict): for field in schema["required"]: if field not in data: return False # Check properties if "properties" in schema and isinstance(data, dict): for field, prop_schema in schema["properties"].items(): if field in data: if not _validate_node(data[field], prop_schema): return False # Check minimum/maximum if isinstance(data, (int, float)): if "minimum" in schema and data < schema["minimum"]: return False if "maximum" in schema and data > schema["maximum"]: return False if "exclusiveMinimum" in schema and data <= schema["exclusiveMinimum"]: return False if "exclusiveMaximum" in schema and data >= schema["exclusiveMaximum"]: return False # Check string constraints if isinstance(data, str): if "minLength" in schema and len(data) < schema["minLength"]: return False if "maxLength" in schema and len(data) > schema["maxLength"]: return False if "pattern" in schema: if not re.search(schema["pattern"], data): return False # Check array constraints if isinstance(data, list): if "minItems" in schema and len(data) < schema["minItems"]: return False if "maxItems" in schema and len(data) > schema["maxItems"]: return False if "items" in schema: for item in data: if not _validate_node(item, schema["items"]): return False # Check enum if "enum" in schema: if data not in schema["enum"]: return False # Check logical combinations if "anyOf" in schema: if not any(_validate_node(data, sub) for sub in schema["anyOf"]): return False if "allOf" in schema: if not all(_validate_node(data, sub) for sub in schema["allOf"]): return False if "oneOf" in schema: matches = [_validate_node(data, sub) for sub in schema["oneOf"]] if matches.count(True) != 1: return False if "not" in schema: if _validate_node(data, schema["not"]): return False return True
[docs] def _check_type(data: Any, type_spec: Any) -> bool: """Check if data matches the specified type or list of types.""" if isinstance(type_spec, list): return any(_check_single_type(data, t) for t in type_spec) return _check_single_type(data, type_spec)
[docs] def _check_single_type(data: Any, t: str) -> bool: """Check a single type string (supports both JSON Schema and BSON types).""" # Map MongoDB/BSON types to Python types match t: case "string": return isinstance(data, str) case "number": return isinstance(data, (int, float)) case "integer" | "int" | "long": # In Python, we treat large ints as long return isinstance(data, int) and not isinstance(data, bool) case "double" | "decimal": return isinstance(data, (float, int)) case "object": return isinstance(data, dict) case "array": return isinstance(data, list) case "bool" | "boolean": return isinstance(data, bool) case "null": return data is None case "objectId": if isinstance(data, ObjectId): return True if isinstance(data, dict) and "__neosqlite_objectid__" in data: return True if isinstance(data, str) and len(data) == 24: try: ObjectId(data) return True except ValueError as e: logger.debug( f"Invalid ObjectId hex string in schema validation: {e}" ) return False return False case "binData": return isinstance(data, (Binary, bytes)) case "date": # We handle datetime objects in NeoSQLite from datetime import datetime return isinstance(data, datetime) case _: return True # Unknown type, default to True or handle more?