neosqlite.collection.temporary_table_aggregation module¶
Simplified temporary table aggregation pipeline implementation for NeoSQLite. This focuses on the core concept: using temporary tables to process complex pipelines that the current implementation can’t optimize with a single SQL query.
- neosqlite.collection.temporary_table_aggregation._sanitize_params(params: list[Any] | None) list[Any] | None[source]¶
Sanitize SQL parameters by converting ObjectId instances to strings.
SQLite doesn’t know how to bind ObjectId types, so we convert them to strings.
- Parameters:
params – List of parameters or None
- Returns:
Sanitized parameters with ObjectId converted to strings
- neosqlite.collection.temporary_table_aggregation._json_extract_field_with_objectid_support(json_function_prefix: str, field_name: str, is_local_field: bool = True) str[source]¶
Generate SQL expression to extract a field value with ObjectId support.
When a field contains an ObjectId (stored as {“__neosqlite_objectid__”:true,”id”:”…”}), this extracts just the ID string instead of the full JSON object.
- Parameters:
json_function_prefix – The JSON function prefix (json or jsonb)
field_name – The field name to extract
is_local_field – Whether this is a local field (True) or foreign field (False)
- Returns:
SQL expression string
- class neosqlite.collection.temporary_table_aggregation.DeterministicTempTableManager(pipeline_id: str)[source]¶
Bases:
objectManager for deterministic temporary table names.
This class generates unique but deterministic temporary table names based on pipeline stages and a pipeline ID. It ensures that the same pipeline stage will always generate the same table name within the same pipeline execution, which is useful for caching and optimization purposes.
- __init__(pipeline_id: str)[source]¶
Initialize the DeterministicTempTableManager with a pipeline ID for generating unique table names.
- Parameters:
pipeline_id (str) – A unique identifier for the pipeline, used to ensure table names are deterministic and unique across different pipeline executions.
- make_temp_table_name(stage: dict[str, Any], name_suffix: str = '') str[source]¶
Generate a deterministic temporary table name based on the pipeline stage and pipeline ID.
This method creates a unique but deterministic name for a temporary table by: 1. Creating a canonical representation of the stage 2. Hashing the stage to create a short, unique suffix 3. Combining the pipeline ID, stage type, and hash to form a base name 4. Ensuring uniqueness by tracking name usage within the pipeline
- Parameters:
stage (dict[str, Any]) – The pipeline stage dictionary used to generate the table name
name_suffix (str, optional) – An additional suffix to append to the table name. Defaults to “”.
- Returns:
- A deterministic temporary table name unique to this stage and
pipeline
- Return type:
str
- neosqlite.collection.temporary_table_aggregation.aggregation_pipeline_context(db_connection, pipeline_id: str | None = None)[source]¶
Context manager for temporary aggregation tables with automatic cleanup.
This context manager provides a clean and safe way to work with temporary tables during aggregation pipeline processing. It handles:
Creating a savepoint for atomicity of the entire pipeline
Generating deterministic temporary table names
Providing a function to create temporary tables with proper naming
Automatic cleanup of all temporary tables and savepoint on exit
The context manager supports both new deterministic naming (using stage dictionaries) and backward compatibility (using string suffixes) for temporary tables.
- Parameters:
db_connection – The database connection object
pipeline_id (str | None) – A unique identifier for the pipeline. If None, a default ID is generated for backward compatibility.
- Yields:
Callable –
- A function to create temporary tables with the signature:
create_temp_table(stage_or_suffix, query, params=None, name_suffix=””)
Where: - stage_or_suffix: Either a stage dict (new approach) or string
(backward compatibility)
query: The SQL query to populate the temporary table
params: Optional parameters for the SQL query
name_suffix: Optional suffix for backward compatibility naming
- Raises:
Exception – Any exception that occurs during pipeline processing is re-raised after cleanup operations
- class neosqlite.collection.temporary_table_aggregation.TemporaryTableAggregationProcessor(collection, query_engine=None)[source]¶
Bases:
objectProcessor for aggregation pipelines using temporary tables.
- __init__(collection, query_engine=None)[source]¶
Initialize the TemporaryTableAggregationProcessor with a collection.
- Parameters:
collection – The NeoSQLite collection to process aggregation pipelines on. This collection provides the database connection and document loading functionality needed for pipeline processing.
query_engine – Optional QueryEngine instance for accessing helpers. If not provided, text search in match stages will use simplified processing.
- process_pipeline(pipeline: list[dict[str, Any]], is_count: bool = False, count_field: str | None = None, batch_size: int = 101) list[dict[str, Any]][source]¶
Process an aggregation pipeline using temporary tables for intermediate results.
This method implements a temporary table approach for processing complex aggregation pipelines that cannot be optimized into a single SQL query by the current NeoSQLite implementation. It works by:
Generating a deterministic pipeline ID based on the pipeline content
Using the aggregation_pipeline_context for atomicity and cleanup
Creating temporary tables for each stage or group of compatible stages
Processing pipeline stages in an optimized order (grouping compatible stages)
Returning the final results from the last temporary table
The method supports these pipeline stages: - $match: For filtering documents - $unwind: For deconstructing array fields - $lookup: For joining documents from different collections - $sort, $skip, $limit: For sorting and pagination - $addFields: For adding fields to documents - $count: For counting documents (optimized to use SQL COUNT)
- Parameters:
pipeline (list[dict[str, Any]]) – A list of aggregation pipeline stages to process
- Returns:
- A list of result documents after processing the
pipeline
- Return type:
list[dict[str, Any]]
- Raises:
NotImplementedError – If the pipeline contains unsupported stages
- _process_match_stage(create_temp: Callable, current_table: str, match_spec: dict[str, Any]) str[source]¶
Process a $match stage using temporary tables.
This method creates a temporary table that contains only documents matching the specified criteria. It translates the MongoDB-style match specification into SQL WHERE conditions using json_extract for field access.
The method supports these match operators: - $eq, $gt, $lt, $gte, $lte: Comparison operators - $in, $nin: Array membership operators - $ne: Not equal operator - $text: Text search operator (handled with special logic for unwound elements)
For the special _id field, it uses the table’s id column directly rather than json_extract.
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
match_spec (dict[str, Any]) – The $match stage specification
- Returns:
Name of the newly created temporary table with matched documents
- Return type:
str
- _process_unwind_stages(create_temp: Callable, current_table: str, unwind_specs: list[Any]) str[source]¶
Process one or more consecutive $unwind stages using temporary tables.
This method handles the $unwind stage which deconstructs an array field from input documents to output a document for each element. It can process either a single unwind stage or multiple consecutive unwind stages.
For a single unwind, it uses SQLite’s json_each function to expand the array into separate rows. For multiple consecutive unwinds, it processes them sequentially (one at a time) rather than trying to process them all together, which doesn’t work for nested arrays that depend on previous unwind operations.
The method properly handles array validation, ensuring that only documents with array fields are processed. It also supports the special _id field handling if it were to be unwound (though this would be unusual).
Supports these $unwind options: - path: The array field to unwind (required) - preserveNullAndEmptyArrays: If true, includes documents where the array is missing/null/empty - includeArrayIndex: If specified, includes the array index in the output
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
unwind_specs (list[Any]) – List of $unwind stage specifications to process consecutively
- Returns:
Name of the newly created temporary table with unwound documents
- Return type:
str
- Raises:
ValueError – If an invalid unwind specification is encountered
- _create_lookup_hash_table(from_collection: str, foreign_field: str | None, pipeline: list[dict[str, Any]] | None = None) tuple[str, str][source]¶
Create a hash table (temp table with index) from a foreign collection for efficient hash join lookup.
This implements O(n+m) hash join instead of O(n*m) correlated subquery.
- Parameters:
from_collection – The collection to build hash table from
foreign_field – The field to use as join key (None for _id)
pipeline – Optional pipeline to run on foreign collection first
- Returns:
Tuple of (hash_table_name, join_key_column)
- _create_lookup_hash_table_fallback(hash_table_name: str, from_collection: str, foreign_field: str, join_key: str) None[source]¶
Fallback method to create hash table by reading documents one by one.
Used when the SQL approach fails due to malformed JSON in the data column. This method skips corrupted documents gracefully.
- Parameters:
hash_table_name – Name of the hash table to create
from_collection – Source collection name
foreign_field – Field to use as join key
join_key – Name of the join key column
- _estimate_collection_size(collection_name: str) int[source]¶
Estimate the size of a collection in bytes.
Uses SQLite’s table statistics to estimate size.
- Parameters:
collection_name – Name of the collection to estimate
- Returns:
Estimated size in bytes
- _get_available_memory() int[source]¶
Get available memory for hash join operations.
- Returns:
Available memory in bytes (estimated from SQLite cache or system)
- _should_use_hash_join(from_collection: str, pipeline: list[dict[str, Any]] | None = None) bool[source]¶
Decide whether to use hash join or correlated subquery for $lookup.
Uses memory estimation to decide: - If foreign collection estimate < 30% of available memory: use hash join (faster) - Otherwise: use correlated subquery (lower memory, slower)
- Parameters:
from_collection – The foreign collection name
pipeline – Optional pipeline to run on foreign collection first
- Returns:
True if should use hash join, False for correlated subquery
- _extract_field_value(doc: dict[str, Any], field: str) Any[source]¶
Extract field value from document, supporting dot notation.
- _process_lookup_stage(create_temp: Callable, current_table: str, lookup_spec: dict[str, Any]) str[source]¶
Process a $lookup stage using hash join for O(n+m) complexity.
This method implements the $lookup aggregation stage which performs a left outer join to another collection in the same database. It uses an optimized hash join approach: 1. Creates a temporary table with an index on the foreign field (hash table) 2. Uses a single JOIN query instead of correlated subquery
This replaces the previous correlated subquery approach which was O(n*m).
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
lookup_spec (dict[str, Any]) – The $lookup stage specification containing: - “from”: The name of the collection to join with - “localField”: The field from the input documents - “foreignField”: The field from the documents of the “from” collection - “as”: The name of the new array field to add to the matching documents - “pipeline”: Optional pipeline to run on foreign collection
- Returns:
Name of the newly created temporary table with lookup results added
- Return type:
str
Process $lookup using correlated subquery (O(n*m) but low memory).
This is the fallback when the foreign collection is too large for hash join.
- Parameters:
create_temp – Function to create temporary tables
current_table – Current temp table name
lookup_spec – The $lookup specification
- Returns:
Name of the new temporary table
- _process_lookup_hash_join(create_temp: Callable, current_table: str, lookup_spec: dict[str, Any]) str[source]¶
Process $lookup using hash join (O(n+m) but uses more memory).
- Parameters:
create_temp – Function to create temporary tables
current_table – Current temp table name
lookup_spec – The $lookup specification
- Returns:
Name of the new temporary table
- _process_sort_skip_limit_stage(create_temp: Callable, current_table: str, sort_spec: dict[str, Any] | None, skip_value: int = 0, limit_value: int | None = None) str[source]¶
Process sort/skip/limit stages using temporary tables.
This method handles the $sort, $skip, and $limit aggregation stages, which can be used individually or in combination. It creates a temporary table with the results sorted and/or paginated according to the specifications.
The method supports sorting on both regular fields (using json_extract) and the special _id field (using the id column directly). It handles ascending and descending sort orders, as well as skip and limit operations with proper OFFSET and LIMIT clauses in the SQL query.
When multiple sort/skip/limit stages are consecutive in a pipeline, they are processed together in a single operation for efficiency.
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
sort_spec (dict[str, Any] | None) – The $sort stage specification, mapping field names to sort directions (1 for ascending, -1 for descending)
skip_value (int) – The number of documents to skip (from $skip stage)
limit_value (int | None) – The maximum number of documents to return (from $limit stage)
- Returns:
Name of the newly created temporary table with sorted/skipped/limited results
- Return type:
str
- _process_add_fields_stage(create_temp: Callable, current_table: str, add_fields_spec: dict[str, Any]) str[source]¶
Process an $addFields stage using temporary tables.
This method implements the $addFields aggregation stage which adds new fields to documents. It uses SQLite’s json_set function to add fields to the JSON data.
Supports: - Simple field copying: {“newField”: “$existingField”} - $replaceOne: {$replaceOne: {input: “$text”, find: “old”, replacement: “new”}} - Literal values: {“field”: “value”}
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
add_fields_spec (dict[str, Any]) – The $addFields stage specification mapping new field names to source field paths
- Returns:
Name of the newly created temporary table with added fields
- Return type:
str
- _process_add_fields_stage_python_hybrid(create_temp: Callable, current_table: str, add_fields_spec: dict[str, Any]) str[source]¶
Process $addFields stage with complex expressions using Python hybrid approach.
This method loads documents from the current temp table, applies the $addFields expressions in Python (using ExprEvaluator), and creates a new temp table. This allows us to stay in Tier 2 (temp tables) while still supporting complex expressions like $filter, $map, $reduce, etc.
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table
add_fields_spec (dict[str, Any]) – The $addFields stage specification
- Returns:
Name of the newly created temporary table with added fields
- Return type:
str
- _is_expression(expr: Any) bool[source]¶
Check if an expression is a complex expression (not a simple field reference or literal).
- _process_project_stage(create_temp: Callable, current_table: str, project_spec: dict[str, Any]) str[source]¶
Process a $project stage using temporary tables.
This method implements the $project aggregation stage which reshapes each document in the stream, by adding new fields, removing existing fields, or renaming fields. It reconstructs a unified
datacolumn usingjson_object/jsonb_objectso that downstream stages (especially FTS5 text search viajson_tree) continue to work without modification.Supports: - Simple inclusion:
{"field": 1}- Exclusion:{"field": 0}- Field references:{"alias": "$some.path"}- Expression projections:{"alias": {$concat: [...]}}-_idinclusion/exclusion- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table
project_spec (dict[str, Any]) – The $project stage specification
- Returns:
Name of the newly created temporary table
- Return type:
str
- _process_project_exclusion(create_temp: Callable, current_table: str, project_spec: dict[str, Any], include_id: bool) str[source]¶
Handle exclusion-mode projection by removing fields via json_remove.
- _process_project_inclusion(create_temp: Callable, current_table: str, project_spec: dict[str, Any], include_id_default: bool) str[source]¶
Handle inclusion-mode projection by reconstructing data via json_object.
Handles: - Simple inclusion:
{"field": 1}- Field references:{"alias": "$some.path"}- Expression projections:{"alias": {$concat: [...]}}
- _generate_text_score_sql() str[source]¶
Generate SQL for $meta: “textScore” using stored BM25 score.
During $text search stages, the FTS5 BM25 relevance score is captured and stored in the document’s JSON data as _textScore. This method extracts that score for use in $project/$addFields stages.
- Returns:
SQL expression that returns the BM25 relevance score (positive value)
- _process_replace_root_stage(create_temp: Callable, current_table: str, replace_spec: Any) str[source]¶
Process a $replaceRoot or $replaceWith stage using temporary tables.
This method implements the $replaceRoot/$replaceWith aggregation stage which replaces the root document with a specified field or expression.
- MongoDB syntax:
{$replaceRoot: {newRoot: “$field”}} {$replaceWith: “$field”}
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
replace_spec (Any) – The replace specification (field path or expression)
- Returns:
Name of the newly created temporary table with replaced root documents
- Return type:
str
- _process_group_stage(create_temp: Callable, current_table: str, group_spec: dict[str, Any]) str[source]¶
Process a $group stage using temporary tables.
This method implements the $group aggregation stage which groups documents by a specified key and performs accumulator operations.
Supports these accumulators in SQL tier: - $sum, $avg, $min, $max: Standard SQL aggregators - $count: COUNT(*) - $first, $last: Using subqueries (LIMITATION: requires no preceding $sort) - $addToSet: Using json_group_array(DISTINCT …) - $push: Using json_group_array(…) - Expression keys: Using SQLTranslator for expression evaluation
Limitation: - $first/$last with preceding $sort stage falls back to Python for correctness.
The current implementation uses correlated subqueries that don’t preserve sort order across groups.
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
group_spec (dict[str, Any]) – The $group stage specification
- Returns:
Name of the newly created temporary table with grouped results
- Return type:
str
- _id_to_json_object_args(select_parts: list[str]) str[source]¶
Convert SELECT parts to json_object arguments.
- Parameters:
select_parts – List of SELECT column expressions (e.g., [“expr1 AS field1”, “expr2 AS field2”])
- Returns:
Comma-separated list of ‘key’, value pairs for json_object
- _get_results_from_table(table_name: str, is_count: bool = False, count_field: str | None = None, batch_size: int = 101) list[dict[str, Any]][source]¶
Get results from a temporary table.
This method retrieves all documents from a temporary table and converts them back into their Python dictionary representation using the collection’s document loading mechanism.
For $count optimization, if is_count is True, it returns a single document with the count from the table using SQL COUNT(*) instead of loading all documents.
- Parameters:
table_name (str) – Name of the temporary table to retrieve results from
is_count (bool) – If True, return count document instead of all documents
count_field (str | None) – The field name for the count if is_count is True
- Returns:
- List of documents retrieved from the temporary table,
with each document represented as a dictionary
- Return type:
list[dict[str, Any]]
- _matches_text_search(document: dict[str, Any], search_term: str) bool[source]¶
Apply Python-based text search to a document.
This method uses the unified_text_search function to determine if a document matches a given search term. It’s used as a fallback when text search cannot be efficiently handled with SQL queries, particularly in cases involving unwound elements or complex text search operations.
- Parameters:
document (dict[str, Any]) – The document to search in
search_term (str) – The term to search for
- Returns:
True if the document matches the text search, False otherwise
- Return type:
bool
- _batch_insert_documents(table_name: str, documents: list[tuple]) None[source]¶
Insert multiple documents into a temporary table efficiently.
This method provides an optimized way to insert multiple documents into a temporary table by using a single INSERT statement with multiple value sets. It’s used primarily in the text search processing where documents need to be filtered and inserted into a result table.
- Parameters:
table_name (str) – The name of the table to insert into
documents (list[tuple]) – List of (id, data) tuples to insert
- _process_text_search_stage(create_temp: Callable, current_table: str, match_spec: dict[str, Any]) str[source]¶
Process a $text search stage using FTS5 on temporary table.
This method creates an FTS5 virtual table on the temporary data and uses SQLite’s FTS5 for efficient text search. The tokenizer configuration is detected from the existing FTS index on the collection to ensure consistent behavior.
Note
When $text is used after $unwind (or other stages that create temp tables), the search operates on the unwound elements in the temp table, not on the original collection documents. This differs from MongoDB’s semantics where $text always uses the collection-level text index on original documents.
- Parameters:
create_temp (Callable) – Function to create temporary tables
current_table (str) – Name of the current temporary table containing input data
match_spec (dict[str, Any]) – The $match stage specification containing the $text operator with a $search term
- Returns:
Name of the newly created temporary table with text search results
- Return type:
str
- Raises:
ValueError – If the $text operator specification is invalid or the search term is not a string
- _detect_fts_tokenizer() str[source]¶
Detect the tokenizer configuration from existing FTS indexes on the collection.
- Returns:
The tokenizer clause for FTS5 (e.g., “, tokenize=porter” or “”)
- Return type:
str
- _process_bucket_stage(create_temp, current_table, bucket_spec)[source]¶
Process $bucket stage - groups documents by boundaries.
MongoDB syntax: {
- $bucket: {
groupBy: <expression>, boundaries: [<lowerbound1>, <lowerbound2>, …], default: <literal>, // optional output: { <output1>: { <$accumulator expression> }, … }
}
}
- _process_bucket_auto_stage(create_temp, current_table, bucket_auto_spec)[source]¶
Process $bucketAuto stage - auto-sized buckets.
MongoDB syntax: {
- $bucketAuto: {
groupBy: <expression>, buckets: <number>, output: { <output1>: { <$accumulator expression> }, … }, granularity: <string> // optional
}
}
- _process_densify_stage(create_temp, current_table, densify_spec)[source]¶
Process $densify stage - fills in missing values in a sequence.
MongoDB syntax: {
- $densify: {
field: <field_name>, range: {
step: <number>, bounds: [<lower>, <upper>]
}, partitionBy: <expression> // optional
}
}
- _process_facet_stage(create_temp, current_table, facet_spec)[source]¶
Process $facet stage - processes multiple sub-pipelines and combines results.
MongoDB syntax: {
- $facet: {
<output_field1>: [<pipeline stages>], <output_field2>: [<pipeline stages>], …
}
}
This method: 1. Extracts input documents from the current temp table 2. For each sub-pipeline, executes it using normal aggregation (Tier 1/2/3) 3. Combines all results into a single document with facet fields 4. Returns a temp table containing that combined result
- _process_union_with_stage(create_temp, current_table, union_spec)[source]¶
Process $unionWith stage - combines documents from another collection.
MongoDB syntax: {
- $unionWith: {
coll: <collection_name>, pipeline: [<pipeline stages>] // optional
}
}
- _process_merge_stage(create_temp, current_table, merge_spec)[source]¶
Process $merge stage - writes results to a collection.
MongoDB syntax: {
- $merge: {
into: <collection_name>, on: <field>, // optional whenMatched: <action>, // optional whenNotMatched: <action> // optional
}
}
- _process_redact_stage(create_temp, current_table, redact_spec)[source]¶
Process $redact stage - field-level redaction based on conditions.
MongoDB syntax: {
- $redact: {
- $cond: {
if: <condition>, then: <level>, else: <level>
}
}
}
Levels: - $$DESCEND: Include the field and process sub-fields - $$PRUNE: Exclude the field - $$KEEP: Include the field as-is
- _process_set_window_fields_stage(create_temp: Callable[[dict[str, Any], str, list[Any]], str], current_table: str, spec: dict[str, Any]) str[source]¶
Process $setWindowFields stage.
- _map_window_operator_to_sql(op_name: str, op_val: Any) tuple[str | None, str, list[Any]][source]¶
Map MongoDB window operator to SQL function and operand.
- _build_window_frame_sql(window_spec: dict[str, Any] | None) str[source]¶
Build SQL window frame clause (ROWS BETWEEN …).
- neosqlite.collection.temporary_table_aggregation.can_process_with_temporary_tables(pipeline: list[dict[str, Any]]) bool[source]¶
Determine if a pipeline can be processed with temporary tables.
This function checks if all stages in an aggregation pipeline are supported by the temporary table processing approach. It verifies that each stage in the pipeline is one of the supported stage types.
Additionally, it handles special cases for text search operations: - Pure text search operations are supported with hybrid processing - Text search with simple unwind operations are supported (uses Python text search on temp tables) - Complex nested unwinds (multiple unwinds or dotted paths) fall back to Python
- Parameters:
pipeline (list[dict[str, Any]]) – List of aggregation pipeline stages to check
- Returns:
- True if all stages in the pipeline are supported and can be processed
with temporary tables, False otherwise
- Return type:
bool
- neosqlite.collection.temporary_table_aggregation._contains_text_search(match_spec: dict[str, Any]) bool[source]¶
Check if a match specification contains text search operations.
This function delegates to the centralized _contains_text_operator function to ensure consistent text search detection across all NeoSQLite components.
- Parameters:
match_spec (dict[str, Any]) – The match specification to check for text search operations
- Returns:
True if the match specification contains text search operations, False otherwise
- Return type:
bool
- neosqlite.collection.temporary_table_aggregation.execute_2nd_tier_aggregation(query_engine, pipeline: list[dict[str, Any]], batch_size: int = 101) list[dict[str, Any]][source]¶
Execute aggregation pipeline using temporary table approach for complex pipelines.
This function is designed to be called as the second tier in a three-tier processing system: 1. First tier (QueryEngine): Try existing SQL optimization for simple pipelines 2. Second tier (this function): Try temporary table approach for complex pipelines 3. Third tier (QueryEngine): Fall back to Python implementation for unsupported operations
This function focuses specifically on processing complex pipelines that the current NeoSQLite SQL optimization cannot handle efficiently, using temporary tables for better performance.
- Parameters:
query_engine – The NeoSQLite QueryEngine instance to use for processing
pipeline (list[dict[str, Any]]) – List of aggregation pipeline stages to process
batch_size (int) – Batch size for fetching results from temporary tables
- Returns:
List of result documents after processing the pipeline
- Return type:
list[dict[str, Any]]