diff --git a/CHANGELOG.md b/CHANGELOG.md index 96cd8f27..ea42cb83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,21 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] -## [0.5.0] - 2023-10-04 +## [0.5.1] - 2023-10-04 -### Added -- `Schema` method `typehints` that returns dict of mypy-compatible typehints. +### Fixed +- `select` operates only on consistent states. -### Changed -- **BREAKING**: renamed `Table` method `dtypes` to `typehints`. It now returns a `dict` of mypy-compatible typehints. -- **BREAKING**: `Schema.__getitem__` returns a data class `ColumnSchema` containing all related information on particular column. +## [0.5.0] - 2023-10-04 ### Added +- `Schema` method `typehints` that returns dict of mypy-compatible typehints. - Support for JSON parsing from CSV sources. - `restrict` method in `Table` to restrict table universe to the universe of the other table. - Better support for postgresql types in the output connector. ### Changed +- **BREAKING**: renamed `Table` method `dtypes` to `typehints`. It now returns a `dict` of mypy-compatible typehints. +- **BREAKING**: `Schema.__getitem__` returns a data class `ColumnSchema` containing all related information on particular column. - **BREAKING**: `tuple` reducer used after intervals_over window now sorts values by time. - **BREAKING**: expressions used in `select`, `filter`, `flatten`, `with_columns`, `with_id`, `with_id_from` have to have the same universe as the table. Earlier it was possible to use an expression from a superset of a table universe. To use expressions from wider universes, one can use `restrict` on the expression source table. - **BREAKING**: `pw.universes.promise_are_equal(t1, t2)` no longer allows to use references from `t1` and `t2` in a single expression. To change the universe of a table, use `with_universe_of`. diff --git a/Cargo.lock b/Cargo.lock index e86f9e29..7374663e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,7 +1457,7 @@ dependencies = [ [[package]] name = "pathway" -version = "0.5.0" +version = "0.5.1" dependencies = [ "arc-swap", "arcstr", diff --git a/Cargo.toml b/Cargo.toml index 1934988f..4951dcca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathway" -version = "0.5.0" +version = "0.5.1" edition = "2021" publish = false rust-version = "1.71.0" diff --git a/integration_tests/s3/test_s3_interops.py b/integration_tests/s3/test_s3_interops.py index cc8b1e90..b7ba74e4 100644 --- a/integration_tests/s3/test_s3_interops.py +++ b/integration_tests/s3/test_s3_interops.py @@ -8,6 +8,7 @@ import boto3 import pandas as pd +import pytest import pathway as pw from pathway.internals.monitoring import MonitoringLevel @@ -342,3 +343,124 @@ def on_end(*args, **kwargs): ] * 2 ) + + +def test_s3_alternative_path(tmp_path: pathlib.Path): + input_s3_path = "integration_tests/test_s3_alternative_path/input.csv" + output_path = tmp_path / "output.csv" + model_output_path = tmp_path / "model_output.csv" + + input_contents = "key,value\n1,Hello\n2,World" + + put_aws_object(input_s3_path, input_contents) + write_lines(model_output_path, input_contents) + + table = pw.io.s3_csv.read( + "s3://aws-integrationtest/{}".format(input_s3_path), + aws_s3_settings=pw.io.s3_csv.AwsS3Settings( + access_key="AKIAX67C7K343BP4QUWN", + secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], + region="eu-central-1", + ), + value_columns=["key", "value"], + mode="static", + autocommit_duration_ms=1000, + ) + + pw.io.csv.write(table, str(output_path)) + pw.run() + + result = pd.read_csv( + output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + expected = pd.read_csv( + model_output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + assert result.equals(expected) + + +def test_s3_wrong_path(tmp_path: pathlib.Path): + input_s3_path = "integration_tests/test_s3_wrong_path/input.csv" + output_path = tmp_path / "output.csv" + + table = pw.io.s3_csv.read( + "s3://aws-integrationtest/{}".format(input_s3_path), + aws_s3_settings=pw.io.s3_csv.AwsS3Settings( + access_key="AKIAX67C7K343BP4QUWN", + secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], + region="eu-central-1", + ), + value_columns=["key", "value"], + mode="static", + autocommit_duration_ms=1000, + ) + + pw.io.csv.write(table, str(output_path)) + with pytest.raises( + RuntimeError, + match="Creating S3 reader failed: no objects to read", + ): + pw.run() + + +def test_s3_creds_from_profiles(tmp_path: pathlib.Path): + input_s3_path = "integration_tests/test_s3_creds_from_profiles/input.csv" + output_path = tmp_path / "output.csv" + model_output_path = tmp_path / "model_output.csv" + + input_contents = "key,value\n1,Hello\n2,World" + + put_aws_object(input_s3_path, input_contents) + write_lines(model_output_path, input_contents) + + table = pw.io.s3_csv.read( + "s3://aws-integrationtest/{}".format(input_s3_path), + aws_s3_settings=pw.io.s3_csv.AwsS3Settings(region="eu-central-1"), + value_columns=["key", "value"], + mode="static", + autocommit_duration_ms=1000, + ) + + pw.io.csv.write(table, str(output_path)) + pw.run() + + result = pd.read_csv( + output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + expected = pd.read_csv( + model_output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + assert result.equals(expected) + + +def test_s3_full_autodetect(tmp_path: pathlib.Path): + input_s3_path = "integration_tests/test_s3_full_autodetect/input.csv" + output_path = tmp_path / "output.csv" + model_output_path = tmp_path / "model_output.csv" + + input_contents = "key,value\n1,Hello\n2,World" + + put_aws_object(input_s3_path, input_contents) + write_lines(model_output_path, input_contents) + + class InputSchema(pw.Schema): + key: int + value: str + + table = pw.io.s3.read( + "s3://aws-integrationtest/{}".format(input_s3_path), + format="csv", + schema=InputSchema, + mode="static", + ) + + pw.io.csv.write(table, str(output_path)) + pw.run() + + result = pd.read_csv( + output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + expected = pd.read_csv( + model_output_path, usecols=["key", "value"], index_col=["key"] + ).sort_index() + assert result.equals(expected) diff --git a/integration_tests/wordcount/base.py b/integration_tests/wordcount/base.py index e8ab0637..1e9f8ea1 100644 --- a/integration_tests/wordcount/base.py +++ b/integration_tests/wordcount/base.py @@ -226,18 +226,19 @@ def get_pw_program_run_time( needs_polling = False finally: if mode == STREAMING_MODE_NAME: - popen.kill() + pw_exit_code = popen.poll() + if not pw_exit_code: + popen.kill() else: pw_exit_code = popen.wait() - if pw_exit_code != 0: - warnings.warn( - f"Warning: pw program terminated with non zero exit code: {pw_exit_code}" - ) - assert ( - n_retries < 3 - ), "Number of retries for S3 reconnection exceeded" - needs_pw_program_launch = True - n_retries += 1 + + if pw_exit_code is not None and pw_exit_code != 0: + warnings.warn( + f"Warning: pw program terminated with non zero exit code: {pw_exit_code}" + ) + assert n_retries < 3, "Number of retries for S3 reconnection exceeded" + needs_pw_program_launch = True + n_retries += 1 return time.time() - time_start diff --git a/integration_tests/wordcount/test_new_data.py b/integration_tests/wordcount/test_new_data.py index 7df2edae..fc1e1641 100644 --- a/integration_tests/wordcount/test_new_data.py +++ b/integration_tests/wordcount/test_new_data.py @@ -14,21 +14,13 @@ ) +@pytest.mark.parametrize("n_cpus", [1, 2, 4]) +@pytest.mark.parametrize("pstorage_type", [S3_STORAGE_NAME, FS_STORAGE_NAME]) @pytest.mark.parametrize( - "n_backfilling_runs,n_cpus,mode,pstorage_type", + "n_backfilling_runs,mode", [ - (3, 1, STREAMING_MODE_NAME, S3_STORAGE_NAME), - (3, 2, STREAMING_MODE_NAME, S3_STORAGE_NAME), - (3, 4, STREAMING_MODE_NAME, S3_STORAGE_NAME), - (3, 1, STATIC_MODE_NAME, S3_STORAGE_NAME), - (3, 2, STATIC_MODE_NAME, S3_STORAGE_NAME), - (3, 4, STATIC_MODE_NAME, S3_STORAGE_NAME), - (3, 1, STREAMING_MODE_NAME, FS_STORAGE_NAME), - (3, 2, STREAMING_MODE_NAME, FS_STORAGE_NAME), - (3, 4, STREAMING_MODE_NAME, FS_STORAGE_NAME), - (3, 1, STATIC_MODE_NAME, FS_STORAGE_NAME), - (3, 2, STATIC_MODE_NAME, FS_STORAGE_NAME), - (3, 4, STATIC_MODE_NAME, FS_STORAGE_NAME), + (3, STREAMING_MODE_NAME), + (3, STATIC_MODE_NAME), ], ) def test_integration_new_data( diff --git a/pyproject.toml b/pyproject.toml index f975efb2..8b0d54ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "rich >= 12.6.0", "diskcache >= 5.2.1", "exceptiongroup >= 1.1.3; python_version < '3.11'", + "boto3 >= 1.26.76", ] [project.optional-dependencies] diff --git a/python/pathway/internals/_io_helpers.py b/python/pathway/internals/_io_helpers.py index 0453d154..a6658686 100644 --- a/python/pathway/internals/_io_helpers.py +++ b/python/pathway/internals/_io_helpers.py @@ -2,19 +2,25 @@ from __future__ import annotations +import boto3 + from pathway.internals import api from pathway.internals import dtype as dt from pathway.internals import schema from pathway.internals.table import Table from pathway.internals.trace import trace_user_frame +S3_PATH_PREFIX = "s3://" +S3_DEFAULT_REGION = "us-east-1" +S3_LOCATION_FIELD = "LocationConstraint" + class AwsS3Settings: @trace_user_frame def __init__( self, - bucket_name, *, + bucket_name=None, access_key=None, secret_access_key=None, with_path_style=False, @@ -40,6 +46,31 @@ def __init__( endpoint, ) + @classmethod + def new_from_path(cls, s3_path: str): + starts_with_prefix = s3_path.startswith(S3_PATH_PREFIX) + has_extra_chars = len(s3_path) > len(S3_PATH_PREFIX) + if not starts_with_prefix or not has_extra_chars: + raise ValueError("Incorrect S3 path: {}".format(s3_path)) + bucket = s3_path[len(S3_PATH_PREFIX) :].split("/")[0] + + # the crate we use on the Rust-engine side can't detect the location + # of a bucket, so it's done on the Python side + s3_client = boto3.client("s3") + location_response = s3_client.get_bucket_location(Bucket=bucket) + + # Buckets in Region us-east-1 have a LocationConstraint of None + location_constraint = location_response[S3_LOCATION_FIELD] + if location_constraint is None: + region = S3_DEFAULT_REGION + else: + region = location_constraint.split("|")[0] + + return cls( + bucket_name=bucket, + region=region, + ) + def _format_output_value_fields(table: Table) -> list[api.ValueField]: value_fields = [] diff --git a/python/pathway/internals/column.py b/python/pathway/internals/column.py index 281f03c8..43e9fbef 100644 --- a/python/pathway/internals/column.py +++ b/python/pathway/internals/column.py @@ -320,11 +320,6 @@ class TableRestrictedRowwiseContext(RowwiseContext): table: pw.Table -@dataclass(eq=False, frozen=True) -class CopyContext(Context): - """Context used by operators not changing the columns.""" - - @dataclass(eq=False, frozen=True) class GroupedContext(Context): """Context of `table.groupby().reduce() operation.""" diff --git a/python/pathway/internals/decorators.py b/python/pathway/internals/decorators.py index be20f8c6..bbca3e4c 100644 --- a/python/pathway/internals/decorators.py +++ b/python/pathway/internals/decorators.py @@ -23,10 +23,6 @@ def contextualized_operator(func): return _operator_wrapper(func, op.ContextualizedIntermediateOperator) -def non_contextualized_operator(func): - return _operator_wrapper(func, op.NonContextualizedIntermediateOperator) - - def _operator_wrapper(func: Callable, operator_cls: type[op.OperatorFromDef]): fn_spec = function_spec(func) diff --git a/python/pathway/internals/graph_runner/expression_evaluator.py b/python/pathway/internals/graph_runner/expression_evaluator.py index d9a33683..1d45c2fe 100644 --- a/python/pathway/internals/graph_runner/expression_evaluator.py +++ b/python/pathway/internals/graph_runner/expression_evaluator.py @@ -810,12 +810,6 @@ def _dereference(self, expression: expr.ColumnReference): return super()._dereference(expression) -class CopyEvaluator(ExpressionEvaluator, context_type=clmn.CopyContext): - def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table: - [input_storage] = input_storages - return self.state.get_table(input_storage) - - class FilterEvaluator(ExpressionEvaluator, context_type=clmn.FilterContext): context: clmn.FilterContext diff --git a/python/pathway/internals/graph_runner/operator_handler.py b/python/pathway/internals/graph_runner/operator_handler.py index 737b5482..0a7abc7d 100644 --- a/python/pathway/internals/graph_runner/operator_handler.py +++ b/python/pathway/internals/graph_runner/operator_handler.py @@ -34,7 +34,6 @@ DebugOperator, InputOperator, IterateOperator, - NonContextualizedIntermediateOperator, Operator, OutputOperator, ) @@ -280,18 +279,6 @@ def _does_not_need_evaluation(self, column) -> bool: return self.state.has_column(column) or self.scope_context.skip_column(column) -class NonContextualizedIntermediateOperatorHandler( - OperatorHandler[NonContextualizedIntermediateOperator], - operator_type=NonContextualizedIntermediateOperator, -): - def _run( - self, - operator: NonContextualizedIntermediateOperator, - output_storages: dict[Table, Storage], - ): - pass - - class DebugOperatorHandler( OperatorHandler[DebugOperator], operator_type=DebugOperator, diff --git a/python/pathway/internals/graph_runner/path_evaluator.py b/python/pathway/internals/graph_runner/path_evaluator.py index e30d6e31..c81ee6da 100644 --- a/python/pathway/internals/graph_runner/path_evaluator.py +++ b/python/pathway/internals/graph_runner/path_evaluator.py @@ -192,7 +192,6 @@ def compute( class NoNewColumnsPathEvaluator( PathEvaluator, context_types=[ - clmn.CopyContext, clmn.FilterContext, clmn.ReindexContext, clmn.IntersectContext, diff --git a/python/pathway/internals/graph_runner/storage_graph.py b/python/pathway/internals/graph_runner/storage_graph.py index 97606774..73d6edae 100644 --- a/python/pathway/internals/graph_runner/storage_graph.py +++ b/python/pathway/internals/graph_runner/storage_graph.py @@ -26,7 +26,6 @@ DebugOperator, InputOperator, IterateOperator, - NonContextualizedIntermediateOperator, Operator, OutputOperator, RowTransformerOperator, @@ -141,13 +140,6 @@ def _compute_relevant_columns( operator, DebugOperator ): column_dependencies = self._compute_column_dependencies_output(operator) - elif isinstance( - operator, - NonContextualizedIntermediateOperator, - ): - column_dependencies = self._compute_column_dependencies_no_new_context( - operator - ) elif isinstance(operator, RowTransformerOperator): column_dependencies = self._compute_column_dependencies_row_transformer( operator, self.scope_context @@ -251,20 +243,6 @@ def _compute_column_dependencies_output( column_dependencies[table_._universe].update(table_._columns.values()) return column_dependencies - def _compute_column_dependencies_no_new_context( - self, - operator: NonContextualizedIntermediateOperator, - ) -> dict[Universe, StableSet[Column]]: - column_dependencies: dict[Universe, StableSet[Column]] = defaultdict(StableSet) - # can't use column.column_dependencies() because these operators don't wrap - # in a new context. Hence, using .column_dependencies() would use context - # of the previous operator. - for table_ in operator.output_tables: - column_dependencies[table_._universe].update( - self.column_deps_at_output[operator][table_] - ) - return column_dependencies - def _compute_column_dependencies_row_transformer( self, operator: RowTransformerOperator, @@ -352,8 +330,7 @@ def _compute_storage_paths(self): self._compute_storage_paths_input(operator, storages) elif isinstance(operator, IterateOperator): self._compute_storage_paths_iterate(operator, storages) - elif not isinstance(operator, NonContextualizedIntermediateOperator): - # NonContextualizedIntermediateOperator don't produce new data, skip + else: self._compute_storage_paths_ordinary(operator, storages) self.final_storages = storages diff --git a/python/pathway/internals/operator.py b/python/pathway/internals/operator.py index 1f95b7bb..f3d0f34c 100644 --- a/python/pathway/internals/operator.py +++ b/python/pathway/internals/operator.py @@ -195,10 +195,12 @@ def label(self) -> str: return self.func_spec.func.__name__ -class IntermediateOperator(OperatorFromDef): - """Operator producing tables. It should not be used directly. Use - ContextualizedIntermediateOperator or NonContextualizedIntermediateOperator depending - on your needs. +class ContextualizedIntermediateOperator(OperatorFromDef): + """Operator producing tables with `ColumnWithExpression`s that have not been + evaluated yet. + + `@contextualized_operator` can be used to decorate any function so that + operator will be created and added to the graph whenever such function is called. """ def __init__(self, func_spec, id): @@ -216,28 +218,6 @@ def __call__(self, *args, **kwargs): return result.scalar_or_tuple() -class ContextualizedIntermediateOperator(IntermediateOperator): - """Operator producing tables with `ColumnWithExpression`s that have not been - evaluated yet. - - `@contextualized_operator` can be used to decorate any function so that - operator will be created and added to the graph whenever such function is called. - """ - - pass - - -class NonContextualizedIntermediateOperator(IntermediateOperator): - """Operator producing tables consisting of columns that have been previously - evaluated. - - `@non_contextualized_operator` can be used to decorate any function so - that operator will be created and added to the graph whenever such function is called. - """ - - pass - - class DebugOperator(Operator): name: str table: pw.Table diff --git a/python/pathway/internals/table.py b/python/pathway/internals/table.py index 38ec75d1..5948aaf0 100644 --- a/python/pathway/internals/table.py +++ b/python/pathway/internals/table.py @@ -751,9 +751,8 @@ def copy(self) -> Table: False """ - context = clmn.CopyContext(self._universe) columns = { - name: self._wrap_column_in_context(context, column, name) + name: self._wrap_column_in_context(self._context, column, name) for name, column in self._columns.items() } @@ -1491,10 +1490,9 @@ def rename_columns(self, **kwargs: str | expr.ColumnReference) -> Table: for new_name, old_name in mapping.items(): renamed_columns[new_name] = self._columns[old_name] - context = clmn.CopyContext(self._universe) columns_wrapped = { name: self._wrap_column_in_context( - context, column, mapping[name] if name in mapping else name + self._context, column, mapping[name] if name in mapping else name ) for name, column in renamed_columns.items() } @@ -1638,9 +1636,8 @@ def without(self, *columns: str | expr.ColumnReference) -> Table: else: assert isinstance(col, str) new_columns.pop(col) - context = clmn.CopyContext(self._universe) columns_wrapped = { - name: self._wrap_column_in_context(context, column, name) + name: self._wrap_column_in_context(self._context, column, name) for name, column in new_columns.items() } return self._with_same_universe(*columns_wrapped.items()) diff --git a/python/pathway/io/s3/__init__.py b/python/pathway/io/s3/__init__.py index 427b66a1..9e9062a0 100644 --- a/python/pathway/io/s3/__init__.py +++ b/python/pathway/io/s3/__init__.py @@ -19,6 +19,10 @@ internal_connector_mode, ) +S3_PATH_PREFIX = "s3://" +S3_DEFAULT_REGION = "us-east-1" +S3_LOCATION_FIELD = "LocationConstraint" + class DigitalOceanS3Settings: @trace_user_frame @@ -80,9 +84,9 @@ def __init__( @trace_user_frame def read( path: str, - aws_s3_settings: AwsS3Settings, format: str, *, + aws_s3_settings: AwsS3Settings | None = None, schema: type[Schema] | None = None, mode: str = "streaming", csv_settings: CsvParserSettings | None = None, @@ -172,9 +176,14 @@ def read( "Snapshot mode is currently unsupported in S3-like connectors" ) + if aws_s3_settings: + prepared_aws_settings = aws_s3_settings + else: + prepared_aws_settings = AwsS3Settings.new_from_path(path) + data_storage = construct_s3_data_storage( path=path, - rust_engine_s3_settings=aws_s3_settings.settings, + rust_engine_s3_settings=prepared_aws_settings.settings, format=format, mode=internal_mode, csv_settings=csv_settings, diff --git a/python/pathway/io/s3_csv/__init__.py b/python/pathway/io/s3_csv/__init__.py index 18d81a7c..da8019ea 100644 --- a/python/pathway/io/s3_csv/__init__.py +++ b/python/pathway/io/s3_csv/__init__.py @@ -4,29 +4,22 @@ from typing import Any -from pathway.internals import api, datasource from pathway.internals.api import PathwayType -from pathway.internals.decorators import table_from_datasource from pathway.internals.runtime_type_check import runtime_type_check from pathway.internals.schema import Schema from pathway.internals.table import Table from pathway.internals.trace import trace_user_frame -from pathway.io._utils import ( - CsvParserSettings, - check_deprecated_kwargs, - construct_connector_properties, - construct_schema_and_data_format, - internal_connector_mode, -) +from pathway.io._utils import CsvParserSettings, construct_schema_and_data_format from pathway.io.s3 import AwsS3Settings +from pathway.io.s3 import read as s3_read @runtime_type_check @trace_user_frame def read( path: str, - aws_s3_settings: AwsS3Settings, *, + aws_s3_settings: AwsS3Settings | None = None, schema: type[Schema] | None = None, csv_settings: CsvParserSettings | None = None, mode: str = "streaming", @@ -145,40 +138,29 @@ def read( ... schema=InputSchema, ... ) """ - internal_mode = internal_connector_mode(mode) - if internal_mode == api.ConnectorMode.STREAMING_WITH_DELETIONS: - raise NotImplementedError( - "Snapshot mode is currently unsupported in S3-like connectors" - ) - check_deprecated_kwargs(kwargs, ["poll_new_objects"]) + # legacy fields are not supported in pw.io.s3 reader, so the + # schema should be constructed here + if not schema: + schema, _ = construct_schema_and_data_format( + "csv", + schema=schema, + csv_settings=csv_settings, + value_columns=value_columns, + primary_key=id_columns, + types=types, + default_values=default_values, + ) - data_storage = api.DataStorage( - storage_type="s3_csv", - path=path, - aws_s3_settings=aws_s3_settings.settings, - csv_parser_settings=csv_settings.api_settings if csv_settings else None, - mode=internal_mode, - persistent_id=persistent_id, - ) - schema, data_format = construct_schema_and_data_format( + return s3_read( + path, format="csv", + aws_s3_settings=aws_s3_settings, schema=schema, - value_columns=value_columns, - primary_key=id_columns, - types=types, - default_values=default_values, - ) - properties = construct_connector_properties( - schema_properties=schema.properties(), - commit_duration_ms=autocommit_duration_ms, - ) - return table_from_datasource( - datasource.GenericDataSource( - datastorage=data_storage, - dataformat=data_format, - connector_properties=properties, - schema=schema, - ), - debug_datasource=datasource.debug_datasource(debug_data), + mode=mode, + csv_settings=csv_settings, + persistent_id=persistent_id, + autocommit_duration_ms=autocommit_duration_ms, + debug_data=debug_data, + **kwargs, ) diff --git a/python/pathway/stdlib/utils/col.py b/python/pathway/stdlib/utils/col.py index 4fac2b2c..1bea8ca8 100644 --- a/python/pathway/stdlib/utils/col.py +++ b/python/pathway/stdlib/utils/col.py @@ -4,6 +4,7 @@ import warnings from collections.abc import Callable, Sequence +from typing import Type, overload import pathway.internals as pw from pathway.internals.runtime_type_check import runtime_type_check @@ -58,21 +59,42 @@ def flatten_column( return input_table.flatten(**kwargs) +@overload +def unpack_col( + column: pw.ColumnReference, *unpacked_columns: pw.ColumnReference | str +) -> pw.Table: + ... + + +@overload +def unpack_col( + column: pw.ColumnReference, + *, + schema: Type[pw.Schema], +) -> pw.Table: + ... + + @runtime_type_check @trace_user_frame def unpack_col( - column: pw.ColumnReference, *unpacked_columns: pw.ColumnReference | str + column: pw.ColumnReference, + *unpacked_columns: pw.ColumnReference | str, + schema: Type[pw.Schema] | None = None, ) -> pw.Table: """Unpacks multiple columns from a single column. + Arguments unpacked_columns and schema are mutually exclusive + Input: - column: Column expression of column containing some sequences - unpacked_columns: list of names of output columns + - schema: Schema of new columns Output: - Table with columns named by "unpacked_columns" argument - Example: + Examples: >>> import pathway as pw >>> t1 = pw.debug.table_from_markdown( @@ -94,10 +116,31 @@ def unpack_col( Alice | 25 | dog Bob | 32 | cat Carole | 28 | dog + >>> class SomeSchema(pw.Schema): + ... name: str + ... age: int + ... pet: str + >>> unpack_table = pw.utils.col.unpack_col(t2.user, schema=SomeSchema) + >>> pw.debug.compute_and_print(unpack_table, include_id=False) + name | age | pet + Alice | 25 | dog + Bob | 32 | cat + Carole | 28 | dog """ + + if (schema is None) == (len(unpacked_columns) == 0): + raise ValueError( + "exactly one of the parameters `schema` or `unpacked_columns` must be provided" + ) + + if schema is not None: + unpacked_columns = tuple(schema.column_names()) colrefs = [pw.this[unpacked_column] for unpacked_column in unpacked_columns] kw = {colref.name: column[i] for i, colref in enumerate(colrefs)} - return column.table.select(**kw) + result = column.table.select(**kw) + if schema is not None: + result = result.update_types(**schema.typehints()) + return result # TODO: generalize to apply on groupby: https://github.com/navalgo/IoT-Pathway/issues/1919 diff --git a/python/pathway/tests/test_utils.py b/python/pathway/tests/test_utils.py index 190f517d..ad888efd 100644 --- a/python/pathway/tests/test_utils.py +++ b/python/pathway/tests/test_utils.py @@ -84,6 +84,35 @@ def test_unpack_col(): ) +def test_unpack_col_schema(): + class TestSchema(pw.Schema): + coord1: int + coord2: float + coord3: str + + data = T( + """ + | A | B | C + 1 | 11 | 1.1 | abc + 2 | 12 | 1.2 | def + 3 | 13 | 1.3 | ghi + """ + ) + data = data.select(combined=pw.make_tuple(pw.this.A, pw.this.B, pw.this.C)) + result = unpack_col(data.combined, schema=TestSchema) + assert_table_equality( + result, + T( + """ + | coord1 | coord2 | coord3 + 1 | 11 | 1.1 | abc + 2 | 12 | 1.2 | def + 3 | 13 | 1.3 | ghi + """ + ), + ) + + def test_apply_all_rows(): t1 = T( """ diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index a445fbcd..f954e504 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -184,6 +184,9 @@ pub enum ReadError { #[error("malformed data")] MalformedData, + + #[error("no objects to read")] + NoObjectsToRead, } #[derive(Serialize, Deserialize, Clone, Copy, Debug)] @@ -1439,14 +1442,30 @@ pub struct S3Scanner { } impl S3Scanner { - pub fn new(bucket: S3Bucket, objects_prefix: impl Into) -> Self { - S3Scanner { + pub fn new(bucket: S3Bucket, objects_prefix: impl Into) -> Result { + let objects_prefix = objects_prefix.into(); + + let object_lists = bucket + .list(objects_prefix.clone(), None) + .map_err(|e| ReadError::S3(S3CommandName::ListObjectsV2, e))?; + let mut has_nonempty_list = false; + for list in object_lists { + if !list.contents.is_empty() { + has_nonempty_list = true; + break; + } + } + if !has_nonempty_list { + return Err(ReadError::NoObjectsToRead); + } + + Ok(S3Scanner { bucket, - objects_prefix: objects_prefix.into(), + objects_prefix, current_object: None, processed_objects: HashSet::new(), - } + }) } pub fn stream_object_from_path_and_bucket( @@ -1601,9 +1620,9 @@ impl S3CsvReader { parser_builder: csv::ReaderBuilder, poll_new_objects: bool, persistent_id: Option, - ) -> S3CsvReader { - S3CsvReader { - s3_scanner: S3Scanner::new(bucket, objects_prefix), + ) -> Result { + Ok(S3CsvReader { + s3_scanner: S3Scanner::new(bucket, objects_prefix)?, poll_new_objects, parser_builder, @@ -1612,7 +1631,7 @@ impl S3CsvReader { persistent_id, deferred_read_result: None, total_entries_read: 0, - } + }) } fn stream_next_object(&mut self) -> Result { @@ -1899,9 +1918,9 @@ impl S3GenericReader { poll_new_objects: bool, persistent_id: Option, read_method: ReadMethod, - ) -> S3GenericReader { - S3GenericReader { - s3_scanner: S3Scanner::new(bucket, objects_prefix), + ) -> Result { + Ok(S3GenericReader { + s3_scanner: S3Scanner::new(bucket, objects_prefix)?, poll_new_objects, read_method, @@ -1909,7 +1928,7 @@ impl S3GenericReader { persistent_id, total_entries_read: 0, current_bytes_read: 0, - } + }) } fn stream_next_object(&mut self) -> Result { diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index cb21f0fb..6bc21c61 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -1559,25 +1559,31 @@ impl DataflowGraphInner { let error_reporter = self.error_reporter.clone(); - let new_values = table.values().map_wrapped_named( - "expression_table::evaluate_expression", - wrapper, - move |(key, values)| { - let args: Vec = column_paths - .iter() - .map(|path| path.extract(&key, &values)) - .collect::>() - .unwrap_with_reporter(&error_reporter); - let new_values = expressions.iter().map(|expression_data| { - let result = expression_data - .expression - .eval(&args) - .unwrap_with_reporter_and_trace(&error_reporter, &expression_data.trace); - result - }); - (key, Value::Tuple(new_values.collect())) - }, - ); + let new_values = table + .values() + .consolidate_nondecreasing() + .map_wrapped_named( + "expression_table::evaluate_expression", + wrapper, + move |(key, values)| { + let args: Vec = column_paths + .iter() + .map(|path| path.extract(&key, &values)) + .collect::>() + .unwrap_with_reporter(&error_reporter); + let new_values = expressions.iter().map(|expression_data| { + let result = expression_data + .expression + .eval(&args) + .unwrap_with_reporter_and_trace( + &error_reporter, + &expression_data.trace, + ); + result + }); + (key, Value::Tuple(new_values.collect())) + }, + ); Ok(self.tables.alloc(Table::from_collection(new_values))) } diff --git a/src/python_api.rs b/src/python_api.rs index 91b0d375..01fb05a4 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -83,6 +83,7 @@ use crate::persistence::config::{ }; use crate::persistence::{ExternalPersistentId, PersistentId}; use crate::pipe::{pipe, ReaderType, WriterType}; +use s3::creds::Credentials as AwsCredentials; mod logging; mod numba; @@ -92,6 +93,7 @@ pub fn with_gil_and_pool(f: impl FnOnce(Python) -> R + Ungil) -> R { Python::with_gil(|py| py.with_pool(f)) } +const S3_PATH_PREFIX: &str = "s3://"; static CONVERT: GILOnceCell = GILOnceCell::new(); fn get_convert_python_module(py: Python<'_>) -> &PyAny { @@ -2938,31 +2940,34 @@ pub fn unsafe_make_pointer(value: KeyImpl) -> Key { #[pyclass(module = "pathway.engine", frozen)] pub struct AwsS3Settings { - bucket_name: String, + bucket_name: Option, region: s3::region::Region, access_key: Option, secret_access_key: Option, with_path_style: bool, + profile: Option, } #[pymethods] impl AwsS3Settings { #[new] #[pyo3(signature = ( - bucket_name, + bucket_name = None, access_key = None, secret_access_key = None, with_path_style = false, region = None, endpoint = None, + profile = None, ))] fn new( - bucket_name: String, + bucket_name: Option, access_key: Option, secret_access_key: Option, with_path_style: bool, region: Option, endpoint: Option, + profile: Option, ) -> PyResult { Ok(AwsS3Settings { bucket_name, @@ -2970,6 +2975,7 @@ impl AwsS3Settings { access_key, secret_access_key, with_path_style, + profile, }) } } @@ -2997,8 +3003,20 @@ impl AwsS3Settings { } impl AwsS3Settings { - fn construct_private_bucket(&self) -> PyResult { - let credentials = s3::creds::Credentials::new( + fn final_bucket_name(&self, deduced_name: Option<&str>) -> PyResult { + if let Some(bucket_name) = &self.bucket_name { + Ok(bucket_name.to_string()) + } else if let Some(bucket_name) = deduced_name { + Ok(bucket_name.to_string()) + } else { + Err(PyRuntimeError::new_err( + "bucket_name not specified and isn't in the s3 path", + )) + } + } + + fn construct_private_bucket(&self, deduced_name: Option<&str>) -> PyResult { + let credentials = AwsCredentials::new( Some(&self.access_key.clone().ok_or(PyRuntimeError::new_err( "access key must be specified for a private bucket", ))?), @@ -3018,18 +3036,44 @@ impl AwsS3Settings { PyRuntimeError::new_err(format!("Unable to form credentials to AWS storage: {err}")) })?; - S3Bucket::new(&self.bucket_name, self.region.clone(), credentials).map_err(|err| { + self.construct_bucket_with_credentials(credentials, deduced_name) + } + + fn construct_bucket_with_credentials( + &self, + credentials: AwsCredentials, + deduced_name: Option<&str>, + ) -> PyResult { + S3Bucket::new( + &self.final_bucket_name(deduced_name)?, + self.region.clone(), + credentials, + ) + .map_err(|err| { PyRuntimeError::new_err(format!("Failed to connect to private AWS bucket: {err}")) }) } - fn construct_public_bucket(&self) -> PyResult { - S3Bucket::new_public(&self.bucket_name, self.region.clone()).map_err(|err| { - PyRuntimeError::new_err(format!("Failed to connect to public AWS bucket: {err}")) - }) + fn construct_public_bucket(&self, deduced_name: Option<&str>) -> PyResult { + S3Bucket::new_public(&self.final_bucket_name(deduced_name)?, self.region.clone()).map_err( + |err| PyRuntimeError::new_err(format!("Failed to connect to public AWS bucket: {err}")), + ) } - fn construct_bucket(&self) -> PyResult { + fn deduce_bucket_and_path(s3_path: &str) -> (Option, Option) { + if !s3_path.starts_with(S3_PATH_PREFIX) { + return (None, Some(s3_path.to_string())); + } + let bucket_and_path = &s3_path[S3_PATH_PREFIX.len()..]; + let bucket_and_path_tokenized: Vec<&str> = bucket_and_path.split('/').collect(); + + let bucket = bucket_and_path_tokenized[0]; + let path = bucket_and_path_tokenized[1..].join("/"); + + (Some(bucket.to_string()), Some(path)) + } + + fn construct_bucket(&self, name_override: Option<&str>) -> PyResult { let has_access_key = self.access_key.is_some(); let has_secret_access_key = self.secret_access_key.is_some(); if has_access_key != has_secret_access_key { @@ -3038,9 +3082,20 @@ impl AwsS3Settings { let mut bucket = { if has_access_key && has_secret_access_key { - self.construct_private_bucket()? + self.construct_private_bucket(name_override)? } else { - self.construct_public_bucket()? + let aws_credentials = AwsCredentials::from_sts_env("aws-creds") + .or_else(|_| AwsCredentials::from_env()) + .or_else(|_| AwsCredentials::from_profile(self.profile.as_deref())) + .or_else(|_| AwsCredentials::from_instance_metadata()); + + // first, try to deduce credentials from various sources + if let Ok(credentials) = aws_credentials { + self.construct_bucket_with_credentials(credentials, name_override)? + } else { + // if there are no credentials, treat the bucket as a public + self.construct_public_bucket(name_override)? + } } }; @@ -3468,7 +3523,7 @@ impl DataStorage { let path = self .path .as_ref() - .ok_or_else(|| PyValueError::new_err("For fs storage, path must be specified"))? + .ok_or_else(|| PyValueError::new_err("For fs/s3 storage, path must be specified"))? .as_str(); Ok(path) } @@ -3485,6 +3540,7 @@ impl DataStorage { } fn s3_bucket(&self, py: pyo3::Python) -> PyResult { + let (bucket_name, _) = AwsS3Settings::deduce_bucket_and_path(self.path()?); let bucket = self .aws_s3_settings .as_ref() @@ -3492,7 +3548,7 @@ impl DataStorage { PyValueError::new_err("For AWS storage, aws_s3_settings must be specified") })? .borrow(py) - .construct_bucket()?; + .construct_bucket(bucket_name.as_deref())?; Ok(bucket) } @@ -3550,23 +3606,27 @@ impl DataStorage { Ok((Box::new(storage), 1)) } "s3" => { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); let storage = S3GenericReader::new( self.s3_bucket(py)?, - self.path()?, + deduced_path.unwrap_or(self.path()?.to_string()), self.mode.is_polling_enabled(), self.internal_persistent_id(), self.read_method, - ); + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; Ok((Box::new(storage), 1)) } "s3_csv" => { + let (_, deduced_path) = AwsS3Settings::deduce_bucket_and_path(self.path()?); let storage = S3CsvReader::new( self.s3_bucket(py)?, - self.path()?, + deduced_path.unwrap_or(self.path()?.to_string()), self.build_csv_parser_settings(py), self.mode.is_polling_enabled(), self.internal_persistent_id(), - ); + ) + .map_err(|e| PyRuntimeError::new_err(format!("Creating S3 reader failed: {e}")))?; Ok((Box::new(storage), 1)) } "csv" => {