Source code for neosqlite.collection.query_helper.graph_lookup

"""
Python implementation of MongoDB $graphLookup aggregation stage.
"""

from copy import deepcopy
from typing import Any


[docs] def process_graph_lookup( docs_with_context: list[dict[str, Any]], spec: dict[str, Any], collection: Any, evaluator: Any, ) -> list[dict[str, Any]]: """ Python fallback implementation of $graphLookup. Args: docs_with_context: List of documents with context (__doc__, __root__) spec: $graphLookup specification collection: Current collection instance evaluator: ExprEvaluator for expression evaluation Returns: Updated docs_with_context """ from_collection_name = spec.get("from") start_with_expr = spec.get("startWith") connect_from_field = spec.get("connectFromField") connect_to_field = spec.get("connectToField") as_field = spec.get("as") max_depth = spec.get("maxDepth") depth_field = spec.get("depthField") restrict_search = spec.get("restrictSearchWithMatch") if not all( [ from_collection_name, start_with_expr, connect_from_field, connect_to_field, as_field, ] ): return docs_with_context # Get the target collection target_collection = collection.database.get_collection(from_collection_name) for dc in docs_with_context: doc = dc["__doc__"] # 1. Evaluate startWith start_val = evaluator._evaluate_operand_python(start_with_expr, doc) # start_val can be a single value or an array if not isinstance(start_val, list): search_queue = [(start_val, 0)] else: search_queue = [(v, 0) for v in start_val] visited_ids: set[Any] = set() results: list[dict[str, Any]] = [] # 2. Recursive search while search_queue: current_val, depth = search_queue.pop(0) if current_val is None: continue # Find documents in target collection where connectToField == current_val query = {connect_to_field: current_val} if restrict_search: # Merge with restrict_search query = {"$and": [query, restrict_search]} found_docs = list(target_collection.find(query)) for found_doc in found_docs: doc_id = found_doc.get("_id") if doc_id in visited_ids: continue visited_ids.add(doc_id) # Add depth field if requested result_doc = deepcopy(found_doc) if depth_field: result_doc[depth_field] = depth results.append(result_doc) # Check depth limit if max_depth is not None and depth >= max_depth: continue # Get next values to search next_val = target_collection._get_val( found_doc, connect_from_field ) if isinstance(next_val, list): for v in next_val: search_queue.append((v, depth + 1)) else: search_queue.append((next_val, depth + 1)) # 3. Add results to the document collection._set_val(doc, as_field, results) dc["__doc__"] = doc return docs_with_context