"""
Unified SQL translation framework for NeoSQLite.
This module provides a unified approach to translating MongoDB-style queries
into SQL statements that can be used both for direct execution and for
temporary table generation.
"""
import logging
from typing import Any
logger = logging.getLogger(__name__)
from .cursor import DESCENDING
from .jsonb_support import (
should_use_json_functions,
)
[docs]
def _empty_result() -> tuple[str, list[Any]]:
"""
Return an empty result tuple for fallback cases.
Returns:
tuple[str, list[Any]]: A tuple containing an empty string and an empty list.
"""
return "", []
[docs]
def _text_search_fallback() -> tuple[None, list[Any]]:
"""
Return a text search result tuple for fallback cases.
Returns:
tuple[None, list[Any]]: A tuple containing None and an empty list.
"""
return None, []
[docs]
def _convert_to_bitmask(value: Any) -> int | None:
"""
Convert a value to a bitmask for bitwise operators.
Args:
value: The value to convert (int, list of bit positions, or iterable)
Returns:
Integer bitmask or None if conversion fails
"""
if isinstance(value, int):
return value
match value:
case list() | tuple():
bitmask = 0
for bit_pos in value:
try:
bitmask |= 1 << int(bit_pos)
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
return bitmask
case _ if hasattr(value, "__iter__") and not isinstance(
value, (str, bytes)
):
bitmask = 0
try:
for bit_pos in value:
bitmask |= 1 << int(bit_pos)
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
return bitmask
case _:
try:
return int(value)
except (TypeError, ValueError) as e:
logger.debug(f"{e=}")
return None
[docs]
class SQLFieldAccessor:
"""
Handles field access patterns for different contexts.
This class provides methods to generate appropriate SQL expressions
for accessing fields in different contexts, such as direct table access
or temporary table access.
"""
[docs]
def __init__(
self,
data_column: str = "data",
id_column: str = "id",
jsonb_supported: bool = False,
):
"""
Initialize the SQLFieldAccessor with column names.
Args:
data_column: The name of the column containing JSON data (default: "data")
id_column: The name of the column containing document IDs (default: "id")
jsonb_supported: Whether JSONB functions are supported (default: False)
"""
self.data_column = data_column
self.id_column = id_column
self.jsonb_supported = jsonb_supported
[docs]
def _parse_json_path(self, field: str) -> str:
"""
Convert dot notation with array indexing to JSON path syntax.
Supports:
- Simple fields: "name" -> "$.name"
- Nested fields: "address.street" -> "$.address.street"
- Array indexing: "tags[0]" -> "$.tags[0]"
- Nested array access: "orders.items[2].name" -> "$.orders.items[2].name"
- Complex paths: "a.b[0].c[1].d" -> "$.a.b[0].c[1].d"
Args:
field (str): The field path in dot notation with optional array indices
Returns:
str: Properly formatted JSON path
"""
# Handle special case for _id
if field == "_id":
return field
# Pattern to match field names with optional array indices
# This pattern matches sequences like "field", "field[0]", "field[0][1]", etc.
# Split the field path by dots while preserving array indices
parts = []
current_part = ""
i = 0
while i < len(field):
if field[i] == ".":
if current_part:
parts.append(current_part)
current_part = ""
elif field[i] == "[":
# Find the closing bracket
bracket_end = field.find("]", i)
if bracket_end != -1:
# Add the array index to current part
current_part += field[i : bracket_end + 1]
i = bracket_end
else:
# Malformed array index, treat as regular character
current_part += field[i]
else:
current_part += field[i]
i += 1
# Add the last part
if current_part:
parts.append(current_part)
# Convert each part to JSON path format
json_parts = []
for part in parts:
# Check if part contains array indices
if "[" in part:
# Split field name from array indices
field_name_end = part.find("[")
field_name = part[:field_name_end]
array_indices = part[field_name_end:]
json_parts.append(f"{field_name}{array_indices}")
else:
json_parts.append(part)
return f"$.{'.'.join(json_parts)}"
[docs]
def get_field_access(
self, field: str, context: str = "direct", query: dict | None = None
) -> str:
"""
Generate field access SQL with enhanced JSON path support.
This method generates appropriate SQL expressions for accessing fields
based on the field name and context. For the special "_id" field, it returns
the _id column name for direct access. For other fields, it generates a
json_extract or jsonb_extract expression to access the field from the JSON
data column, with support for complex JSON paths including array indexing.
Args:
field: The field name to access
context: The context for field access (default: "direct")
query: The query being processed (used to determine if text search is needed)
Returns:
SQL expression for accessing the field
"""
if field == "_id":
# Special handling for _id field - access the _id column directly
return "_id"
else:
# Use enhanced JSON path parsing
json_path = self._parse_json_path(field)
# Determine whether to use json_* or jsonb_* functions
use_json = should_use_json_functions(query, self.jsonb_supported)
function_name = "json_extract" if use_json else "jsonb_extract"
return f"{function_name}({self.data_column}, '{json_path}')"
[docs]
class SQLOperatorTranslator:
"""
Translates MongoDB operators to SQL expressions.
This class handles the translation of MongoDB query operators (like $eq, $gt, $in, etc.)
into equivalent SQL expressions. It uses an SQLFieldAccessor to generate appropriate
field access expressions for different contexts.
"""
[docs]
def __init__(
self,
field_accessor: SQLFieldAccessor | None = None,
json_each_function: str | None = None,
):
"""
Initialize the SQLOperatorTranslator with an optional field accessor.
Args:
field_accessor: An SQLFieldAccessor instance to use for field access expressions.
If None, a default SQLFieldAccessor will be created.
json_each_function: The json_each function to use ('jsonb_each' or 'json_each').
If None, defaults to 'json_each'.
"""
self.field_accessor = field_accessor or SQLFieldAccessor()
self._json_each_function = json_each_function or "json_each"
[docs]
def translate_operator(
self, field_access: str, operator: str, value: Any
) -> tuple[str | None, list[Any]]:
"""
Translate a MongoDB operator to SQL.
This method handles the translation of MongoDB query operators into equivalent
SQL expressions. It supports various operators including comparison operators
($eq, $gt, $lt, $gte, $lte, $ne), array operators ($in, $nin), existence checks
($exists), modulo operations ($mod), array size checks ($size), and substring
searches ($contains).
Special handling is included for Binary objects which are serialized using
the compact format for SQL comparisons.
Args:
field_access: The SQL expression for accessing the field
operator: The MongoDB operator ($eq, $gt, etc.)
value: The value to compare against
Returns:
Tuple of (SQL expression, parameters) or (None, []) if unsupported
"""
# default `sql` and `params`
sql: str | None = None
params: list[Any] = []
# Serialize Binary objects for SQL comparisons using compact format
if isinstance(value, bytes) and hasattr(value, "encode_for_storage"):
from .json_helpers import neosqlite_json_dumps_for_sql
value = neosqlite_json_dumps_for_sql(value)
# Check if this is a datetime comparison that should be wrapped with datetime() function
# This is needed when datetime fields are indexed using datetime(json_extract(...)) for timezone normalization
is_datetime_comparison = (
operator in ("$eq", "$gt", "$lt", "$gte", "$lte", "$ne")
and isinstance(value, str)
and self._is_datetime_value(value)
) or (
operator in ("$in", "$nin")
and isinstance(value, (list, tuple))
and len(value) > 0 # Only if there are elements to check
and all(
isinstance(v, str) and self._is_datetime_value(v) for v in value
)
)
# If it's a datetime comparison, wrap both field and value with datetime() to match indexing strategy
if is_datetime_comparison:
# Wrap the field access with datetime() function for consistency with datetime indexes
datetime_field_access = f"datetime({field_access})"
match operator:
case "$eq":
sql = f"{datetime_field_access} = datetime(?)"
params = [value]
case "$gt":
sql = f"{datetime_field_access} > datetime(?)"
params = [value]
case "$lt":
sql = f"{datetime_field_access} < datetime(?)"
params = [value]
case "$gte":
sql = f"{datetime_field_access} >= datetime(?)"
params = [value]
case "$lte":
sql = f"{datetime_field_access} <= datetime(?)"
params = [value]
case "$ne":
sql = f"{datetime_field_access} != datetime(?)"
params = [value]
case "$in":
if isinstance(value, (list, tuple)):
if field_access == "_id":
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"{datetime_field_access} IN ({placeholders})"
params = list(value)
else:
# For array fields with datetime values, use json_each
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"EXISTS (SELECT 1 FROM {self._json_each_function}({datetime_field_access}) WHERE json_each.value IN ({placeholders}))"
params = list(value)
case "$nin":
if isinstance(value, (list, tuple)):
if field_access == "_id":
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"{datetime_field_access} NOT IN ({placeholders})"
params = list(value)
else:
# For array fields with datetime values, use json_each
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"NOT EXISTS (SELECT 1 FROM {self._json_each_function}({datetime_field_access}) WHERE json_each.value IN ({placeholders}))"
params = list(value)
case _:
# For unsupported operators with datetime values, fall back to regular processing
is_datetime_comparison = False
# Use regular processing below
else:
# Regular processing for non-datetime values
match operator:
case "$eq":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} = ?"
params = [value]
case "$gt":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} > ?"
params = [value]
case "$lt":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} < ?"
params = [value]
case "$gte":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} >= ?"
params = [value]
case "$lte":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} <= ?"
params = [value]
case "$ne":
# Array values need Python for correct semantics
if isinstance(value, (list, tuple)):
return None, []
sql = f"{field_access} != ?"
params = [value]
case "$in":
if isinstance(value, (list, tuple)):
if len(value) == 0:
return None, []
if field_access == "_id":
# For _id field, SQL IN works correctly (scalar comparison)
if len(value) > 0 and all(
isinstance(v, str)
and self._is_datetime_value(v)
for v in value
):
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"datetime({field_access}) IN ({placeholders})"
else:
placeholders = ", ".join("?" for _ in value)
sql = f"{field_access} IN ({placeholders})"
params = list(value)
else:
return None, []
case "$nin":
if isinstance(value, (list, tuple)):
if len(value) == 0:
return None, []
if field_access == "_id":
# For _id field, SQL NOT IN works correctly (scalar comparison)
if len(value) > 0 and all(
isinstance(v, str)
and self._is_datetime_value(v)
for v in value
):
placeholders = ", ".join(
"datetime(?)" for _ in value
)
sql = f"datetime({field_access}) NOT IN ({placeholders})"
else:
placeholders = ", ".join("?" for _ in value)
sql = f"{field_access} NOT IN ({placeholders})"
params = list(value)
else:
return None, []
case "$all":
if isinstance(value, (list, tuple)):
if len(value) == 0:
return None, []
if field_access != "_id":
return None, []
case "$exists":
# Handle boolean value for $exists
if isinstance(value, bool):
if value:
sql = f"{field_access} IS NOT NULL"
params = []
else:
sql = f"{field_access} IS NULL"
params = []
else:
# Invalid value for $exists, fallback to Python
return None, []
case "$mod":
# Handle [divisor, remainder] array
if isinstance(value, (list, tuple)) and len(value) == 2:
divisor, remainder = value
sql = f"{field_access} % ? = ?"
params = [divisor, remainder]
case "$size":
# Handle array size comparison
if isinstance(value, int):
sql = f"json_array_length({field_access}) = ?"
params = [value]
case "$contains":
# Handle case-insensitive substring search
# Convert value to string to match Python implementation behavior
str_value = str(value)
sql = f"lower({field_access}) LIKE ?"
params = [f"%{str_value.lower()}%"]
case "$bitsAllClear":
# Handle bitwise all clear operator
# Check if all specified bits are clear (0)
bitmask = _convert_to_bitmask(value)
if bitmask is not None:
sql = f"({field_access} & ?) = 0"
params = [bitmask]
case "$bitsAllSet":
# Handle bitwise all set operator
# Check if all specified bits are set (1)
bitmask = _convert_to_bitmask(value)
if bitmask is not None:
sql = f"({field_access} & ?) = ?"
params = [bitmask, bitmask]
case "$bitsAnyClear":
# Handle bitwise any clear operator
# Check if any of the specified bits are clear (0)
bitmask = _convert_to_bitmask(value)
if bitmask is not None:
sql = f"((~{field_access}) & ?) != 0"
params = [bitmask]
case "$bitsAnySet":
# Handle bitwise any set operator
# Check if any of the specified bits are set (1)
bitmask = _convert_to_bitmask(value)
if bitmask is not None:
sql = f"({field_access} & ?) != 0"
params = [bitmask]
case _:
# Unsupported operator
pass
return sql, params
[docs]
def _is_datetime_value(self, value: Any) -> bool:
"""
Check if a value is a datetime string.
Args:
value: Value to check
Returns:
True if value is a datetime string, False otherwise
"""
from .datetime_utils import is_datetime_value
return is_datetime_value(value)
[docs]
class SQLClauseBuilder:
"""
Builds SQL clauses with reusable components.
This class provides methods to build various SQL clauses including WHERE, ORDER BY,
and LIMIT/OFFSET clauses. It uses SQLFieldAccessor for field access and
SQLOperatorTranslator for operator translations to create SQL expressions from
MongoDB-style query specifications.
"""
[docs]
def __init__(
self,
field_accessor: SQLFieldAccessor | None = None,
operator_translator: SQLOperatorTranslator | None = None,
json_each_function: str | None = None,
):
"""
Initialize the SQLClauseBuilder with optional field accessor and operator translator.
Args:
field_accessor: An SQLFieldAccessor instance to use for field access expressions.
If None, a default SQLFieldAccessor will be created.
operator_translator: An SQLOperatorTranslator instance to use for operator translations.
If None, a default SQLOperatorTranslator will be created.
json_each_function: The json_each function to use ('jsonb_each' or 'json_each').
If provided and operator_translator is None, a new SQLOperatorTranslator
will be created with this function.
"""
self.field_accessor = field_accessor or SQLFieldAccessor()
if operator_translator is not None:
self.operator_translator = operator_translator
elif json_each_function is not None:
self.operator_translator = SQLOperatorTranslator(
field_accessor, json_each_function
)
else:
self.operator_translator = SQLOperatorTranslator()
[docs]
def _build_logical_condition(
self,
operator: str,
conditions: list[dict[str, Any]],
context: str = "direct",
) -> tuple[str | None, list[Any]]:
"""
Build a logical condition ($and, $or, $nor, $not).
This method handles the construction of SQL expressions for MongoDB logical operators.
It recursively processes nested conditions and builds appropriate SQL expressions
with proper parentheses grouping.
Args:
operator: The logical operator ($and, $or, $nor)
conditions: List of condition dictionaries
context: The context for field access (default: "direct")
Returns:
Tuple of (SQL expression, parameters) or (None, []) if unsupported
"""
# default `sql` and `params`
sql: str | None = None
params: list[Any] = []
if not isinstance(conditions, list):
return sql, params
clauses: list[str] = []
for condition in conditions:
if isinstance(condition, dict):
# Recursively build condition
clause, clause_params = self.build_where_clause(
condition, context, is_nested=True
)
if clause is None:
# Unsupported condition - return None to signal Python fallback
return None, []
elif clause: # Only add non-empty clauses
# Remove "WHERE " prefix if present
if clause.startswith("WHERE "):
clause = clause[6:]
clauses.append(f"({clause})")
params.extend(clause_params)
else:
# Non-dict condition - return None to signal Python fallback
return None, []
if not clauses:
return _empty_result()
match operator:
case "$and":
sql = " AND ".join(clauses)
case "$or":
sql = " OR ".join(clauses)
case "$nor":
sql = "NOT (" + " OR ".join(clauses) + ")"
case _:
# Unsupported logical operator
pass
return sql, params
[docs]
def build_where_clause(
self,
query: dict[str, Any],
context: str = "direct",
is_nested: bool = False,
query_param: dict | None = None,
) -> tuple[str | None, list[Any]]:
"""
Build a WHERE clause from a MongoDB-style query.
This method translates a MongoDB-style query specification into an SQL WHERE clause.
It handles both simple field conditions and complex logical operators ($and, $or, $nor, $not).
The method recursively processes nested conditions and properly groups them with parentheses.
Args:
query: The MongoDB-style query
context: The context for field access (default: "direct")
is_nested: Whether this is a nested condition within a logical operator (default: False)
query_param: The original query being processed (used to determine if text search is needed)
Returns:
Tuple of (WHERE clause, parameters) or (None, []) if unsupported
"""
# default `sql` and `params`
sql: str | None = None
params: list[Any] = []
# default `clauses`
clauses: list[str] = []
for field, value in query.items():
if field == "$where":
raise NotImplementedError(
"The '$where' operator (JavaScript) is not supported in NeoSQLite. "
"Please use the '$expr' operator for field-to-field comparisons, "
"which is fully compatible with MongoDB and highly optimized in NeoSQLite."
)
if field == "$function":
raise NotImplementedError(
"The '$function' operator is not supported in NeoSQLite. "
"Please use '$expr' with Python expressions, or post-process results in Python."
)
if field == "$accumulator":
raise NotImplementedError(
"The '$accumulator' operator is not supported in NeoSQLite. "
"Please use built-in accumulators ($sum, $avg, $min, $max, $count, $push, $addToSet, $first, $last), "
"or post-process results in Python."
)
if field in ("$and", "$or", "$nor"):
# Handle logical operators directly
sql, clause_params = self._build_logical_condition(
field, value, context
)
if sql is None:
return None, [] # Unsupported condition, fallback to Python
else: # Only add if not empty
clauses.append(sql)
params.extend(clause_params)
elif field == "$not":
# Handle $not logical operator (applies to single condition)
if isinstance(value, dict):
not_clause, not_params = self.build_where_clause(
value, context, is_nested=True, query_param=query_param
)
if not_clause is None:
return (
None,
[],
) # Unsupported condition, fallback to Python
if not_clause:
# Remove "WHERE " prefix if present
if not_clause.startswith("WHERE "):
not_clause = not_clause[6:]
clauses.append(f"NOT ({not_clause})")
params.extend(not_params)
else:
return _empty_result() # Empty condition
else:
return _empty_result() # Invalid format for $not
else:
# Regular field condition
# Get field access expression
field_access = self.field_accessor.get_field_access(
field, context, query_param
)
if isinstance(value, dict):
# Handle query operators like $eq, $gt, $lt, etc.
for operator, op_val in value.items():
sql, clause_params = (
self.operator_translator.translate_operator(
field_access, operator, op_val
)
)
if sql is None:
# Unsupported operator, fallback to Python
return None, []
clauses.append(sql)
params.extend(clause_params)
else:
# Simple equality check
clauses.append(f"{field_access} = ?")
params.append(value)
if not clauses:
return _empty_result()
# Build the final WHERE clause
where_clause = "WHERE " + " AND ".join(clauses)
if is_nested:
# Remove "WHERE " prefix for nested conditions
where_clause = where_clause[6:]
return where_clause, params
[docs]
def build_order_by_clause(
self, sort_spec: dict[str, Any], context: str = "direct"
) -> str:
"""
Build an ORDER BY clause from a sort specification.
This method translates a MongoDB-style sort specification into an SQL ORDER BY clause.
It handles multiple sort fields with their respective sort directions (ascending or descending).
Args:
sort_spec: The sort specification mapping field names to sort directions
context: The context for field access (default: "direct")
Returns:
ORDER BY clause as a string
"""
if not sort_spec:
return ""
order_parts = []
for field, direction in sort_spec.items():
field_access = self.field_accessor.get_field_access(field, context)
order_parts.append(
f"{field_access} {'DESC' if direction == DESCENDING else 'ASC'}"
)
return "ORDER BY " + ", ".join(order_parts)
[docs]
def build_limit_offset_clause(
self, limit_value: int | None = None, skip_value: int = 0
) -> str:
"""
Build LIMIT and OFFSET clauses.
This method constructs SQL LIMIT and OFFSET clauses based on the provided limit and skip values.
It handles the special case where SQLite requires a LIMIT clause when using OFFSET.
Args:
limit_value: The limit value (default: None)
skip_value: The skip value (default: 0)
Returns:
LIMIT and OFFSET clauses as a string
"""
limit_clause = ""
if limit_value is not None:
if skip_value > 0:
limit_clause = f"LIMIT {limit_value} OFFSET {skip_value}"
else:
limit_clause = f"LIMIT {limit_value}"
elif skip_value > 0:
# SQLite requires LIMIT when using OFFSET
limit_clause = f"LIMIT -1 OFFSET {skip_value}"
return limit_clause
[docs]
class SQLTranslator:
"""
Unified SQL translator that can be used for both direct SQL generation
and temporary table generation.
This class provides a high-level interface for translating MongoDB-style query
operations into SQL clauses. It integrates SQLFieldAccessor, SQLOperatorTranslator,
and SQLClauseBuilder to provide comprehensive SQL translation capabilities.
"""
[docs]
def __init__(
self,
table_name: str | None = None,
data_column: str = "data",
id_column: str = "id",
jsonb_supported: bool = False,
json_each_function: str | None = None,
):
"""
Initialize the SQLTranslator with table and column names.
Args:
table_name: The name of the table to query (default: "collection")
data_column: The name of the column containing JSON data (default: "data")
id_column: The name of the column containing document IDs (default: "id")
jsonb_supported: Whether JSONB functions are supported (default: False)
json_each_function: The json_each function to use ('jsonb_each' or 'json_each').
If None, defaults to 'json_each'.
"""
self.table_name = table_name or "collection"
self.data_column = data_column
self.id_column = id_column
self.jsonb_supported = jsonb_supported
self._json_each_function = json_each_function or "json_each"
# Initialize components
self.field_accessor = SQLFieldAccessor(
data_column, id_column, jsonb_supported
)
self.operator_translator = SQLOperatorTranslator(
self.field_accessor, self._json_each_function
)
self.clause_builder = SQLClauseBuilder(
self.field_accessor, self.operator_translator
)
[docs]
def translate_match(
self, match_spec: dict[str, Any], context: str = "direct"
) -> tuple[str | None, list[Any]]:
"""
Translate a $match stage to SQL WHERE clause.
This method translates a MongoDB $match specification into an SQL WHERE clause.
It handles special cases like text search queries by returning None to indicate
that fallback to Python implementation is required. For regular queries, it delegates
to the SQLClauseBuilder's build_where_clause method.
Args:
match_spec: The $match specification
context: The context for field access (default: "direct")
Returns:
Tuple of (WHERE clause, parameters) or (None, []) for text search
"""
# Handle text search queries separately
if not isinstance(match_spec, dict):
return _text_search_fallback() # Fallback for non-dict queries
if "$text" in match_spec:
return (
_text_search_fallback()
) # Special handling required, return None to fallback
# Check for nested $text operators in logical operators
if self._contains_text_operator(match_spec):
return (
_text_search_fallback()
) # Special handling required, return None to fallback
# Pass the query to the clause builder so it can be used for field access decisions
return self.clause_builder.build_where_clause(
match_spec, context, query_param=match_spec
)
[docs]
def _contains_text_operator(self, query: dict[str, Any]) -> bool:
"""
Check if a query contains any $text operators, including nested in logical operators.
This method delegates to the centralized _contains_text_operator function
to ensure consistent text search detection across all NeoSQLite components.
Args:
query: The query to check
Returns:
True if the query contains $text operators, False otherwise
"""
from .jsonb_support import _contains_text_operator
return _contains_text_operator(query)
[docs]
def translate_sort(
self, sort_spec: dict[str, Any], context: str = "direct"
) -> str:
"""
Translate a $sort stage to SQL ORDER BY clause.
This method translates a MongoDB $sort specification into an SQL ORDER BY clause.
It delegates to the SQLClauseBuilder's build_order_by_clause method to perform
the actual translation, handling multiple sort fields with their respective
sort directions (ascending or descending).
Args:
sort_spec: The $sort specification
context: The context for field access (default: "direct")
Returns:
ORDER BY clause
"""
return self.clause_builder.build_order_by_clause(sort_spec, context)
[docs]
def translate_skip_limit(
self, limit_value: int | None = None, skip_value: int = 0
) -> str:
"""
Translate $skip and $limit stages to SQL LIMIT/OFFSET clauses.
This method translates MongoDB $skip and $limit specifications into SQL LIMIT and OFFSET clauses.
It delegates to the SQLClauseBuilder's build_limit_offset_clause method to perform the actual
translation, handling both limit and skip values appropriately for SQLite syntax.
Args:
limit_value: The limit value (default: None)
skip_value: The skip value (default: 0)
Returns:
LIMIT and OFFSET clauses
"""
return self.clause_builder.build_limit_offset_clause(
limit_value, skip_value
)
[docs]
def translate_field_access(
self, field: str, context: str = "direct"
) -> str:
"""
Translate field access for a given field and context.
This method generates the appropriate SQL expression for accessing a field
based on the field name and context. It delegates to the SQLFieldAccessor's
get_field_access method to perform the actual translation, handling special
cases like the "_id" field which maps to the ID column, and regular fields
which use json_extract expressions.
Args:
field: The field name
context: The context for field access (default: "direct")
Returns:
SQL expression for accessing the field
"""
return self.field_accessor.get_field_access(field, context)
[docs]
def translate_sort_skip_limit(
self,
sort_spec: dict[str, Any] | None,
skip_value: int = 0,
limit_value: int | None = None,
context: str = "direct",
) -> tuple[str, str, str]:
"""
Translate sort/skip/limit stages to SQL clauses.
This method translates MongoDB sort, skip, and limit specifications into their
corresponding SQL clauses. It combines the functionality of translate_sort and
translate_skip_limit to generate ORDER BY, LIMIT, and OFFSET clauses in a single call.
Args:
sort_spec: The $sort specification (default: None)
skip_value: The skip value (default: 0)
limit_value: The limit value (default: None)
context: The context for field access (default: "direct")
Returns:
Tuple of (ORDER BY clause, LIMIT clause, OFFSET clause)
"""
# Build ORDER BY clause
order_by = self.translate_sort(sort_spec, context) if sort_spec else ""
# Build LIMIT/OFFSET clause
limit_offset = self.translate_skip_limit(limit_value, skip_value)
return order_by, limit_offset, ""