diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1ae8ea..200a2a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: - name: Build wheel and install run: | maturin build --release --out dist - pip install --find-links dist wolfxl + pip install --no-index --find-links dist wolfxl - name: Run tests run: pytest tests/ -v diff --git a/pyproject.toml b/pyproject.toml index 670ad25..15ca853 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,9 @@ classifiers = [ "Topic :: Office/Business :: Financial :: Spreadsheet", ] +[project.optional-dependencies] +calc = ["formulas>=1.3.3,<2.0"] + [project.urls] Homepage = "https://github.com/SynthGL/wolfxl" Repository = "https://github.com/SynthGL/wolfxl" @@ -29,9 +32,18 @@ bindings = "pyo3" module-name = "wolfxl._rust" python-source = "python" +[tool.pyright] +pythonVersion = "3.12" +extraPaths = ["python"] + [tool.ruff] line-length = 100 + +[tool.ruff.lint] select = ["E", "F", "I", "N", "W", "UP"] +[tool.pytest.ini_options] +markers = ["slow: marks tests that are sensitive to CI timing"] + [tool.mypy] strict = true diff --git a/python/wolfxl/__init__.py b/python/wolfxl/__init__.py index c268189..c3389be 100644 --- a/python/wolfxl/__init__.py +++ b/python/wolfxl/__init__.py @@ -17,6 +17,8 @@ wb.save("out.xlsx") """ +from __future__ import annotations + import os from wolfxl._rust import __version__ diff --git a/python/wolfxl/_workbook.py b/python/wolfxl/_workbook.py index b000f4e..dca2505 100644 --- a/python/wolfxl/_workbook.py +++ b/python/wolfxl/_workbook.py @@ -8,10 +8,13 @@ from __future__ import annotations import os -from typing import Any +from typing import TYPE_CHECKING, Any from wolfxl._worksheet import Worksheet +if TYPE_CHECKING: + from wolfxl.calc._protocol import RecalcResult + class Workbook: """openpyxl-compatible workbook backed by Rust.""" @@ -23,6 +26,7 @@ def __init__(self) -> None: self._rust_writer: Any = _rust.RustXlsxWriterBook() self._rust_reader: Any = None self._rust_patcher: Any = None + self._evaluator: Any = None self._sheet_names: list[str] = ["Sheet"] self._sheets: dict[str, Worksheet] = {} self._sheets["Sheet"] = Worksheet(self, "Sheet") @@ -36,6 +40,7 @@ def _from_reader(cls, path: str) -> Workbook: wb = object.__new__(cls) wb._rust_writer = None wb._rust_patcher = None + wb._evaluator = None wb._rust_reader = _rust.CalamineStyledBook.open(path) names = [str(n) for n in wb._rust_reader.sheet_names()] wb._sheet_names = names @@ -51,6 +56,7 @@ def _from_patcher(cls, path: str) -> Workbook: wb = object.__new__(cls) wb._rust_writer = None + wb._evaluator = None wb._rust_reader = _rust.CalamineStyledBook.open(path) wb._rust_patcher = _rust.XlsxPatcher.open(path) names = [str(n) for n in wb._rust_reader.sheet_names()] @@ -118,6 +124,50 @@ def save(self, filename: str | os.PathLike[str]) -> None: else: raise RuntimeError("save requires write or modify mode") + # ------------------------------------------------------------------ + # Formula evaluation (requires wolfxl.calc) + # ------------------------------------------------------------------ + + def calculate(self) -> dict[str, Any]: + """Evaluate all formulas in the workbook. + + Returns a dict of cell_ref -> computed value for all formula cells. + Requires the ``wolfxl.calc`` module (install via ``pip install wolfxl[calc]``). + + The internal evaluator is cached so that a subsequent + :meth:`recalculate` call can reuse it without rescanning. + """ + from wolfxl.calc._evaluator import WorkbookEvaluator + + ev = WorkbookEvaluator() + ev.load(self) + result = ev.calculate() + self._evaluator = ev # cache for recalculate() + return result + + def recalculate( + self, + perturbations: dict[str, float | int], + tolerance: float = 1e-10, + ) -> RecalcResult: + """Perturb input cells and recompute affected formulas. + + Returns a ``RecalcResult`` describing which cells changed. + Requires the ``wolfxl.calc`` module. + + If :meth:`calculate` was called first, the cached evaluator is + reused (avoiding a full rescan + recalculate). + """ + ev = self._evaluator + if ev is None: + from wolfxl.calc._evaluator import WorkbookEvaluator + + ev = WorkbookEvaluator() + ev.load(self) + ev.calculate() + self._evaluator = ev + return ev.recalculate(perturbations, tolerance) + # ------------------------------------------------------------------ # Context manager + cleanup # ------------------------------------------------------------------ diff --git a/python/wolfxl/calc/__init__.py b/python/wolfxl/calc/__init__.py new file mode 100644 index 0000000..51530cc --- /dev/null +++ b/python/wolfxl/calc/__init__.py @@ -0,0 +1,21 @@ +"""wolfxl.calc - Formula evaluation engine for wolfxl workbooks.""" + +from wolfxl.calc._evaluator import WorkbookEvaluator +from wolfxl.calc._functions import FUNCTION_WHITELIST_V1, FunctionRegistry, is_supported +from wolfxl.calc._graph import DependencyGraph +from wolfxl.calc._parser import FormulaParser, all_references, expand_range +from wolfxl.calc._protocol import CalcEngine, CellDelta, RecalcResult + +__all__ = [ + "CalcEngine", + "CellDelta", + "DependencyGraph", + "FUNCTION_WHITELIST_V1", + "FormulaParser", + "FunctionRegistry", + "RecalcResult", + "WorkbookEvaluator", + "all_references", + "expand_range", + "is_supported", +] diff --git a/python/wolfxl/calc/_evaluator.py b/python/wolfxl/calc/_evaluator.py new file mode 100644 index 0000000..812a6bc --- /dev/null +++ b/python/wolfxl/calc/_evaluator.py @@ -0,0 +1,553 @@ +"""WorkbookEvaluator: recursive expression evaluator for Excel formulas. + +Replaces fragile regex-based dispatch with a proper recursive descent +parser that handles balanced parentheses, operator precedence, and +arbitrarily nested expressions like ``=ROUND(SUM(A1:A5)*IF(B1>0,1.1,1.0),2)``. +""" + +from __future__ import annotations + +import logging +import re +from typing import TYPE_CHECKING, Any + +from wolfxl.calc._functions import FunctionRegistry +from wolfxl.calc._graph import DependencyGraph +from wolfxl.calc._parser import expand_range +from wolfxl.calc._protocol import CellDelta, RecalcResult + +if TYPE_CHECKING: + from wolfxl._workbook import Workbook + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Expression parsing helpers +# --------------------------------------------------------------------------- + + +def _find_matching_paren(expr: str, start: int) -> int: + """Index of the ``')'`` matching the ``'('`` at *expr[start]*, or -1.""" + depth = 1 + i = start + 1 + in_string = False + while i < len(expr): + ch = expr[i] + if ch == '"': + in_string = not in_string + elif not in_string: + if ch == '(': + depth += 1 + elif ch == ')': + depth -= 1 + if depth == 0: + return i + i += 1 + return -1 + + +def _match_function_call(expr: str) -> tuple[str, str] | None: + """If *expr* is exactly ``FUNC(balanced_args)``, return ``(name, args_str)``. + + Uses balanced parenthesis matching so ``SUM(A1:A5)*2`` is NOT matched + (there's trailing content after the close-paren). + """ + stripped = expr.strip() + m = re.match(r'^([A-Z][A-Z0-9_.]*)\s*\(', stripped, re.IGNORECASE) + if not m: + return None + open_idx = m.end() - 1 # position of '(' + close_idx = _find_matching_paren(stripped, open_idx) + # The close-paren must be the very last character + if close_idx >= 0 and close_idx == len(stripped) - 1: + return (m.group(1), stripped[open_idx + 1 : close_idx]) + return None + + +def _find_top_level_split(expr: str) -> tuple[str, str, str] | None: + """Find the rightmost lowest-precedence binary operator at paren depth 0. + + Precedence (lowest to highest):: + + 1. comparison (>=, <=, <>, >, <, =) + 2. additive (+, -) + 3. multiplicative (*, /) + + Right-to-left scan produces correct left-to-right associativity. + Returns ``(left, op, right)`` or ``None``. + """ + length = len(expr) + + for pass_type in ("cmp", "add", "mul"): + depth = 0 + in_string = False + i = length - 1 + while i > 0: + ch = expr[i] + + # Skip string literals + if ch == '"': + in_string = not in_string + i -= 1 + continue + if in_string: + i -= 1 + continue + + # Track parentheses (inverted for right-to-left) + if ch == ')': + depth += 1 + i -= 1 + continue + if ch == '(': + depth -= 1 + i -= 1 + continue + + if depth != 0: + i -= 1 + continue + + matched_op: str | None = None + op_start = i + + if pass_type == "cmp": + # 2-char comparison operators checked first + if i >= 1 and expr[i - 1 : i + 1] in (">=", "<=", "<>"): + matched_op = expr[i - 1 : i + 1] + op_start = i - 1 + elif ch in ('>', '<'): + matched_op = ch + elif ch == '=' and not (i >= 1 and expr[i - 1] in ('>', '<', '!')): + matched_op = ch + elif pass_type == "add" and ch in ('+', '-'): + matched_op = ch + elif pass_type == "mul" and ch in ('*', '/'): + matched_op = ch + + if matched_op is not None: + # Verify it's a binary operator (not unary prefix) + if op_start <= 0: + i -= 1 + continue + # Check preceding non-space character + j = op_start - 1 + while j >= 0 and expr[j] == ' ': + j -= 1 + if j < 0 or expr[j] in ('(', ',', '+', '-', '*', '/', '>', '<', '='): + i -= 1 + continue + # Skip +/- that are part of scientific notation (e.g. 2.5e-1) + if matched_op in ('+', '-') and j >= 1 and expr[j] in ('e', 'E'): + pre_e = j - 1 + if pre_e >= 0 and expr[pre_e].isdigit(): + i -= 1 + continue + + left = expr[:op_start].strip() + right = expr[op_start + len(matched_op) :].strip() + if left and right: + return (left, matched_op, right) + + i -= 1 + + return None + + +def _has_top_level_colon(expr: str) -> bool: + """``True`` when *expr* contains ``:`` at paren depth 0 (range ref).""" + depth = 0 + for ch in expr: + if ch == '(': + depth += 1 + elif ch == ')': + depth -= 1 + elif ch == ':' and depth == 0: + return True + return False + + +def _binary_op(left: Any, op: str, right: Any) -> Any: + """Evaluate an arithmetic binary operation.""" + if not isinstance(left, (int, float)) or not isinstance(right, (int, float)): + return None + if op == '+': + return left + right + if op == '-': + return left - right + if op == '*': + return left * right + if op == '/': + return "#DIV/0!" if right == 0 else left / right + return None + + +def _compare(left: Any, right: Any, op: str) -> bool: + """Evaluate a comparison operation. + + Handles both numeric and string comparisons. String comparisons are + case-insensitive (matching Excel behavior). + """ + # Both numeric -> numeric comparison + if isinstance(left, (int, float)) and isinstance(right, (int, float)): + lf, rf = left, right + else: + # Try numeric coercion first + try: + lf = float(left) if not isinstance(left, (int, float)) else left + rf = float(right) if not isinstance(right, (int, float)) else right + except (ValueError, TypeError): + # Fall back to string comparison (case-insensitive, like Excel) + ls = str(left).lower() if left is not None else "" + rs = str(right).lower() if right is not None else "" + if op in ('=', '=='): + return ls == rs + if op in ('<>', '!='): + return ls != rs + if op == '>': + return ls > rs + if op == '<': + return ls < rs + if op == '>=': + return ls >= rs + if op == '<=': + return ls <= rs + return False + if op == '>': + return lf > rf + if op == '<': + return lf < rf + if op == '>=': + return lf >= rf + if op == '<=': + return lf <= rf + if op in ('=', '=='): + return lf == rf + if op in ('<>', '!='): + return lf != rf + return False + + +def _values_differ(a: Any, b: Any, tolerance: float) -> bool: + """Check if two values differ beyond tolerance.""" + if a is None and b is None: + return False + if a is None or b is None: + return True + if isinstance(a, (int, float)) and isinstance(b, (int, float)): + return abs(float(a) - float(b)) > tolerance + return a != b + + +# --------------------------------------------------------------------------- +# Evaluator +# --------------------------------------------------------------------------- + + +class WorkbookEvaluator: + """Evaluates Excel formulas in a wolfxl Workbook. + + Usage:: + + evaluator = WorkbookEvaluator() + evaluator.load(workbook) + results = evaluator.calculate() + recalc = evaluator.recalculate({"Sheet1!A1": 42.0}) + """ + + def __init__(self) -> None: + self._cell_values: dict[str, Any] = {} + self._graph = DependencyGraph() + self._functions = FunctionRegistry() + self._loaded = False + + def load(self, workbook: Workbook) -> None: + """Scan workbook, store cell values, build dependency graph.""" + self._cell_values.clear() + self._graph = DependencyGraph() + + for sheet_name in workbook.sheetnames: + ws = workbook[sheet_name] + for row in ws.iter_rows(values_only=False): + for cell in row: + val = cell.value + cell_ref = f"{sheet_name}!{cell.coordinate}" + if isinstance(val, str) and val.startswith("="): + # Formula cell: store formula string, register in graph + self._cell_values[cell_ref] = val + self._graph.add_formula(cell_ref, val, sheet_name) + elif val is not None: + # Value cell: store the value + self._cell_values[cell_ref] = val + + self._loaded = True + + def calculate(self) -> dict[str, Any]: + """Evaluate all formulas in topological order. + + Returns dict of cell_ref -> computed value for formula cells. + """ + if not self._loaded: + raise RuntimeError("Call load() before calculate()") + + order = self._graph.topological_order() + results: dict[str, Any] = {} + + for cell_ref in order: + formula = self._graph.formulas[cell_ref] + value = self._evaluate_formula(cell_ref, formula) + self._cell_values[cell_ref] = value + results[cell_ref] = value + + return results + + def recalculate( + self, + perturbations: dict[str, float | int], + tolerance: float = 1e-10, + ) -> RecalcResult: + """Perturb input cells and recompute affected formulas.""" + if not self._loaded: + raise RuntimeError("Call load() before recalculate()") + + # Snapshot old values for delta computation + old_values: dict[str, Any] = {} + for cell_ref in self._graph.formulas: + old_values[cell_ref] = self._cell_values.get(cell_ref) + + # Apply perturbations + for cell_ref, value in perturbations.items(): + self._cell_values[cell_ref] = value + + # Find and evaluate affected cells + affected = self._graph.affected_cells(set(perturbations.keys())) + for cell_ref in affected: + formula = self._graph.formulas[cell_ref] + value = self._evaluate_formula(cell_ref, formula) + self._cell_values[cell_ref] = value + + # Build deltas + deltas: list[CellDelta] = [] + propagated = 0 + for cell_ref in affected: + old_val = old_values.get(cell_ref) + new_val = self._cell_values.get(cell_ref) + if _values_differ(old_val, new_val, tolerance): + propagated += 1 + deltas.append(CellDelta( + cell_ref=cell_ref, + old_value=old_val, + new_value=new_val, + formula=self._graph.formulas.get(cell_ref), + )) + + max_depth = self._graph.max_depth(set(perturbations.keys())) + + return RecalcResult( + perturbations=dict(perturbations), + deltas=tuple(deltas), + total_formula_cells=len(self._graph.formulas), + propagated_cells=propagated, + max_chain_depth=max_depth, + ) + + # ------------------------------------------------------------------ + # Formula evaluation (recursive descent) + # ------------------------------------------------------------------ + + def _evaluate_formula(self, cell_ref: str, formula: str) -> Any: + """Evaluate a single formula string (starting with ``=``).""" + body = formula.strip() + if body.startswith('='): + body = body[1:] + sheet = self._sheet_from_ref(cell_ref) + result = self._eval_expr(body.strip(), sheet) + if result is not None: + return result + logger.debug("Cannot evaluate formula %r in %s", formula, cell_ref) + return None + + def _eval_expr(self, expr: str, sheet: str) -> Any: + """Recursively evaluate an expression (no leading ``=``). + + Dispatch order (first match wins): + + 1. Binary/comparison split at top level (paren-aware, precedence-correct) + 2. Parenthesized sub-expression ``(...)`` + 3. Function call ``FUNC(balanced_args)`` + 4. Unary minus / plus + 5. Numeric literal + 6. String literal + 7. Boolean literal + 8. Cell reference + """ + expr = expr.strip() + if not expr: + return None + + # 1. Binary split (comparison → additive → multiplicative) + split = _find_top_level_split(expr) + if split: + left_str, op, right_str = split + left_val = self._eval_expr(left_str, sheet) + right_val = self._eval_expr(right_str, sheet) + if op in ('+', '-', '*', '/'): + return _binary_op(left_val, op, right_val) + return _compare(left_val, right_val, op) + + # 2. Parenthesized sub-expression: (expr) + if expr.startswith('('): + close = _find_matching_paren(expr, 0) + if close == len(expr) - 1: + return self._eval_expr(expr[1:close], sheet) + + # 3. Function call: FUNC(balanced_args) + func = _match_function_call(expr) + if func: + return self._eval_function(func[0].upper(), func[1], sheet) + + # 4. Unary minus / plus + if expr.startswith('-'): + val = self._eval_expr(expr[1:], sheet) + if isinstance(val, (int, float)): + return -val + return val + if expr.startswith('+'): + return self._eval_expr(expr[1:], sheet) + + # 5. Numeric literal (int, float, and scientific notation like 1E3) + try: + num = float(expr) + except ValueError: + pass + else: + # Preserve int for plain integer literals + if re.fullmatch(r'[+-]?\d+', expr): + return int(expr) + return num + + # 6. String literal + if len(expr) >= 2 and expr[0] == '"' and expr[-1] == '"': + return expr[1:-1] + + # 7. Boolean + upper = expr.upper() + if upper == 'TRUE': + return True + if upper == 'FALSE': + return False + + # 8. Cell reference + return self._resolve_cell_ref(expr, sheet) + + # ------------------------------------------------------------------ + # Atom / argument resolution + # ------------------------------------------------------------------ + + def _resolve_cell_ref(self, expr: str, sheet: str) -> Any: + """Resolve a cell reference string to its stored value.""" + clean = expr.strip().replace('$', '') + if '!' in clean: + parts = clean.split('!', 1) + ref_sheet = parts[0].strip("'") + ref = f"{ref_sheet}!{parts[1].upper()}" + else: + ref = f"{sheet}!{clean.upper()}" + return self._cell_values.get(ref) + + def _resolve_range(self, arg: str, sheet: str) -> list[Any]: + """Resolve a range like ``A1:A5`` to a list of cell values.""" + clean = arg.strip().replace('$', '') + if '!' not in clean: + range_ref = f"{sheet}!{clean.upper()}" + else: + parts = clean.split('!', 1) + ref_sheet = parts[0].strip("'") + range_ref = f"{ref_sheet}!{parts[1].upper()}" + cells = expand_range(range_ref) + return [self._cell_values.get(c) for c in cells] + + # ------------------------------------------------------------------ + # Function dispatch + # ------------------------------------------------------------------ + + def _eval_function(self, func_name: str, args_str: str, sheet: str) -> Any: + """Evaluate a function call with resolved arguments.""" + func = self._functions.get(func_name) + if func is None: + logger.debug("Unsupported function: %s", func_name) + return None + args = self._parse_function_args(args_str, sheet) + try: + return func(args) + except Exception as e: + logger.debug("Error evaluating %s: %s", func_name, e) + return None + + def _parse_function_args(self, args_str: str, sheet: str) -> list[Any]: + """Split on commas at depth 0 (respecting strings), resolve each argument.""" + args: list[Any] = [] + depth = 0 + in_string = False + current = "" + i = 0 + length = len(args_str) + + while i < length: + ch = args_str[i] + + if ch == '"': + if in_string: + # Handle Excel escaped quote ("") + if i + 1 < length and args_str[i + 1] == '"': + current += '""' + i += 2 + continue + in_string = False + else: + in_string = True + current += ch + elif not in_string: + if ch == '(': + depth += 1 + current += ch + elif ch == ')': + depth -= 1 + current += ch + elif ch == ',' and depth == 0: + args.append(self._resolve_arg(current.strip(), sheet)) + current = "" + else: + current += ch + else: + current += ch + + i += 1 + + if current.strip(): + args.append(self._resolve_arg(current.strip(), sheet)) + + return args + + def _resolve_arg(self, arg: str, sheet: str) -> Any: + """Resolve a single function argument. + + Range references (containing ``:`` at depth 0) return a list of + cell values. Everything else delegates to ``_eval_expr``. + """ + if not arg: + return None + + # Range reference at top level + if _has_top_level_colon(arg) and not arg.startswith('"'): + return self._resolve_range(arg, sheet) + + return self._eval_expr(arg, sheet) + + @staticmethod + def _sheet_from_ref(cell_ref: str) -> str: + """Extract sheet name from a canonical cell reference.""" + if '!' in cell_ref: + return cell_ref.rsplit('!', 1)[0] + return 'Sheet1' diff --git a/python/wolfxl/calc/_functions.py b/python/wolfxl/calc/_functions.py new file mode 100644 index 0000000..9708fc5 --- /dev/null +++ b/python/wolfxl/calc/_functions.py @@ -0,0 +1,264 @@ +"""Function whitelist and builtin implementations for formula evaluation.""" + +from __future__ import annotations + +import math +from typing import Any, Callable + +# --------------------------------------------------------------------------- +# Whitelist: functions the calc engine will attempt to evaluate. +# Organized by category for readability. +# --------------------------------------------------------------------------- + +FUNCTION_WHITELIST_V1: dict[str, str] = { + # Math (10) + "SUM": "math", + "ABS": "math", + "ROUND": "math", + "ROUNDUP": "math", + "ROUNDDOWN": "math", + "INT": "math", + "MOD": "math", + "POWER": "math", + "SQRT": "math", + "SIGN": "math", + # Logic (5) + "IF": "logic", + "AND": "logic", + "OR": "logic", + "NOT": "logic", + "IFERROR": "logic", + # Lookup (6) + "VLOOKUP": "lookup", + "HLOOKUP": "lookup", + "INDEX": "lookup", + "MATCH": "lookup", + "OFFSET": "lookup", + "CHOOSE": "lookup", + # Statistical (6) + "AVERAGE": "statistical", + "COUNT": "statistical", + "COUNTA": "statistical", + "COUNTIF": "statistical", + "MIN": "statistical", + "MAX": "statistical", + # Financial (7) + "PV": "financial", + "FV": "financial", + "PMT": "financial", + "NPV": "financial", + "IRR": "financial", + "SLN": "financial", + "DB": "financial", + # Text (5) + "LEFT": "text", + "RIGHT": "text", + "MID": "text", + "LEN": "text", + "CONCATENATE": "text", +} + + +def is_supported(func_name: str) -> bool: + """Check if a function name is in the evaluation whitelist.""" + return func_name.upper() in FUNCTION_WHITELIST_V1 + + +# --------------------------------------------------------------------------- +# Builtin implementations - pure Python, no external deps. +# Each takes a list of resolved argument values. +# --------------------------------------------------------------------------- + + +def _coerce_numeric(values: list[Any]) -> list[float]: + """Flatten and coerce values to floats, skipping None/str/bool.""" + result: list[float] = [] + for v in values: + if isinstance(v, (list, tuple)): + result.extend(_coerce_numeric(list(v))) + elif isinstance(v, bool): + # In Excel, TRUE=1, FALSE=0 in numeric context + result.append(float(v)) + elif isinstance(v, (int, float)): + result.append(float(v)) + # Skip None, str, errors + return result + + +def _builtin_sum(args: list[Any]) -> float: + return sum(_coerce_numeric(args)) + + +def _builtin_abs(args: list[Any]) -> float: + if len(args) != 1: + raise ValueError("ABS requires exactly 1 argument") + nums = _coerce_numeric(args) + if not nums: + raise ValueError("ABS: non-numeric argument") + return abs(nums[0]) + + +def _builtin_round(args: list[Any]) -> float: + if len(args) < 1 or len(args) > 2: + raise ValueError("ROUND requires 1 or 2 arguments") + nums = _coerce_numeric([args[0]]) + if not nums: + raise ValueError("ROUND: non-numeric argument") + digits = int(_coerce_numeric([args[1]])[0]) if len(args) > 1 else 0 + return round(nums[0], digits) + + +def _builtin_roundup(args: list[Any]) -> float: + if len(args) < 1 or len(args) > 2: + raise ValueError("ROUNDUP requires 1 or 2 arguments") + nums = _coerce_numeric([args[0]]) + if not nums: + raise ValueError("ROUNDUP: non-numeric argument") + digits = int(_coerce_numeric([args[1]])[0]) if len(args) > 1 else 0 + if digits == 0: + return float(math.ceil(nums[0])) + factor = 10 ** digits + return math.ceil(nums[0] * factor) / factor + + +def _builtin_int(args: list[Any]) -> float: + if len(args) != 1: + raise ValueError("INT requires exactly 1 argument") + nums = _coerce_numeric(args) + if not nums: + raise ValueError("INT: non-numeric argument") + return float(math.floor(nums[0])) + + +def _builtin_if(args: list[Any]) -> Any: + if len(args) < 2 or len(args) > 3: + raise ValueError("IF requires 2 or 3 arguments") + condition = args[0] + # Excel truthy: 0/False/None/"" are falsy + truthy = bool(condition) if not isinstance(condition, (int, float)) else condition != 0 + if truthy: + return args[1] + return args[2] if len(args) > 2 else False + + +def _builtin_iferror(args: list[Any]) -> Any: + if len(args) != 2: + raise ValueError("IFERROR requires exactly 2 arguments") + value = args[0] + # If the value is an error string (e.g., "#DIV/0!"), return the fallback + if isinstance(value, str) and value.startswith("#"): + return args[1] + return value + + +def _builtin_and(args: list[Any]) -> bool: + if not args: + raise ValueError("AND requires at least 1 argument") + for a in args: + if isinstance(a, (list, tuple)): + if not all(bool(x) for x in a if x is not None): + return False + elif not a: + return False + return True + + +def _builtin_or(args: list[Any]) -> bool: + if not args: + raise ValueError("OR requires at least 1 argument") + for a in args: + if isinstance(a, (list, tuple)): + if any(bool(x) for x in a if x is not None): + return True + elif a: + return True + return False + + +def _builtin_not(args: list[Any]) -> bool: + if len(args) != 1: + raise ValueError("NOT requires exactly 1 argument") + return not bool(args[0]) + + +def _builtin_count(args: list[Any]) -> float: + """COUNT - counts numeric values only.""" + return float(len(_coerce_numeric(args))) + + +def _builtin_counta(args: list[Any]) -> float: + """COUNTA - counts non-empty values.""" + count = 0 + for v in args: + if isinstance(v, (list, tuple)): + count += sum(1 for x in v if x is not None) + elif v is not None: + count += 1 + return float(count) + + +def _builtin_min(args: list[Any]) -> float: + nums = _coerce_numeric(args) + if not nums: + return 0.0 + return min(nums) + + +def _builtin_max(args: list[Any]) -> float: + nums = _coerce_numeric(args) + if not nums: + return 0.0 + return max(nums) + + +def _builtin_average(args: list[Any]) -> float: + nums = _coerce_numeric(args) + if not nums: + raise ValueError("AVERAGE: no numeric values") + return sum(nums) / len(nums) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +_BUILTINS: dict[str, Callable[[list[Any]], Any]] = { + "SUM": _builtin_sum, + "ABS": _builtin_abs, + "ROUND": _builtin_round, + "ROUNDUP": _builtin_roundup, + "INT": _builtin_int, + "IF": _builtin_if, + "IFERROR": _builtin_iferror, + "AND": _builtin_and, + "OR": _builtin_or, + "NOT": _builtin_not, + "COUNT": _builtin_count, + "COUNTA": _builtin_counta, + "MIN": _builtin_min, + "MAX": _builtin_max, + "AVERAGE": _builtin_average, +} + + +class FunctionRegistry: + """Registry of callable function implementations. + + Starts with builtins and can be extended with custom functions. + """ + + def __init__(self) -> None: + self._functions: dict[str, Callable[[list[Any]], Any]] = dict(_BUILTINS) + + def register(self, name: str, func: Callable[[list[Any]], Any]) -> None: + self._functions[name.upper()] = func + + def get(self, name: str) -> Callable[[list[Any]], Any] | None: + return self._functions.get(name.upper()) + + def has(self, name: str) -> bool: + return name.upper() in self._functions + + @property + def supported_functions(self) -> frozenset[str]: + return frozenset(self._functions.keys()) diff --git a/python/wolfxl/calc/_graph.py b/python/wolfxl/calc/_graph.py new file mode 100644 index 0000000..1fc1fbb --- /dev/null +++ b/python/wolfxl/calc/_graph.py @@ -0,0 +1,140 @@ +"""Dependency graph for formula cells with topological ordering.""" + +from __future__ import annotations + +from collections import deque +from typing import TYPE_CHECKING + +from wolfxl.calc._parser import all_references + +if TYPE_CHECKING: + from wolfxl._workbook import Workbook + + +class DependencyGraph: + """Tracks formula cell dependencies for evaluation ordering. + + All cell references use canonical "SheetName!A1" format. + """ + + __slots__ = ("dependencies", "dependents", "formulas") + + def __init__(self) -> None: + # cell -> set of cells it reads from + self.dependencies: dict[str, set[str]] = {} + # cell -> set of cells that read from it (reverse edges) + self.dependents: dict[str, set[str]] = {} + # cell -> formula string + self.formulas: dict[str, str] = {} + + def add_formula(self, cell_ref: str, formula: str, current_sheet: str) -> None: + """Register a formula cell and its dependencies.""" + self.formulas[cell_ref] = formula + refs = all_references(formula, current_sheet) + + self.dependencies[cell_ref] = set(refs) + + for ref in refs: + if ref not in self.dependents: + self.dependents[ref] = set() + self.dependents[ref].add(cell_ref) + + def topological_order(self) -> list[str]: + """Return formula cells in evaluation order (Kahn's algorithm). + + Raises ValueError if a circular reference is detected. + """ + # Only consider formula cells (sorted for determinism) + formula_cells = set(self.formulas.keys()) + if not formula_cells: + return [] + + # Compute in-degrees within formula cells only + in_degree: dict[str, int] = {} + for cell in formula_cells: + deps = self.dependencies.get(cell, set()) + # Only count deps that are themselves formula cells + in_degree[cell] = len(deps & formula_cells) + + # Start with formula cells that have no formula-cell dependencies + # Sorted to ensure deterministic output across runs (Python hash randomization) + queue: deque[str] = deque(sorted( + cell for cell in formula_cells if in_degree[cell] == 0 + )) + + order: list[str] = [] + while queue: + cell = queue.popleft() + order.append(cell) + # Reduce in-degree for dependent formula cells (sorted for determinism) + for dep in sorted(self.dependents.get(cell, set())): + if dep in formula_cells: + in_degree[dep] -= 1 + if in_degree[dep] == 0: + queue.append(dep) + + if len(order) != len(formula_cells): + missing = formula_cells - set(order) + raise ValueError(f"Circular reference detected involving: {missing}") + + return order + + def affected_cells(self, changed_cells: set[str]) -> list[str]: + """Find all formula cells affected by changes, in evaluation order. + + Uses BFS on the dependents graph, then filters to topological order. + """ + affected: set[str] = set() + queue: deque[str] = deque(changed_cells) + visited: set[str] = set(changed_cells) + + while queue: + cell = queue.popleft() + for dep in self.dependents.get(cell, set()): + if dep not in visited: + visited.add(dep) + queue.append(dep) + if dep in self.formulas: + affected.add(dep) + + # Return in topological order + full_order = self.topological_order() + return [c for c in full_order if c in affected] + + def max_depth(self, roots: set[str]) -> int: + """Longest dependency chain from root cells through formula cells.""" + if not roots: + return 0 + + depth: dict[str, int] = {r: 0 for r in roots} + queue: deque[str] = deque(roots) + max_d = 0 + + while queue: + cell = queue.popleft() + current_depth = depth[cell] + for dep in self.dependents.get(cell, set()): + if dep in self.formulas: + new_depth = current_depth + 1 + if dep not in depth or new_depth > depth[dep]: + depth[dep] = new_depth + max_d = max(max_d, new_depth) + queue.append(dep) + + return max_d + + @classmethod + def from_workbook(cls, workbook: Workbook) -> DependencyGraph: + """Build a dependency graph by scanning all sheets for formula cells.""" + graph = cls() + + for sheet_name in workbook.sheetnames: + ws = workbook[sheet_name] + for row in ws.iter_rows(values_only=False): + for cell in row: + val = cell.value + if isinstance(val, str) and val.startswith("="): + cell_ref = f"{sheet_name}!{cell.coordinate}" + graph.add_formula(cell_ref, val, sheet_name) + + return graph diff --git a/python/wolfxl/calc/_parser.py b/python/wolfxl/calc/_parser.py new file mode 100644 index 0000000..b004761 --- /dev/null +++ b/python/wolfxl/calc/_parser.py @@ -0,0 +1,235 @@ +"""Formula parser: regex-based reference extraction + optional formulas lib.""" + +from __future__ import annotations + +import re +from typing import Any + +from wolfxl._utils import a1_to_rowcol, rowcol_to_a1 + +# --------------------------------------------------------------------------- +# Regex patterns for Excel formula reference extraction +# --------------------------------------------------------------------------- + +# Single cell ref: A1, $A$1, $A1, A$1 (with optional sheet prefix) +_SHEET_PREFIX = r"(?:'([^']+)'!|([A-Za-z0-9_]+)!)" +_CELL_REF = r"\$?([A-Z]{1,3})\$?(\d+)" +_SINGLE_REF_RE = re.compile( + rf"(?:{_SHEET_PREFIX})?{_CELL_REF}", + re.IGNORECASE, +) + +# Range: A1:B5 (with optional sheet prefix, applied to start only) +_RANGE_REF_RE = re.compile( + rf"(?:{_SHEET_PREFIX})?{_CELL_REF}\s*:\s*{_CELL_REF}", + re.IGNORECASE, +) + +# Function names: SUM(...), VLOOKUP(...) +_FUNC_RE = re.compile(r"([A-Z][A-Z0-9_.]+)\s*\(", re.IGNORECASE) + +# Strings in formulas (to skip refs inside string literals) +_STRING_RE = re.compile(r'"[^"]*"') + + +def _strip_strings(formula: str) -> str: + """Remove string literals so refs inside quotes aren't matched.""" + return _STRING_RE.sub("", formula) + + +# --------------------------------------------------------------------------- +# Reference extraction +# --------------------------------------------------------------------------- + + +def parse_references(formula: str, current_sheet: str = "Sheet1") -> list[str]: + """Extract all single cell references from a formula. + + Returns canonical "SheetName!A1" strings (no dollar signs, unquoted). + Does NOT include range references - use parse_range_references for those. + """ + clean = _strip_strings(formula) + refs: list[str] = [] + seen: set[str] = set() + + # First extract ranges so we can skip their individual refs + range_spans: list[tuple[int, int]] = [] + for m in _RANGE_REF_RE.finditer(clean): + range_spans.append((m.start(), m.end())) + + for m in _SINGLE_REF_RE.finditer(clean): + # Skip if this match is inside a range match + pos = m.start() + in_range = any(s <= pos < e for s, e in range_spans) + if in_range: + continue + + sheet = m.group(1) or m.group(2) or current_sheet + col_str = m.group(3).upper() + row_str = m.group(4) + canonical = f"{sheet}!{col_str}{row_str}" + if canonical not in seen: + refs.append(canonical) + seen.add(canonical) + + return refs + + +def parse_range_references(formula: str, current_sheet: str = "Sheet1") -> list[str]: + """Extract all range references from a formula. + + Returns canonical "SheetName!A1:B5" strings. + """ + clean = _strip_strings(formula) + ranges: list[str] = [] + seen: set[str] = set() + + for m in _RANGE_REF_RE.finditer(clean): + sheet = m.group(1) or m.group(2) or current_sheet + start_col = m.group(3).upper() + start_row = m.group(4) + end_col = m.group(5).upper() + end_row = m.group(6) + canonical = f"{sheet}!{start_col}{start_row}:{end_col}{end_row}" + if canonical not in seen: + ranges.append(canonical) + seen.add(canonical) + + return ranges + + +def parse_functions(formula: str) -> list[str]: + """Extract all function names used in a formula.""" + clean = _strip_strings(formula) + funcs: list[str] = [] + seen: set[str] = set() + for m in _FUNC_RE.finditer(clean): + name = m.group(1).upper() + if name not in seen: + funcs.append(name) + seen.add(name) + return funcs + + +# --------------------------------------------------------------------------- +# Range expansion +# --------------------------------------------------------------------------- + + +def expand_range(range_ref: str) -> list[str]: + """Expand a range like "A1:A5" into individual cell refs ["A1", "A2", ..., "A5"]. + + The range_ref can be with or without sheet prefix. + Returns refs in the same format as input (with or without sheet). + """ + sheet: str | None = None + ref_part = range_ref + + # Check for sheet prefix + if "!" in range_ref: + sheet, ref_part = range_ref.rsplit("!", 1) + sheet = sheet.strip("'") + + parts = ref_part.split(":") + if len(parts) != 2: + raise ValueError(f"Invalid range: {range_ref!r}") + + start_row, start_col = a1_to_rowcol(parts[0].replace("$", "")) + end_row, end_col = a1_to_rowcol(parts[1].replace("$", "")) + + # Normalize order + r_min, r_max = min(start_row, end_row), max(start_row, end_row) + c_min, c_max = min(start_col, end_col), max(start_col, end_col) + + cells: list[str] = [] + for r in range(r_min, r_max + 1): + for c in range(c_min, c_max + 1): + ref = rowcol_to_a1(r, c) + if sheet is not None: + cells.append(f"{sheet}!{ref}") + else: + cells.append(ref) + + return cells + + +# --------------------------------------------------------------------------- +# All-references extraction (combines singles + expanded ranges) +# --------------------------------------------------------------------------- + + +def all_references(formula: str, current_sheet: str = "Sheet1") -> list[str]: + """Extract all cell references (single + range-expanded) from a formula. + + Returns canonical "SheetName!A1" strings with ranges fully expanded. + """ + refs: list[str] = [] + seen: set[str] = set() + + # Single refs (excluding those inside ranges) + for ref in parse_references(formula, current_sheet): + if ref not in seen: + refs.append(ref) + seen.add(ref) + + # Expand ranges + for rng in parse_range_references(formula, current_sheet): + for ref in expand_range(rng): + if ref not in seen: + refs.append(ref) + seen.add(ref) + + return refs + + +# --------------------------------------------------------------------------- +# FormulaParser: optional formulas lib integration +# --------------------------------------------------------------------------- + +_formulas_available: bool | None = None + + +def _check_formulas() -> bool: + global _formulas_available + if _formulas_available is None: + try: + import formulas # noqa: F401 + + _formulas_available = True + except ImportError: + _formulas_available = False + return _formulas_available + + +class FormulaParser: + """Parses Excel formulas for reference extraction and optional compilation. + + The compile() method tries the `formulas` library first. If unavailable, + returns None and the evaluator falls back to builtin function dispatch. + """ + + def __init__(self) -> None: + self._use_formulas = _check_formulas() + + def parse_refs(self, formula: str, current_sheet: str = "Sheet1") -> list[str]: + """Extract all cell references from a formula (always works).""" + return all_references(formula, current_sheet) + + def compile(self, formula: str) -> Any | None: + """Try to compile a formula into a callable. + + Returns a compiled function or None if compilation fails. + The compiled function is not used in the current implementation - + we rely on builtin dispatch instead for determinism. + """ + if not self._use_formulas: + return None + try: + import formulas as fm + + result = fm.Parser().ast(formula) + if result and len(result) > 1: + return result[1].compile() + except Exception: + pass + return None diff --git a/python/wolfxl/calc/_protocol.py b/python/wolfxl/calc/_protocol.py new file mode 100644 index 0000000..2083bb2 --- /dev/null +++ b/python/wolfxl/calc/_protocol.py @@ -0,0 +1,63 @@ +"""CalcEngine protocol and result dataclasses.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from wolfxl._workbook import Workbook + + +@dataclass(frozen=True) +class CellDelta: + """A single cell's value change from recalculation.""" + + cell_ref: str # canonical "SheetName!A1" + old_value: float | int | str | bool | None + new_value: float | int | str | bool | None + formula: str | None = None # the formula that produced new_value + + +@dataclass(frozen=True) +class RecalcResult: + """Result of a perturbation-driven recalculation.""" + + perturbations: dict[str, float | int] # cell_ref -> new input value + deltas: tuple[CellDelta, ...] # cells that changed + total_formula_cells: int = 0 + propagated_cells: int = 0 # formula cells whose value actually changed + max_chain_depth: int = 0 # longest dependency chain from perturbed inputs + + @property + def propagation_ratio(self) -> float: + if self.total_formula_cells == 0: + return 0.0 + return self.propagated_cells / self.total_formula_cells + + +@runtime_checkable +class CalcEngine(Protocol): + """Protocol for formula evaluation engines.""" + + def load(self, workbook: Workbook) -> None: + """Scan a workbook, build dependency graph, compile formulas.""" + ... + + def calculate(self) -> dict[str, float | int | str | bool | None]: + """Evaluate all formulas in topological order. + + Returns a dict of cell_ref -> computed value for all formula cells. + """ + ... + + def recalculate( + self, + perturbations: dict[str, float | int], + tolerance: float = 1e-10, + ) -> RecalcResult: + """Perturb input cells and recompute affected formulas. + + Returns a RecalcResult describing which cells changed. + """ + ... diff --git a/tests/fixtures/calc/cross_sheet.xlsx b/tests/fixtures/calc/cross_sheet.xlsx new file mode 100644 index 0000000..ec33a96 Binary files /dev/null and b/tests/fixtures/calc/cross_sheet.xlsx differ diff --git a/tests/fixtures/calc/hardcoded.xlsx b/tests/fixtures/calc/hardcoded.xlsx new file mode 100644 index 0000000..9ec4e6b Binary files /dev/null and b/tests/fixtures/calc/hardcoded.xlsx differ diff --git a/tests/fixtures/calc/mixed.xlsx b/tests/fixtures/calc/mixed.xlsx new file mode 100644 index 0000000..ea94013 Binary files /dev/null and b/tests/fixtures/calc/mixed.xlsx differ diff --git a/tests/fixtures/calc/sum_chain.xlsx b/tests/fixtures/calc/sum_chain.xlsx new file mode 100644 index 0000000..a0ff14d Binary files /dev/null and b/tests/fixtures/calc/sum_chain.xlsx differ diff --git a/tests/test_calc_evaluator.py b/tests/test_calc_evaluator.py new file mode 100644 index 0000000..c969062 --- /dev/null +++ b/tests/test_calc_evaluator.py @@ -0,0 +1,549 @@ +"""Tests for wolfxl.calc WorkbookEvaluator.""" + +from __future__ import annotations + +import os +import tempfile + +import pytest +from wolfxl.calc._evaluator import WorkbookEvaluator + +import wolfxl + + +def _make_sum_chain_workbook() -> wolfxl.Workbook: + """Create a workbook: A1=10, A2=20, A3=SUM(A1:A2), A4=A3*2.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = "=SUM(A1:A2)" + ws["A4"] = "=A3*2" + return wb + + +def _roundtrip(wb: wolfxl.Workbook) -> tuple[wolfxl.Workbook, str]: + """Save and reload a workbook. Caller must delete the temp file.""" + with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f: + path = f.name + wb.save(path) + return wolfxl.load_workbook(path), path + + +class TestLoadAndCalculate: + def test_sum_chain_write_mode(self) -> None: + wb = _make_sum_chain_workbook() + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!A3"] == 30.0 + assert results["Sheet!A4"] == 60.0 + + def test_sum_chain_after_roundtrip(self) -> None: + wb = _make_sum_chain_workbook() + wb2, path = _roundtrip(wb) + try: + ev = WorkbookEvaluator() + ev.load(wb2) + results = ev.calculate() + assert results["Sheet!A3"] == 30.0 + assert results["Sheet!A4"] == 60.0 + finally: + wb2.close() + os.unlink(path) + + def test_if_conditional(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["B1"] = "=IF(A1>50,A1*2,0)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 200 + + def test_if_false_branch(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["B1"] = "=IF(A1>50,A1*2,0)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 0 + + def test_nested_functions(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 3 + ws["A2"] = -5 + ws["A3"] = 7 + ws["B1"] = "=SUM(A1:A3)" + ws["B2"] = "=ABS(A2)" + ws["B3"] = "=MAX(B1,B2)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 5.0 + assert results["Sheet!B2"] == 5.0 + assert results["Sheet!B3"] == 5.0 + + def test_literal_formula(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = "=42" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!A1"] == 42.0 + + def test_direct_ref(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["B1"] = "=A1" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 100 + + def test_binary_operations(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 3 + ws["B1"] = "=A1+A2" + ws["B2"] = "=A1-A2" + ws["B3"] = "=A1*A2" + ws["B4"] = "=A1/A2" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 13.0 + assert results["Sheet!B2"] == 7.0 + assert results["Sheet!B3"] == 30.0 + assert abs(results["Sheet!B4"] - 10 / 3) < 1e-10 + + def test_iferror(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 0 + ws["B1"] = "=IFERROR(A1,0)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 10 + + +class TestCrossSheet: + def test_cross_sheet_sum(self) -> None: + wb = wolfxl.Workbook() + ws1 = wb.active + ws1["A1"] = 100 + ws1["A2"] = 200 + ws2 = wb.create_sheet("Summary") + ws2["A1"] = "=SUM(Sheet!A1:A2)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Summary!A1"] == 300.0 + + +class TestRecalculate: + def test_perturbation_propagates(self) -> None: + wb = _make_sum_chain_workbook() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + result = ev.recalculate({"Sheet!A1": 15}) + assert result.propagated_cells == 2 # A3 and A4 changed + assert result.total_formula_cells == 2 + assert result.propagation_ratio == 1.0 + assert result.max_chain_depth > 0 + + # Verify new values + delta_map = {d.cell_ref: d for d in result.deltas} + assert delta_map["Sheet!A3"].new_value == 35.0 # 15+20 + assert delta_map["Sheet!A4"].new_value == 70.0 # 35*2 + + def test_hardcoded_no_propagation(self) -> None: + """A workbook with all hardcoded values should have propagation_ratio=0.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 # hardcoded, not formula + ws["A4"] = 60 # hardcoded, not formula + + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + result = ev.recalculate({"Sheet!A1": 15}) + assert result.propagation_ratio == 0.0 + assert result.propagated_cells == 0 + + def test_mixed_propagation(self) -> None: + """Some formulas, some hardcoded.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = "=SUM(A1:A2)" # formula - will propagate + ws["A4"] = 60 # hardcoded - won't propagate + ws["A5"] = "=A3+A4" # formula, depends on A3 (propagates) and A4 (static) + + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + result = ev.recalculate({"Sheet!A1": 15}) + assert result.propagated_cells == 2 # A3 and A5 changed + assert result.total_formula_cells == 2 + assert result.propagation_ratio == 1.0 + + def test_tolerance(self) -> None: + """Small perturbation within tolerance should show no delta.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10.0 + ws["A2"] = "=A1" + + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + # Perturb by exactly 0 (same value) + result = ev.recalculate({"Sheet!A1": 10.0}) + assert result.propagated_cells == 0 + + def test_recalc_result_structure(self) -> None: + wb = _make_sum_chain_workbook() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + result = ev.recalculate({"Sheet!A1": 11}) + assert isinstance(result.perturbations, dict) + assert isinstance(result.deltas, tuple) + assert all(isinstance(d, wolfxl.calc.CellDelta) for d in result.deltas) + assert isinstance(result.propagation_ratio, float) + + +class TestDeterminism: + def test_100_rounds_identical(self) -> None: + """Same perturbation 100 times must produce identical results.""" + wb = _make_sum_chain_workbook() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + results = [] + for _ in range(100): + # Reset to original values + ev._cell_values["Sheet!A1"] = 10 + ev._cell_values["Sheet!A2"] = 20 + ev.calculate() + r = ev.recalculate({"Sheet!A1": 11}) + results.append(r) + + # All results should be identical + first = results[0] + for r in results[1:]: + assert r.propagated_cells == first.propagated_cells + assert r.total_formula_cells == first.total_formula_cells + assert len(r.deltas) == len(first.deltas) + for d1, d2 in zip(first.deltas, r.deltas): + assert d1.cell_ref == d2.cell_ref + assert d1.new_value == d2.new_value + assert d1.old_value == d2.old_value + + +class TestComplexExpressions: + """Complex nested formulas that the regex-based evaluator couldn't handle.""" + + def test_function_times_number(self) -> None: + """=SUM(A1:A2)*2 — function result as binary operand.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["B1"] = "=SUM(A1:A2)*2" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 60.0 + + def test_number_plus_function(self) -> None: + """=5+SUM(A1:A2) — number + function call.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["B1"] = "=5+SUM(A1:A2)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 35.0 + + def test_function_minus_function(self) -> None: + """=SUM(A1:A2)-SUM(A3:A4) — two function calls in binary op.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["A2"] = 200 + ws["A3"] = 50 + ws["A4"] = 75 + ws["B1"] = "=SUM(A1:A2)-SUM(A3:A4)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 175.0 + + def test_round_of_product(self) -> None: + """=ROUND(SUM(A1:A3)*1.1,2) — binary expression inside function arg.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 + ws["B1"] = "=ROUND(SUM(A1:A3)*1.1,2)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 66.0 + + def test_round_sum_times_if(self) -> None: + """=ROUND(SUM(A1:A3)*IF(A4>0,1.1,1.0),2) — the poster-child complex case.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 + ws["A4"] = 1 + ws["B1"] = "=ROUND(SUM(A1:A3)*IF(A4>0,1.1,1.0),2)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 66.0 + + def test_if_with_function_condition_and_args(self) -> None: + """=IF(SUM(A1:A3)>50,SUM(A1:A3)*2,0) — functions in all IF positions.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 + ws["B1"] = "=IF(SUM(A1:A3)>50,SUM(A1:A3)*2,0)" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 120.0 + + def test_operator_precedence(self) -> None: + """=A1+A2*A3 must respect multiplication-first precedence.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 2 + ws["A2"] = 3 + ws["A3"] = 4 + ws["B1"] = "=A1+A2*A3" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 14.0 # 2+(3*4), not (2+3)*4 + + def test_parenthesized_expression(self) -> None: + """=(A1+A2)*A3 — parens override default precedence.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 2 + ws["A2"] = 3 + ws["A3"] = 4 + ws["B1"] = "=(A1+A2)*A3" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 20.0 # (2+3)*4 + + def test_if_result_times_number(self) -> None: + """=IF(A1>0,A1,0)*2 — function result used in binary operation.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["B1"] = "=IF(A1>0,A1,0)*2" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 20.0 + + def test_comparison_at_top_level(self) -> None: + """=A1>B1 should return a boolean.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["B1"] = 50 + ws["C1"] = "=A1>B1" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!C1"] is True + + def test_multi_term_arithmetic(self) -> None: + """=A1+A2+A3-A4 — three additive ops, left-to-right associativity.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 + ws["A4"] = 5 + ws["B1"] = "=A1+A2+A3-A4" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 55.0 + + def test_complex_perturbation_propagation(self) -> None: + """Perturbation through complex formulas still propagates correctly.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["A2"] = 200 + ws["B1"] = "=SUM(A1:A2)*2" # 600 + ws["B2"] = "=IF(B1>500,B1*1.1,0)" # 660 + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + result = ev.recalculate({"Sheet!A1": 110}) + assert result.propagation_ratio == 1.0 + delta_map = {d.cell_ref: d for d in result.deltas} + assert delta_map["Sheet!B1"].new_value == 620.0 # (110+200)*2 + assert abs(delta_map["Sheet!B2"].new_value - 682.0) < 0.01 # 620*1.1 + + +class TestEdgeCases: + def test_load_required_before_calculate(self) -> None: + ev = WorkbookEvaluator() + with pytest.raises(RuntimeError, match="Call load"): + ev.calculate() + + def test_load_required_before_recalculate(self) -> None: + ev = WorkbookEvaluator() + with pytest.raises(RuntimeError, match="Call load"): + ev.recalculate({"Sheet1!A1": 1}) + + def test_empty_workbook(self) -> None: + wb = wolfxl.Workbook() + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results == {} + + def test_division_by_zero(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 0 + ws["B1"] = "=A1/A2" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == "#DIV/0!" + + +class TestTextComparison: + """Verify that string comparisons work in formulas (PR review fix).""" + + def test_string_equality(self) -> None: + """=IF(A1="OK",1,0) with string value in A1.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = "OK" + ws["B1"] = '=IF(A1="OK",1,0)' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 1 + + def test_string_inequality(self) -> None: + """=IF(A1="OK",1,0) with different string value.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = "FAIL" + ws["B1"] = '=IF(A1="OK",1,0)' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 0 + + def test_string_comparison_case_insensitive(self) -> None: + """Excel string comparisons are case-insensitive.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = "ok" + ws["B1"] = '=IF(A1="OK",1,0)' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 1 + + def test_string_not_equal(self) -> None: + """=IF(A1<>"OK",1,0) with string comparison.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = "FAIL" + ws["B1"] = '=IF(A1<>"OK",1,0)' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 1 + + +class TestQuotedCommasInArgs: + """Verify that commas inside string literals don't split args (PR review fix).""" + + def test_if_with_comma_in_string(self) -> None: + """=IF(TRUE,"a,b","c") should not split on the comma inside quotes.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = '=IF(TRUE,"a,b","c")' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!A1"] == "a,b" + + def test_if_false_branch_with_comma(self) -> None: + """=IF(FALSE,"a","b,c") picks the false branch correctly.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = '=IF(FALSE,"a","b,c")' + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!A1"] == "b,c" + + +class TestScientificNotation: + """Verify that scientific notation numeric literals parse correctly (PR review fix).""" + + def test_1e3(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 5 + ws["B1"] = "=A1+1E3" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 1005.0 + + def test_negative_exponent(self) -> None: + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["B1"] = "=A1*2.5e-1" + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!B1"] == 25.0 diff --git a/tests/test_calc_functions.py b/tests/test_calc_functions.py new file mode 100644 index 0000000..47ca7a5 --- /dev/null +++ b/tests/test_calc_functions.py @@ -0,0 +1,204 @@ +"""Tests for wolfxl.calc function registry and builtins.""" + +from __future__ import annotations + +import pytest +from wolfxl.calc._functions import ( + _BUILTINS, + FUNCTION_WHITELIST_V1, + FunctionRegistry, + is_supported, +) + + +class TestWhitelist: + def test_whitelist_has_39_functions(self) -> None: + assert len(FUNCTION_WHITELIST_V1) == 39 + + def test_all_categories_represented(self) -> None: + categories = set(FUNCTION_WHITELIST_V1.values()) + assert categories == {"math", "logic", "lookup", "statistical", "financial", "text"} + + def test_is_supported_case_insensitive(self) -> None: + assert is_supported("sum") + assert is_supported("SUM") + assert is_supported("Sum") + assert not is_supported("WEBSERVICE") + assert not is_supported("RAND") + + +class TestFunctionRegistry: + def test_builtins_registered(self) -> None: + reg = FunctionRegistry() + assert reg.has("SUM") + assert reg.has("IF") + assert reg.has("AVERAGE") + + def test_custom_registration(self) -> None: + reg = FunctionRegistry() + reg.register("MYFUNC", lambda args: 42) + assert reg.has("MYFUNC") + assert reg.get("MYFUNC")([]) == 42 + + def test_case_insensitive_lookup(self) -> None: + reg = FunctionRegistry() + assert reg.get("sum") is reg.get("SUM") + + def test_supported_functions_property(self) -> None: + reg = FunctionRegistry() + funcs = reg.supported_functions + assert isinstance(funcs, frozenset) + assert "SUM" in funcs + + +class TestBuiltinSUM: + def test_basic(self) -> None: + fn = _BUILTINS["SUM"] + assert fn([1, 2, 3]) == 6.0 + + def test_nested_lists(self) -> None: + fn = _BUILTINS["SUM"] + assert fn([[1, 2], [3, 4]]) == 10.0 + + def test_skip_none_and_strings(self) -> None: + fn = _BUILTINS["SUM"] + assert fn([1, None, "text", 3]) == 4.0 + + def test_empty(self) -> None: + fn = _BUILTINS["SUM"] + assert fn([]) == 0.0 + + def test_booleans_coerced(self) -> None: + fn = _BUILTINS["SUM"] + assert fn([True, False, 1]) == 2.0 + + +class TestBuiltinABS: + def test_positive(self) -> None: + assert _BUILTINS["ABS"]([-5]) == 5.0 + + def test_zero(self) -> None: + assert _BUILTINS["ABS"]([0]) == 0.0 + + def test_already_positive(self) -> None: + assert _BUILTINS["ABS"]([3.14]) == 3.14 + + def test_wrong_arity(self) -> None: + with pytest.raises(ValueError, match="exactly 1"): + _BUILTINS["ABS"]([1, 2]) + + +class TestBuiltinROUND: + def test_round_default_digits(self) -> None: + assert _BUILTINS["ROUND"]([3.14159]) == 3.0 + + def test_round_2_digits(self) -> None: + assert _BUILTINS["ROUND"]([3.14159, 2]) == 3.14 + + def test_round_negative_digits(self) -> None: + assert _BUILTINS["ROUND"]([1234, -2]) == 1200.0 + + +class TestBuiltinROUNDUP: + def test_roundup_basic(self) -> None: + assert _BUILTINS["ROUNDUP"]([3.2]) == 4.0 + + def test_roundup_2_digits(self) -> None: + assert _BUILTINS["ROUNDUP"]([3.141, 2]) == 3.15 + + +class TestBuiltinINT: + def test_positive(self) -> None: + assert _BUILTINS["INT"]([3.7]) == 3.0 + + def test_negative(self) -> None: + # Excel INT floors toward negative infinity + assert _BUILTINS["INT"]([-3.2]) == -4.0 + + +class TestBuiltinIF: + def test_true_branch(self) -> None: + assert _BUILTINS["IF"]([True, "yes", "no"]) == "yes" + + def test_false_branch(self) -> None: + assert _BUILTINS["IF"]([False, "yes", "no"]) == "no" + + def test_numeric_condition(self) -> None: + assert _BUILTINS["IF"]([1, "yes", "no"]) == "yes" + assert _BUILTINS["IF"]([0, "yes", "no"]) == "no" + + def test_missing_false_branch(self) -> None: + assert _BUILTINS["IF"]([False, "yes"]) is False + + +class TestBuiltinIFERROR: + def test_no_error(self) -> None: + assert _BUILTINS["IFERROR"]([42, 0]) == 42 + + def test_error_string(self) -> None: + assert _BUILTINS["IFERROR"](["#DIV/0!", 0]) == 0 + + def test_ref_error(self) -> None: + assert _BUILTINS["IFERROR"](["#REF!", "fallback"]) == "fallback" + + +class TestBuiltinLogic: + def test_and_all_true(self) -> None: + assert _BUILTINS["AND"]([True, True, 1]) is True + + def test_and_one_false(self) -> None: + assert _BUILTINS["AND"]([True, False]) is False + + def test_or_one_true(self) -> None: + assert _BUILTINS["OR"]([False, True]) is True + + def test_or_all_false(self) -> None: + assert _BUILTINS["OR"]([False, 0, None]) is False + + def test_not(self) -> None: + assert _BUILTINS["NOT"]([True]) is False + assert _BUILTINS["NOT"]([False]) is True + + +class TestBuiltinCounting: + def test_count_numeric(self) -> None: + assert _BUILTINS["COUNT"]([1, "text", None, 3.5, True]) == 3.0 + + def test_counta_non_empty(self) -> None: + assert _BUILTINS["COUNTA"]([1, "text", None, 3.5]) == 3.0 + + def test_count_empty(self) -> None: + assert _BUILTINS["COUNT"]([]) == 0.0 + + +class TestBuiltinMinMax: + def test_min(self) -> None: + assert _BUILTINS["MIN"]([3, 1, 4, 1, 5]) == 1.0 + + def test_max(self) -> None: + assert _BUILTINS["MAX"]([3, 1, 4, 1, 5]) == 5.0 + + def test_min_empty(self) -> None: + assert _BUILTINS["MIN"]([]) == 0.0 + + def test_max_nested(self) -> None: + assert _BUILTINS["MAX"]([[1, 2], [3, 4]]) == 4.0 + + +class TestBuiltinAVERAGE: + def test_basic(self) -> None: + assert _BUILTINS["AVERAGE"]([2, 4, 6]) == 4.0 + + def test_empty_raises(self) -> None: + with pytest.raises(ValueError, match="no numeric"): + _BUILTINS["AVERAGE"]([]) + + def test_skip_non_numeric(self) -> None: + assert _BUILTINS["AVERAGE"]([10, None, "text", 20]) == 15.0 + + +class TestBuiltinDivisionByZero: + """Edge case: ensure no unhandled ZeroDivisionError from builtins.""" + + def test_average_single(self) -> None: + assert _BUILTINS["AVERAGE"]([0]) == 0.0 diff --git a/tests/test_calc_graph.py b/tests/test_calc_graph.py new file mode 100644 index 0000000..a52bd4d --- /dev/null +++ b/tests/test_calc_graph.py @@ -0,0 +1,131 @@ +"""Tests for wolfxl.calc dependency graph and topological ordering.""" + +from __future__ import annotations + +import pytest +from wolfxl.calc._graph import DependencyGraph + + +class TestAddFormula: + def test_simple_dependency(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + assert "Sheet1!A1" in g.dependencies["Sheet1!B1"] + assert "Sheet1!B1" in g.dependents["Sheet1!A1"] + + def test_range_dependency(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!A4", "=SUM(A1:A3)", "Sheet1") + deps = g.dependencies["Sheet1!A4"] + assert "Sheet1!A1" in deps + assert "Sheet1!A2" in deps + assert "Sheet1!A3" in deps + + def test_cross_sheet_dependency(self) -> None: + g = DependencyGraph() + g.add_formula("IS!B1", "=TB!A1+TB!A2", "IS") + deps = g.dependencies["IS!B1"] + assert "TB!A1" in deps + assert "TB!A2" in deps + + +class TestTopologicalOrder: + def test_empty(self) -> None: + g = DependencyGraph() + assert g.topological_order() == [] + + def test_linear_chain(self) -> None: + """A1 -> B1 -> C1 (B1=A1+1, C1=B1*2)""" + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!B1*2", "Sheet1") + order = g.topological_order() + assert order.index("Sheet1!B1") < order.index("Sheet1!C1") + + def test_diamond(self) -> None: + """A1 feeds B1 and C1, both feed D1.""" + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!A1*2", "Sheet1") + g.add_formula("Sheet1!D1", "=Sheet1!B1+Sheet1!C1", "Sheet1") + order = g.topological_order() + # B1 and C1 must come before D1 + assert order.index("Sheet1!B1") < order.index("Sheet1!D1") + assert order.index("Sheet1!C1") < order.index("Sheet1!D1") + + def test_circular_detection(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!A1", "=Sheet1!B1+1", "Sheet1") + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + with pytest.raises(ValueError, match="Circular reference"): + g.topological_order() + + def test_multi_sheet_ordering(self) -> None: + """TB!C1 depends on IS!A1 which depends on TB!B1.""" + g = DependencyGraph() + g.add_formula("IS!A1", "=TB!B1*0.1", "IS") + g.add_formula("TB!C1", "=IS!A1+100", "TB") + order = g.topological_order() + assert order.index("IS!A1") < order.index("TB!C1") + + +class TestAffectedCells: + def test_single_change(self) -> None: + """Changing A1 affects B1 which affects C1.""" + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!B1*2", "Sheet1") + affected = g.affected_cells({"Sheet1!A1"}) + assert affected == ["Sheet1!B1", "Sheet1!C1"] + + def test_diamond_propagation(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!A1*2", "Sheet1") + g.add_formula("Sheet1!D1", "=Sheet1!B1+Sheet1!C1", "Sheet1") + affected = g.affected_cells({"Sheet1!A1"}) + # All three formula cells are affected + assert len(affected) == 3 + assert affected[-1] == "Sheet1!D1" + + def test_unrelated_cells_not_affected(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!D1", "=Sheet1!C1*2", "Sheet1") + affected = g.affected_cells({"Sheet1!A1"}) + assert "Sheet1!B1" in affected + assert "Sheet1!D1" not in affected + + def test_change_non_existent_cell(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + affected = g.affected_cells({"Sheet1!Z99"}) + assert affected == [] + + +class TestMaxDepth: + def test_linear_chain_depth(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!B1*2", "Sheet1") + g.add_formula("Sheet1!D1", "=Sheet1!C1+3", "Sheet1") + assert g.max_depth({"Sheet1!A1"}) == 3 + + def test_diamond_depth(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + g.add_formula("Sheet1!C1", "=Sheet1!A1*2", "Sheet1") + g.add_formula("Sheet1!D1", "=Sheet1!B1+Sheet1!C1", "Sheet1") + assert g.max_depth({"Sheet1!A1"}) == 2 + + def test_empty_roots(self) -> None: + g = DependencyGraph() + assert g.max_depth(set()) == 0 + + def test_no_dependents(self) -> None: + g = DependencyGraph() + g.add_formula("Sheet1!B1", "=Sheet1!A1+1", "Sheet1") + # A1 has one dependent (B1), depth = 1 + assert g.max_depth({"Sheet1!A1"}) == 1 + # C1 is not referenced by anyone + assert g.max_depth({"Sheet1!C1"}) == 0 diff --git a/tests/test_calc_integration.py b/tests/test_calc_integration.py new file mode 100644 index 0000000..85d77b2 --- /dev/null +++ b/tests/test_calc_integration.py @@ -0,0 +1,321 @@ +"""Integration tests for wolfxl.calc: full roundtrip and Workbook convenience methods.""" + +from __future__ import annotations + +import os +import tempfile +import time + +import pytest +from wolfxl.calc import RecalcResult, WorkbookEvaluator + +import wolfxl + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "fixtures", "calc") + + +def _save_and_reload(wb: wolfxl.Workbook) -> tuple[wolfxl.Workbook, str]: + """Save workbook to temp file and reload in read mode.""" + with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f: + path = f.name + wb.save(path) + return wolfxl.load_workbook(path), path + + +# --------------------------------------------------------------------------- +# Golden workbook builders +# --------------------------------------------------------------------------- + + +def _build_sum_chain() -> wolfxl.Workbook: + """A1=10, A2=20, A3=SUM(A1:A2), A4=A3*2.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = "=SUM(A1:A2)" + ws["A4"] = "=A3*2" + return wb + + +def _build_cross_sheet() -> wolfxl.Workbook: + """TB sheet with values, IS sheet with formulas referencing TB.""" + wb = wolfxl.Workbook() + tb = wb.active # "Sheet" renamed to TB conceptually + tb["A1"] = 1000 + tb["A2"] = 2000 + tb["A3"] = 3000 + tb["A4"] = 4000 + summary = wb.create_sheet("Summary") + summary["A1"] = "=SUM(Sheet!A1:A4)" + summary["A2"] = "=AVERAGE(Sheet!A1:A4)" + summary["A3"] = "=Summary!A1-Summary!A2" + return wb + + +def _build_hardcoded() -> wolfxl.Workbook: + """Same values as sum_chain but all hardcoded (no formulas).""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 10 + ws["A2"] = 20 + ws["A3"] = 30 # hardcoded + ws["A4"] = 60 # hardcoded + return wb + + +def _build_mixed() -> wolfxl.Workbook: + """Some formulas, some hardcoded values.""" + wb = wolfxl.Workbook() + ws = wb.active + ws["A1"] = 100 + ws["A2"] = 200 + ws["A3"] = "=SUM(A1:A2)" # formula + ws["A4"] = 500 # hardcoded + ws["A5"] = "=A3+A4" # formula using both + return wb + + +def _build_income_statement(num_rows: int = 50) -> wolfxl.Workbook: + """Realistic income statement with many formula rows.""" + wb = wolfxl.Workbook() + ws = wb.active + + # Revenue line items + for i in range(1, num_rows + 1): + ws.cell(row=i, column=1, value=f"Line {i}") + ws.cell(row=i, column=2, value=float(i * 1000)) + + # Column C: formulas referencing B + for i in range(1, num_rows + 1): + ws.cell(row=i, column=3, value=f"=B{i}*1.1") + + # Column D: running total + ws.cell(row=1, column=4, value="=C1") + for i in range(2, num_rows + 1): + ws.cell(row=i, column=4, value=f"=D{i-1}+C{i}") + + # Summary rows + summary_row = num_rows + 1 + ws.cell(row=summary_row, column=2, value=f"=SUM(B1:B{num_rows})") + ws.cell(row=summary_row, column=3, value=f"=SUM(C1:C{num_rows})") + ws.cell(row=summary_row, column=4, value=f"=D{num_rows}") + + return wb + + +# --------------------------------------------------------------------------- +# Fixture generation (saved to disk once) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session", autouse=True) +def golden_fixtures() -> None: + """Generate golden .xlsx fixtures for other tests.""" + os.makedirs(FIXTURE_DIR, exist_ok=True) + + builders = { + "sum_chain.xlsx": _build_sum_chain, + "cross_sheet.xlsx": _build_cross_sheet, + "hardcoded.xlsx": _build_hardcoded, + "mixed.xlsx": _build_mixed, + } + + for name, builder in builders.items(): + path = os.path.join(FIXTURE_DIR, name) + if not os.path.exists(path): + wb = builder() + wb.save(path) + + +# --------------------------------------------------------------------------- +# Integration tests: create -> save -> load -> calculate -> verify +# --------------------------------------------------------------------------- + + +class TestRoundtripCalculation: + def test_sum_chain_roundtrip(self) -> None: + wb = _build_sum_chain() + wb2, path = _save_and_reload(wb) + try: + ev = WorkbookEvaluator() + ev.load(wb2) + results = ev.calculate() + assert results["Sheet!A3"] == 30.0 + assert results["Sheet!A4"] == 60.0 + finally: + wb2.close() + os.unlink(path) + + def test_cross_sheet_roundtrip(self) -> None: + wb = _build_cross_sheet() + wb2, path = _save_and_reload(wb) + try: + ev = WorkbookEvaluator() + ev.load(wb2) + results = ev.calculate() + assert results["Summary!A1"] == 10000.0 + assert results["Summary!A2"] == 2500.0 + assert results["Summary!A3"] == 7500.0 + finally: + wb2.close() + os.unlink(path) + + +class TestPerturbationDiscrimination: + """The core test: formulas vs hardcoded discrimination.""" + + def test_formulas_propagate(self) -> None: + wb = _build_sum_chain() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + result = ev.recalculate({"Sheet!A1": 15}) + assert result.propagation_ratio == 1.0 + + def test_hardcoded_no_propagation(self) -> None: + wb = _build_hardcoded() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + result = ev.recalculate({"Sheet!A1": 15}) + assert result.propagation_ratio == 0.0 + + def test_mixed_intermediate_propagation(self) -> None: + wb = _build_mixed() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + result = ev.recalculate({"Sheet!A1": 150}) + # A3 and A5 are formulas, both should propagate + assert result.propagated_cells == 2 + assert result.propagation_ratio == 1.0 + + +class TestGoldenFixtures: + """Test against saved .xlsx files.""" + + def test_sum_chain_fixture(self) -> None: + path = os.path.join(FIXTURE_DIR, "sum_chain.xlsx") + wb = wolfxl.load_workbook(path) + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results["Sheet!A3"] == 30.0 + assert results["Sheet!A4"] == 60.0 + wb.close() + + def test_hardcoded_fixture(self) -> None: + path = os.path.join(FIXTURE_DIR, "hardcoded.xlsx") + wb = wolfxl.load_workbook(path) + ev = WorkbookEvaluator() + ev.load(wb) + results = ev.calculate() + assert results == {} # No formulas to evaluate + wb.close() + + +class TestWorkbookConvenienceMethods: + def test_calculate(self) -> None: + wb = _build_sum_chain() + results = wb.calculate() + assert results["Sheet!A3"] == 30.0 + assert results["Sheet!A4"] == 60.0 + + def test_recalculate(self) -> None: + wb = _build_sum_chain() + result = wb.recalculate({"Sheet!A1": 15}) + assert isinstance(result, RecalcResult) + assert result.propagation_ratio == 1.0 + + def test_cross_sheet_calculate(self) -> None: + wb = _build_cross_sheet() + results = wb.calculate() + assert results["Summary!A1"] == 10000.0 + + +class TestWorkbookCaching: + """Verify the evaluator caching in Workbook.calculate/recalculate.""" + + def test_recalculate_reuses_evaluator_after_calculate(self) -> None: + wb = _build_sum_chain() + wb.calculate() + assert hasattr(wb, '_evaluator') and wb._evaluator is not None + + result = wb.recalculate({"Sheet!A1": 15}) + assert result.propagation_ratio == 1.0 + + def test_recalculate_without_prior_calculate(self) -> None: + """recalculate() still works when calculate() was never called.""" + wb = _build_sum_chain() + result = wb.recalculate({"Sheet!A1": 15}) + assert isinstance(result, RecalcResult) + assert result.propagation_ratio == 1.0 + + def test_cached_evaluator_is_same_object(self) -> None: + wb = _build_sum_chain() + wb.calculate() + ev1 = wb._evaluator + wb.recalculate({"Sheet!A1": 15}) + assert wb._evaluator is ev1 # same object, not recreated + + +class TestDeterminism: + def test_100_rounds_bit_exact(self) -> None: + wb = _build_sum_chain() + ev = WorkbookEvaluator() + ev.load(wb) + ev.calculate() + + baseline = ev.recalculate({"Sheet!A1": 11.0}) + for _ in range(99): + ev._cell_values["Sheet!A1"] = 10 + ev._cell_values["Sheet!A2"] = 20 + ev.calculate() + result = ev.recalculate({"Sheet!A1": 11.0}) + assert result.propagated_cells == baseline.propagated_cells + for d1, d2 in zip(baseline.deltas, result.deltas): + assert d1.new_value == d2.new_value + + +class TestPerformance: + @pytest.mark.slow + def test_500_formula_cells_under_2s(self) -> None: + """calculate() on a 500-formula workbook must complete in <2s. + + Threshold is generous to avoid CI flakiness across platforms. + Local runs typically complete in <100ms. + """ + wb = _build_income_statement(num_rows=250) # 250*2 + 3 = 503 formulas + ev = WorkbookEvaluator() + ev.load(wb) + + start = time.perf_counter() + ev.calculate() + elapsed = time.perf_counter() - start + + assert elapsed < 2.0, f"calculate() took {elapsed:.3f}s (>2s)" + + def test_recalculate_faster_than_full(self) -> None: + """recalculate() on a subset should be faster than full calculate().""" + wb = _build_income_statement(num_rows=250) + ev = WorkbookEvaluator() + ev.load(wb) + + start_full = time.perf_counter() + ev.calculate() + full_time = time.perf_counter() - start_full + + start_recalc = time.perf_counter() + ev.recalculate({"Sheet!B1": 2000.0}) + recalc_time = time.perf_counter() - start_recalc + + # Recalculate should be no slower than full calculate + # (in practice it's faster because it only evaluates affected subset) + assert recalc_time <= full_time * 2, ( + f"recalc {recalc_time:.4f}s vs full {full_time:.4f}s" + ) diff --git a/tests/test_calc_parser.py b/tests/test_calc_parser.py new file mode 100644 index 0000000..119222d --- /dev/null +++ b/tests/test_calc_parser.py @@ -0,0 +1,175 @@ +"""Tests for wolfxl.calc formula parser and reference extraction.""" + +from __future__ import annotations + +import pytest +from wolfxl.calc._parser import ( + FormulaParser, + all_references, + expand_range, + parse_functions, + parse_range_references, + parse_references, +) + + +class TestSingleReferences: + def test_simple_ref(self) -> None: + refs = parse_references("=A1+B2", "Sheet1") + assert refs == ["Sheet1!A1", "Sheet1!B2"] + + def test_dollar_signs_stripped(self) -> None: + refs = parse_references("=$A$1+B$2+$C3", "Sheet1") + assert refs == ["Sheet1!A1", "Sheet1!B2", "Sheet1!C3"] + + def test_cross_sheet_ref(self) -> None: + refs = parse_references("=Sheet2!A1+B2", "Sheet1") + assert refs == ["Sheet2!A1", "Sheet1!B2"] + + def test_quoted_sheet_ref(self) -> None: + refs = parse_references("='Income Statement'!B5+A1", "Sheet1") + assert refs == ["Income Statement!B5", "Sheet1!A1"] + + def test_no_duplicates(self) -> None: + refs = parse_references("=A1+A1+A1", "Sheet1") + assert refs == ["Sheet1!A1"] + + def test_string_literal_ignored(self) -> None: + refs = parse_references('=A1&"Hello A2"', "Sheet1") + assert refs == ["Sheet1!A1"] + + def test_case_normalized(self) -> None: + refs = parse_references("=a1+b2", "Sheet1") + assert refs == ["Sheet1!A1", "Sheet1!B2"] + + +class TestRangeReferences: + def test_simple_range(self) -> None: + ranges = parse_range_references("=SUM(A1:A5)", "Sheet1") + assert ranges == ["Sheet1!A1:A5"] + + def test_cross_sheet_range(self) -> None: + ranges = parse_range_references("=SUM(TB!B2:B5)", "IS") + assert ranges == ["TB!B2:B5"] + + def test_quoted_sheet_range(self) -> None: + ranges = parse_range_references("=SUM('Trial Balance'!A1:A10)", "Sheet1") + assert ranges == ["Trial Balance!A1:A10"] + + def test_dollar_in_range(self) -> None: + ranges = parse_range_references("=SUM($A$1:$A$5)", "Sheet1") + assert ranges == ["Sheet1!A1:A5"] + + def test_single_refs_not_in_range(self) -> None: + """Single refs inside a range shouldn't appear in parse_references.""" + refs = parse_references("=SUM(A1:A5)+B1", "Sheet1") + # A1 and A5 are part of the range, only B1 is standalone + assert refs == ["Sheet1!B1"] + + +class TestParseRangeSingleRefExclusion: + def test_ref_at_start_of_range_excluded(self) -> None: + """A1 in A1:A5 should not show as a standalone ref.""" + refs = parse_references("=SUM(A1:A5)", "Sheet1") + assert refs == [] + + def test_ref_outside_range_included(self) -> None: + refs = parse_references("=SUM(A1:A5)+C1", "Sheet1") + assert refs == ["Sheet1!C1"] + + +class TestParseFunctions: + def test_simple_function(self) -> None: + funcs = parse_functions("=SUM(A1:A5)") + assert funcs == ["SUM"] + + def test_nested_functions(self) -> None: + funcs = parse_functions("=IF(SUM(A1:A5)>0,ROUND(B1,2),0)") + assert funcs == ["IF", "SUM", "ROUND"] + + def test_no_duplicates(self) -> None: + funcs = parse_functions("=SUM(A1:A3)+SUM(B1:B3)") + assert funcs == ["SUM"] + + def test_function_in_string_ignored(self) -> None: + funcs = parse_functions('=A1&"SUM(B1)"') + assert funcs == [] + + +class TestExpandRange: + def test_column_range(self) -> None: + cells = expand_range("A1:A5") + assert cells == ["A1", "A2", "A3", "A4", "A5"] + + def test_row_range(self) -> None: + cells = expand_range("B2:D2") + assert cells == ["B2", "C2", "D2"] + + def test_block_range(self) -> None: + cells = expand_range("A1:B2") + assert cells == ["A1", "B1", "A2", "B2"] + + def test_single_cell_range(self) -> None: + cells = expand_range("A1:A1") + assert cells == ["A1"] + + def test_with_sheet_prefix(self) -> None: + cells = expand_range("Sheet2!A1:A3") + assert cells == ["Sheet2!A1", "Sheet2!A2", "Sheet2!A3"] + + def test_quoted_sheet(self) -> None: + cells = expand_range("'Income Statement'!B1:B3") + assert cells == [ + "Income Statement!B1", + "Income Statement!B2", + "Income Statement!B3", + ] + + def test_dollar_signs_handled(self) -> None: + cells = expand_range("$A$1:$A$3") + assert cells == ["A1", "A2", "A3"] + + def test_reversed_range_normalized(self) -> None: + """A5:A1 should produce same result as A1:A5.""" + cells = expand_range("A5:A1") + assert cells == ["A1", "A2", "A3", "A4", "A5"] + + def test_invalid_range(self) -> None: + with pytest.raises(ValueError, match="Invalid range"): + expand_range("A1") + + +class TestAllReferences: + def test_combines_singles_and_ranges(self) -> None: + refs = all_references("=SUM(A1:A3)+B1", "Sheet1") + # B1 is standalone, A1:A3 expands to A1, A2, A3 + assert "Sheet1!B1" in refs + assert "Sheet1!A1" in refs + assert "Sheet1!A2" in refs + assert "Sheet1!A3" in refs + + def test_no_duplicates_across_types(self) -> None: + refs = all_references("=A1+SUM(A1:A3)", "Sheet1") + # A1 appears as both standalone and in range - should only be listed once + assert refs.count("Sheet1!A1") == 1 + + def test_multi_sheet(self) -> None: + refs = all_references("=Sheet1!A1+Sheet2!B1", "Sheet1") + assert "Sheet1!A1" in refs + assert "Sheet2!B1" in refs + + +class TestFormulaParser: + def test_parse_refs(self) -> None: + p = FormulaParser() + refs = p.parse_refs("=SUM(A1:A3)+B1", "Sheet1") + assert "Sheet1!B1" in refs + assert "Sheet1!A1" in refs + + def test_compile_returns_none_without_formulas_lib(self) -> None: + """compile() should return None gracefully when formulas lib is not installed.""" + p = FormulaParser() + result = p.compile("=SUM(A1:A5)") + # May be None if formulas is not installed, or a callable if it is + if result is not None: + assert callable(result)