neosqlite.collection.temporary_table_aggregation module

Simplified temporary table aggregation pipeline implementation for NeoSQLite. This focuses on the core concept: using temporary tables to process complex pipelines that the current implementation can’t optimize with a single SQL query.

neosqlite.collection.temporary_table_aggregation._sanitize_params(params: list[Any] | None) list[Any] | None[source]

Sanitize SQL parameters by converting ObjectId instances to strings.

SQLite doesn’t know how to bind ObjectId types, so we convert them to strings.

Parameters:

params – List of parameters or None

Returns:

Sanitized parameters with ObjectId converted to strings

neosqlite.collection.temporary_table_aggregation._json_extract_field_with_objectid_support(json_function_prefix: str, field_name: str, is_local_field: bool = True) str[source]

Generate SQL expression to extract a field value with ObjectId support.

When a field contains an ObjectId (stored as {“__neosqlite_objectid__”:true,”id”:”…”}), this extracts just the ID string instead of the full JSON object.

Parameters:
  • json_function_prefix – The JSON function prefix (json or jsonb)

  • field_name – The field name to extract

  • is_local_field – Whether this is a local field (True) or foreign field (False)

Returns:

SQL expression string

class neosqlite.collection.temporary_table_aggregation.DeterministicTempTableManager(pipeline_id: str)[source]

Bases: object

Manager for deterministic temporary table names.

This class generates unique but deterministic temporary table names based on pipeline stages and a pipeline ID. It ensures that the same pipeline stage will always generate the same table name within the same pipeline execution, which is useful for caching and optimization purposes.

__init__(pipeline_id: str)[source]

Initialize the DeterministicTempTableManager with a pipeline ID for generating unique table names.

Parameters:

pipeline_id (str) – A unique identifier for the pipeline, used to ensure table names are deterministic and unique across different pipeline executions.

make_temp_table_name(stage: dict[str, Any], name_suffix: str = '') str[source]

Generate a deterministic temporary table name based on the pipeline stage and pipeline ID.

This method creates a unique but deterministic name for a temporary table by: 1. Creating a canonical representation of the stage 2. Hashing the stage to create a short, unique suffix 3. Combining the pipeline ID, stage type, and hash to form a base name 4. Ensuring uniqueness by tracking name usage within the pipeline

Parameters:
  • stage (dict[str, Any]) – The pipeline stage dictionary used to generate the table name

  • name_suffix (str, optional) – An additional suffix to append to the table name. Defaults to “”.

Returns:

A deterministic temporary table name unique to this stage and

pipeline

Return type:

str

neosqlite.collection.temporary_table_aggregation.aggregation_pipeline_context(db_connection, pipeline_id: str | None = None)[source]

Context manager for temporary aggregation tables with automatic cleanup.

This context manager provides a clean and safe way to work with temporary tables during aggregation pipeline processing. It handles:

  1. Creating a savepoint for atomicity of the entire pipeline

  2. Generating deterministic temporary table names

  3. Providing a function to create temporary tables with proper naming

  4. Automatic cleanup of all temporary tables and savepoint on exit

The context manager supports both new deterministic naming (using stage dictionaries) and backward compatibility (using string suffixes) for temporary tables.

Parameters:
  • db_connection – The database connection object

  • pipeline_id (str | None) – A unique identifier for the pipeline. If None, a default ID is generated for backward compatibility.

Yields:

Callable

A function to create temporary tables with the signature:

create_temp_table(stage_or_suffix, query, params=None, name_suffix=””)

Where: - stage_or_suffix: Either a stage dict (new approach) or string

(backward compatibility)

  • query: The SQL query to populate the temporary table

  • params: Optional parameters for the SQL query

  • name_suffix: Optional suffix for backward compatibility naming

Raises:

Exception – Any exception that occurs during pipeline processing is re-raised after cleanup operations

class neosqlite.collection.temporary_table_aggregation.TemporaryTableAggregationProcessor(collection, query_engine=None)[source]

Bases: object

Processor for aggregation pipelines using temporary tables.

__init__(collection, query_engine=None)[source]

Initialize the TemporaryTableAggregationProcessor with a collection.

Parameters:
  • collection – The NeoSQLite collection to process aggregation pipelines on. This collection provides the database connection and document loading functionality needed for pipeline processing.

  • query_engine – Optional QueryEngine instance for accessing helpers. If not provided, text search in match stages will use simplified processing.

process_pipeline(pipeline: list[dict[str, Any]], is_count: bool = False, count_field: str | None = None, batch_size: int = 101) list[dict[str, Any]][source]

Process an aggregation pipeline using temporary tables for intermediate results.

This method implements a temporary table approach for processing complex aggregation pipelines that cannot be optimized into a single SQL query by the current NeoSQLite implementation. It works by:

  1. Generating a deterministic pipeline ID based on the pipeline content

  2. Using the aggregation_pipeline_context for atomicity and cleanup

  3. Creating temporary tables for each stage or group of compatible stages

  4. Processing pipeline stages in an optimized order (grouping compatible stages)

  5. Returning the final results from the last temporary table

The method supports these pipeline stages: - $match: For filtering documents - $unwind: For deconstructing array fields - $lookup: For joining documents from different collections - $sort, $skip, $limit: For sorting and pagination - $addFields: For adding fields to documents - $count: For counting documents (optimized to use SQL COUNT)

Parameters:

pipeline (list[dict[str, Any]]) – A list of aggregation pipeline stages to process

Returns:

A list of result documents after processing the

pipeline

Return type:

list[dict[str, Any]]

Raises:

NotImplementedError – If the pipeline contains unsupported stages

_process_match_stage(create_temp: Callable, current_table: str, match_spec: dict[str, Any]) str[source]

Process a $match stage using temporary tables.

This method creates a temporary table that contains only documents matching the specified criteria. It translates the MongoDB-style match specification into SQL WHERE conditions using json_extract for field access.

The method supports these match operators: - $eq, $gt, $lt, $gte, $lte: Comparison operators - $in, $nin: Array membership operators - $ne: Not equal operator - $text: Text search operator (handled with special logic for unwound elements)

For the special _id field, it uses the table’s id column directly rather than json_extract.

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • match_spec (dict[str, Any]) – The $match stage specification

Returns:

Name of the newly created temporary table with matched documents

Return type:

str

_process_unwind_stages(create_temp: Callable, current_table: str, unwind_specs: list[Any]) str[source]

Process one or more consecutive $unwind stages using temporary tables.

This method handles the $unwind stage which deconstructs an array field from input documents to output a document for each element. It can process either a single unwind stage or multiple consecutive unwind stages.

For a single unwind, it uses SQLite’s json_each function to expand the array into separate rows. For multiple consecutive unwinds, it processes them sequentially (one at a time) rather than trying to process them all together, which doesn’t work for nested arrays that depend on previous unwind operations.

The method properly handles array validation, ensuring that only documents with array fields are processed. It also supports the special _id field handling if it were to be unwound (though this would be unusual).

Supports these $unwind options: - path: The array field to unwind (required) - preserveNullAndEmptyArrays: If true, includes documents where the array is missing/null/empty - includeArrayIndex: If specified, includes the array index in the output

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • unwind_specs (list[Any]) – List of $unwind stage specifications to process consecutively

Returns:

Name of the newly created temporary table with unwound documents

Return type:

str

Raises:

ValueError – If an invalid unwind specification is encountered

_create_lookup_hash_table(from_collection: str, foreign_field: str | None, pipeline: list[dict[str, Any]] | None = None) tuple[str, str][source]

Create a hash table (temp table with index) from a foreign collection for efficient hash join lookup.

This implements O(n+m) hash join instead of O(n*m) correlated subquery.

Parameters:
  • from_collection – The collection to build hash table from

  • foreign_field – The field to use as join key (None for _id)

  • pipeline – Optional pipeline to run on foreign collection first

Returns:

Tuple of (hash_table_name, join_key_column)

_create_lookup_hash_table_fallback(hash_table_name: str, from_collection: str, foreign_field: str, join_key: str) None[source]

Fallback method to create hash table by reading documents one by one.

Used when the SQL approach fails due to malformed JSON in the data column. This method skips corrupted documents gracefully.

Parameters:
  • hash_table_name – Name of the hash table to create

  • from_collection – Source collection name

  • foreign_field – Field to use as join key

  • join_key – Name of the join key column

_estimate_collection_size(collection_name: str) int[source]

Estimate the size of a collection in bytes.

Uses SQLite’s table statistics to estimate size.

Parameters:

collection_name – Name of the collection to estimate

Returns:

Estimated size in bytes

_get_available_memory() int[source]

Get available memory for hash join operations.

Returns:

Available memory in bytes (estimated from SQLite cache or system)

_should_use_hash_join(from_collection: str, pipeline: list[dict[str, Any]] | None = None) bool[source]

Decide whether to use hash join or correlated subquery for $lookup.

Uses memory estimation to decide: - If foreign collection estimate < 30% of available memory: use hash join (faster) - Otherwise: use correlated subquery (lower memory, slower)

Parameters:
  • from_collection – The foreign collection name

  • pipeline – Optional pipeline to run on foreign collection first

Returns:

True if should use hash join, False for correlated subquery

_extract_field_value(doc: dict[str, Any], field: str) Any[source]

Extract field value from document, supporting dot notation.

_process_lookup_stage(create_temp: Callable, current_table: str, lookup_spec: dict[str, Any]) str[source]

Process a $lookup stage using hash join for O(n+m) complexity.

This method implements the $lookup aggregation stage which performs a left outer join to another collection in the same database. It uses an optimized hash join approach: 1. Creates a temporary table with an index on the foreign field (hash table) 2. Uses a single JOIN query instead of correlated subquery

This replaces the previous correlated subquery approach which was O(n*m).

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • lookup_spec (dict[str, Any]) – The $lookup stage specification containing: - “from”: The name of the collection to join with - “localField”: The field from the input documents - “foreignField”: The field from the documents of the “from” collection - “as”: The name of the new array field to add to the matching documents - “pipeline”: Optional pipeline to run on foreign collection

Returns:

Name of the newly created temporary table with lookup results added

Return type:

str

_process_lookup_correlated_subquery(create_temp: Callable, current_table: str, lookup_spec: dict[str, Any]) str[source]

Process $lookup using correlated subquery (O(n*m) but low memory).

This is the fallback when the foreign collection is too large for hash join.

Parameters:
  • create_temp – Function to create temporary tables

  • current_table – Current temp table name

  • lookup_spec – The $lookup specification

Returns:

Name of the new temporary table

_process_lookup_hash_join(create_temp: Callable, current_table: str, lookup_spec: dict[str, Any]) str[source]

Process $lookup using hash join (O(n+m) but uses more memory).

Parameters:
  • create_temp – Function to create temporary tables

  • current_table – Current temp table name

  • lookup_spec – The $lookup specification

Returns:

Name of the new temporary table

_process_sort_skip_limit_stage(create_temp: Callable, current_table: str, sort_spec: dict[str, Any] | None, skip_value: int = 0, limit_value: int | None = None) str[source]

Process sort/skip/limit stages using temporary tables.

This method handles the $sort, $skip, and $limit aggregation stages, which can be used individually or in combination. It creates a temporary table with the results sorted and/or paginated according to the specifications.

The method supports sorting on both regular fields (using json_extract) and the special _id field (using the id column directly). It handles ascending and descending sort orders, as well as skip and limit operations with proper OFFSET and LIMIT clauses in the SQL query.

When multiple sort/skip/limit stages are consecutive in a pipeline, they are processed together in a single operation for efficiency.

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • sort_spec (dict[str, Any] | None) – The $sort stage specification, mapping field names to sort directions (1 for ascending, -1 for descending)

  • skip_value (int) – The number of documents to skip (from $skip stage)

  • limit_value (int | None) – The maximum number of documents to return (from $limit stage)

Returns:

Name of the newly created temporary table with sorted/skipped/limited results

Return type:

str

_process_add_fields_stage(create_temp: Callable, current_table: str, add_fields_spec: dict[str, Any]) str[source]

Process an $addFields stage using temporary tables.

This method implements the $addFields aggregation stage which adds new fields to documents. It uses SQLite’s json_set function to add fields to the JSON data.

Supports: - Simple field copying: {“newField”: “$existingField”} - $replaceOne: {$replaceOne: {input: “$text”, find: “old”, replacement: “new”}} - Literal values: {“field”: “value”}

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • add_fields_spec (dict[str, Any]) – The $addFields stage specification mapping new field names to source field paths

Returns:

Name of the newly created temporary table with added fields

Return type:

str

_process_add_fields_stage_python_hybrid(create_temp: Callable, current_table: str, add_fields_spec: dict[str, Any]) str[source]

Process $addFields stage with complex expressions using Python hybrid approach.

This method loads documents from the current temp table, applies the $addFields expressions in Python (using ExprEvaluator), and creates a new temp table. This allows us to stay in Tier 2 (temp tables) while still supporting complex expressions like $filter, $map, $reduce, etc.

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table

  • add_fields_spec (dict[str, Any]) – The $addFields stage specification

Returns:

Name of the newly created temporary table with added fields

Return type:

str

_is_expression(expr: Any) bool[source]

Check if an expression is a complex expression (not a simple field reference or literal).

_process_project_stage(create_temp: Callable, current_table: str, project_spec: dict[str, Any]) str[source]

Process a $project stage using temporary tables.

This method implements the $project aggregation stage which reshapes each document in the stream, by adding new fields, removing existing fields, or renaming fields. It reconstructs a unified data column using json_object / jsonb_object so that downstream stages (especially FTS5 text search via json_tree) continue to work without modification.

Supports: - Simple inclusion: {"field": 1} - Exclusion: {"field": 0} - Field references: {"alias": "$some.path"} - Expression projections: {"alias": {$concat: [...]}} - _id inclusion/exclusion

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table

  • project_spec (dict[str, Any]) – The $project stage specification

Returns:

Name of the newly created temporary table

Return type:

str

_process_project_exclusion(create_temp: Callable, current_table: str, project_spec: dict[str, Any], include_id: bool) str[source]

Handle exclusion-mode projection by removing fields via json_remove.

_process_project_inclusion(create_temp: Callable, current_table: str, project_spec: dict[str, Any], include_id_default: bool) str[source]

Handle inclusion-mode projection by reconstructing data via json_object.

Handles: - Simple inclusion: {"field": 1} - Field references: {"alias": "$some.path"} - Expression projections: {"alias": {$concat: [...]}}

_generate_text_score_sql() str[source]

Generate SQL for $meta: “textScore” using stored BM25 score.

During $text search stages, the FTS5 BM25 relevance score is captured and stored in the document’s JSON data as _textScore. This method extracts that score for use in $project/$addFields stages.

Returns:

SQL expression that returns the BM25 relevance score (positive value)

_process_replace_root_stage(create_temp: Callable, current_table: str, replace_spec: Any) str[source]

Process a $replaceRoot or $replaceWith stage using temporary tables.

This method implements the $replaceRoot/$replaceWith aggregation stage which replaces the root document with a specified field or expression.

MongoDB syntax:

{$replaceRoot: {newRoot: “$field”}} {$replaceWith: “$field”}

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • replace_spec (Any) – The replace specification (field path or expression)

Returns:

Name of the newly created temporary table with replaced root documents

Return type:

str

_process_group_stage(create_temp: Callable, current_table: str, group_spec: dict[str, Any]) str[source]

Process a $group stage using temporary tables.

This method implements the $group aggregation stage which groups documents by a specified key and performs accumulator operations.

Supports these accumulators in SQL tier: - $sum, $avg, $min, $max: Standard SQL aggregators - $count: COUNT(*) - $first, $last: Using subqueries (LIMITATION: requires no preceding $sort) - $addToSet: Using json_group_array(DISTINCT …) - $push: Using json_group_array(…) - Expression keys: Using SQLTranslator for expression evaluation

Limitation: - $first/$last with preceding $sort stage falls back to Python for correctness.

The current implementation uses correlated subqueries that don’t preserve sort order across groups.

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • group_spec (dict[str, Any]) – The $group stage specification

Returns:

Name of the newly created temporary table with grouped results

Return type:

str

_id_to_json_object_args(select_parts: list[str]) str[source]

Convert SELECT parts to json_object arguments.

Parameters:

select_parts – List of SELECT column expressions (e.g., [“expr1 AS field1”, “expr2 AS field2”])

Returns:

Comma-separated list of ‘key’, value pairs for json_object

_get_results_from_table(table_name: str, is_count: bool = False, count_field: str | None = None, batch_size: int = 101) list[dict[str, Any]][source]

Get results from a temporary table.

This method retrieves all documents from a temporary table and converts them back into their Python dictionary representation using the collection’s document loading mechanism.

For $count optimization, if is_count is True, it returns a single document with the count from the table using SQL COUNT(*) instead of loading all documents.

Parameters:
  • table_name (str) – Name of the temporary table to retrieve results from

  • is_count (bool) – If True, return count document instead of all documents

  • count_field (str | None) – The field name for the count if is_count is True

Returns:

List of documents retrieved from the temporary table,

with each document represented as a dictionary

Return type:

list[dict[str, Any]]

Apply Python-based text search to a document.

This method uses the unified_text_search function to determine if a document matches a given search term. It’s used as a fallback when text search cannot be efficiently handled with SQL queries, particularly in cases involving unwound elements or complex text search operations.

Parameters:
  • document (dict[str, Any]) – The document to search in

  • search_term (str) – The term to search for

Returns:

True if the document matches the text search, False otherwise

Return type:

bool

_batch_insert_documents(table_name: str, documents: list[tuple]) None[source]

Insert multiple documents into a temporary table efficiently.

This method provides an optimized way to insert multiple documents into a temporary table by using a single INSERT statement with multiple value sets. It’s used primarily in the text search processing where documents need to be filtered and inserted into a result table.

Parameters:
  • table_name (str) – The name of the table to insert into

  • documents (list[tuple]) – List of (id, data) tuples to insert

_process_text_search_stage(create_temp: Callable, current_table: str, match_spec: dict[str, Any]) str[source]

Process a $text search stage using FTS5 on temporary table.

This method creates an FTS5 virtual table on the temporary data and uses SQLite’s FTS5 for efficient text search. The tokenizer configuration is detected from the existing FTS index on the collection to ensure consistent behavior.

Note

When $text is used after $unwind (or other stages that create temp tables), the search operates on the unwound elements in the temp table, not on the original collection documents. This differs from MongoDB’s semantics where $text always uses the collection-level text index on original documents.

Parameters:
  • create_temp (Callable) – Function to create temporary tables

  • current_table (str) – Name of the current temporary table containing input data

  • match_spec (dict[str, Any]) – The $match stage specification containing the $text operator with a $search term

Returns:

Name of the newly created temporary table with text search results

Return type:

str

Raises:

ValueError – If the $text operator specification is invalid or the search term is not a string

_detect_fts_tokenizer() str[source]

Detect the tokenizer configuration from existing FTS indexes on the collection.

Returns:

The tokenizer clause for FTS5 (e.g., “, tokenize=porter” or “”)

Return type:

str

_process_bucket_stage(create_temp, current_table, bucket_spec)[source]

Process $bucket stage - groups documents by boundaries.

MongoDB syntax: {

$bucket: {

groupBy: <expression>, boundaries: [<lowerbound1>, <lowerbound2>, …], default: <literal>, // optional output: { <output1>: { <$accumulator expression> }, … }

}

}

_build_group_by_expr(group_by)[source]

Build SQL expression for groupBy field.

_process_bucket_auto_stage(create_temp, current_table, bucket_auto_spec)[source]

Process $bucketAuto stage - auto-sized buckets.

MongoDB syntax: {

$bucketAuto: {

groupBy: <expression>, buckets: <number>, output: { <output1>: { <$accumulator expression> }, … }, granularity: <string> // optional

}

}

_process_densify_stage(create_temp, current_table, densify_spec)[source]

Process $densify stage - fills in missing values in a sequence.

MongoDB syntax: {

$densify: {

field: <field_name>, range: {

step: <number>, bounds: [<lower>, <upper>]

}, partitionBy: <expression> // optional

}

}

_process_facet_stage(create_temp, current_table, facet_spec)[source]

Process $facet stage - processes multiple sub-pipelines and combines results.

MongoDB syntax: {

$facet: {

<output_field1>: [<pipeline stages>], <output_field2>: [<pipeline stages>], …

}

}

This method: 1. Extracts input documents from the current temp table 2. For each sub-pipeline, executes it using normal aggregation (Tier 1/2/3) 3. Combines all results into a single document with facet fields 4. Returns a temp table containing that combined result

_process_union_with_stage(create_temp, current_table, union_spec)[source]

Process $unionWith stage - combines documents from another collection.

MongoDB syntax: {

$unionWith: {

coll: <collection_name>, pipeline: [<pipeline stages>] // optional

}

}

_process_merge_stage(create_temp, current_table, merge_spec)[source]

Process $merge stage - writes results to a collection.

MongoDB syntax: {

$merge: {

into: <collection_name>, on: <field>, // optional whenMatched: <action>, // optional whenNotMatched: <action> // optional

}

}

_process_redact_stage(create_temp, current_table, redact_spec)[source]

Process $redact stage - field-level redaction based on conditions.

MongoDB syntax: {

$redact: {
$cond: {

if: <condition>, then: <level>, else: <level>

}

}

}

Levels: - $$DESCEND: Include the field and process sub-fields - $$PRUNE: Exclude the field - $$KEEP: Include the field as-is

_process_set_window_fields_stage(create_temp: Callable[[dict[str, Any], str, list[Any]], str], current_table: str, spec: dict[str, Any]) str[source]

Process $setWindowFields stage.

_map_window_operator_to_sql(op_name: str, op_val: Any) tuple[str | None, str, list[Any]][source]

Map MongoDB window operator to SQL function and operand.

_build_window_frame_sql(window_spec: dict[str, Any] | None) str[source]

Build SQL window frame clause (ROWS BETWEEN …).

_process_graph_lookup_stage(create_temp: Callable[[dict[str, Any], str, list[Any]], str], current_table: str, spec: dict[str, Any]) str[source]

Process $graphLookup stage.

_process_fill_stage(create_temp: Callable[[dict[str, Any], str, list[Any]], str], current_table: str, spec: dict[str, Any]) str[source]

Process $fill stage.

neosqlite.collection.temporary_table_aggregation.can_process_with_temporary_tables(pipeline: list[dict[str, Any]]) bool[source]

Determine if a pipeline can be processed with temporary tables.

This function checks if all stages in an aggregation pipeline are supported by the temporary table processing approach. It verifies that each stage in the pipeline is one of the supported stage types.

Additionally, it handles special cases for text search operations: - Pure text search operations are supported with hybrid processing - Text search with simple unwind operations are supported (uses Python text search on temp tables) - Complex nested unwinds (multiple unwinds or dotted paths) fall back to Python

Parameters:

pipeline (list[dict[str, Any]]) – List of aggregation pipeline stages to check

Returns:

True if all stages in the pipeline are supported and can be processed

with temporary tables, False otherwise

Return type:

bool

Check if a match specification contains text search operations.

This function delegates to the centralized _contains_text_operator function to ensure consistent text search detection across all NeoSQLite components.

Parameters:

match_spec (dict[str, Any]) – The match specification to check for text search operations

Returns:

True if the match specification contains text search operations, False otherwise

Return type:

bool

neosqlite.collection.temporary_table_aggregation.execute_2nd_tier_aggregation(query_engine, pipeline: list[dict[str, Any]], batch_size: int = 101) list[dict[str, Any]][source]

Execute aggregation pipeline using temporary table approach for complex pipelines.

This function is designed to be called as the second tier in a three-tier processing system: 1. First tier (QueryEngine): Try existing SQL optimization for simple pipelines 2. Second tier (this function): Try temporary table approach for complex pipelines 3. Third tier (QueryEngine): Fall back to Python implementation for unsupported operations

This function focuses specifically on processing complex pipelines that the current NeoSQLite SQL optimization cannot handle efficiently, using temporary tables for better performance.

Parameters:
  • query_engine – The NeoSQLite QueryEngine instance to use for processing

  • pipeline (list[dict[str, Any]]) – List of aggregation pipeline stages to process

  • batch_size (int) – Batch size for fetching results from temporary tables

Returns:

List of result documents after processing the pipeline

Return type:

list[dict[str, Any]]