Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split MockContext into Standalone and multi-group #42

Merged
merged 10 commits into from
Oct 24, 2023
231 changes: 182 additions & 49 deletions exasol_udf_mock_python/mock_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Tuple, Iterator
from typing import List, Tuple, Iterator, Iterable, Any, Optional, Union
from functools import wraps

import pandas as pd

Expand All @@ -8,50 +9,190 @@
from exasol_udf_mock_python.udf_context import UDFContext


def check_context(f):
"""
Decorator checking that a MockContext object has valid current group context.
Raises a RuntimeError if this is not the case.
"""
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.no_context:
raise RuntimeError('Calling UDFContext interface when the current group context '
'is invalid is disallowed')
return f(self, *args, **kwargs)

return wrapper


def validate_emit(row: Tuple, columns: List[Column]):
"""
Validates that a data row to be emitted corresponds to the definition of the output columns.
The number of elements in the row should match the number of columns and the type of each
element should match the type of the correspondent column. Raises a ValueError if the first
condition is false or a TypeError if the second condition is false.

:param row: Data row
:param columns: Column definition.
"""
if len(row) != len(columns):
raise ValueError(f"row {row} has not the same number of values as columns are defined")
for i, column in enumerate(columns):
if row[i] is not None and not isinstance(row[i], column.type):
raise TypeError(f"Value {row[i]} ({type(row[i])}) at position {i} is not a {column.type}")


class MockContext(UDFContext):
"""
ahsimb marked this conversation as resolved.
Show resolved Hide resolved
Implementation of generic UDF Mock Context interface for a SET UDF with groups.
This class allows iterating over groups. The functionality of the UDF Context are applicable
for the current input group.

Call `next_group` to iterate over groups. The `output_groups` property provides the emit
output for all groups iterated so far including the output for the current group.

Calling any function of the UDFContext interface when the group iterator has passed the end
or before the first call to the `next_group` is illegal and will cause a RuntimeException.
"""

def __init__(self, input_groups: Iterator[Group], metadata: MockMetaData):
"""
:param input_groups: Input groups. Each group object should contain input rows for the group.

:param metadata: The mock metadata object.
"""

self._input_groups = input_groups
self._output_groups = []
self._input_group = None # type: Group
self._output_group_list = None # type: List
self._output_group = None # type: Group
self._iter = None # type: Iterator[Tuple]
self._len = None # type: int
self._metadata = metadata
self._name_position_map = \
{column.name: position
for position, column
in enumerate(metadata.input_columns)}
""" Mock context for the current group """
self._current_context: Optional[StandaloneMockContext] = None
ahsimb marked this conversation as resolved.
Show resolved Hide resolved
""" Output for all groups """
self._previous_output: List[Group] = []
ahsimb marked this conversation as resolved.
Show resolved Hide resolved

@property
def no_context(self) -> bool:
"""Returns True if the current group context is invalid"""
return self._current_context is None

def _next_group(self):
def next_group(self) -> bool:
"""
Moves group iterator to the next group.
Returns False if the iterator gets beyond the last group. Returns True otherwise.
"""

# Save output of the current group
if self._current_context is not None:
self._previous_output.append(Group(self._current_context.output))
self._current_context = None

# Try get to the next input group
try:
self._input_group = next(self._input_groups)
input_group = next(self._input_groups)
except StopIteration as e:
self._data = None
self._output_group_list = None
self._output_group = None
self._input_group = None
self._iter = None
self._len = None
return False
self._len = len(self._input_group)
if self._len == 0:
self._data = None
self._output_group_list = None
self._output_group = None
self._input_group = None
self._iter = None
self._len = None
raise RuntimeError("Empty input groups are not allowd")
self._output_group_list = []
self._output_group = Group(self._output_group_list)
self._output_groups.append(self._output_group)
self._iter = iter(self._input_group)
self.next()
if len(input_group) == 0:
raise RuntimeError("Empty input groups are not allowed")

ahsimb marked this conversation as resolved.
Show resolved Hide resolved
# Create Mock Context for the new input group
self._current_context = StandaloneMockContext(input_group, self._metadata)
return True

def _is_positive_integer(self, value):
@property
def output_groups(self):
"""
Output of all groups including the current one.
"""
if self._current_context is None:
return self._previous_output
else:
groups = list(self._previous_output)
groups.append(Group(self._current_context.output))
return groups

@check_context
def __getattr__(self, name):
return getattr(self._current_context, name)

@check_context
def get_dataframe(self, num_rows: Union[str, int], start_col: int = 0) -> Optional[pd.DataFrame]:
return self._current_context.get_dataframe(num_rows, start_col)
ahsimb marked this conversation as resolved.
Show resolved Hide resolved

@check_context
def next(self, reset: bool = False) -> bool:
return self._current_context.next(reset)
ahsimb marked this conversation as resolved.
Show resolved Hide resolved

@check_context
def size(self) -> int:
return self._current_context.size()
ahsimb marked this conversation as resolved.
Show resolved Hide resolved

@check_context
def reset(self) -> None:
self._current_context.reset()
ahsimb marked this conversation as resolved.
Show resolved Hide resolved

@check_context
def emit(self, *args) -> None:
self._current_context.emit(*args)


def get_scalar_input(inp: Any) -> Iterable[Tuple[Any, ...]]:
"""
Figures out if the SCALAR parameters are provided as a scalar value or a tuple
and also if there is a wrapping container around.
Unless the parameters are already in a wrapping container returns parameters as a tuple wrapped
into a one-item list, e.g [(param1[, param2, ...)]. Otherwise, returns the original input.

:param inp: Input parameters.
"""

if isinstance(inp, Iterable) and not isinstance(inp, str):
row1 = next(iter(inp))
if isinstance(row1, Iterable) and not isinstance(row1, str):
return inp
else:
return [inp]
else:
return [(inp,)]


class StandaloneMockContext(UDFContext):
"""
Implementation of generic UDF Mock Context interface a SCALAR UDF or a SET UDF with no groups.

For Emit UDFs the output in the form of the list of tuples can be
accessed by reading the `output` property.
"""

def __init__(self, inp: Any, metadata: MockMetaData):
"""
:param inp: Input rows for a SET UDF or parameters for a SCALAR one.
In the former case the input object must be an iterable of rows. This, for example,
can be a Group object. It must implement the __len__ method. Each data row must be
an indexable container, e.g. a tuple.
In the SCALAR case the input can be a scalar value, or tuple. This can also be wrapped
in an iterable container, similar to the SET case.

:param metadata: The mock metadata object.
"""
if metadata.input_type.upper() == 'SCALAR':
self._input = get_scalar_input(inp)
else:
self._input = inp
self._metadata = metadata
self._data: Optional[Any] = None
self._iter: Optional[Iterator[Tuple[Any, ...]]] = None
self._name_position_map = \
{column.name: position
for position, column
in enumerate(metadata.input_columns)}
self._output = []
self.next(reset=True)

@property
def output(self) -> List[Tuple[Any, ...]]:
"""Emitted output so far"""
return self._output

@staticmethod
def _is_positive_integer(value):
return value is not None and isinstance(value, int) and value > 0

def get_dataframe(self, num_rows='all', start_col=0):
Expand Down Expand Up @@ -80,26 +221,26 @@ def get_dataframe(self, num_rows='all', start_col=0):
return df

def __getattr__(self, name):
return self._data[self._name_position_map[name]]
return None if self._data is None else self._data[self._name_position_map[name]]

def next(self, reset: bool = False):
if reset:
if self._iter is None or reset:
self.reset()
else:
try:
new_data = next(self._iter)
self._data = new_data
self._validate_tuples(self._data, self._metadata.input_columns)
validate_emit(self._data, self._metadata.input_columns)
return True
except StopIteration as e:
self._data = None
return False

def size(self):
return self._len
return len(self._input)

def reset(self):
self._iter = iter(self._input_group)
self._iter = iter(self._input)
self.next()

def emit(self, *args):
Expand All @@ -108,13 +249,5 @@ def emit(self, *args):
else:
tuples = [args]
for row in tuples:
self._validate_tuples(row, self._metadata.output_columns)
self._output_group_list.extend(tuples)
return

def _validate_tuples(self, row: Tuple, columns: List[Column]):
if len(row) != len(columns):
raise Exception(f"row {row} has not the same number of values as columns are defined")
for i, column in enumerate(columns):
if row[i] is not None and not isinstance(row[i], column.type):
raise TypeError(f"Value {row[i]} ({type(row[i])}) at position {i} is not a {column.type}")
validate_emit(row, self._metadata.output_columns)
self._output.extend(tuples)
4 changes: 2 additions & 2 deletions exasol_udf_mock_python/udf_mock_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def _loop_groups(ctx:MockContext, exa:MockExaEnvironment, runfunc:Callable):
while ctx._next_group():
while ctx.next_group():
_wrapped_run(ctx, exa, runfunc)


Expand Down Expand Up @@ -77,4 +77,4 @@ def run(self,
finally:
if "cleanup" in exec_globals:
self._exec_cleanup(exec_globals)
return ctx._output_groups
return ctx.output_groups
69 changes: 69 additions & 0 deletions tests/test_mock_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
import pandas as pd

from exasol_udf_mock_python.group import Group
from exasol_udf_mock_python.mock_context import MockContext
from tests.test_mock_context_standalone import meta_set_emits


@pytest.fixture
def context_set_emits(meta_set_emits):
pets = Group([(1, 'cat'), (2, 'dog')])
bugs = Group([(3, 'ant'), (4, 'bee'), (5, 'beetle')])
groups = [pets, bugs]
return MockContext(iter(groups), meta_set_emits)


def test_scroll(context_set_emits):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nicoretti suggested that we implement a generator which returns the column values and we collect these values in a list and compare this list at the end with an assert. Furthermore, we shouldn't test internal stuff like _current.
And, I thought about _next_group and _output_groups, I think, we can make them public, because the UDF will use the Interface UDFContext and not the MockContext.

def generator(ctx):
  while True:
    if not ctx._next_group():
      break
    while True:
          yield ctx.t2
          if not ctx.next():
            break

result=list(generator(context_set_emits))
assert result==['cat', 'dog', ...]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to put both groups in one stream. We could create a jagged array instead. But I am not in favour of having an extra piece of code in a test. I think testing should be straightforward if possible.

I should perhaps remove the checks of "private" variables, e.g. _current_context. On that note, I think we should make the _next_group and the _output_groups public. That would require some changes upstream, e.g. in UDFMockExecutor. Would you agree?

Copy link
Collaborator

@tkilias tkilias Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with making _next_group and _output_groups public, because the class is anyway usually used through the Executor

and yes please remove _current_context and so on

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the remaining issue, let me think. about it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That with the jagged array sounds ok to me.

groups = []
while context_set_emits.next_group():
group = [context_set_emits.t2]
while context_set_emits.next():
group.append(context_set_emits.t2)
groups.append(group)
assert groups == [['cat', 'dog'], ['ant', 'bee', 'beetle']]


def test_output_groups(context_set_emits):
context_set_emits.next_group()
context_set_emits.emit(1, 'cat')
context_set_emits.emit(2, 'dog')
context_set_emits.next_group()
context_set_emits.emit(3, 'ant')
context_set_emits.emit(4, 'bee')
context_set_emits.emit(5, 'beetle')
context_set_emits.next_group()
assert len(context_set_emits.output_groups) == 2
assert context_set_emits.output_groups[0] == Group([(1, 'cat'), (2, 'dog')])
assert context_set_emits.output_groups[1] == Group([(3, 'ant'), (4, 'bee'), (5, 'beetle')])


def test_output_groups_partial(context_set_emits):
context_set_emits.next_group()
context_set_emits.emit(1, 'cat')
context_set_emits.emit(2, 'dog')
context_set_emits.next_group()
context_set_emits.emit(3, 'ant')
context_set_emits.emit(4, 'bee')
assert len(context_set_emits.output_groups) == 2
assert context_set_emits.output_groups[0] == Group([(1, 'cat'), (2, 'dog')])
assert context_set_emits.output_groups[1] == Group([(3, 'ant'), (4, 'bee')])


def test_no_context_exception(context_set_emits):

for _ in range(3):
context_set_emits.next_group()

with pytest.raises(RuntimeError):
_ = context_set_emits.t2
with pytest.raises(RuntimeError):
_ = context_set_emits.get_dataframe()
with pytest.raises(RuntimeError):
context_set_emits.next()
with pytest.raises(RuntimeError):
_ = context_set_emits.size()
with pytest.raises(RuntimeError):
context_set_emits.reset()
with pytest.raises(RuntimeError):
context_set_emits.emit(1, 'cat')
Loading
Loading