diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 300aef18..64c77075 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -145,6 +145,10 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
+ - name: Create dir for wheels
+ run: |
+ mkdir wheels
+
- uses: actions/download-artifact@master
with:
name: pathway-x86-x64
@@ -157,7 +161,7 @@ jobs:
rm -rf "${ENV_NAME}"
python -m venv "${ENV_NAME}"
source "${ENV_NAME}/bin/activate"
- WHEEL=(public/pathway/target/wheels/pathway-*.whl)
+ WHEEL=(./wheels/pathway-*.whl)
pip install --prefer-binary "${WHEEL}[tests]"
# --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py
python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway
@@ -177,7 +181,7 @@ jobs:
runs-on: ${{ matrix.os }}
timeout-minutes: 15
steps:
- - name: create dir for wheels
+ - name: Create dir for wheels
run: |
mkdir wheels
@@ -202,7 +206,7 @@ jobs:
rm -rf "${ENV_NAME}"
python"${{ matrix.python-version }}" -m venv "${ENV_NAME}"
source "${ENV_NAME}/bin/activate"
- WHEEL=(public/pathway/target/wheels/pathway-*.whl)
+ WHEEL=(./wheels/pathway-*.whl)
pip install --prefer-binary "${WHEEL}[tests]"
# --confcutdir anything below to avoid picking REPO_TOP_DIR/conftest.py
python -m pytest --confcutdir "${ENV_NAME}" --doctest-modules --pyargs pathway
@@ -235,7 +239,7 @@ jobs:
aws-region: ${{ secrets.ARTIFACT_AWS_REGION }}
role-duration-seconds: 1200
- - name: create dir for wheels
+ - name: Create dir for wheels
run: |
mkdir wheels
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 02ad4272..50888c2e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
+## [0.4.0] - 2023-09-21
+
+### Added
+- Support for JSON data format, including `pw.Json` type.
+- Methods `as_int()`, `as_float()`, `as_str()`, `as_bool()` to convert values from `Json`.
+
+### Changed
+- Method `get()` and `[]` to support accessing elements in Jsons.
+- Function `pw.assert_table_has_schema` for writing asserts checking, whether given table has the same schema as the one that is given as an argument.
+- **BREAKING**: `ix` and `ix_ref` operations are now standalone transformations of `pw.Table` into `pw.Table`. Most of the usages remain the same, but sometimes user needs to provide a context (when e.g. using them inside `join` or `groupby` operations). `ix` and `ix_ref` are temporarily broken inside temporal joins.
+
+### Fixed
+- Fixed a bug where new-style optional types (e.g. `int | None`) were translated to `Any` dtype.
+
## [0.3.4] - 2023-09-18
### Fixed
diff --git a/Cargo.lock b/Cargo.lock
index e9a91bc8..7df41194 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -92,9 +92,9 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-compression"
-version = "0.4.2"
+version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d495b6dc0184693324491a5ac05f559acc97bf937ab31d7a1c33dd0016be6d2b"
+checksum = "bb42b2197bf15ccb092b62c74515dbd8b86d0effd934795f6687c93b6e679a2c"
dependencies = [
"flate2",
"futures-core",
@@ -231,9 +231,9 @@ dependencies = [
[[package]]
name = "bumpalo"
-version = "3.13.0"
+version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
+checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "byteorder"
@@ -264,9 +264,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.30"
+version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
+checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -1094,9 +1094,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
-version = "0.2.147"
+version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
+checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "libz-sys"
@@ -1446,7 +1446,7 @@ dependencies = [
[[package]]
name = "pathway"
-version = "0.3.4"
+version = "0.4.0"
dependencies = [
"arc-swap",
"arcstr",
@@ -1633,9 +1633,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.66"
+version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
+checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [
"unicode-ident",
]
@@ -1995,9 +1995,9 @@ dependencies = [
[[package]]
name = "rustix"
-version = "0.38.12"
+version = "0.38.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bdf14a7a466ce88b5eac3da815b53aefc208ce7e74d1c263aabb04d88c4abeb1"
+checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662"
dependencies = [
"bitflags 2.4.0",
"errno",
@@ -2093,9 +2093,9 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.106"
+version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2"
+checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
dependencies = [
"itoa",
"ryu",
@@ -2525,9 +2525,9 @@ checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a"
[[package]]
name = "typenum"
-version = "1.16.0"
+version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
+checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
@@ -2537,9 +2537,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
-version = "1.0.11"
+version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
+checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
diff --git a/Cargo.toml b/Cargo.toml
index 12546056..d73ff644 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "pathway"
-version = "0.3.4"
+version = "0.4.0"
edition = "2021"
publish = false
rust-version = "1.71.0"
@@ -22,7 +22,7 @@ arcstr = { version = "1.1.5", default-features = false, features = ["serde", "st
base32 = "0.4.0"
bincode = "1.3.3"
cfg-if = "1.0.0"
-chrono = { version = "0.4.30", features = ["std", "clock"], default-features = false }
+chrono = { version = "0.4.31", features = ["std", "clock"], default-features = false }
chrono-tz = "0.8.3"
crossbeam-channel = "0.5.8"
csv = "1.2.2"
diff --git a/python/pathway/__init__.py b/python/pathway/__init__.py
index 20ed6fca..34658dde 100644
--- a/python/pathway/__init__.py
+++ b/python/pathway/__init__.py
@@ -31,6 +31,7 @@
Joinable,
JoinMode,
JoinResult,
+ Json,
MonitoringLevel,
Pointer,
Schema,
@@ -42,6 +43,7 @@
apply,
apply_async,
apply_with_type,
+ assert_table_has_schema,
asynchronous,
attribute,
cast,
@@ -148,6 +150,8 @@
"DURATION",
"unwrap",
"SchemaProperties",
+ "assert_table_has_schema",
+ "Json",
]
diff --git a/python/pathway/debug/__init__.py b/python/pathway/debug/__init__.py
index 527c063f..5e332560 100644
--- a/python/pathway/debug/__init__.py
+++ b/python/pathway/debug/__init__.py
@@ -10,7 +10,7 @@
import pandas as pd
-from pathway.internals import api, parse_graph
+from pathway.internals import Json, api, parse_graph
from pathway.internals.datasource import PandasDataSource
from pathway.internals.decorators import table_from_datasource
from pathway.internals.graph_runner import GraphRunner
@@ -37,7 +37,10 @@ def table_to_dicts(table: Table):
@functools.total_ordering
class _NoneAwareComparisonWrapper:
def __init__(self, inner):
- self.inner = inner
+ if isinstance(inner, dict | Json):
+ self.inner = str(inner)
+ else:
+ self.inner = inner
def __eq__(self, other):
if not isinstance(other, _NoneAwareComparisonWrapper):
diff --git a/python/pathway/dt.py b/python/pathway/dt.py
index ce5d6afe..57959434 100644
--- a/python/pathway/dt.py
+++ b/python/pathway/dt.py
@@ -7,6 +7,7 @@
DURATION,
FLOAT,
INT,
+ JSON,
POINTER,
STR,
Array,
@@ -26,6 +27,7 @@
"DURATION",
"FLOAT",
"INT",
+ "JSON",
"POINTER",
"STR",
"Array",
diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi
index db120334..30dc0f18 100644
--- a/python/pathway/engine.pyi
+++ b/python/pathway/engine.pyi
@@ -29,6 +29,7 @@ class PathwayType(Enum):
DATE_TIME_UTC: PathwayType
DURATION: PathwayType
ARRAY: PathwayType
+ JSON: PathwayType
class ConnectorMode(Enum):
STATIC: ConnectorMode
@@ -52,15 +53,15 @@ class Trace:
@dataclasses.dataclass(frozen=True)
class EvalProperties:
- trace: Optional[Trace] = None
dtype: Optional[DType] = None
- append_only: Optional[bool] = False
+ trace: Optional[Trace] = None
+ append_only: bool = False
@dataclasses.dataclass(frozen=True)
class ConnectorProperties:
commit_duration_ms: Optional[int] = None
unsafe_trusted_ids: Optional[bool] = False
- append_only: Optional[bool] = False
+ append_only: bool = False
class Column:
"""A Column holds data and conceptually is a Dict[Universe elems, dt]
@@ -165,6 +166,9 @@ class Expression:
def cast_optional(
expr: Expression, source_type: PathwayType, target_type: PathwayType
) -> Optional[Expression]: ...
+ def convert_optional(
+ expr: Expression, source_type: PathwayType, target_type: PathwayType
+ ) -> Optional[Expression]: ...
@staticmethod
def if_else(if_: Expression, then: Expression, else_: Expression) -> Expression: ...
@staticmethod
@@ -276,6 +280,12 @@ class Expression:
expr: Expression, index: Expression
) -> Expression: ...
@staticmethod
+ def json_get_item_checked(
+ expr: Expression, index: Expression, default: Expression
+ ) -> Expression: ...
+ @staticmethod
+ def json_get_item_unchecked(expr: Expression, index: Expression) -> Expression: ...
+ @staticmethod
def unwrap(expr: Expression) -> Expression: ...
@staticmethod
def to_string(expr: Expression) -> Expression: ...
@@ -459,9 +469,6 @@ class Grouper:
def input_column(self, column: Column) -> Column: ...
def count_column(self) -> Column: ...
def reducer_column(self, reducer: Reducer, column: Column) -> Column: ...
- def reducer_ix_column(
- self, reducer: Reducer, ixer: Ixer, input_column: Column
- ) -> Column: ...
class Concat:
@property
diff --git a/python/pathway/internals/__init__.py b/python/pathway/internals/__init__.py
index fd745471..2c59ec45 100644
--- a/python/pathway/internals/__init__.py
+++ b/python/pathway/internals/__init__.py
@@ -8,6 +8,7 @@
apply,
apply_async,
apply_with_type,
+ assert_table_has_schema,
cast,
coalesce,
declare_type,
@@ -31,14 +32,13 @@
from pathway.internals.expression import (
ColumnExpression,
ColumnExpressionOrValue,
- ColumnIxExpression,
ColumnReference,
ReducerExpression,
- ReducerIxExpression,
)
from pathway.internals.groupby import GroupedJoinResult, GroupedTable
from pathway.internals.join import FilteredJoinResult, Joinable, JoinResult
from pathway.internals.join_mode import JoinMode
+from pathway.internals.json import Json
from pathway.internals.monitoring import MonitoringLevel
from pathway.internals.operator import iterate_universe
from pathway.internals.row_transformer import ClassArg
@@ -104,8 +104,6 @@
"run",
"run_all",
"numba_apply",
- "ColumnIxExpression",
- "ReducerIxExpression",
"__version__",
"universes",
"asynchronous",
@@ -118,4 +116,6 @@
"DURATION",
"unwrap",
"SchemaProperties",
+ "assert_table_has_schema",
+ "Json",
]
diff --git a/python/pathway/internals/api.py b/python/pathway/internals/api.py
index da902b96..a306df52 100644
--- a/python/pathway/internals/api.py
+++ b/python/pathway/internals/api.py
@@ -32,7 +32,9 @@
# XXX: engine calls return BasePointer, not Pointer
class Pointer(BasePointer, Generic[TSchema]):
"""Pointer to row type.
+
Example:
+
>>> import pathway as pw
>>> t1 = pw.debug.parse_to_table('''
... age | owner | pet
@@ -109,4 +111,5 @@ def denumpify(x):
dt.DURATION: PathwayType.DURATION,
dt.Array(): PathwayType.ARRAY,
dt.ANY: PathwayType.ANY,
+ dt.JSON: PathwayType.JSON,
}
diff --git a/python/pathway/internals/arg_handlers.py b/python/pathway/internals/arg_handlers.py
index c1185ae9..03a4a166 100644
--- a/python/pathway/internals/arg_handlers.py
+++ b/python/pathway/internals/arg_handlers.py
@@ -137,7 +137,7 @@ def reduce_args_handler(self, *args, **kwargs):
def select_args_handler(self, *args, **kwargs):
for arg in args:
- if not isinstance(arg, expr.ColumnRefOrIxExpression):
+ if not isinstance(arg, expr.ColumnReference):
if isinstance(arg, str):
raise ValueError(
f"Expected a ColumnReference, found a string. Did you mean this.{arg} instead of {repr(arg)}?"
diff --git a/python/pathway/internals/column.py b/python/pathway/internals/column.py
index 2303f9e9..05ec4f7c 100644
--- a/python/pathway/internals/column.py
+++ b/python/pathway/internals/column.py
@@ -2,16 +2,19 @@
from __future__ import annotations
-from abc import ABC
+from abc import ABC, abstractmethod
from dataclasses import dataclass, field
+from functools import cached_property
from itertools import chain
from types import EllipsisType
-from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple
+from typing import TYPE_CHECKING, ClassVar, Dict, Iterable, Optional, Tuple, Type
import pathway.internals as pw
+from pathway.internals import column_properties as cp
from pathway.internals import dtype as dt
from pathway.internals import trace
-from pathway.internals.expression import ColumnExpression, ColumnRefOrIxExpression
+from pathway.internals.dtype import DType
+from pathway.internals.expression import ColumnExpression, ColumnReference
from pathway.internals.helpers import SetOnceProperty, StableSet
if TYPE_CHECKING:
@@ -52,14 +55,12 @@ def table(self) -> Table:
class Column(ABC):
- dtype: dt.DType
universe: Universe
lineage: SetOnceProperty[ColumnLineage] = SetOnceProperty()
"""Lateinit by operator."""
- def __init__(self, dtype: dt.DType, universe: Universe) -> None:
+ def __init__(self, universe: Universe) -> None:
super().__init__()
- self.dtype = dtype
self.universe = universe
self._trace = trace.Trace.from_traceback()
@@ -73,11 +74,30 @@ def trace(self) -> trace.Trace:
else:
return self._trace
+ @property
+ @abstractmethod
+ def properties(self) -> cp.ColumnProperties:
+ ...
+
+ @property
+ def dtype(self) -> DType:
+ return self.properties.dtype
+
class MaterializedColumn(Column):
"""Column not requiring evaluation."""
- pass
+ def __init__(
+ self,
+ universe: Universe,
+ properties: cp.ColumnProperties,
+ ):
+ super().__init__(universe)
+ self._properties = properties
+
+ @property
+ def properties(self) -> cp.ColumnProperties:
+ return self._properties
class MethodColumn(MaterializedColumn):
@@ -91,17 +111,30 @@ class ColumnWithContext(Column, ABC):
context: Context
- def __init__(self, dtype: dt.DType, context: Context, universe: Universe):
- super().__init__(dtype, universe)
+ def __init__(self, context: Context, universe: Universe):
+ super().__init__(universe)
self.context = context
def column_dependencies(self) -> StableSet[Column]:
return super().column_dependencies() | self.context.column_dependencies()
+ @cached_property
+ def properties(self) -> cp.ColumnProperties:
+ return self.context.column_properties(self)
+
+ @cached_property
+ @abstractmethod
+ def context_dtype(self) -> DType:
+ ...
+
class IdColumn(ColumnWithContext):
def __init__(self, context: Context) -> None:
- super().__init__(dt.POINTER, context, context.universe)
+ super().__init__(context, context.universe)
+
+ @cached_property
+ def context_dtype(self) -> DType:
+ return dt.POINTER
class ColumnWithExpression(ColumnWithContext):
@@ -117,8 +150,7 @@ def __init__(
expression: ColumnExpression,
lineage: Optional[Lineage] = None,
):
- dtype = context.expression_type(expression)
- super().__init__(dtype, context, universe)
+ super().__init__(context, universe)
self.expression = expression
if lineage is not None:
self.lineage = lineage
@@ -129,15 +161,19 @@ def dereference(self) -> Column:
def column_dependencies(self) -> StableSet[Column]:
return super().column_dependencies() | self.expression._column_dependencies()
+ @cached_property
+ def context_dtype(self) -> DType:
+ return self.context.expression_type(self.expression)
+
class ColumnWithReference(ColumnWithExpression):
- expression: ColumnRefOrIxExpression
+ expression: ColumnReference
def __init__(
self,
context: Context,
universe: Universe,
- expression: ColumnRefOrIxExpression,
+ expression: ColumnReference,
lineage: Optional[Lineage] = None,
):
super().__init__(context, universe, expression, lineage)
@@ -178,6 +214,8 @@ class Context:
universe: Universe
"""Resulting universe."""
+ _column_properties_evaluator: ClassVar[Type[cp.ColumnPropertiesEvaluator]]
+
def columns_to_eval(self) -> Iterable[Column]:
return []
@@ -185,9 +223,7 @@ def column_dependencies(self) -> StableSet[Column]:
deps = (col.column_dependencies() for col in self.columns_to_eval())
return StableSet.union(*deps)
- def reference_column_dependencies(
- self, ref: ColumnRefOrIxExpression
- ) -> StableSet[Column]:
+ def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
return StableSet()
def _get_type_interpreter(self):
@@ -205,14 +241,32 @@ def expression_with_type(self, expression: ColumnExpression) -> ColumnExpression
expression, state=TypeInterpreterState()
)
+ def column_properties(self, column: ColumnWithContext) -> cp.ColumnProperties:
+ return self._column_properties_evaluator().eval(column)
+
+ def __init_subclass__(
+ cls,
+ /,
+ column_properties_evaluator: Type[
+ cp.ColumnPropertiesEvaluator
+ ] = cp.DefaultPropsEvaluator,
+ **kwargs,
+ ) -> None:
+ super().__init_subclass__(**kwargs)
+ cls._column_properties_evaluator = column_properties_evaluator
+
@dataclass(eq=False, frozen=True)
-class RowwiseContext(Context):
+class RowwiseContext(
+ Context, column_properties_evaluator=cp.PreserveDependenciesPropsEvaluator
+):
"""Context for basic expressions."""
+ ...
+
@dataclass(eq=False, frozen=True)
-class TableRestrictedRowwiseContext(Context):
+class TableRestrictedRowwiseContext(RowwiseContext):
"""Restricts expression to specific table."""
table: pw.Table
@@ -237,7 +291,9 @@ def columns_to_eval(self) -> Iterable[Column]:
@dataclass(eq=False, frozen=True)
-class FilterContext(Context):
+class FilterContext(
+ Context, column_properties_evaluator=cp.PreserveDependenciesPropsEvaluator
+):
"""Context of `table.filter() operation."""
filtering_column: ColumnWithExpression
@@ -257,6 +313,18 @@ def columns_to_eval(self) -> Iterable[Column]:
return [self.reindex_column]
+@dataclass(eq=False, frozen=True)
+class IxContext(Context):
+ """Context of `table.ix() operation."""
+
+ orig_universe: Universe
+ key_column: ColumnWithExpression
+ optional: bool
+
+ def columns_to_eval(self) -> Iterable[Column]:
+ return [self.key_column]
+
+
@dataclass(eq=False, frozen=True)
class IntersectContext(Context):
"""Context of `table.intersect() operation."""
@@ -294,9 +362,7 @@ class UpdateRowsContext(Context):
def __post_init__(self):
assert len(self.union_universes) > 0
- def reference_column_dependencies(
- self, ref: ColumnRefOrIxExpression
- ) -> StableSet[Column]:
+ def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
return StableSet([self.updates[ref.name]])
@@ -310,17 +376,17 @@ class ConcatUnsafeContext(Context):
def __post_init__(self):
assert len(self.union_universes) > 0
- def reference_column_dependencies(
- self, ref: ColumnRefOrIxExpression
- ) -> StableSet[Column]:
+ def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
return StableSet([update[ref.name] for update in self.updates])
@dataclass(eq=False, frozen=True)
-class PromiseSameUniverseContext(Context):
+class PromiseSameUniverseContext(
+ Context, column_properties_evaluator=cp.PreserveDependenciesPropsEvaluator
+):
"""Context of table.unsafe_promise_same_universe_as() operation."""
- pass
+ ...
@dataclass(eq=True, frozen=True)
diff --git a/python/pathway/internals/column_properties.py b/python/pathway/internals/column_properties.py
new file mode 100644
index 00000000..f748889a
--- /dev/null
+++ b/python/pathway/internals/column_properties.py
@@ -0,0 +1,44 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+from pathway.internals import dtype as dt
+
+if TYPE_CHECKING:
+ import pathway.internals.column as clmn
+
+
+@dataclass(frozen=True)
+class ColumnProperties:
+ dtype: dt.DType
+ append_only: bool = False
+
+
+class ColumnPropertiesEvaluator(ABC):
+ def eval(self, column: clmn.ColumnWithContext) -> ColumnProperties:
+ return ColumnProperties(
+ dtype=column.context_dtype, append_only=self._append_only(column)
+ )
+
+ @abstractmethod
+ def _append_only(self, column: clmn.ColumnWithContext) -> bool:
+ ...
+
+
+class DefaultPropsEvaluator(ColumnPropertiesEvaluator):
+ def _append_only(self, column: clmn.ColumnWithContext) -> bool:
+ return False
+
+
+class PreserveDependenciesPropsEvaluator(ColumnPropertiesEvaluator):
+ def _append_only(self, column: clmn.ColumnWithContext):
+ return self._has_property(column, "append_only", True)
+
+ def _has_property(self, column: clmn.ColumnWithContext, name: str, value: Any):
+ return all(
+ getattr(col.properties, name) == value
+ for col in column.column_dependencies()
+ if col != column
+ )
diff --git a/python/pathway/internals/common.py b/python/pathway/internals/common.py
index ef7a2a10..08c1251a 100644
--- a/python/pathway/internals/common.py
+++ b/python/pathway/internals/common.py
@@ -3,12 +3,12 @@
from __future__ import annotations
import functools
-from typing import Any, Callable, Optional, Union, overload
+from typing import Any, Callable, Optional, Type, Union, overload
from pathway.internals import dtype as dt
from pathway.internals import expression as expr
from pathway.internals import operator as op
-from pathway.internals import table
+from pathway.internals import schema, table
from pathway.internals.asynchronous import (
AsyncRetryStrategy,
CacheStrategy,
@@ -557,3 +557,42 @@ def unwrap(col: expr.ColumnExpressionOrValue) -> expr.ColumnExpression:
9
"""
return expr.UnwrapExpression(col)
+
+
+def assert_table_has_schema(
+ table: table.Table,
+ schema: Type[schema.Schema],
+ *,
+ allow_superset: bool = False,
+ ignore_primary_keys: bool = True,
+) -> None:
+ """
+ Asserts that the schema of the table is equivalent to the schema given as an argument.
+
+ Args:
+ table: Table for which we are asserting schema.
+ schema: Schema, which we assert that the Table has.
+ allow_superset: if True, the columns of the table can be a superset of columns
+ in schema.
+ ignore_primary_keys: if True, the assert won't check whether table and schema
+ have the same primary keys. The default value is True.
+
+ Example:
+
+ >>> import pathway as pw
+ >>> t1 = pw.debug.parse_to_table('''
+ ... age | owner | pet
+ ... 10 | Alice | dog
+ ... 9 | Bob | dog
+ ... 8 | Alice | cat
+ ... 7 | Bob | dog
+ ... ''')
+ >>> t2 = t1.select(pw.this.owner, age = pw.cast(float, pw.this.age))
+ >>> schema = pw.schema_builder(
+ ... {"age": pw.column_definition(dtype=float), "owner": pw.column_definition(dtype=str)}
+ ... )
+ >>> pw.assert_table_has_schema(t2, schema)
+ """
+ table.schema.assert_equal_to(
+ schema, allow_superset=allow_superset, ignore_primary_keys=ignore_primary_keys
+ )
diff --git a/python/pathway/internals/desugaring.py b/python/pathway/internals/desugaring.py
index 0cdf4208..ee8f15bb 100644
--- a/python/pathway/internals/desugaring.py
+++ b/python/pathway/internals/desugaring.py
@@ -4,17 +4,7 @@
from abc import abstractmethod
from functools import wraps
-from typing import (
- TYPE_CHECKING,
- Any,
- Dict,
- Iterable,
- List,
- Mapping,
- Tuple,
- TypeVar,
- cast,
-)
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Tuple, TypeVar
from pathway.internals import expression as expr
from pathway.internals.expression_visitor import IdentityTransform
@@ -39,8 +29,26 @@ def __init__(self, substitution: Dict[thisclass.ThisMetaclass, table.Joinable]):
def eval_column_val(
self, expression: expr.ColumnReference, **kwargs
) -> expr.ColumnReference:
- table = self._desugar_table(expression.table)
- return table[expression.name]
+ from pathway.internals import thisclass
+
+ # Below we want to break moment we reach the fix-point
+ # desugaring is hard, since it can desugar delayed-op-on-this to identical delayed-op-on-this
+ # (due to interval join desugaring pw.this to pw.this, which can break desugaring of delayed ix)
+ # and delayed-ops are hard to compare.
+ # But any nontrivial step reduces `depth` of delayed ops, or lands us in `pw.Table` world where
+ # comparisons make sense.
+ while True:
+ expression = expression.table[expression.name] # magic desugaring slices
+ prev = expression.table
+ table = self._desugar_table(expression.table)
+ expression = table[expression.name]
+ if prev == table or (
+ isinstance(prev, thisclass.ThisMetaclass)
+ and isinstance(table, thisclass.ThisMetaclass)
+ and prev._delay_depth() == table._delay_depth()
+ ):
+ break
+ return expression
def eval_pointer(
self, expression: expr.PointerExpression, **kwargs
@@ -48,21 +56,20 @@ def eval_pointer(
args = [self.eval_expression(arg, **kwargs) for arg in expression._args]
optional = expression._optional
desugared_table = self._desugar_table(expression._table)
- from pathway.internals import table
+ from pathway.internals.table import Table
- return expr.PointerExpression(
- cast(table.Table, desugared_table), *args, optional=optional
- )
+ assert isinstance(desugared_table, Table)
+
+ return expr.PointerExpression(desugared_table, *args, optional=optional)
def _desugar_table(
self, table: table.Joinable | thisclass.ThisMetaclass
) -> table.Joinable:
from pathway.internals import thisclass
- if isinstance(table, thisclass.ThisMetaclass):
- return table._eval_substitution(self.substitution)
- else:
+ if not isinstance(table, thisclass.ThisMetaclass):
return table
+ return table._eval_substitution(self.substitution)
class SubstitutionDesugaring(DesugaringTransform):
@@ -110,19 +117,6 @@ def eval_column_val( # type: ignore[override]
else:
return super().eval_column_val(expression, **kwargs)
- def eval_ix(
- self, expression: expr.ColumnIxExpression, **kwargs
- ) -> expr.ColumnIxExpression:
- column_expression = super().eval_column_val(
- expression._column_expression, **kwargs
- )
- keys_expression = self.eval_expression(expression._keys_expression, **kwargs)
- return expr.ColumnIxExpression(
- column_expression=column_expression,
- keys_expression=keys_expression,
- optional=expression._optional,
- )
-
def eval_require(
self, expression: expr.RequireExpression, **kwargs
) -> expr.RequireExpression:
@@ -201,14 +195,6 @@ def eval_reducer(
]
return expr.ReducerExpression(expression._reducer, *args)
- def eval_reducer_ix(
- self, expression: expr.ReducerIxExpression, **kwargs
- ) -> expr.ReducerIxExpression:
- select_desugar = TableSelectDesugaring(self.table_like._joinable_to_group)
- arg = cast(expr.ColumnIxExpression, expression._args[0])
- arg_ix = select_desugar.eval_ix(arg, **kwargs)
- return expr.ReducerIxExpression(expression._reducer, arg_ix)
-
ColExprT = TypeVar("ColExprT", bound=expr.ColumnExpression)
@@ -260,15 +246,12 @@ def _desugar_this_kwargs(
def combine_args_kwargs(
- args: Iterable[expr.ColumnReference | expr.ColumnIxExpression],
+ args: Iterable[expr.ColumnReference],
kwargs: Mapping[str, Any],
) -> Dict[str, expr.ColumnExpression]:
all_args = {}
def add(name, expression):
- from pathway.internals import table
-
- assert not isinstance(expression, table.Table)
if name in all_args:
raise ValueError(f"Duplicate expression value given for {name}")
if name == "id":
diff --git a/python/pathway/internals/dtype.py b/python/pathway/internals/dtype.py
index 711a2b87..684c154d 100644
--- a/python/pathway/internals/dtype.py
+++ b/python/pathway/internals/dtype.py
@@ -3,6 +3,7 @@
from __future__ import annotations
import collections
+import types
import typing
from abc import ABC
from types import EllipsisType
@@ -11,7 +12,7 @@
import numpy as np
import pandas as pd
-from pathway.internals import api
+from pathway.internals import api, json
if typing.TYPE_CHECKING:
from pathway.internals.schema import Schema
@@ -44,6 +45,12 @@ def __class_getitem__(cls, args):
else:
return cls(args) # type: ignore[call-arg]
+ def equivalent_to(self, other: DType) -> bool:
+ return dtype_equivalence(self, other)
+
+ def is_subclass_of(self, other: DType) -> bool:
+ return dtype_issubclass(self, other)
+
class _SimpleDType(DType):
wrapped: type
@@ -233,6 +240,23 @@ def __new__(cls, *args: DType | EllipsisType) -> Tuple | List: # type: ignore[m
return cls._cached_new(args)
+class Json(DType):
+ def __new__(cls) -> Json:
+ return cls._cached_new()
+
+ def _set_args(self):
+ pass
+
+ def __repr__(self) -> str:
+ return "Json"
+
+ def to_engine(self) -> api.PathwayType:
+ return api.PathwayType.JSON
+
+
+JSON: DType = Json()
+
+
class List(DType):
wrapped: DType
@@ -315,6 +339,7 @@ def wrap(input_type) -> DType:
assert input_type != Callable
assert input_type != Array
assert input_type != List
+ assert input_type != Json
if isinstance(input_type, DType):
return input_type
if input_type == _NoneType:
@@ -347,7 +372,7 @@ def wrap(input_type) -> DType:
assert isinstance(ret_type, DType), type(ret_type)
return Callable(callable_args, ret_type)
elif (
- typing.get_origin(input_type) is typing.Union
+ typing.get_origin(input_type) in (typing.Union, types.UnionType)
and len(typing.get_args(input_type)) == 2
and isinstance(None, typing.get_args(input_type)[1])
):
@@ -356,6 +381,12 @@ def wrap(input_type) -> DType:
return Optional(arg)
elif input_type in [list, tuple, typing.List, typing.Tuple]:
return ANY_TUPLE
+ elif (
+ input_type == json.Json
+ or input_type == dict
+ or typing.get_origin(input_type) == dict
+ ):
+ return JSON
elif typing.get_origin(input_type) == list:
args = get_args(input_type)
(arg,) = args
diff --git a/python/pathway/internals/expression.py b/python/pathway/internals/expression.py
index 5be3a600..7c459d4a 100644
--- a/python/pathway/internals/expression.py
+++ b/python/pathway/internals/expression.py
@@ -3,7 +3,7 @@
from __future__ import annotations
import dataclasses
-from abc import ABC, abstractmethod
+from abc import ABC
from functools import lru_cache
from typing import (
TYPE_CHECKING,
@@ -201,9 +201,15 @@ def is_not_none(self) -> IsNotNoneExpression:
# will not do the right thing.
__iter__ = None
- def __getitem__(self, index: Union[ColumnExpression, int]) -> ColumnExpression:
- """Extracts element at `index` from an object. The object has to be a Tuple.
- Errors if no element is present at `index`.
+ def __getitem__(self, index: ColumnExpression | int | str) -> ColumnExpression:
+ """Extracts element at `index` from an object. The object has to be a Tuple or Json.
+
+ Index can be effectively `int` for Tuple and `int` or `str` for Json.
+ For Tuples, using negative index can be used to access elements at the end, moving backwards.
+
+ if no element is present at `index`:
+ - returns `json(null)` for Json
+ - raises error for Tuple
Args:
index: Position to extract element at.
@@ -227,19 +233,22 @@ def __getitem__(self, index: Union[ColumnExpression, int]) -> ColumnExpression:
4 | 1 | 4
7 | 3 | 3
"""
- return SequenceGetExpression(self, index, check_if_exists=False)
+ return GetExpression(self, index, check_if_exists=False)
def get(
self,
- index: Union[ColumnExpression, int],
+ index: ColumnExpression | int | str,
default: ColumnExpressionOrValue = None,
) -> ColumnExpression:
- """Extracts element at `index` from an object. The object has to be a Tuple.
+ """Extracts element at `index` from an object. The object has to be a Tuple or Json.
If no element is present at `index`, it returns value specified by a `default` parameter.
+ Index can be effectively `int` for Tuple and `int` or `str` for Json.
+ For Tuples, using negative index can be used to access elements at the end, moving backwards.
+
Args:
index: Position to extract element at.
- default: Value returned when no element is at position `index`.
+ default: Value returned when no element is at position `index`. Defaults to None.
Example:
@@ -265,7 +274,7 @@ def get(
2 | | | 100
3 | | 3 | 3
"""
- return SequenceGetExpression(self, index, default, check_if_exists=True)
+ return GetExpression(self, index, default, check_if_exists=True)
@property
def dt(self) -> DateTimeNamespace:
@@ -327,16 +336,96 @@ def to_string(self) -> MethodCallExpression:
self,
)
+ def as_int(self) -> ConvertExpression:
+ """Converts value to an int or None if not possible.
+ Currently works for Json columns only.
+
+ Example:
+
+ >>> import pathway as pw
+ >>> import pandas as pd
+ >>> class InputSchema(pw.Schema):
+ ... data: dict
+ >>> dt = pd.DataFrame(data={"data": [{"value": 1}, {"value": 2}]})
+ >>> table = pw.debug.table_from_pandas(dt, schema=InputSchema)
+ >>> result = table.select(result=pw.this.data.get("value").as_int())
+ >>> pw.debug.compute_and_print(result, include_id=False)
+ result
+ 1
+ 2
+ """
+ return ConvertExpression(dt.INT, self)
+
+ def as_float(self) -> ConvertExpression:
+ """Converts value to a float or None if not possible.
+ Currently works for Json columns only.
+
+ Example:
+
+ >>> import pathway as pw
+ >>> import pandas as pd
+ >>> class InputSchema(pw.Schema):
+ ... data: dict
+ >>> dt = pd.DataFrame(data={"data": [{"value": 1.5}, {"value": 3.14}]})
+ >>> table = pw.debug.table_from_pandas(dt, schema=InputSchema)
+ >>> result = table.select(result=pw.this.data.get("value").as_float())
+ >>> pw.debug.compute_and_print(result, include_id=False)
+ result
+ 1.5
+ 3.14
+ """
+ return ConvertExpression(dt.FLOAT, self)
+
+ def as_str(self) -> ConvertExpression:
+ """Converts value to a string or None if not possible.
+ Currently works for Json columns only.
+
+ Example:
+
+ >>> import pathway as pw
+ >>> import pandas as pd
+ >>> class InputSchema(pw.Schema):
+ ... data: dict
+ >>> dt = pd.DataFrame(data={"data": [{"value": "dog"}, {"value": "cat"}]})
+ >>> table = pw.debug.table_from_pandas(dt, schema=InputSchema)
+ >>> result = table.select(result=pw.this.data.get("value").as_str())
+ >>> pw.debug.compute_and_print(result, include_id=False)
+ result
+ cat
+ dog
+ """
+ return ConvertExpression(dt.STR, self)
+
+ def as_bool(self) -> ConvertExpression:
+ """Converts value to a bool or None if not possible.
+ Currently works for Json columns only.
+
+ Example:
+
+ >>> import pathway as pw
+ >>> import pandas as pd
+ >>> class InputSchema(pw.Schema):
+ ... data: dict
+ >>> dt = pd.DataFrame(data={"data": [{"value": True}, {"value": False}]})
+ >>> table = pw.debug.table_from_pandas(dt, schema=InputSchema)
+ >>> result = table.select(result=pw.this.data.get("value").as_bool())
+ >>> pw.debug.compute_and_print(result, include_id=False)
+ result
+ False
+ True
+ """
+ return ConvertExpression(dt.BOOL, self)
+
ColumnExpressionOrValue = Union[ColumnExpression, Value]
class ColumnCallExpression(ColumnExpression):
_args: Tuple[ColumnExpression, ...]
- _col_expr: ColumnRefOrIxExpression
+ _col_expr: ColumnReference
def __init__(
- self, col_expr: ColumnRefOrIxExpression, args: Iterable[ColumnExpressionOrValue]
+ self, col_expr: ColumnReference, args: Iterable[ColumnExpressionOrValue]
):
super().__init__()
self._col_expr = col_expr
@@ -353,46 +442,7 @@ def __init__(self, val: Value):
self._deps = []
-class ColumnRefOrIxExpression(ColumnExpression):
- _column: Column
-
- def __init__(self, column: Column):
- super().__init__()
- self._column = column
-
- def __call__(self, *args) -> ColumnExpression:
- return ColumnCallExpression(self, args)
-
- @property
- def _column_with_expression_cls(self) -> Type[ColumnWithExpression]:
- from pathway.internals.column import ColumnWithReference
-
- return ColumnWithReference
-
- @property
- @abstractmethod
- def name(self) -> str:
- ...
-
-
-@dataclasses.dataclass(frozen=True)
-class InternalColRef:
- _table: Table
- _name: str
-
- def to_colref(self) -> ColumnReference:
- return self._table[self._name]
-
- def to_original(self) -> InternalColRef:
- return self.to_colref()._to_original_internal()
-
- def to_column(self) -> Column:
- if self._name == "id":
- return self._table._id_column
- return self._table._columns[self._name]
-
-
-class ColumnReference(ColumnRefOrIxExpression):
+class ColumnReference(ColumnExpression):
"""Reference to the column.
Inherits from ColumnExpression.
@@ -412,11 +462,13 @@ class ColumnReference(ColumnRefOrIxExpression):
True
"""
+ _column: Column
_table: Table
_name: str
def __init__(self, *, column: Column, table: Table, name: str):
- super().__init__(column)
+ super().__init__()
+ self._column = column
self._table = table
self._name = name
self._deps = []
@@ -477,36 +529,31 @@ def _dependencies_above_reducer(self) -> helpers.StableSet[InternalColRef]:
def _dependencies_below_reducer(self) -> helpers.StableSet[InternalColRef]:
return helpers.StableSet()
+ def __call__(self, *args) -> ColumnExpression:
+ return ColumnCallExpression(self, args)
-class ColumnIxExpression(ColumnRefOrIxExpression):
- _keys_expression: ColumnExpression
- _column_expression: ColumnReference
- _optional: bool
+ @property
+ def _column_with_expression_cls(self) -> Type[ColumnWithExpression]:
+ from pathway.internals.column import ColumnWithReference
- def __init__(
- self,
- *,
- keys_expression: ColumnExpression,
- column_expression: ColumnReference,
- optional: bool,
- ):
- super().__init__(column_expression._column)
- self._keys_expression = keys_expression
- self._column_expression = column_expression
- self._optional = optional
- self._deps = [self._keys_expression, self._column_expression]
+ return ColumnWithReference
- @lru_cache
- def _dependencies_above_reducer(self) -> helpers.StableSet[InternalColRef]:
- return self._keys_expression._dependencies_above_reducer()
- @lru_cache
- def _dependencies_below_reducer(self) -> helpers.StableSet[InternalColRef]:
- return self._keys_expression._dependencies_below_reducer()
+@dataclasses.dataclass(frozen=True)
+class InternalColRef:
+ _table: Table
+ _name: str
- @property
- def name(self) -> str:
- return self._column_expression.name
+ def to_colref(self) -> ColumnReference:
+ return self._table[self._name]
+
+ def to_original(self) -> InternalColRef:
+ return self.to_colref()._to_original_internal()
+
+ def to_column(self) -> Column:
+ if self._name == "id":
+ return self._table._id_column
+ return self._table._columns[self._name]
class ColumnBinaryOpExpression(ColumnExpression):
@@ -559,11 +606,6 @@ def _dependencies_below_reducer(self) -> helpers.StableSet[InternalColRef]:
)
-class ReducerIxExpression(ReducerExpression):
- def __init__(self, reducer: UnaryReducer, arg: ColumnIxExpression):
- super().__init__(reducer, arg)
-
-
class CountExpression(ColumnExpression):
def __init__(self):
super().__init__()
@@ -632,7 +674,7 @@ def __init__(
class CastExpression(ColumnExpression):
- _return_type: Any
+ _return_type: dt.DType
_expr: ColumnExpression
def __init__(self, return_type: Any, expr: ColumnExpressionOrValue):
@@ -642,6 +684,17 @@ def __init__(self, return_type: Any, expr: ColumnExpressionOrValue):
self._deps = [self._expr]
+class ConvertExpression(ColumnExpression):
+ _return_type: dt.DType
+ _expr: ColumnExpression
+
+ def __init__(self, return_type: dt.DType, expr: ColumnExpressionOrValue):
+ super().__init__()
+ self._return_type = dt.Optional(return_type)
+ self._expr = _wrap_arg(expr)
+ self._deps = [self._expr]
+
+
class DeclareTypeExpression(ColumnExpression):
_return_type: Any
_expr: ColumnExpression
@@ -745,17 +798,17 @@ def __init__(self, *args: ColumnExpressionOrValue):
self._deps = self._args
-class SequenceGetExpression(ColumnExpression):
+class GetExpression(ColumnExpression):
_object: ColumnExpression
_index: ColumnExpression
_default: ColumnExpression
_check_if_exists: bool
- _const_index: Optional[int]
+ _const_index: Optional[int | str]
def __init__(
self,
object: ColumnExpression,
- index: Union[ColumnExpression, int],
+ index: ColumnExpression | int | str,
default: ColumnExpressionOrValue = None,
check_if_exists=True,
) -> None:
@@ -765,7 +818,7 @@ def __init__(
self._default = _wrap_arg(default)
self._check_if_exists = check_if_exists
if isinstance(self._index, ColumnConstExpression) and isinstance(
- self._index._val, int
+ self._index._val, (int, str)
):
self._const_index = self._index._val
else:
@@ -862,7 +915,7 @@ def _wrap_arg(arg: ColumnExpressionOrValue) -> ColumnExpression:
def smart_name(arg: ColumnExpression) -> Optional[str]:
from pathway.internals.reducers import _any, _unique
- if isinstance(arg, ColumnRefOrIxExpression):
+ if isinstance(arg, ColumnReference):
return arg.name
if isinstance(arg, ReducerExpression) and arg._reducer in [_unique, _any]:
r_args = arg._args
diff --git a/python/pathway/internals/expression_printer.py b/python/pathway/internals/expression_printer.py
index 2566d38d..3299ed29 100644
--- a/python/pathway/internals/expression_printer.py
+++ b/python/pathway/internals/expression_printer.py
@@ -6,8 +6,10 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Iterable
+from pathway.internals import dtype as dt
+from pathway.internals import expression as expr
+
if TYPE_CHECKING:
- import pathway.internals.expression as expr
from pathway.internals.table import Table
from pathway.internals.trace import Trace
@@ -33,7 +35,12 @@ def print_table_infos(self):
)
def eval_column_val(self, expression: expr.ColumnReference):
- return f"
.{expression._name}"
+ from pathway.internals.thisclass import ThisMetaclass
+
+ if isinstance(expression._table, ThisMetaclass):
+ return f"{expression._table}.{expression._name}"
+ else:
+ return f".{expression._name}"
def eval_unary_op(self, expression: expr.ColumnUnaryOpExpression):
symbol = getattr(expression._operator, "_symbol", expression._operator.__name__)
@@ -54,9 +61,6 @@ def eval_reducer(self, expression: expr.ReducerExpression):
name = expression._reducer.name
return f"pathway.reducers.{name}({args})"
- def eval_reducer_ix(self, expression: expr.ReducerIxExpression):
- return self.eval_reducer(expression)
-
def eval_count(self, expression: expr.CountExpression):
return "pathway.reducers.count()"
@@ -81,14 +85,6 @@ def eval_pointer(self, expression: expr.PointerExpression):
args = self._eval_args_kwargs(expression._args, kwargs)
return f".pointer_from({args})"
- def eval_ix(self, expression: expr.ColumnIxExpression):
- args = [self.eval_expression(expression._keys_expression)]
- if expression._optional:
- args.append("optional=True")
- args_joined = ", ".join(args)
- name = expression._column_expression._name
- return f".ix({args_joined}).{name}"
-
def eval_call(self, expression: expr.ColumnCallExpression):
args = self._eval_args_kwargs(expression._args)
return self.eval_expression(expression._col_expr) + f"({args})"
@@ -97,6 +93,11 @@ def eval_cast(self, expression: expr.CastExpression):
uexpr = self.eval_expression(expression._expr)
return f"pathway.cast({_type_name(expression._return_type)}, {uexpr})"
+ def eval_convert(self, expression: expr.ConvertExpression):
+ uexpr = self.eval_expression(expression._expr)
+ dtype = dt.unoptionalize(expression._return_type)
+ return f"pathway.as_{_type_name(dtype).lower()}({uexpr})"
+
def eval_declare(self, expression: expr.DeclareTypeExpression):
uexpr = self.eval_expression(expression._expr)
return f"pathway.declare_type({_type_name(expression._return_type)}, {uexpr})"
@@ -147,9 +148,10 @@ def eval_make_tuple(self, expression: expr.MakeTupleExpression):
args = self._eval_args_kwargs(expression._args)
return f"pathway.make_tuple({args})"
- def eval_sequence_get(self, expression: expr.SequenceGetExpression):
+ def eval_get(self, expression: expr.GetExpression):
object = self.eval_expression(expression._object)
args = [expression._index]
+
if expression._check_if_exists:
args += [expression._default]
args_formatted = self._eval_args_kwargs(args)
diff --git a/python/pathway/internals/expression_visitor.py b/python/pathway/internals/expression_visitor.py
index b9cacad0..6e1af64c 100644
--- a/python/pathway/internals/expression_visitor.py
+++ b/python/pathway/internals/expression_visitor.py
@@ -3,10 +3,13 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import Callable, Dict, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Type, TypeVar
from pathway.internals import expression as expr
+if TYPE_CHECKING:
+ from pathway.internals.table import Table
+
class ExpressionVisitor(ABC):
def eval_expression(self, expression, **kwargs):
@@ -15,14 +18,13 @@ def eval_expression(self, expression, **kwargs):
expr.ColumnUnaryOpExpression: self.eval_unary_op,
expr.ColumnBinaryOpExpression: self.eval_binary_op,
expr.ReducerExpression: self.eval_reducer,
- expr.ReducerIxExpression: self.eval_reducer_ix,
expr.CountExpression: self.eval_count,
expr.ApplyExpression: self.eval_apply,
expr.ColumnConstExpression: self.eval_const,
- expr.ColumnIxExpression: self.eval_ix,
expr.ColumnCallExpression: self.eval_call,
expr.PointerExpression: self.eval_pointer,
expr.CastExpression: self.eval_cast,
+ expr.ConvertExpression: self.eval_convert,
expr.DeclareTypeExpression: self.eval_declare,
expr.CoalesceExpression: self.eval_coalesce,
expr.RequireExpression: self.eval_require,
@@ -30,7 +32,7 @@ def eval_expression(self, expression, **kwargs):
expr.NumbaApplyExpression: self.eval_numbaapply,
expr.AsyncApplyExpression: self.eval_async_apply,
expr.MakeTupleExpression: self.eval_make_tuple,
- expr.SequenceGetExpression: self.eval_sequence_get,
+ expr.GetExpression: self.eval_get,
expr.MethodCallExpression: self.eval_method_call,
expr.IsNotNoneExpression: self.eval_not_none,
expr.IsNoneExpression: self.eval_none,
@@ -60,10 +62,6 @@ def eval_const(self, expression: expr.ColumnConstExpression):
def eval_reducer(self, expression: expr.ReducerExpression):
...
- @abstractmethod
- def eval_reducer_ix(self, expression: expr.ReducerIxExpression):
- ...
-
@abstractmethod
def eval_count(self, expression: expr.CountExpression):
...
@@ -85,15 +83,15 @@ def eval_pointer(self, expression: expr.PointerExpression):
...
@abstractmethod
- def eval_ix(self, expression: expr.ColumnIxExpression):
+ def eval_call(self, expression: expr.ColumnCallExpression):
...
@abstractmethod
- def eval_call(self, expression: expr.ColumnCallExpression):
+ def eval_cast(self, expression: expr.CastExpression):
...
@abstractmethod
- def eval_cast(self, expression: expr.CastExpression):
+ def eval_convert(self, expression: expr.ConvertExpression):
...
@abstractmethod
@@ -125,7 +123,7 @@ def eval_make_tuple(self, expression: expr.MakeTupleExpression):
...
@abstractmethod
- def eval_sequence_get(self, expression: expr.SequenceGetExpression):
+ def eval_get(self, expression: expr.GetExpression):
...
@abstractmethod
@@ -181,14 +179,6 @@ def eval_reducer(
args = [self.eval_expression(arg, **kwargs) for arg in expression._args]
return expr.ReducerExpression(expression._reducer, *args)
- def eval_reducer_ix(
- self, expression: expr.ReducerIxExpression, **kwargs
- ) -> expr.ReducerIxExpression:
- [arg] = [self.eval_expression(arg, **kwargs) for arg in expression._args]
- return expr.ReducerIxExpression(
- expression._reducer, cast(expr.ColumnIxExpression, arg)
- )
-
def eval_count(
self, expression: expr.CountExpression, **kwargs
) -> expr.CountExpression:
@@ -237,25 +227,12 @@ def eval_pointer(
optional = expression._optional
return expr.PointerExpression(expression._table, *expr_args, optional=optional)
- def eval_ix(
- self, expression: expr.ColumnIxExpression, **kwargs
- ) -> expr.ColumnIxExpression:
- column_expression = self.eval_expression(
- expression._column_expression, **kwargs
- )
- keys_expression = self.eval_expression(expression._keys_expression, **kwargs)
- return expr.ColumnIxExpression(
- column_expression=column_expression,
- keys_expression=keys_expression,
- optional=expression._optional,
- )
-
def eval_call(
self, expression: expr.ColumnCallExpression, **kwargs
) -> expr.ColumnCallExpression:
expr_args = [self.eval_expression(arg, **kwargs) for arg in expression._args]
col_expr = self.eval_expression(expression._col_expr, **kwargs)
- assert isinstance(col_expr, expr.ColumnRefOrIxExpression)
+ assert isinstance(col_expr, expr.ColumnReference)
return expr.ColumnCallExpression(col_expr=col_expr, args=expr_args)
def eval_cast(
@@ -264,6 +241,12 @@ def eval_cast(
result = self.eval_expression(expression._expr, **kwargs)
return expr.CastExpression(return_type=expression._return_type, expr=result)
+ def eval_convert(
+ self, expression: expr.ConvertExpression, **kwargs
+ ) -> expr.ConvertExpression:
+ result = self.eval_expression(expression._expr, **kwargs)
+ return expr.ConvertExpression(return_type=expression._return_type, expr=result)
+
def eval_declare(
self, expression: expr.DeclareTypeExpression, **kwargs
) -> expr.DeclareTypeExpression:
@@ -312,10 +295,8 @@ def eval_make_tuple(
expr_args = [self.eval_expression(arg, **kwargs) for arg in expression._args]
return expr.MakeTupleExpression(*expr_args)
- def eval_sequence_get(
- self, expression: expr.SequenceGetExpression, **kwargs
- ) -> expr.SequenceGetExpression:
- return expr.SequenceGetExpression(
+ def eval_get(self, expression: expr.GetExpression, **kwargs) -> expr.GetExpression:
+ return expr.GetExpression(
self.eval_expression(expression._object, **kwargs),
self.eval_expression(expression._index, **kwargs),
self.eval_expression(expression._default, **kwargs),
@@ -337,3 +318,20 @@ def eval_unwrap(
) -> expr.UnwrapExpression:
result = self.eval_expression(expression._expr, **kwargs)
return expr.UnwrapExpression(expr=result)
+
+
+class TableCollector(IdentityTransform):
+ table_list: List[Table]
+
+ def __init__(self):
+ self.table_list = []
+
+ def eval_column_val(self, expression: expr.ColumnReference, **kwargs: Any):
+ self.table_list.append(expression.table)
+ return super().eval_column_val(expression, **kwargs)
+
+
+def collect_tables(expression: expr.ColumnExpression) -> List[Table]:
+ collector = TableCollector()
+ collector.eval_expression(expression)
+ return collector.table_list
diff --git a/python/pathway/internals/graph_runner/expression_evaluator.py b/python/pathway/internals/graph_runner/expression_evaluator.py
index b93b2e9f..58a6e2cd 100644
--- a/python/pathway/internals/graph_runner/expression_evaluator.py
+++ b/python/pathway/internals/graph_runner/expression_evaluator.py
@@ -5,17 +5,7 @@
import warnings
from abc import ABC, abstractmethod
from functools import cached_property
-from typing import (
- TYPE_CHECKING,
- Callable,
- ClassVar,
- Dict,
- List,
- Optional,
- Tuple,
- Type,
- cast,
-)
+from typing import TYPE_CHECKING, Callable, ClassVar, Dict, List, Optional, Tuple, Type
import pathway.internals.column as clmn
import pathway.internals.expression as expr
@@ -29,6 +19,7 @@
get_binary_expression,
get_binary_operators_mapping_optionals,
get_cast_operators_mapping,
+ get_convert_operators_mapping,
get_unary_expression,
tuple_handling_operators,
)
@@ -37,8 +28,13 @@
from pathway.internals.graph_runner.state import ScopeState
-def column_eval_properties(column: clmn.ColumnWithContext) -> api.EvalProperties:
- return api.EvalProperties(dtype=column.dtype, trace=column.trace.to_engine())
+def column_eval_properties(column: clmn.Column) -> api.EvalProperties:
+ props = column.properties
+ return api.EvalProperties(
+ trace=column.trace.to_engine(),
+ dtype=props.dtype,
+ append_only=props.append_only,
+ )
class ExpressionEvaluator(ABC):
@@ -358,7 +354,34 @@ def eval_cast(
return result_expression
raise TypeError(
- f"Pathway doesn't support type conversion from {source_type} to {target_type}."
+ f"Pathway doesn't support casting {source_type} to {target_type}."
+ )
+
+ def eval_convert(
+ self,
+ expression: expr.ConvertExpression,
+ eval_state: Optional[RowwiseEvalState] = None,
+ ):
+ arg = self.eval_expression(expression._expr, eval_state=eval_state)
+ source_type = expression._expr._dtype
+ target_type = expression._return_type
+
+ if (
+ dt.dtype_equivalence(target_type, source_type)
+ or dt.dtype_equivalence(dt.Optional(source_type), target_type)
+ or (source_type == dt.NONE and isinstance(target_type, dt.Optional))
+ or target_type == dt.ANY
+ ):
+ return arg
+ if (
+ result_expression := get_convert_operators_mapping(
+ arg, source_type, target_type
+ )
+ ) is not None:
+ return result_expression
+
+ raise TypeError(
+ f"Pathway doesn't support converting {source_type} to {target_type}."
)
def eval_declare(
@@ -458,13 +481,6 @@ def eval_reducer(
):
raise RuntimeError("RowwiseEvaluator encountered ReducerExpression")
- def eval_reducer_ix(
- self,
- expression: expr.ReducerIxExpression,
- eval_state: Optional[RowwiseEvalState] = None,
- ):
- raise RuntimeError("RowwiseEvaluator encountered ReducerIxExpression")
-
def eval_count(
self,
expression: expr.CountExpression,
@@ -472,22 +488,6 @@ def eval_count(
):
raise RuntimeError("RowwiseEvaluator encountered CountExpression")
- def eval_ix(
- self,
- expression: expr.ColumnIxExpression,
- eval_state: Optional[RowwiseEvalState] = None,
- ):
- keys_col: api.Column = self._column_from_expression(expression._keys_expression)
- colref = expression._column_expression
-
- input_universe = self.state.get_universe(colref.table._universe)
- ixer = self.scope.ix(
- keys_col, input_universe, strict=True, optional=expression._optional
- )
-
- input_column = self.state.get_column(expression._column)
- return self.eval_dependency(ixer.ix_column(input_column), eval_state=eval_state)
-
def eval_pointer(
self,
expression: expr.PointerExpression,
@@ -513,18 +513,27 @@ def eval_make_tuple(
]
return api.Expression.make_tuple(*expressions)
- def eval_sequence_get(
+ def eval_get(
self,
- expression: expr.SequenceGetExpression,
+ expression: expr.GetExpression,
eval_state: Optional[RowwiseEvalState] = None,
):
object = self.eval_expression(expression._object, eval_state=eval_state)
index = self.eval_expression(expression._index, eval_state=eval_state)
default = self.eval_expression(expression._default, eval_state=eval_state)
- if expression._check_if_exists:
- return api.Expression.sequence_get_item_checked(object, index, default)
+ object_dtype = expression._object._dtype
+
+ if object_dtype == dt.JSON:
+ if expression._check_if_exists:
+ return api.Expression.json_get_item_checked(object, index, default)
+ else:
+ return api.Expression.json_get_item_unchecked(object, index)
else:
- return api.Expression.sequence_get_item_unchecked(object, index)
+ assert not object_dtype.equivalent_to(dt.Optional(dt.JSON))
+ if expression._check_if_exists:
+ return api.Expression.sequence_get_item_checked(object, index, default)
+ else:
+ return api.Expression.sequence_get_item_unchecked(object, index)
def eval_method_call(
self,
@@ -665,6 +674,27 @@ def output_universe(self) -> api.Universe:
return self.scope.reindex_universe(self.reindexing_column)
+class IxEvaluator(ExpressionEvaluator, context_type=clmn.IxContext):
+ context: clmn.IxContext
+ key_column: api.Column
+ input_universe: api.Universe
+ ixer: api.Ixer
+
+ def _initialize_from_context(self):
+ self.key_column = self.state.get_column(self.context.key_column)
+ self.input_universe = self.state.get_universe(self.context.orig_universe)
+ self.ixer = self.scope.ix(
+ self.key_column,
+ self.input_universe,
+ strict=True,
+ optional=self.context.optional,
+ )
+
+ def eval(self, column: clmn.ColumnWithExpression) -> api.Column:
+ column_to_ix = self.state.get_column(column.dereference())
+ return self.ixer.ix_column(column_to_ix)
+
+
class PromiseSameUniverseEvaluator(
ExpressionEvaluator, context_type=clmn.PromiseSameUniverseContext
):
@@ -792,36 +822,6 @@ def eval_reducer(
column = self.grouper.reducer_column(engine_reducer, arg_column)
return self.eval_dependency(column, eval_state=eval_state)
- def eval_reducer_ix(
- self,
- expression: expr.ReducerIxExpression,
- eval_state: Optional[RowwiseEvalState] = None,
- ):
- [arg] = expression._args
- engine_reducer = expression._reducer.engine_reducer(arg._dtype)
- rowwise_context = self.context.table._context
- column = self.context.table._eval(arg, rowwise_context)
- rowwise = RowwiseEvaluator(
- rowwise_context, self.scope, self.state, self.scope_context
- )
-
- ix_expr: expr.ColumnIxExpression = cast(
- expr.ColumnIxExpression, column.expression
- )
-
- keys_col: api.Column = rowwise._column_from_expression(ix_expr._keys_expression)
-
- input_column = rowwise.state.get_column(ix_expr._column)
-
- ixer = rowwise.scope.ix(
- keys_col, input_column.universe, strict=True, optional=ix_expr._optional
- )
-
- result_column = self.grouper.reducer_ix_column(
- engine_reducer, ixer, input_column
- )
- return self.eval_dependency(result_column, eval_state=eval_state)
-
@cached_property
def output_universe(self) -> api.Universe:
return self.grouper.universe
diff --git a/python/pathway/internals/groupby.py b/python/pathway/internals/groupby.py
index 88ce9ea3..8a6de193 100644
--- a/python/pathway/internals/groupby.py
+++ b/python/pathway/internals/groupby.py
@@ -5,7 +5,7 @@
import itertools
from abc import abstractmethod
from functools import lru_cache
-from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Iterable, Iterator, Optional, Tuple, cast
from pathway.internals.trace import trace_user_frame
@@ -53,6 +53,18 @@ def reduce(
def _operator_dependencies(self) -> StableSet[table.Table]:
...
+ def __getattr__(self, name):
+ return getattr(self._joinable_to_group, name)
+
+ def __getitem__(self, name):
+ return self._joinable_to_group[name]
+
+ def keys(self):
+ return self._joinable_to_group.keys()
+
+ def __iter__(self) -> Iterator[expr.ColumnReference]:
+ return iter(self._joinable_to_group)
+
class GroupedTable(GroupedJoinable, OperatorInput):
"""Result of a groupby operation on a Table.
@@ -83,7 +95,7 @@ def __init__(
grouping_columns: Tuple[expr.InternalColRef, ...],
set_id: bool = False,
):
- super().__init__(Universe(), {thisclass.this: table}, table)
+ super().__init__(Universe(), {thisclass.this: self}, table)
self._grouping_columns = StableSet(grouping_columns)
self._context = clmn.GroupedContext(
table=table,
diff --git a/python/pathway/internals/ix.py b/python/pathway/internals/ix.py
deleted file mode 100644
index 4ae0208b..00000000
--- a/python/pathway/internals/ix.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Copyright © 2023 Pathway
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Iterator
-
-import pathway.internals.expression as expr
-
-if TYPE_CHECKING:
- from pathway.internals.table import Table
-
-
-class IxIndexer:
- """An object that when indexed by some column returns a table with rows specified by that column.
- Accepts both `[]` and `()` indexing.
-
- Example:
-
- >>> import pathway as pw
- >>> t_animals = pw.debug.parse_to_table('''
- ... | epithet | genus
- ... 1 | upupa | epops
- ... 2 | acherontia | atropos
- ... 3 | bubo | scandiacus
- ... 4 | dynastes | hercules
- ... ''')
- >>> t_birds = pw.debug.parse_to_table('''
- ... | desc
- ... 2 | hoopoe
- ... 4 | owl
- ... ''')
- >>> ret = t_birds.select(t_birds.desc, latin=t_animals.ix[t_birds.id].genus)
- >>> pw.debug.compute_and_print(ret, include_id=False)
- desc | latin
- hoopoe | atropos
- owl | hercules
- """
-
- _table: Table
-
- def __init__(self, table):
- self._table = table
-
- def __getitem__(self, keys_expression: expr.ColumnExpression) -> IxAppliedIndexer:
- return IxAppliedIndexer(self._table, keys_expression, False)
-
- def __call__(
- self, keys_expression: expr.ColumnExpression, *, optional: bool = False
- ) -> IxAppliedIndexer:
- return IxAppliedIndexer(self._table, keys_expression, optional)
-
-
-class IxAppliedIndexer:
- _table: Table
- _keys_expression: expr.ColumnExpression
- _optional: bool
-
- def __init__(self, table, keys_expression, optional):
- self._table = table
- self._keys_expression = keys_expression
- self._optional = optional
-
- def _get_colref_by_name(self, name, exception_type) -> expr.ColumnIxExpression:
- expression = getattr(self._table, name)
- if not isinstance(expression, expr.ColumnReference):
- raise exception_type("f{name} is not a column")
- return expr.ColumnIxExpression(
- keys_expression=self._keys_expression,
- column_expression=expression,
- optional=self._optional,
- )
-
- def __getattr__(self, name: str) -> expr.ColumnIxExpression:
- return self._get_colref_by_name(name, AttributeError)
-
- def __getitem__(self, name: str) -> expr.ColumnIxExpression:
- return self._get_colref_by_name(name, KeyError)
-
- def keys(self):
- return self._table.keys()
-
- def __iter__(self) -> Iterator[expr.ColumnIxExpression]:
- return (self[name] for name in self.keys())
diff --git a/python/pathway/internals/json.py b/python/pathway/internals/json.py
new file mode 100644
index 00000000..815dcea1
--- /dev/null
+++ b/python/pathway/internals/json.py
@@ -0,0 +1,70 @@
+# Copyright © 2023 Pathway
+
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from functools import cached_property
+from typing import Any, ClassVar
+
+
+class _JsonEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, Json):
+ return obj.value
+ return super().default(obj)
+
+
+@dataclass(frozen=True, eq=True)
+class Json:
+ """Represents Json value.
+
+ Example:
+ >>> import pathway as pw
+ >>> t1 = pw.debug.parse_to_table('''
+ ... a | b | c
+ ... True | 2 | manul
+ ... ''')
+ >>> @pw.udf
+ ... def to_json(val) -> pw.Json:
+ ... return pw.Json(val)
+ >>> result = t1.select(**{c: to_json(pw.this[c]) for c in t1.column_names()})
+ >>> pw.debug.compute_and_print(result, include_id=False)
+ a | b | c
+ true | 2 | "manul"
+ """
+
+ NULL: ClassVar[Json]
+
+ _value: JsonValue
+
+ def __str__(self) -> str:
+ return json.dumps(self.value)
+
+ def __repr__(self) -> str:
+ return f"pw.Json({self.value!r})"
+
+ @cached_property
+ def value(self) -> JsonValue:
+ if isinstance(self._value, Json):
+ return self._value.value
+ else:
+ return self._value
+
+ @staticmethod
+ def parse(value: str | bytes | bytearray) -> Json:
+ return Json(json.loads(value))
+
+ @staticmethod
+ def dumps(obj: Any) -> str:
+ return json.dumps(obj, cls=_JsonEncoder)
+
+
+JsonValue = (
+ int | float | str | bool | list["JsonValue"] | dict[str, "JsonValue"] | None | Json
+)
+
+
+Json.NULL = Json(None)
+
+all = ["Json"]
diff --git a/python/pathway/internals/operator_mapping.py b/python/pathway/internals/operator_mapping.py
index 0358dce5..7d7ea197 100644
--- a/python/pathway/internals/operator_mapping.py
+++ b/python/pathway/internals/operator_mapping.py
@@ -230,6 +230,27 @@ def get_cast_operators_mapping(
return expression if expression is not None else default
+def get_convert_operators_mapping(
+ expr: api.Expression, source_type: dt.DType, target_type: dt.DType
+) -> Optional[api.Expression]:
+ source_type_engine = _TYPES_TO_ENGINE_MAPPING.get(
+ dt.normalize_dtype(dt.unoptionalize(source_type))
+ )
+ target_type_engine = _TYPES_TO_ENGINE_MAPPING.get(
+ dt.normalize_dtype(dt.unoptionalize(target_type))
+ )
+ assert (
+ source_type_engine is not None and target_type_engine is not None
+ ), "invalid pathway type"
+
+ expression = api.Expression.convert_optional(
+ expr,
+ source_type_engine,
+ target_type_engine,
+ )
+ return expression
+
+
def common_dtype_in_binary_operator(
left_dtype: dt.DType, right_dtype: dt.DType
) -> Optional[dt.DType]:
diff --git a/python/pathway/internals/reducers.py b/python/pathway/internals/reducers.py
index 7d3a8019..3963ed5c 100644
--- a/python/pathway/internals/reducers.py
+++ b/python/pathway/internals/reducers.py
@@ -107,8 +107,6 @@ def return_type(self, arg_type: dt.DType) -> dt.DType:
def _generate_unary_reducer(reducer: UnaryReducer):
def wrapper(arg: expr.ColumnExpression) -> expr.ReducerExpression:
- if isinstance(arg, expr.ColumnIxExpression):
- return expr.ReducerIxExpression(reducer, arg)
return expr.ReducerExpression(reducer, arg)
return wrapper
diff --git a/python/pathway/internals/row_transformer.py b/python/pathway/internals/row_transformer.py
index c4fe7d0c..88bd44c5 100644
--- a/python/pathway/internals/row_transformer.py
+++ b/python/pathway/internals/row_transformer.py
@@ -13,6 +13,7 @@
from pathway.internals import parse_graph, schema
from pathway.internals.api import BasePointer, Pointer, ref_scalar
from pathway.internals.column import MaterializedColumn, MethodColumn
+from pathway.internals.column_properties import ColumnProperties
from pathway.internals.schema import Schema, schema_from_types
from pathway.internals.shadows import inspect
@@ -259,7 +260,7 @@ class AbstractOutputAttribute(ToBeComputedAttribute, ABC):
is_output = True
def to_output_column(self, universe: Universe):
- return MaterializedColumn(self.dtype, universe)
+ return MaterializedColumn(universe, ColumnProperties(dtype=self.dtype))
class Attribute(ToBeComputedAttribute):
@@ -288,7 +289,7 @@ def to_transformer_column(
return tt.MethodTransformerColumn(self, operator)
def to_output_column(self, universe: Universe):
- return MethodColumn(self.dtype, universe)
+ return MethodColumn(universe, ColumnProperties(dtype=self.dtype))
@cached_property
def dtype(self) -> dt.DType:
diff --git a/python/pathway/internals/schema.py b/python/pathway/internals/schema.py
index f38a1137..40b486aa 100644
--- a/python/pathway/internals/schema.py
+++ b/python/pathway/internals/schema.py
@@ -24,6 +24,7 @@
from pathway.internals import dtype as dt
from pathway.internals import trace
+from pathway.internals.column_properties import ColumnProperties
from pathway.internals.runtime_type_check import runtime_type_check
if TYPE_CHECKING:
@@ -158,27 +159,32 @@ def _create_column_definitions(schema: SchemaMetaclass):
columns = {}
for name, annotation in annotations.items():
- coldtype = dt.wrap(annotation)
- column = fields.pop(name, column_definition(dtype=coldtype))
+ col_dtype = dt.wrap(annotation)
+ column = fields.pop(name, column_definition(dtype=col_dtype))
if not isinstance(column, ColumnDefinition):
raise ValueError(
f"`{name}` should be a column definition, found {type(column)}"
)
- name = column.name or name
+ dtype = column.dtype
+ if dtype is None:
+ dtype = col_dtype
- if column.dtype is None:
- column = dataclasses.replace(column, dtype=coldtype)
- column = dataclasses.replace(column, dtype=dt.wrap(column.dtype))
- if coldtype != column.dtype:
- print(coldtype)
- print(column.dtype)
+ if col_dtype != dtype:
raise TypeError(
f"type annotation of column `{name}` does not match column definition"
)
- columns[name] = column
+ name = column.name or name
+
+ columns[name] = ColumnSchema(
+ primary_key=column.primary_key,
+ default_value=column.default_value,
+ dtype=dt.wrap(dtype),
+ name=name,
+ append_only=schema.__properties__.append_only,
+ )
if fields:
names = ", ".join(fields.keys())
@@ -193,7 +199,7 @@ class SchemaProperties:
class SchemaMetaclass(type):
- __columns__: Dict[str, ColumnDefinition]
+ __columns__: Dict[str, ColumnSchema]
__properties__: SchemaProperties
__types__: Dict[str, dt.DType]
@@ -201,8 +207,8 @@ class SchemaMetaclass(type):
def __init__(self, *args, append_only=False, **kwargs) -> None:
super().__init__(*args, **kwargs)
- self.__columns__ = _create_column_definitions(self)
self.__properties__ = SchemaProperties(append_only=append_only)
+ self.__columns__ = _create_column_definitions(self)
self.__types__ = {
name: cast(dt.DType, column.dtype)
for name, column in self.__columns__.items()
@@ -214,12 +220,16 @@ def __or__(self, other: Type[Schema]) -> Type[Schema]: # type: ignore
def properties(self) -> SchemaProperties:
return self.__properties__
- def columns(self) -> Dict[str, ColumnDefinition]:
+ def columns(self) -> Dict[str, ColumnSchema]:
return dict(self.__columns__)
def column_names(self) -> list[str]:
return list(self.keys())
+ def column_properties(self, name: str) -> ColumnProperties:
+ column = self.__columns__[name]
+ return ColumnProperties(dtype=column.dtype, append_only=column.append_only)
+
def primary_key_columns(self) -> Optional[list[str]]:
# There is a distinction between an empty set of columns denoting
# the primary key and None. If any (including empty) set of keys if provided,
@@ -249,7 +259,9 @@ def values(self) -> ValuesView[Any]:
return self.__types__.values()
def update_types(self, **kwargs) -> Type[Schema]:
- columns: Dict[str, ColumnDefinition] = dict(self.__columns__)
+ columns: Dict[str, ColumnDefinition] = {
+ col.name: col.to_definition() for col in self.__columns__.values()
+ }
for name, dtype in kwargs.items():
if name not in columns:
raise ValueError(
@@ -300,6 +312,42 @@ def __eq__(self, other: object) -> bool:
def __hash__(self) -> int:
return hash(self._as_tuple())
+ def assert_equal_to(
+ self,
+ other: Type[Schema],
+ *,
+ allow_superset: bool = False,
+ ignore_primary_keys: bool = True,
+ ) -> None:
+ self_dict = self.as_dict()
+ other_dict = other.as_dict()
+
+ # Check if self has all columns of other
+ if self_dict.keys() < other_dict.keys():
+ missing_columns = other_dict.keys() - self_dict.keys()
+ raise AssertionError(f"schema does not have columns {missing_columns}")
+
+ # Check if types of columns are the same
+ for col in other_dict:
+ assert other_dict[col] == self_dict[col], (
+ f"type of column {col} does not match - its type is {self_dict[col]} in {self.__name__}",
+ f" and {other_dict[col]} in {other.__name__}",
+ )
+
+ # When allow_superset=False, check that self does not have extra columns
+ if not allow_superset and self_dict.keys() > other_dict.keys():
+ extra_columns = self_dict.keys() - other_dict.keys()
+ raise AssertionError(
+ f"there are extra columns: {extra_columns} which are not present in the provided schema"
+ )
+
+ # Check whether primary keys are the same
+ if not ignore_primary_keys:
+ assert self.primary_key_columns() == other.primary_key_columns(), (
+ f"primary keys in the schemas do not match - they are {self.primary_key_columns()} in {self.__name__}",
+ f" and {other.primary_key_columns()} in {other.__name__}",
+ )
+
class Schema(metaclass=SchemaMetaclass):
"""Base class to inherit from when creating schemas.
@@ -355,6 +403,26 @@ def __repr__(self):
_no_default_value_marker = _Undefined()
+@dataclass(frozen=True)
+class ColumnSchema:
+ primary_key: bool
+ default_value: Optional[Any]
+ dtype: dt.DType
+ name: str
+ append_only: bool
+
+ def has_default_value(self) -> bool:
+ return self.default_value != _no_default_value_marker
+
+ def to_definition(self) -> ColumnDefinition:
+ return ColumnDefinition(
+ primary_key=self.primary_key,
+ default_value=self.default_value,
+ dtype=self.dtype,
+ name=self.name,
+ )
+
+
@dataclass(frozen=True)
class ColumnDefinition:
primary_key: bool = False
@@ -362,9 +430,6 @@ class ColumnDefinition:
dtype: Optional[dt.DType] = dt.ANY
name: Optional[str] = None
- def has_default_value(self) -> bool:
- return self.default_value != _no_default_value_marker
-
def __post_init__(self):
assert self.dtype is None or isinstance(self.dtype, dt.DType)
diff --git a/python/pathway/internals/sql.py b/python/pathway/internals/sql.py
index 1d8a2f26..d9d9c7c2 100644
--- a/python/pathway/internals/sql.py
+++ b/python/pathway/internals/sql.py
@@ -35,12 +35,6 @@ def eval_reducer(
self.contains_reducers = True
return super().eval_reducer(expression, **kwargs)
- def eval_reducer_ix(
- self, expression: expr.ReducerIxExpression, **kwargs
- ) -> expr.ReducerIxExpression:
- self.contains_reducers = True
- return super().eval_reducer_ix(expression, **kwargs)
-
def eval_count(
self, expression: expr.CountExpression, **kwargs
) -> expr.CountExpression:
diff --git a/python/pathway/internals/table.py b/python/pathway/internals/table.py
index bef07109..02d96d5a 100644
--- a/python/pathway/internals/table.py
+++ b/python/pathway/internals/table.py
@@ -30,6 +30,7 @@
reduce_args_handler,
select_args_handler,
)
+from pathway.internals.column_properties import ColumnProperties
from pathway.internals.decorators import (
contextualized_operator,
empty_from_schema,
@@ -37,8 +38,8 @@
table_to_datasink,
)
from pathway.internals.desugaring import combine_args_kwargs, desugar
+from pathway.internals.expression_visitor import collect_tables
from pathway.internals.helpers import SetOnceProperty, StableSet
-from pathway.internals.ix import IxIndexer
from pathway.internals.join import Joinable
from pathway.internals.operator import DebugOperator, OutputHandle
from pathway.internals.operator_input import OperatorInput
@@ -784,12 +785,21 @@ def reduce(
"""
return self.groupby().reduce(*args, **kwargs)
- @property
- def ix(self):
- """Return an object that indexed by `[column]` returns new table reindexed.
+ @trace_user_frame
+ def ix(
+ self, expression: expr.ColumnExpression, *, optional: bool = False, context=None
+ ) -> Table:
+ """Reindexes the table using expression values as keys. Uses keys from context, or tries to infer
+ proper context from the expression.
+ If optional is True, then None in expression values result in None values in the result columns.
+ Missing values in table keys result in RuntimeError.
+
+ Context can be anything that allows for `select` or `reduce`, or `pathway.this` construct
+ (latter results in returning a delayed operation, and should be only used when using `ix` inside
+ join().select() or groupby().reduce() sequence).
Returns:
- Indexer: an object that when indexed by some column returns a table with rows specified by that column.
+ Reindexed table with the same set of columns.
Example:
@@ -812,7 +822,76 @@ def ix(self):
hoopoe | atropos
owl | hercules
"""
- return IxIndexer(self)
+
+ if context is None:
+ all_tables = collect_tables(expression)
+ if len(all_tables) == 0:
+ context = thisclass.this
+ elif all(tab == all_tables[0] for tab in all_tables):
+ context = all_tables[0]
+ if context is None:
+ for tab in all_tables:
+ if not isinstance(tab, Table):
+ raise ValueError("Table expected here.")
+ if len(all_tables) == 0:
+ raise ValueError("Const value provided.")
+ context = all_tables[0]
+ for tab in all_tables:
+ assert context._universe.is_equal_to(tab._universe)
+ if isinstance(context, thisclass.ThisMetaclass):
+ return context._delayed_op(
+ lambda table: self.ix(
+ expression=expression, optional=optional, context=table
+ ),
+ qualname=f"{self}.ix(...)",
+ name="ix",
+ )
+ if isinstance(context, groupby.GroupedJoinable):
+ key_col = context.reduce(tmp=expression).tmp
+ else:
+ key_col = context.select(tmp=expression).tmp
+ key_dtype = eval_type(key_col)
+ if (
+ optional and not dt.dtype_issubclass(key_dtype, dt.Optional(dt.POINTER))
+ ) or (not optional and not isinstance(key_dtype, dt.Pointer)):
+ raise TypeError(
+ f"Pathway supports indexing with Pointer type only. The type used was {key_dtype}."
+ )
+ ret = self._ix(key_col, optional)
+ if optional and isinstance(key_dtype, dt.Optional):
+ return ret.update_types(
+ **{name: dt.Optional(ret.schema[name]) for name in ret.keys()}
+ )
+ else:
+ return ret
+
+ def restrict(self, other: Table) -> Table:
+ assert other._universe.is_subset_of(self._universe)
+ return other.select(*[colref for colref in self])
+
+ @contextualized_operator
+ def _ix(
+ self,
+ key_expression: expr.ColumnReference,
+ optional: bool,
+ ) -> Table:
+ key_universe_table = key_expression._table
+ universe = key_universe_table._universe
+ key_column = key_universe_table._eval(key_expression)
+
+ context = clmn.IxContext(universe, self._universe, key_column, optional)
+
+ columns = {
+ name: self._wrap_column_in_context(context, column, name)
+ for name, column in self._columns.items()
+ }
+
+ return Table(
+ columns=columns,
+ universe=universe,
+ pk_columns=self._pk_columns,
+ id_column=clmn.IdColumn(context),
+ )
def __lshift__(self, other: Table) -> Table:
"""Alias to update_cells method.
@@ -1111,6 +1190,7 @@ def _update_rows(self, other: Table) -> Table:
return ret
@trace_user_frame
+ @desugar
def with_columns(self, *args: expr.ColumnReference, **kwargs: Any) -> Table:
"""Updates columns of `self`, according to args and kwargs.
See `table.select` specification for evaluation of args and kwargs.
@@ -1600,7 +1680,10 @@ def flatten(self, *args: expr.ColumnReference, **kwargs: Any) -> Table:
universe = Universe()
flatten_result_column = clmn.MaterializedColumn(
- clmn.FlattenContext.get_flatten_column_dtype(flatten_column), universe
+ universe,
+ ColumnProperties(
+ dtype=clmn.FlattenContext.get_flatten_column_dtype(flatten_column),
+ ),
)
context = clmn.FlattenContext(
universe=universe,
@@ -1637,8 +1720,12 @@ def _sort_experimental(
) -> Table:
if not isinstance(instance, expr.ColumnExpression):
instance = expr.ColumnConstExpression(instance)
- prev_column = clmn.MaterializedColumn(dt.Optional(dt.POINTER), self._universe)
- next_column = clmn.MaterializedColumn(dt.Optional(dt.POINTER), self._universe)
+ prev_column = clmn.MaterializedColumn(
+ self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER))
+ )
+ next_column = clmn.MaterializedColumn(
+ self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER))
+ )
context = clmn.SortingContext(
self._universe,
self._eval(key),
@@ -1722,8 +1809,11 @@ def _eval(
def _from_schema(cls, schema: Type[Schema]) -> Table:
universe = Universe()
columns = {
- name: clmn.MaterializedColumn(type, universe)
- for name, type in schema.as_dict().items()
+ name: clmn.MaterializedColumn(
+ universe,
+ schema.column_properties(name),
+ )
+ for name in schema.column_names()
}
return cls(columns=columns, universe=universe, pk_columns={}, schema=schema)
@@ -1761,7 +1851,7 @@ def to(self, sink: DataSink) -> None:
def _materialize(self, universe: Universe):
columns = {
- name: clmn.MaterializedColumn(column.dtype, universe)
+ name: clmn.MaterializedColumn(universe, column.properties)
for (name, column) in self._columns.items()
}
return Table(
@@ -1795,8 +1885,18 @@ def pointer_from(self, *args: Any, optional=False):
@runtime_type_check
@trace_user_frame
- def ix_ref(self, *args: expr.ColumnExpressionOrValue, optional: bool = False):
- """Returns a row, indexed by its primary keys. Several columns can be used as index.
+ def ix_ref(
+ self, *args: expr.ColumnExpressionOrValue, optional: bool = False, context=None
+ ):
+ """Reindexes the table using expressions as primary keys.
+ Uses keys from context, or tries to infer proper context from the expression.
+ If optional is True, then None in expression values result in None values in the result columns.
+ Missing values in table keys result in RuntimeError.
+
+ Context can be anything that allows for `select` or `reduce`, or `pathway.this` construct
+ (latter results in returning a delayed operation, and should be only used when using `ix` inside
+ join().select() or groupby().reduce() sequence).
+
Args:
args: Column references.
@@ -1834,7 +1934,7 @@ def ix_ref(self, *args: expr.ColumnExpressionOrValue, optional: bool = False):
... David | cat
... ''')
>>> t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
- >>> t3 = t1.select(*pw.this, new_value=t2.ix_ref(pw.this.pet).count)
+ >>> t3 = t1.select(*pw.this, new_value=t2.ix_ref(t1.pet).count)
>>> pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 1
@@ -1853,7 +1953,7 @@ def ix_ref(self, *args: expr.ColumnExpressionOrValue, optional: bool = False):
... David | cat
... ''')
>>> t2 = t1.reduce(count=pw.reducers.count())
- >>> t3 = t1.select(*pw.this, new_value=t2.ix_ref().count)
+ >>> t3 = t1.select(*pw.this, new_value=t2.ix_ref(context=t1).count)
>>> pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 4
@@ -1861,9 +1961,11 @@ def ix_ref(self, *args: expr.ColumnExpressionOrValue, optional: bool = False):
Carole | cat | 4
David | cat | 4
"""
- # or maybe ref should stay as a wrapper for a tuple, so that t.ix(ref(*args)) is t.ix(t.pointer_from(*args))
- # but we disallow select(foo=ref(bar)) and require pointer_from there?
- return self.ix(self.pointer_from(*args, optional=optional), optional=optional)
+ return self.ix(
+ self.pointer_from(*args, optional=optional),
+ optional=optional,
+ context=context,
+ )
def _subtables(self) -> StableSet[Table]:
return StableSet([self])
diff --git a/python/pathway/internals/table_slice.py b/python/pathway/internals/table_slice.py
index 0205ca0d..77835625 100644
--- a/python/pathway/internals/table_slice.py
+++ b/python/pathway/internals/table_slice.py
@@ -118,6 +118,22 @@ def with_prefix(self, prefix: str) -> TableSlice:
def with_suffix(self, suffix: str) -> TableSlice:
return self.rename({name: name + suffix for name in self.keys()})
+ @trace_user_frame
+ def ix(self, expression, *, optional: bool = False, context=None) -> TableSlice:
+ new_table = self._table.ix(expression, optional=optional, context=context)
+ return TableSlice(
+ {name: new_table[colref._name] for name, colref in self._mapping.items()},
+ new_table,
+ )
+
+ @trace_user_frame
+ def ix_ref(self, *args, optional: bool = False, context=None) -> TableSlice:
+ new_table = self._table.ix_ref(*args, optional=optional, context=context)
+ return TableSlice(
+ {name: new_table[colref._name] for name, colref in self._mapping.items()},
+ new_table,
+ )
+
@property
def slice(self):
return self
diff --git a/python/pathway/internals/thisclass.py b/python/pathway/internals/thisclass.py
index 5d7ec76a..41ffe293 100644
--- a/python/pathway/internals/thisclass.py
+++ b/python/pathway/internals/thisclass.py
@@ -12,8 +12,6 @@
import itertools
-from pathway.internals.ix import IxIndexer
-
KEY_GUARD = "__pathway_kwargs_hack"
_key_guard_counter = itertools.count()
@@ -59,6 +57,12 @@ def with_prefix(self, *args, **kwargs):
def with_suffix(self, *args, **kwargs):
return self._create_mock("with_suffix", args, kwargs)
+ def ix(self, *args, **kwargs):
+ return self._create_mock("ix", args, kwargs)
+
+ def ix_ref(self, *args, **kwargs):
+ return self._create_mock("ix_ref", args, kwargs)
+
@property
def slice(self):
return self
@@ -106,13 +110,6 @@ def keys(self):
def __call__(self, *args, **kwargs):
raise TypeError("You cannot instantiate `this` class.")
- @property
- def ix(self):
- return IxIndexer(self)
-
- def ix_ref(self, *args: expr.ColumnExpression):
- return self.ix(self.pointer_from(*args))
-
def pointer_from(self, *args: Any, optional=False):
return expr.PointerExpression(self, *args, optional=optional) # type: ignore
@@ -133,21 +130,49 @@ def _eval_substitution(
def _create_mock(self, name, args, kwargs) -> ThisMetaclass:
raise NotImplementedError
+ @classmethod
+ def _delayed_op(self, op, *, name=None, qualname=None):
+ raise NotImplementedError
+
+ @classmethod
+ def _delay_depth(self):
+ raise NotImplementedError
+
class _those(metaclass=ThisMetaclass):
@classmethod
def _eval_table(self, table: Joinable) -> Joinable:
return table
+ @classmethod
+ def _delay_depth(self):
+ return 0
+
@classmethod
def _create_mock(self, name, args, kwargs):
+ ret = self._delayed_op(
+ lambda table: getattr(table.slice, name)(*args, **kwargs),
+ qualname=f"{self.__qualname__}.{name}(...)",
+ name=name,
+ )
+
+ return ret
+
+ @classmethod
+ def _delayed_op(self, op, *, name=None, qualname=None):
class subclass(self): # type: ignore[valid-type,misc]
@classmethod
def _eval_table(cls, table):
- return getattr(super()._eval_table(table).slice, name)(*args, **kwargs)
+ return op(super()._eval_table(table))
+
+ @classmethod
+ def _delay_depth(self):
+ return super()._delay_depth() + 1
- subclass.__qualname__ = self.__qualname__ + "." + name + "(...)"
- subclass.__name__ = name
+ if name is not None:
+ subclass.__name__ = name
+ if qualname is not None:
+ subclass.__qualname__ = qualname
return subclass
diff --git a/python/pathway/internals/type_interpreter.py b/python/pathway/internals/type_interpreter.py
index 6f44e53b..68163fe0 100644
--- a/python/pathway/internals/type_interpreter.py
+++ b/python/pathway/internals/type_interpreter.py
@@ -58,6 +58,9 @@ def _eval_column_val(
expression: expr.ColumnReference,
state: Optional[TypeInterpreterState] = None,
) -> dt.DType:
+ from pathway.internals.table import Table
+
+ assert isinstance(expression._table, Table)
dtype = expression._column.dtype
assert state is not None
assert isinstance(dtype, dt.DType)
@@ -194,15 +197,6 @@ def _eval_reducer(
[arg] = expression._args
return expression._reducer.return_type(arg._dtype)
- def eval_reducer_ix(
- self,
- expression: expr.ReducerIxExpression,
- state: Optional[TypeInterpreterState] = None,
- **kwargs,
- ) -> expr.ReducerIxExpression:
- expression = super().eval_reducer_ix(expression, state=state, **kwargs)
- return _wrap(expression, self._eval_reducer(expression))
-
def eval_count(
self,
expression: expr.CountExpression,
@@ -257,29 +251,6 @@ def eval_call(
assert dt.dtype_issubclass(arg_dtype, arg_annot)
return _wrap(expression, ret_type)
- def eval_ix(
- self,
- expression: expr.ColumnIxExpression,
- state: Optional[TypeInterpreterState] = None,
- **kwargs,
- ) -> expr.ColumnIxExpression:
- expression = super().eval_ix(expression, state=state, **kwargs)
- key_dtype = expression._keys_expression._dtype
- if expression._optional:
- if not dt.dtype_issubclass(key_dtype, dt.Optional(dt.POINTER)):
- raise TypeError(
- f"Pathway supports indexing with Pointer type only. The type used was {key_dtype}."
- )
- if not isinstance(key_dtype, dt.Optional):
- return _wrap(expression, expression._column.dtype)
- return _wrap(expression, dt.Optional(expression._column.dtype))
- else:
- if not isinstance(key_dtype, dt.Pointer):
- raise TypeError(
- f"Pathway supports indexing with Pointer type only. The type used was {key_dtype}."
- )
- return _wrap(expression, expression._column.dtype)
-
def eval_pointer(
self,
expression: expr.PointerExpression,
@@ -304,6 +275,15 @@ def eval_cast(
expression = super().eval_cast(expression, state=state, **kwargs)
return _wrap(expression, expression._return_type)
+ def eval_convert(
+ self,
+ expression: expr.ConvertExpression,
+ state: Optional[TypeInterpreterState] = None,
+ **kwargs,
+ ) -> expr.ConvertExpression:
+ expression = super().eval_convert(expression, state=state, **kwargs)
+ return _wrap(expression, expression._return_type)
+
def eval_declare(
self,
expression: expr.DeclareTypeExpression,
@@ -403,79 +383,109 @@ def eval_make_tuple(
dtypes = tuple(arg._dtype for arg in expression._args)
return _wrap(expression, dt.Tuple(*dtypes))
- def eval_sequence_get(
+ def _eval_json_get(
+ self,
+ expression: expr.GetExpression,
+ state: Optional[TypeInterpreterState] = None,
+ **kwargs,
+ ) -> expr.GetExpression:
+ return _wrap(expression, dt.JSON)
+
+ def eval_get(
self,
- expression: expr.SequenceGetExpression,
+ expression: expr.GetExpression,
state: Optional[TypeInterpreterState] = None,
**kwargs,
- ) -> expr.SequenceGetExpression:
- expression = super().eval_sequence_get(expression, state=state, **kwargs)
+ ) -> expr.GetExpression:
+ expression = super().eval_get(expression, state=state, **kwargs)
object_dtype = expression._object._dtype
index_dtype = expression._index._dtype
default_dtype = expression._default._dtype
- if not isinstance(object_dtype, (dt.Tuple, dt.List)) and object_dtype not in [
- dt.ANY,
- dt.Array(),
- ]:
- raise TypeError(f"Object in {expression!r} has to be a sequence.")
- if index_dtype != dt.INT:
- raise TypeError(f"Index in {expression!r} has to be an int.")
-
- if object_dtype == dt.Array():
- warnings.warn(
- f"Object in {expression!r} is of type numpy.ndarray but its number of"
- + " dimensions is not known. Pathway cannot determine the return type"
- + " and will set Any as the return type. Please use "
- + "pathway.declare_type to set the correct return type."
- )
- return _wrap(expression, dt.ANY)
- if object_dtype == dt.ANY:
- return _wrap(expression, dt.ANY)
-
- if isinstance(object_dtype, dt.List):
- if expression._check_if_exists:
- return _wrap(expression, dt.Optional(object_dtype.wrapped))
+ if object_dtype == dt.JSON:
+ # json
+ if not default_dtype.is_subclass_of(dt.Optional(dt.JSON)):
+ raise TypeError(
+ f"Default must be of type {dt.Optional(dt.JSON)}, found {default_dtype}."
+ )
+ if not expression._check_if_exists or default_dtype == dt.JSON:
+ return _wrap(expression, dt.JSON)
else:
- return _wrap(expression, object_dtype.wrapped)
- assert isinstance(object_dtype, dt.Tuple)
- if object_dtype == dt.ANY_TUPLE:
- return _wrap(expression, dt.ANY)
-
- assert not isinstance(object_dtype.args, EllipsisType)
- dtypes = object_dtype.args
+ return _wrap(expression, dt.Optional(dt.JSON))
+ elif object_dtype.equivalent_to(dt.Optional(dt.JSON)):
+ # optional json
+ raise TypeError(f"Cannot get from {dt.Optional(dt.JSON)}.")
+ else:
+ # sequence
+ if not isinstance(
+ object_dtype, (dt.Tuple, dt.List)
+ ) and object_dtype not in [
+ dt.ANY,
+ dt.Array(),
+ ]:
+ raise TypeError(
+ f"Object in {expression!r} has to be a JSON or sequence."
+ )
+ if index_dtype != dt.INT:
+ raise TypeError(f"Index in {expression!r} has to be an int.")
- if (
- expression._const_index is None
- ): # no specified position, index is an Expression
- assert isinstance(dtypes[0], dt.DType)
- return_dtype = dtypes[0]
- for dtype in dtypes[1:]:
- if isinstance(dtype, dt.DType):
- return_dtype = dt.types_lca(return_dtype, dtype)
- if expression._check_if_exists:
- return_dtype = dt.types_lca(return_dtype, default_dtype)
- return _wrap(expression, return_dtype)
-
- try:
- try_ret = dtypes[expression._const_index]
- return _wrap(expression, try_ret)
- except IndexError:
- message = (
- f"Index {expression._const_index} out of range for a tuple of"
- + f" type {object_dtype}."
- )
- if expression._check_if_exists:
- expression_info = get_expression_info(expression)
+ if object_dtype == dt.Array():
warnings.warn(
- message
- + " It refers to the following expression:\n"
- + expression_info
- + "Consider using just the default value without .get()."
+ f"Object in {expression!r} is of type numpy.ndarray but its number of"
+ + " dimensions is not known. Pathway cannot determine the return type"
+ + " and will set Any as the return type. Please use "
+ + "pathway.declare_type to set the correct return type."
)
- return _wrap(expression, default_dtype)
- else:
- raise IndexError(message)
+ return _wrap(expression, dt.ANY)
+ if object_dtype == dt.ANY:
+ return _wrap(expression, dt.ANY)
+
+ if isinstance(object_dtype, dt.List):
+ if expression._check_if_exists:
+ return _wrap(expression, dt.Optional(object_dtype.wrapped))
+ else:
+ return _wrap(expression, object_dtype.wrapped)
+ assert isinstance(object_dtype, dt.Tuple)
+ if object_dtype == dt.ANY_TUPLE:
+ return _wrap(expression, dt.ANY)
+
+ assert not isinstance(object_dtype.args, EllipsisType)
+ dtypes = object_dtype.args
+
+ if (
+ expression._const_index is None
+ ): # no specified position, index is an Expression
+ assert isinstance(dtypes[0], dt.DType)
+ return_dtype = dtypes[0]
+ for dtype in dtypes[1:]:
+ if isinstance(dtype, dt.DType):
+ return_dtype = dt.types_lca(return_dtype, dtype)
+ if expression._check_if_exists:
+ return_dtype = dt.types_lca(return_dtype, default_dtype)
+ return _wrap(expression, return_dtype)
+
+ if not isinstance(expression._const_index, int):
+ raise IndexError("Index n")
+
+ try:
+ try_ret = dtypes[expression._const_index]
+ return _wrap(expression, try_ret)
+ except IndexError:
+ message = (
+ f"Index {expression._const_index} out of range for a tuple of"
+ + f" type {object_dtype}."
+ )
+ if expression._check_if_exists:
+ expression_info = get_expression_info(expression)
+ warnings.warn(
+ message
+ + " It refers to the following expression:\n"
+ + expression_info
+ + "Consider using just the default value without .get()."
+ )
+ return _wrap(expression, default_dtype)
+ else:
+ raise IndexError(message)
def eval_method_call(
self,
diff --git a/python/pathway/internals/utils/convert.py b/python/pathway/internals/utils/convert.py
index dad8f20b..7c1d34e7 100644
--- a/python/pathway/internals/utils/convert.py
+++ b/python/pathway/internals/utils/convert.py
@@ -1,11 +1,13 @@
# Copyright © 2023 Pathway
from datetime import datetime, timedelta
-from typing import Tuple
+from typing import Any, Tuple
import pandas as pd
from dateutil import tz
+from pathway.internals import json
+
MICROSECOND = timedelta(microseconds=1)
@@ -46,3 +48,18 @@ def _pd_timestamp_from_utc_ns(timestamp: int) -> pd.Timestamp:
def _pd_timedelta_from_ns(duration: int) -> pd.Timedelta:
"""Accepts duration in ns"""
return pd.Timedelta(duration)
+
+
+def _parse_to_json(value: str) -> json.Json:
+ """Parse string to value wrapped in pw.Json"""
+ return json.Json.parse(value)
+
+
+def _value_to_json(value: json.JsonValue) -> json.Json:
+ """Returns value wrapped in pw.Json"""
+ return json.Json(value)
+
+
+def _json_dumps(obj: Any) -> str:
+ """Serialize obj as a JSON formatted string."""
+ return json.Json.dumps(obj)
diff --git a/python/pathway/io/_utils.py b/python/pathway/io/_utils.py
index 9b143a28..7d8f0700 100644
--- a/python/pathway/io/_utils.py
+++ b/python/pathway/io/_utils.py
@@ -42,6 +42,7 @@
PathwayType.DATE_TIME_UTC: dt.DATE_TIME_UTC,
PathwayType.DURATION: dt.DURATION,
PathwayType.ARRAY: np.ndarray,
+ PathwayType.JSON: dict,
}
SUPPORTED_INPUT_FORMATS: Set[str] = set(
diff --git a/python/pathway/io/elasticsearch/__init__.py b/python/pathway/io/elasticsearch/__init__.py
index 1c653703..3718d2f5 100644
--- a/python/pathway/io/elasticsearch/__init__.py
+++ b/python/pathway/io/elasticsearch/__init__.py
@@ -74,8 +74,14 @@ def write(table: Table, host: str, auth: ElasticSearchAuth, index_name: str) ->
Now suppose we want to send a Pathway table pets to this local instance of
Elasticsearch.
+
>>> import pathway as pw
- >>> pets = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat")
+ >>> pets = pw.debug.parse_to_table('''
+ ... age | owner | pet
+ ... 10 | Alice | dog
+ ... 9 | Bob | cat
+ ... 8 | Alice | cat
+ ... ''')
It can be done as follows:
@@ -86,7 +92,7 @@ def write(table: Table, host: str, auth: ElasticSearchAuth, index_name: str) ->
... index_name="animals",
... )
- All the updates of table ```pets`` will be indexed to "animals" as well.
+ All the updates of table "pets" will be indexed to "animals" as well.
"""
data_storage = api.DataStorage(
diff --git a/python/pathway/stdlib/graphs/louvain_communities/impl.py b/python/pathway/stdlib/graphs/louvain_communities/impl.py
index aa0b5a8b..6bd1f7c9 100644
--- a/python/pathway/stdlib/graphs/louvain_communities/impl.py
+++ b/python/pathway/stdlib/graphs/louvain_communities/impl.py
@@ -97,13 +97,11 @@ def _propose_clusters(
# aggregate the gains for adjacent clusters, adjust for the self loops
# self loops are counted with weight 0.5, as they were created via graph contraction,
# which counted each loop twice
- aggregated_gain = vertex_cluster_edges.groupby(
- vertex_cluster_edges.u, vertex_cluster_edges.vc
- ).reduce(
- vertex_cluster_edges.u,
- vertex_cluster_edges.vc,
- gain=pw.reducers.sum(vertex_cluster_edges.weight)
- + self_loop_contribution.ix(vertex_cluster_edges.u).contr / 2,
+ aggregated_gain = vertex_cluster_edges.groupby(pw.this.u, pw.this.vc).reduce(
+ pw.this.u,
+ pw.this.vc,
+ gain=pw.reducers.sum(pw.this.weight)
+ + self_loop_contribution.ix(pw.this.u).contr / 2,
)
def louvain_gain(gain, degree, penalty, total):
diff --git a/python/pathway/stdlib/ml/smart_table_ops/_fuzzy_join.py b/python/pathway/stdlib/ml/smart_table_ops/_fuzzy_join.py
index c6a6f771..b72af30e 100644
--- a/python/pathway/stdlib/ml/smart_table_ops/_fuzzy_join.py
+++ b/python/pathway/stdlib/ml/smart_table_ops/_fuzzy_join.py
@@ -7,7 +7,6 @@
from typing import Any, Callable, Optional
import pathway.internals as pw
-from pathway.internals.fingerprints import fingerprint
from pathway.internals.helpers import StableSet
@@ -372,7 +371,7 @@ def _normalize_weight(cnt: float, normalization_type: int) -> float:
).select(
weight=edges_left_light.weight
* edges_right_light.weight
- * features_normalized.ix(edges_left_light.feature).weight,
+ * features_normalized.ix(pw.this.feature).weight,
left=edges_left_light.node,
right=edges_right_light.node,
)
@@ -405,16 +404,19 @@ def _normalize_weight(cnt: float, normalization_type: int) -> float:
)
)
- def weight_to_pseudoweight(weight: float, left_id: Any, right_id: Any) -> Any:
- return (weight, fingerprint(sorted([left_id, right_id]), format="i64"))
+ def weight_to_pseudoweight(weight, left_id, right_id):
+ return pw.if_else(
+ left_id < right_id,
+ pw.make_tuple(weight, left_id, right_id),
+ pw.make_tuple(weight, right_id, left_id),
+ )
node_node = (
pw.Table.concat_reindex(node_node_light, node_node_heavy)
.groupby(pw.this.left, pw.this.right)
.reduce(pw.this.left, pw.this.right, weight=pw.reducers.sum(pw.this.weight))
.with_columns(
- weight=pw.apply(
- weight_to_pseudoweight,
+ weight=weight_to_pseudoweight(
pw.this.weight,
pw.this.left,
pw.this.right,
@@ -438,7 +440,7 @@ def weight_to_pseudoweight(weight: float, left_id: Any, right_id: Any) -> Any:
node_node = node_node.filter(node_node.left < node_node.right)
node_node = node_node.with_columns(
- weight=pw.apply_with_type(lambda x: x[0], float, node_node.weight),
+ weight=pw.this.weight[0],
)
if by_hand_match is not None:
diff --git a/python/pathway/stdlib/temporal/_interval_join.py b/python/pathway/stdlib/temporal/_interval_join.py
index 114bfe58..cb798052 100644
--- a/python/pathway/stdlib/temporal/_interval_join.py
+++ b/python/pathway/stdlib/temporal/_interval_join.py
@@ -172,6 +172,10 @@ def _interval_join(
"upper_bound": (interval.upper_bound, IntervalType),
}
)
+ if left == right:
+ raise ValueError(
+ "Cannot join table with itself. Use .copy() as one of the arguments of the join."
+ )
if interval.lower_bound == interval.upper_bound:
raise ValueError(
"The difference between lower_bound and upper_bound has to be positive in the Table.interval_join().\n"
diff --git a/python/pathway/stdlib/temporal/_window.py b/python/pathway/stdlib/temporal/_window.py
index 9a3ef1b4..07c98b7d 100644
--- a/python/pathway/stdlib/temporal/_window.py
+++ b/python/pathway/stdlib/temporal/_window.py
@@ -451,11 +451,17 @@ def _apply(
}
)
+ if self.at.table == table:
+ at_table = self.at.table.copy()
+ at = at_table[self.at.name]
+ else:
+ at_table = self.at.table
+ at = self.at
return (
temporal.interval_join(
- self.at.table,
+ at_table,
table,
- self.at,
+ at,
key,
temporal.interval(self.lower_bound, self.upper_bound),
)
diff --git a/python/pathway/stdlib/utils/col.py b/python/pathway/stdlib/utils/col.py
index 08716e91..dac0ab17 100644
--- a/python/pathway/stdlib/utils/col.py
+++ b/python/pathway/stdlib/utils/col.py
@@ -255,8 +255,9 @@ def groupby_reduce_majority(
)
res = counts.groupby(counts[column_group_name]).reduce(
counts[column_group_name],
- majority=counts.ix(pw.reducers.argmax(counts._pw_special_count))[
- column_val_name
- ],
+ majority=counts.ix(
+ pw.reducers.argmax(counts._pw_special_count), context=pw.this
+ )[column_val_name],
)
+
return res
diff --git a/python/pathway/tests/temporal/test_interval_joins.py b/python/pathway/tests/temporal/test_interval_joins.py
index feb43aec..961143ce 100644
--- a/python/pathway/tests/temporal/test_interval_joins.py
+++ b/python/pathway/tests/temporal/test_interval_joins.py
@@ -946,6 +946,7 @@ def test_interval_join_coalesce() -> None:
assert_table_equality_wo_index(res, expected)
+@pytest.mark.xfail(reason="Ix and temporal_joins do not mix.")
def test_interval_join_left_ix() -> None:
t1 = pw.debug.table_from_markdown(
"""
@@ -978,7 +979,9 @@ def test_interval_join_left_ix() -> None:
"""
)
join_result = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1))
- res = join_result.select(left_t=t1.t, right_t=t2.t, other=t2.ix[t1.id].t)
+ res = join_result.select(
+ left_t=t1.t, right_t=t2.t, other=t2.ix(t1.id, context=pw.this).t
+ )
assert_table_equality_wo_index(
res,
expected,
@@ -1181,3 +1184,18 @@ def test_interval_joins_typing_on():
pw.temporal.interval(-1, 2),
left_table.col == right_table.col,
)
+
+
+def test_errors_on_equal_tables():
+ t1 = T(
+ """
+ | a | t
+ 0 | 1 | -1
+ """
+ )
+
+ with pytest.raises(
+ ValueError,
+ match="Cannot join table with itself. Use .copy\(\) as one of the arguments of the join.", # noqa
+ ):
+ t1.interval_join(t1, t1.t, t1.t, pw.temporal.interval(-2, 0))
diff --git a/python/pathway/tests/temporal/test_window_joins.py b/python/pathway/tests/temporal/test_window_joins.py
index 23155c27..77c6aa19 100644
--- a/python/pathway/tests/temporal/test_window_joins.py
+++ b/python/pathway/tests/temporal/test_window_joins.py
@@ -345,6 +345,7 @@ def test_window_join_with_time_expressions() -> None:
assert_table_equality_wo_index(res, expected)
+@pytest.mark.xfail(reason="Ix and joins do not mix.")
def test_window_left_join_ix() -> None:
t1 = T(
"""
@@ -389,7 +390,7 @@ def test_window_left_join_ix() -> None:
"""
)
join_result = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2))
- res = join_result.select(x=t1.t, y=t2.t, other=t2.ix[t1.id].t)
+ res = join_result.select(x=t1.t, y=t2.t, other=t2.ix(t1.id).t)
assert_table_equality_wo_index(res, expected)
diff --git a/python/pathway/tests/temporal/test_windows.py b/python/pathway/tests/temporal/test_windows.py
index ab77284b..8667e69e 100644
--- a/python/pathway/tests/temporal/test_windows.py
+++ b/python/pathway/tests/temporal/test_windows.py
@@ -714,3 +714,33 @@ def test_intervals_over_with_shard():
),
)
assert_table_equality_wo_index(result, expected)
+
+
+def test_intervals_over_works_on_same_table():
+ t = T(
+ """
+ | t
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ 4 | 4
+ 5 | 5
+ """
+ )
+ result = pw.temporal.windowby(
+ t,
+ t.t,
+ window=pw.temporal.intervals_over(at=t.t, lower_bound=-2, upper_bound=0),
+ ).reduce(pw.this._pw_window_location, v=pw.reducers.sorted_tuple(pw.this.t))
+
+ df = pd.DataFrame(
+ {
+ "_pw_window_location": [1, 2, 3, 4, 5],
+ "v": [(1,), (1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5)],
+ }
+ )
+ expected = pw.debug.table_from_pandas(
+ df,
+ schema=pw.schema_from_types(_pw_window_location=int, v=typing.List[int]),
+ )
+ assert_table_equality_wo_index(result, expected)
diff --git a/python/pathway/tests/test_column_properties.py b/python/pathway/tests/test_column_properties.py
new file mode 100644
index 00000000..83f78e77
--- /dev/null
+++ b/python/pathway/tests/test_column_properties.py
@@ -0,0 +1,70 @@
+import pathway.internals as pw
+from pathway.internals import dtype as dt
+from pathway.internals.column_properties import ColumnProperties
+from pathway.tests.utils import T
+
+
+def assert_col_props(expr: pw.ColumnReference, properties: ColumnProperties):
+ assert expr._column.properties == properties
+
+
+def test_preserve_dependency_properties():
+ input1 = T(
+ """
+ | a
+ 1 | 42
+ """
+ )
+ input2 = T(
+ """
+ | b
+ 1 | 42
+ """,
+ ).with_universe_of(input1)
+ input3 = T(
+ """
+ | c
+ 1 | 42
+ """,
+ schema=pw.schema_builder(
+ {"c": pw.column_definition(dtype=int)},
+ properties=pw.SchemaProperties(append_only=False),
+ ),
+ ).with_universe_of(input1)
+
+ result = input1.select(a=input1.a, b=input1.a + input2.b, c=input1.a + input3.c)
+
+ assert_col_props(result.a, ColumnProperties(dtype=dt.INT, append_only=True))
+ assert_col_props(result.b, ColumnProperties(dtype=dt.INT, append_only=True))
+ assert_col_props(result.c, ColumnProperties(dtype=dt.INT, append_only=False))
+
+
+def test_preserve_context_dependency_properties():
+ input1 = T(
+ """
+ | a
+ 1 | 42
+ """
+ )
+ input2 = T(
+ """
+ | b
+ 1 | 42
+ """,
+ ).with_universe_of(input1)
+ input3 = T(
+ """
+ | c
+ 1 | 42
+ """,
+ schema=pw.schema_builder(
+ {"c": pw.column_definition(dtype=int)},
+ properties=pw.SchemaProperties(append_only=False),
+ ),
+ ).with_universe_of(input1)
+
+ res1 = input1.filter(pw.this.a == input2.b)
+ res2 = input1.filter(pw.this.a == input3.c)
+
+ assert_col_props(res1.a, ColumnProperties(dtype=dt.INT, append_only=True))
+ assert_col_props(res2.a, ColumnProperties(dtype=dt.INT, append_only=False))
diff --git a/python/pathway/tests/test_common.py b/python/pathway/tests/test_common.py
index d9c7736c..8df34cec 100644
--- a/python/pathway/tests/test_common.py
+++ b/python/pathway/tests/test_common.py
@@ -18,6 +18,7 @@
from pathway.debug import table_from_pandas, table_to_pandas
from pathway.internals import api
from pathway.internals import dtype as dt
+from pathway.internals.decorators import empty_from_schema
from pathway.internals.expression import NumbaApplyExpression
from pathway.tests.utils import (
T,
@@ -555,9 +556,8 @@ def test_indexing_single_value_groupby():
grouped_table = indexed_table.groupby(pw.this.colB).reduce(
pw.this.colB, sum=pw.reducers.sum(pw.this.colA)
)
- returned = indexed_table.select(
- *pw.this, sum=grouped_table.ix_ref(pw.this.colB).sum
- )
+ returned = indexed_table + grouped_table.ix_ref(indexed_table.colB)[["sum"]]
+
expected = T(
"""
colA | colB | sum
@@ -583,10 +583,11 @@ def test_indexing_single_value_groupby_hardcoded_value():
grouped_table = indexed_table.groupby(pw.this.colB).reduce(
pw.this.colB, sum=pw.reducers.sum(pw.this.colA)
)
- returned = indexed_table.select(*pw.this, sum_A=grouped_table.ix_ref("A").sum)
+ returned = indexed_table + grouped_table.ix_ref("A", context=indexed_table)[["sum"]]
+ returned2 = indexed_table.select(*pw.this, sum=grouped_table.ix_ref("A").sum)
expected = T(
"""
- colA | colB | sum_A
+ colA | colB | sum
10 | A | 30
20 | A | 30
30 | B | 30
@@ -594,6 +595,7 @@ def test_indexing_single_value_groupby_hardcoded_value():
"""
)
assert_table_equality_wo_index(returned, expected)
+ assert_table_equality(returned, returned2)
def test_indexing_two_values_groupby():
@@ -613,8 +615,9 @@ def test_indexing_two_values_groupby():
grouped_table = indexed_table.groupby(pw.this.colB, pw.this.colC).reduce(
pw.this.colB, pw.this.colC, sum=pw.reducers.sum(pw.this.colA)
)
- returned = indexed_table.select(
- *pw.this, sum=grouped_table.ix_ref(pw.this.colB, pw.this.colC).sum
+ returned = (
+ indexed_table
+ + grouped_table.ix_ref(indexed_table.colB, indexed_table.colC)[["sum"]]
)
expected = T(
"""
@@ -663,7 +666,7 @@ def test_ixref_optional():
)
returned = indexer.select(
*pw.this,
- sum=grouped_table.ix_ref(pw.this.refB, pw.this.refC, optional=True).sum,
+ sum=grouped_table.ix_ref(indexer.refB, indexer.refC, optional=True).sum,
)
expected = T(
"""
@@ -697,7 +700,8 @@ def test_indexing_two_values_groupby_hardcoded_values():
"""
)
returned = tested_table.select(
- *pw.this, new_value=indexed_table.ix_ref(10, "A").colA
+ *pw.this,
+ new_value=indexed_table.ix_ref(10, "A").colA,
)
expected = T(
"""
@@ -1378,9 +1382,7 @@ def collatz_step(x: float) -> float:
def test_rows_fixpoint():
def min_id_remove(iterated: pw.Table):
min_id_table = iterated.reduce(min_id=pw.reducers.min(iterated.id))
- iterated = iterated.filter(
- iterated.id != min_id_table.ix(min_id_table.pointer_from()).min_id
- )
+ iterated = iterated.filter(iterated.id != min_id_table.ix_ref().min_id)
return dict(iterated=iterated)
ret = pw.iterate(
@@ -2522,6 +2524,31 @@ def test_ix_none_in_source():
assert_table_equality(res, expected)
+def test_ix_no_select():
+ input = T(
+ """
+ | foo | bar
+ 1 | 1 | 4
+ 2 | 1 | 5
+ 3 | 2 | 6
+ """
+ ).with_columns(foo=pw.this.pointer_from(pw.this.foo))
+
+ result = input.ix(input.foo)[["bar"]]
+
+ assert_table_equality(
+ result,
+ T(
+ """
+ | bar
+ 1 | 4
+ 2 | 4
+ 3 | 5
+ """
+ ),
+ )
+
+
def test_ix_self_select():
input = T(
"""
@@ -2532,7 +2559,7 @@ def test_ix_self_select():
"""
).with_columns(foo=pw.this.pointer_from(pw.this.foo))
- result = input.select(result=input.ix(input.foo).bar)
+ result = input.select(result=input.ix(pw.this.foo).bar)
assert_table_equality(
result,
@@ -2547,6 +2574,34 @@ def test_ix_self_select():
)
+def test_groupby_ix_this():
+ left = T(
+ """
+ pet | owner | age
+ dog | Alice | 10
+ dog | Bob | 9
+ cat | Alice | 8
+ cat | Bob | 7
+ """
+ )
+
+ res = left.groupby(left.pet).reduce(
+ age=pw.reducers.max(pw.this.age),
+ owner=pw.this.ix(pw.reducers.argmax(pw.this.age)).owner,
+ )
+
+ assert_table_equality_wo_index(
+ res,
+ T(
+ """
+ age | owner
+ 10 | Alice
+ 8 | Alice
+ """
+ ),
+ )
+
+
def test_groupby_simplest():
left = T(
"""
@@ -2980,8 +3035,8 @@ def test_argmin_argmax_tie():
res = table.groupby(table.age).reduce(
table.age,
- min=table.ix(pw.reducers.argmin(table.age)).name,
- max=table.ix(pw.reducers.argmax(table.age)).name,
+ min=table.ix(pw.reducers.argmin(table.age), context=pw.this).name,
+ max=table.ix(pw.reducers.argmax(table.age), context=pw.this).name,
)
expected = T(
@@ -3508,7 +3563,7 @@ def test_groupby_ix():
).with_columns(grouper=pw.this.pointer_from(pw.this.grouper))
res = tab.groupby(id=tab.grouper).reduce(
col=pw.reducers.argmax(tab.val),
- output=tab.ix(pw.reducers.argmax(tab.val)).output,
+ output=tab.ix(pw.reducers.argmax(tab.val), context=pw.this).output,
)
expected = T(
"""
@@ -3571,10 +3626,10 @@ def test_join_ix():
)
ret = left.join(right, left.a == right.id, id=left.id).select(
- col=right.ix(left.a).b
+ col=right.ix(left.a, context=pw.this).b
)
- ret3 = right.having(left.a).select(col=right.ix(left.a).b)
+ ret3 = right.having(left.a).select(col=right.ix(left.a, context=pw.this).b)
# below is the desugared version of above computation
# it works, and it's magic
@@ -3972,8 +4027,8 @@ def test_multiple_having():
assert_table_equality_wo_index(
indexed_table.having(indexer1.key, indexer2.key).select(
- col1=indexed_table.ix(indexer1.key).col,
- col2=indexed_table.ix(indexer2.key).col,
+ col1=indexed_table.ix(indexer1.key, context=pw.this).col,
+ col2=indexed_table.ix(indexer2.key, context=pw.this).col,
),
T(
"""
@@ -5438,3 +5493,135 @@ def test_tuple_reducer_consistency():
"""
),
)
+
+
+@pytest.mark.parametrize(
+ "table_schema, schema, allow_superset, ignore_primary_keys",
+ [
+ (
+ {"col_a": pw.column_definition(dtype=int)},
+ {"col_a": pw.column_definition(dtype=int)},
+ False,
+ False,
+ ),
+ (
+ {
+ "col_a": pw.column_definition(dtype=int),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ {"col_a": pw.column_definition(dtype=int)},
+ True,
+ False,
+ ),
+ (
+ {
+ "col_a": pw.column_definition(dtype=int, primary_key=True),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ {"col_a": pw.column_definition(dtype=int, primary_key=True)},
+ True,
+ False,
+ ),
+ (
+ {
+ "col_a": pw.column_definition(dtype=int, primary_key=True),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ {"col_a": pw.column_definition(dtype=int)},
+ True,
+ True,
+ ),
+ ],
+)
+def test_assert_table_has_schema_passes(
+ table_schema, schema, allow_superset, ignore_primary_keys
+):
+ table_schema = pw.schema_builder(table_schema)
+ table = empty_from_schema(table_schema)
+ schema = pw.schema_builder(schema)
+
+ pw.assert_table_has_schema(
+ table,
+ schema,
+ allow_superset=allow_superset,
+ ignore_primary_keys=ignore_primary_keys,
+ )
+
+
+@pytest.mark.parametrize(
+ "table_schema, schema, allow_superset, ignore_primary_keys",
+ [
+ (
+ {"col_a": pw.column_definition(dtype=int)},
+ {"col_a": pw.column_definition(dtype=float)},
+ False,
+ False,
+ ),
+ (
+ {
+ "col_a": pw.column_definition(dtype=int),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ {"col_a": pw.column_definition(dtype=int)},
+ False,
+ False,
+ ),
+ (
+ {"col_a": pw.column_definition(dtype=int, primary_key=True)},
+ {
+ "col_a": pw.column_definition(dtype=int, primary_key=True),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ True,
+ False,
+ ),
+ (
+ {
+ "col_a": pw.column_definition(dtype=int, primary_key=True),
+ "col_b": pw.column_definition(dtype=float),
+ },
+ {"col_a": pw.column_definition(dtype=int)},
+ True,
+ False,
+ ),
+ ],
+)
+def test_assert_table_has_schema_fails(
+ table_schema, schema, allow_superset, ignore_primary_keys
+):
+ table_schema = pw.schema_builder(table_schema)
+ table = empty_from_schema(table_schema)
+ schema = pw.schema_builder(schema)
+
+ with pytest.raises(AssertionError):
+ pw.assert_table_has_schema(
+ table,
+ schema,
+ allow_superset=allow_superset,
+ ignore_primary_keys=ignore_primary_keys,
+ )
+
+
+def test_assert_table_has_schema_default_arguments():
+ table_schema = pw.schema_builder(
+ {
+ "col_a": pw.column_definition(dtype=int, primary_key=True),
+ "col_b": pw.column_definition(dtype=float),
+ }
+ )
+ table = empty_from_schema(table_schema)
+
+ # checking ignore_primary_keys argument
+ schema = pw.schema_from_types(col_a=int, col_b=float)
+ pw.assert_table_has_schema(
+ table,
+ schema,
+ )
+
+ # checking allow_superset argument
+ schema = pw.schema_from_types(col_a=int, col_b=float, col_c=str)
+ with pytest.raises(AssertionError):
+ pw.assert_table_has_schema(
+ table,
+ schema,
+ )
diff --git a/python/pathway/tests/test_expression_repr.py b/python/pathway/tests/test_expression_repr.py
index 86293cd1..63c6f877 100644
--- a/python/pathway/tests/test_expression_repr.py
+++ b/python/pathway/tests/test_expression_repr.py
@@ -17,16 +17,6 @@ def test_column_reference():
assert repr(t.pet) == ".pet"
-def test_column_ix():
- t = T(
- """
- pet | owner | age
- 1 | Alice | 10
- """
- )
- assert repr(t.ix[t.age].pet) == ".ix(.age).pet"
-
-
def test_column_binary_op():
t = T(
"""
@@ -128,6 +118,19 @@ def test_cast():
assert repr(pw.cast(float, t.pet)) == "pathway.cast(FLOAT, .pet)"
+def test_convert():
+ t = T(
+ """
+ | pet
+ 1 | foo
+ """
+ )
+ assert repr(t.pet.as_int()) == "pathway.as_int(.pet)"
+ assert repr(t.pet.as_float()) == "pathway.as_float(.pet)"
+ assert repr(t.pet.as_str()) == "pathway.as_str(.pet)"
+ assert repr(t.pet.as_bool()) == "pathway.as_bool(.pet)"
+
+
def test_declare_type():
t = T(
"""
diff --git a/python/pathway/tests/test_fuzzy_join.py b/python/pathway/tests/test_fuzzy_join.py
index d916d712..c50f5192 100644
--- a/python/pathway/tests/test_fuzzy_join.py
+++ b/python/pathway/tests/test_fuzzy_join.py
@@ -151,13 +151,7 @@ def test_fuzzy_match_many_to_many():
T(
"""
right | left | weight
- 14 | 4 | 0.03125
- 10 | 0 | 0.03125
- 15 | 7 | 0.03125
- 18 | 1 | 0.03125
- 13 | 3 | 0.03125
- 19 | 6 | 0.03125
- 17 | 9 | 0.03125
+ 19 | 9 | 0.03125
""",
),
)
@@ -228,8 +222,6 @@ def test_fuzzy_self_match_many_to_many():
T(
"""
right |left | weight
- 4 | 0 | 0.0625
- 2 | 1 | 0.0625
9 | 8 | 0.0625
""",
),
@@ -369,6 +361,7 @@ def test_smart_large_heavy_light():
expected = T(
"""
left | right | weight
+ b | b | 0.333333
ab | ab | 0.404762
""",
)
diff --git a/python/pathway/tests/test_json.py b/python/pathway/tests/test_json.py
new file mode 100644
index 00000000..597a9907
--- /dev/null
+++ b/python/pathway/tests/test_json.py
@@ -0,0 +1,487 @@
+# Copyright © 2023 Pathway
+
+from __future__ import annotations
+
+import re
+from typing import Any
+
+import pandas as pd
+import pytest
+
+import pathway as pw
+from pathway import dt
+from pathway.debug import table_from_pandas
+from pathway.tests.utils import T, assert_table_equality, run_all
+
+
+def _json_table_from_list(data):
+ class _JsonSubject(pw.io.python.ConnectorSubject):
+ def __init__(self, data: list[dict[str, Any]]) -> None:
+ super().__init__()
+ self.data = data
+
+ def run(self) -> None:
+ for key, row in enumerate(self.data):
+ self.next_json({"key": key + 1, **row})
+
+ schema = pw.schema_builder(
+ columns={
+ "key": pw.column_definition(dtype=int, primary_key=True),
+ **{name: pw.column_definition(dtype=dt.JSON) for name in data[0]},
+ }
+ )
+
+ return pw.io.python.read(_JsonSubject(data), schema=schema).without(pw.this.key)
+
+
+def _json_table(**kwargs) -> pw.Table:
+ return _json_table_from_list([dict(zip(kwargs, v)) for v in zip(*kwargs.values())])
+
+
+def _optional_json_table(**kwargs) -> pw.Table:
+ @pw.udf
+ def filter_null(col: pw.Json) -> pw.Json | None:
+ if col == pw.Json(None):
+ return None
+ return col
+
+ table = _json_table(**kwargs)
+
+ return table.select(**{name: filter_null(pw.this[name]) for name in kwargs})
+
+
+def test_json_get_simple():
+ input = _json_table(data=[{"value": 1}, {"value": 2}])
+
+ result = input.select(ret=pw.this.data.get("value"))
+
+ assert_table_equality(
+ _optional_json_table(ret=[1, 2]),
+ result,
+ )
+
+
+def test_json_get_none():
+ input = _json_table(data=[{}])
+
+ with pytest.raises(
+ TypeError, match=re.escape(f"Cannot get from {dt.Optional(dt.JSON)}.")
+ ):
+ input.select(result=pw.this.data.get("a").get("b"))
+
+
+def test_json_get_default():
+ input = _json_table(
+ data=[
+ {"a": {"b": 1}},
+ {"a": {"b": None}},
+ {"a": {}},
+ {"a": [1, 2, 3]},
+ {"a": 42},
+ {"a": None},
+ {},
+ [1, 2, 3],
+ None,
+ 1,
+ "foo",
+ ]
+ )
+
+ result = input.select(result=pw.this.data.get("a", default={"b": 42}))
+
+ assert_table_equality(
+ _json_table(
+ result=[
+ {"b": 1},
+ {"b": None},
+ {},
+ [1, 2, 3],
+ 42,
+ None,
+ {"b": 42},
+ {"b": 42},
+ {"b": 42},
+ {"b": 42},
+ {"b": 42},
+ ]
+ ),
+ result,
+ )
+
+
+def test_json_get_wo_default():
+ input = _json_table(
+ data=[
+ {"a": {"b": 1}},
+ ]
+ )
+
+ with pytest.raises(
+ TypeError, match=re.escape(rf"Cannot get from {dt.Optional(dt.JSON)}.")
+ ):
+ input.select(result=pw.this.data.get("a").get("b"))
+
+
+def test_json_dict_get_int_index():
+ input = _json_table(
+ data=[
+ {"a": 1},
+ ]
+ )
+
+ result = input.select(result=pw.this.data.get(1))
+
+ assert_table_equality(
+ T(
+ """
+ | result
+ 1 |
+ """
+ ).update_types(result=dt.Optional(dt.JSON)),
+ result,
+ )
+
+
+def test_json_array_get_str_index():
+ input = _json_table(
+ data=[
+ {"a": [1, 2, 3]},
+ ]
+ )
+
+ result = input.select(result=pw.this.data["a"].get("foo"))
+
+ assert_table_equality(
+ T(
+ """
+ | result
+ 1 |
+ """
+ ).update_types(result=dt.Optional(dt.JSON)),
+ result,
+ )
+
+
+def test_json_get_wrong_default():
+ input = _json_table(
+ data=[
+ {"a": {"b": 1}},
+ ]
+ )
+
+ with pytest.raises(
+ TypeError,
+ match=re.escape(rf"Default must be of type {dt.Optional(dt.JSON)}, found INT."),
+ ):
+ input.select(result=pw.this.data.get("a", 42).get("b"))
+
+
+def test_json_get_item():
+ input = _json_table(
+ data=[
+ {"a": {"b": 1}},
+ {"a": {"b": None}},
+ {},
+ {"a": {}},
+ {"a": [1, 2, 3]},
+ {"a": 42},
+ {"a": None},
+ ]
+ )
+
+ result = input.select(result=pw.this.data["a"]["b"])
+
+ assert_table_equality(
+ _json_table(result=[1, None, None, None, None, None, None]),
+ result,
+ )
+
+
+def test_json_get_array_index():
+ input = _json_table(
+ index=[0, 1, 2],
+ data=[{"value": [1, 2, 3]}, {"value": [4, 5, 6]}, {"value": [7, 8, 9]}],
+ )
+
+ result = input.select(result=pw.this.data["value"][pw.this.index.as_int()])
+
+ assert_table_equality(
+ _json_table(result=[1, 5, 9]),
+ result,
+ )
+
+
+@pytest.mark.parametrize("index", [-1, -4, 3])
+def test_json_get_array_index_out_of_bounds(index):
+ input = _json_table(data=[{"value": [0, 1, 2]}])
+
+ result = input.select(result=pw.this.data["value"][index])
+
+ assert_table_equality(
+ _json_table(result=[None]),
+ result,
+ )
+
+
+def test_json_get_item_optional_json():
+ input = _json_table(data=[{}])
+
+ with pytest.raises(
+ TypeError,
+ match=re.escape(f"Cannot get from {dt.Optional(dt.JSON)}."),
+ ):
+ input.select(result=pw.this.data.get("a")["b"])
+
+
+@pytest.mark.parametrize(
+ "from_,to_,method",
+ [
+ (
+ [{"value": 42}, {"value": -1}, {"value": None}, {}],
+ [42, -1, None, None],
+ pw.ColumnExpression.as_int,
+ ),
+ (
+ [
+ {"value": 1.5},
+ {"value": 10},
+ {"value": 0},
+ {"value": -1},
+ {"value": 2**32 + 1},
+ {"value": 2**45 + 1},
+ {"value": None},
+ {},
+ ],
+ [1.5, 10.0, 0.0, -1.0, float(2**32 + 1), float(2**45 + 1), None, None],
+ pw.ColumnExpression.as_float,
+ ),
+ (
+ [{"value": "foo"}, {"value": "42"}, {"value": "true"}, {"value": None}, {}],
+ ["foo", "42", "true", None, None],
+ pw.ColumnExpression.as_str,
+ ),
+ (
+ [{"value": True}, {"value": False}, {"value": None}, {}],
+ [True, False, None, None],
+ pw.ColumnExpression.as_bool,
+ ),
+ ],
+)
+def test_json_as_type(from_, to_, method):
+ to_dtype = type(to_[0])
+
+ input = _json_table(data=from_)
+
+ result = input.select(result=method(pw.this.data.get("value")))
+
+ expected = table_from_pandas(
+ pd.DataFrame({"key": list(range(1, len(to_) + 1)), "result": to_}),
+ schema=pw.schema_builder(
+ columns={
+ "key": pw.column_definition(primary_key=True, dtype=int),
+ "result": pw.column_definition(dtype=dt.Optional(to_dtype)),
+ }
+ ),
+ ).without(pw.this.key)
+
+ assert_table_equality(result, expected)
+
+
+@pytest.mark.parametrize(
+ "value",
+ ["42", "foo", 1.6, True],
+)
+def test_json_as_int_wrong_values(value):
+ input = _json_table(data=[{"value": value}])
+
+ input.select(result=pw.this.data.get("value").as_int())
+
+ with pytest.raises(ValueError):
+ run_all()
+
+
+@pytest.mark.parametrize(
+ "value",
+ ["42", "foo", True],
+)
+def test_json_as_float_wrong_values(value):
+ input = _json_table(data=[{"value": value}])
+
+ input.select(result=pw.this.data.get("value").as_float())
+
+ with pytest.raises(ValueError):
+ run_all()
+
+
+@pytest.mark.parametrize(
+ "value",
+ [1, 1.6, True],
+)
+def test_json_as_str_wrong_values(value):
+ input = _json_table(data=[{"value": value}])
+
+ input.select(result=pw.this.data.get("value").as_str())
+
+ with pytest.raises(ValueError):
+ run_all()
+
+
+@pytest.mark.parametrize(
+ "value",
+ [1, 0, 1.6, "1", "0", "true", "True"],
+)
+def test_json_as_bool_wrong_values(value):
+ input = _json_table(data=[{"value": value}])
+
+ input.select(result=pw.this.data.get("value").as_bool())
+
+ with pytest.raises(ValueError):
+ run_all()
+
+
+def test_json_input():
+ table = _json_table_from_list(
+ [
+ {
+ "a": {"value": 1},
+ "b": 2,
+ "c": 1.5,
+ "d": True,
+ "e": "foo",
+ "f": [1, 2, 3],
+ }
+ ]
+ )
+
+ result = table.select(
+ a=pw.this.a["value"].as_int(),
+ b=pw.this.b.as_int(),
+ c=pw.this.c.as_float(),
+ d=pw.this.d.as_bool(),
+ e=pw.this.e.as_str(),
+ f=pw.this.f[1].as_int(),
+ )
+
+ assert_table_equality(
+ T(
+ """
+ | a | b | c | d | e | f
+ 1 | 1 | 2 | 1.5 | True | foo | 2
+ """
+ ).update_types(
+ a=dt.Optional(int),
+ b=dt.Optional(int),
+ c=dt.Optional(float),
+ d=dt.Optional(bool),
+ e=dt.Optional(str),
+ f=dt.Optional(int),
+ ),
+ result,
+ )
+
+
+def test_json_apply():
+ table = _json_table(a=[1, 2, 3])
+
+ @pw.udf
+ def map(a: pw.Json) -> int:
+ assert isinstance(a.value, int)
+ return a.value + 1
+
+ result = table.select(ret=map(**table))
+
+ assert_table_equality(
+ T(
+ """
+ | ret
+ 1 | 2
+ 2 | 3
+ 3 | 4
+ """
+ ),
+ result,
+ )
+
+
+def test_json_type():
+ table = _json_table(
+ a=[{"value": 1}], b=[2], c=[1.5], d=[True], e="foo", f=[[1, 2, 3]]
+ )
+
+ @pw.udf
+ def assert_types(**kwargs) -> bool:
+ return all(isinstance(arg, pw.Json) for arg in kwargs.values())
+
+ result = table.select(ret=assert_types(**table))
+
+ assert_table_equality(
+ T(
+ """
+ | ret
+ 1 | True
+ """
+ ),
+ result,
+ )
+
+
+def test_json_recursive():
+ table = T(
+ """
+ | value
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ """
+ )
+
+ @pw.udf
+ def wrap(value: int) -> pw.Json:
+ j = pw.Json(pw.Json(pw.Json(value)))
+ assert isinstance(j.value, int)
+ return j
+
+ result = table.select(ret=wrap(pw.this.value).as_int())
+
+ assert_table_equality(
+ T(
+ """
+ | ret
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ """
+ ).update_types(ret=dt.Optional(int)),
+ result,
+ )
+
+
+def test_json_nested():
+ table = T(
+ """
+ | value
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ """
+ )
+
+ @pw.udf
+ def wrap(value: int) -> pw.Json:
+ j = pw.Json([pw.Json(value)])
+ assert isinstance(j.value[0].value, int) # type:ignore
+ return j
+
+ result = table.select(ret=wrap(pw.this.value).get(0).as_int())
+
+ assert_table_equality(
+ T(
+ """
+ | ret
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ """
+ ).update_types(ret=dt.Optional(int)),
+ result,
+ )
diff --git a/python/pathway/tests/test_schema.py b/python/pathway/tests/test_schema.py
index e2105f17..f8ce8762 100644
--- a/python/pathway/tests/test_schema.py
+++ b/python/pathway/tests/test_schema.py
@@ -123,3 +123,12 @@ class Same(pw.Schema):
assert A == Same
assert A == schema_from_builder
assert A == compat_schema
+
+
+def test_schema_canonical_json():
+ class A(pw.Schema):
+ a: dict
+ b: pw.Json
+
+ assert A.columns()["a"].dtype == pw.dt.JSON
+ assert A.columns()["b"].dtype == pw.dt.JSON
diff --git a/python/pathway/tests/test_utils.py b/python/pathway/tests/test_utils.py
index ebe26dba..190f517d 100644
--- a/python/pathway/tests/test_utils.py
+++ b/python/pathway/tests/test_utils.py
@@ -45,7 +45,7 @@ def test_ix_sanitize():
)
t_indexer = t_indexer.select(indexer=t_animals.pointer_from(pw.this.indexer))
ret = t_animals.having(t_indexer.indexer).select(
- genus=t_animals.ix(t_indexer.indexer).genus
+ genus=t_animals.ix(t_indexer.indexer, context=pw.this).genus
)
expected = T(
"""
diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs
index fd20f562..a5ddea6c 100644
--- a/src/connectors/data_format.rs
+++ b/src/connectors/data_format.rs
@@ -611,6 +611,7 @@ fn serialize_value_to_json(value: &Value) -> Result {
Value::DateTimeNaive(dt) => Ok(json!(dt.to_string())),
Value::DateTimeUtc(dt) => Ok(json!(dt.to_string())),
Value::Duration(d) => Ok(json!(d.nanoseconds())),
+ Value::Json(j) => Ok((**j).clone()),
}
}
@@ -623,24 +624,26 @@ fn values_by_names_from_json(
) -> Result, ParseError> {
let mut parsed_values = Vec::with_capacity(field_names.len());
for value_field in field_names {
- let default_value = {
+ let (default_value, dtype) = {
if let Some(schema_item) = schema.get(value_field) {
if let Some(default) = &schema_item.default {
- Some(default)
+ (Some(default), schema_item.type_)
} else {
- None
+ (None, schema_item.type_)
}
} else {
- None
+ (None, Type::Any)
}
};
let value = if let Some(path) = column_paths.get(value_field) {
if let Some(value) = payload.pointer(path) {
- parse_value_from_json(value).ok_or(ParseError::FailedToParseFromJson(
- value_field.to_string(),
- value.clone(),
- ))?
+ match dtype {
+ Type::Json => Value::from(value.clone()),
+ _ => parse_value_from_json(value).ok_or_else(|| {
+ ParseError::FailedToParseFromJson(value_field.to_string(), value.clone())
+ })?,
+ }
} else if let Some(default) = default_value {
default.clone()
} else if field_absence_is_error {
@@ -656,12 +659,15 @@ fn values_by_names_from_json(
let value_specified_in_json = payload.get(value_field).is_some();
if value_specified_in_json {
- parse_value_from_json(&payload[&value_field]).ok_or(
- ParseError::FailedToParseFromJson(
- value_field.to_string(),
- payload[&value_field].clone(),
- ),
- )?
+ match dtype {
+ Type::Json => Value::from(payload[&value_field].clone()),
+ _ => parse_value_from_json(&payload[&value_field]).ok_or_else(|| {
+ ParseError::FailedToParseFromJson(
+ value_field.to_string(),
+ payload[&value_field].clone(),
+ )
+ })?,
+ }
} else if let Some(default) = default_value {
default.clone()
} else if field_absence_is_error {
diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs
index f485ceee..a777120f 100644
--- a/src/connectors/data_storage.rs
+++ b/src/connectors/data_storage.rs
@@ -1343,6 +1343,7 @@ impl PsqlSerializer for Value {
Value::DateTimeNaive(date_time) => date_time.to_string(),
Value::DateTimeUtc(date_time) => date_time.to_string(),
Value::Duration(duration) => duration.nanoseconds().to_string(),
+ Value::Json(json) => json.to_string(),
}
}
}
diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs
index 06ab286d..25158d36 100644
--- a/src/engine/dataflow.rs
+++ b/src/engine/dataflow.rs
@@ -544,7 +544,6 @@ struct Ixer {
none_keys: Option>,
ix_key_policy: IxKeyPolicy,
values_to_keys_arranged: OnceCell>,
- keys_to_values_arranged: OnceCell>,
}
impl Ixer {
@@ -562,7 +561,6 @@ impl Ixer {
none_keys,
ix_key_policy,
values_to_keys_arranged: OnceCell::new(),
- keys_to_values_arranged: OnceCell::new(),
}
}
@@ -573,11 +571,6 @@ impl Ixer {
.arrange()
})
}
-
- fn keys_to_values_arranged(&self) -> &ArrangedByKey {
- self.keys_to_values_arranged
- .get_or_init(|| self.keys_values.arrange())
- }
}
struct Joiner {
left_universe: UniverseHandle,
@@ -851,7 +844,6 @@ struct DataflowGraphInner {
ixers_cache: HashMap<(ColumnHandle, UniverseHandle, IxKeyPolicy), IxerHandle>,
groupers_cache: HashMap<(Vec, Vec, UniverseHandle), GrouperHandle>,
groupers_id_cache: HashMap<(ColumnHandle, Vec, UniverseHandle), GrouperHandle>,
- groupers_ixers_cache: HashMap<(GrouperHandle, IxerHandle), ArrangedByKey>,
concat_cache: HashMap, ConcatHandle>,
joiners_cache: HashMap<
(
@@ -1291,7 +1283,6 @@ impl DataflowGraphInner {
ixers_cache: HashMap::new(),
groupers_cache: HashMap::new(),
groupers_id_cache: HashMap::new(),
- groupers_ixers_cache: HashMap::new(),
concat_cache: HashMap::new(),
joiners_cache: HashMap::new(),
ignore_asserts,
@@ -2303,79 +2294,6 @@ impl DataflowGraphInner {
Ok(new_column_handle)
}
- fn grouper_reducer_column_ix(
- &mut self,
- grouper_handle: GrouperHandle,
- reducer: Reducer,
- ixer_handle: IxerHandle,
- column_handle: ColumnHandle,
- ) -> Result {
- let dataflow_reducer = Self::create_dataflow_reducer(reducer);
-
- let ixer = self
- .ixers
- .get(ixer_handle)
- .ok_or(Error::InvalidIxerHandle)?;
- assert!(ixer.ix_key_policy != IxKeyPolicy::SkipMissing);
-
- let column = self
- .columns
- .get(column_handle)
- .ok_or(Error::InvalidColumnHandle)?;
- if column.universe != ixer.input_universe {
- return Err(Error::UniverseMismatch);
- }
-
- let grouper = self
- .groupers
- .get(grouper_handle)
- .ok_or(Error::InvalidGrouperHandle)?;
-
- let new_source_key_to_result_key = if let Some(val) = self
- .groupers_ixers_cache
- .get(&(grouper_handle, ixer_handle))
- {
- val.clone()
- } else {
- let new_source_key_to_result_key = ixer
- .keys_to_values_arranged()
- .join_core(
- grouper.source_key_to_result_key(),
- |_source_key, col_key, result_key| once((*col_key, *result_key)),
- )
- .arrange_named("grouper_reducer_column_ix::new_source_key_to_result_key");
- self.groupers_ixers_cache.insert(
- (grouper_handle, ixer_handle),
- new_source_key_to_result_key.clone(),
- );
- new_source_key_to_result_key
- };
-
- if !self.ignore_asserts {
- self.assert_collections_same_size(
- &ixer.keys_values,
- &new_source_key_to_result_key.as_collection(|_k, _v| ()),
- );
- };
-
- let new_values = dataflow_reducer.reduce(
- self,
- &new_source_key_to_result_key,
- column.values_arranged(),
- );
-
- grouper.result_keys.get_or_init(|| {
- new_values.map_named("grouper_reducer_column_ix::result_keys", |(key, _value)| {
- key
- })
- });
- let result_universe = self.grouper_universe(grouper_handle)?;
- let new_column_handle = self
- .columns
- .alloc(Column::from_collection(result_universe, new_values));
- Ok(new_column_handle)
- }
-
fn ix(
&mut self,
key_column_handle: ColumnHandle,
@@ -4038,21 +3956,6 @@ impl Graph for InnerDataflowGraph {
.grouper_reducer_column(grouper_handle, reducer, column_handle)
}
- fn grouper_reducer_column_ix(
- &self,
- grouper_handle: GrouperHandle,
- reducer: Reducer,
- ixer_handle: IxerHandle,
- column_handle: ColumnHandle,
- ) -> Result {
- self.0.borrow_mut().grouper_reducer_column_ix(
- grouper_handle,
- reducer,
- ixer_handle,
- column_handle,
- )
- }
-
fn ix(
&self,
key_column_handle: ColumnHandle,
@@ -4510,21 +4413,6 @@ impl> Graph for OuterDataflowGraph
.grouper_reducer_column(grouper_handle, reducer, column_handle)
}
- fn grouper_reducer_column_ix(
- &self,
- grouper_handle: GrouperHandle,
- reducer: Reducer,
- ixer_handle: IxerHandle,
- column_handle: ColumnHandle,
- ) -> Result {
- self.0.borrow_mut().grouper_reducer_column_ix(
- grouper_handle,
- reducer,
- ixer_handle,
- column_handle,
- )
- }
-
fn ix(
&self,
key_column_handle: ColumnHandle,
diff --git a/src/engine/expression.rs b/src/engine/expression.rs
index 7a9319aa..16a79c95 100644
--- a/src/engine/expression.rs
+++ b/src/engine/expression.rs
@@ -15,7 +15,7 @@ use smallvec::SmallVec;
use super::error::{DynError, DynResult};
use super::time::{DateTime, DateTimeNaive, DateTimeUtc, Duration};
-use super::{Error, Key, Value};
+use super::{Error, Key, Type, Value};
use crate::mat_mul::mat_mul;
#[derive(Debug)]
@@ -97,6 +97,8 @@ pub enum AnyExpression {
MakeTuple(Expressions),
TupleGetItemChecked(Arc, Arc, Arc),
TupleGetItemUnchecked(Arc, Arc),
+ JsonGetItem(Arc, Arc, Arc),
+ JsonToOptional(Arc, Type),
ParseStringToInt(Arc, bool),
ParseStringToFloat(Arc, bool),
ParseStringToBool(Arc, Vec, Vec, bool),
@@ -347,6 +349,29 @@ fn get_array_element(
}
}
+fn get_json_item(
+ expr: &Arc,
+ index: &Arc,
+ values: &[Value],
+) -> DynResult