diff --git a/README.rst b/README.rst index faf3565..bbc2419 100644 --- a/README.rst +++ b/README.rst @@ -92,7 +92,8 @@ Insert Data Pandas DataFrame ---------------- -Big fan of Pandas? We too! You can mix SQL and Pandas API together: +Big fan of Pandas? We too! You can mix SQL and Pandas API together. Also you can converting query results to a variety of formats(e.g. Numpy Array, Pandas DataFrame, Polars DataFrame, Arrow Table) by DBAPI. + .. code-block:: python @@ -128,3 +129,18 @@ Big fan of Pandas? We too! You can mix SQL and Pandas API together: df = c.query_dataframe('SELECT * FROM table(test)') print(df) print(df.describe()) + + # Converting query results to a variety of formats with dbapi + with connect('proton://localhost') as conn: + with conn.cursor() as cur: + cur.execute('SELECT * FROM table(test)') + print(cur.df()) # Pandas DataFrame + + cur.execute('SELECT * FROM table(test)') + print(cur.fetchnumpy()) # Numpy Arrays + + cur.execute('SELECT * FROM table(test)') + print(cur.pl()) # Polars DataFrame + + cur.execute('SELECT * FROM table(test)') + print(cur.arrow()) # Arrow Table \ No newline at end of file diff --git a/example/idempotent/idempotent.py b/example/idempotent/idempotent.py new file mode 100644 index 0000000..c363dc6 --- /dev/null +++ b/example/idempotent/idempotent.py @@ -0,0 +1,73 @@ +from proton_driver import connect, Client +from datetime import date +from time import sleep + + +# Create a test stream +def create_test_stream(operator, table_name, table_columns): + operator.execute(f'DROP STREAM IF EXISTS {table_name};') + operator.execute(f'CREATE STREAM {table_name} ({table_columns})') + + +# Use dbapi to implement idempotent insertion +def use_dbapi(): + with connect('proton://localhost') as conn: + with conn.cursor() as cur: + create_test_stream( + cur, + 'test_user', + 'id int32, name string, birthday date' + ) + # Set idempotent_id. + cur.set_settings(dict(idempotent_id='batch1')) + # Insert data into test_user multiple times with the same idempotent_id. # noqa + # The query result should contain only the first inserted data. + data = [ + (123456, 'timeplus', date(2024, 10, 24)), + (789012, 'stream ', date(2023, 10, 24)), + (135790, 'proton ', date(2024, 10, 24)), + (246801, 'database', date(2024, 10, 24)), + ] + # Execute multiple insert operations. + for _ in range(10): + cur.execute( + 'INSERT INTO test_user (id, name, birthday) VALUES', + data + ) + cur.fetchall() + # wait for 3 sec to make sure data available in historical store. + sleep(3) + cur.execute('SELECT count() FROM table(test_user)') + res = cur.fetchall() + # Data is inserted only once,so res == (4,). + print(res) + + +# Use Client to implement idempotent insertion +def use_client(): + cli = Client('localhost', 8463) + create_test_stream(cli, 'test_stream', '`i` int, `v` string') + setting = { + 'idempotent_id': 'batch1' + } + data = [ + (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), + (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h') + ] + # Execute multiple insert operations. + for _ in range(10): + cli.execute( + 'INSERT INTO test_stream (i, v) VALUES', + data, + settings=setting + ) + # wait for 3 sec to make sure data available in historical store. + sleep(3) + res = cli.execute('SELECT count() FROM table(test_stream)') + # Data is inserted only once,so res == (8,). + print(res) + + +if __name__ == "__main__": + use_dbapi() # (4,) + use_client() # (8,) diff --git a/example/pandas/dataframe.py b/example/pandas/dataframe.py index ce1d57f..97d4f2c 100644 --- a/example/pandas/dataframe.py +++ b/example/pandas/dataframe.py @@ -1,7 +1,7 @@ import pandas as pd import time -from proton_driver import client +from proton_driver import client, connect if __name__ == "__main__": c = client.Client(host='127.0.0.1', port=8463) @@ -37,3 +37,22 @@ df = c.query_dataframe('SELECT * FROM table(test)') print(df) print(df.describe()) + + # Converting query results to a variety of formats with dbapi + with connect('proton://localhost') as conn: + with conn.cursor() as cur: + cur.execute('SELECT * FROM table(test)') + print('--------------Pandas DataFrame--------------') + print(cur.df()) + + cur.execute('SELECT * FROM table(test)') + print('----------------Numpy Arrays----------------') + print(cur.fetchnumpy()) + + cur.execute('SELECT * FROM table(test)') + print('--------------Polars DataFrame--------------') + print(cur.pl()) + + cur.execute('SELECT * FROM table(test)') + print('-----------------Arrow Table----------------') + print(cur.arrow()) diff --git a/proton_driver/dbapi/cursor.py b/proton_driver/dbapi/cursor.py index e0912cc..e1630d7 100644 --- a/proton_driver/dbapi/cursor.py +++ b/proton_driver/dbapi/cursor.py @@ -208,6 +208,78 @@ def fetchall(self): self._rows = [] return rv + def df(self): + """ + Fetch all (remaining) rows of a query result, returning them as + a pandas DataFrame. + + :return: Pandas DataFrame of fetched rows. + """ + self._check_query_started() + + import pandas as pd + + rv = pd.DataFrame({ + name: [row[i] for row in self._rows] if name else None + for i, name in enumerate(self._columns) + }) + self._rows = [] + return rv + + def fetchnumpy(self): + """ + Fetch all (remaining) rows of a query result, returning + them as a dictionary of NumPy arrays. + + :return: Dictionary of NumPy arrays of fetched rows. + """ + self._check_query_started() + + import numpy as np + + rv = { + name: np.array([row[i] for row in self._rows]) if name else None + for i, name in enumerate(self._columns) + } + self._rows = [] + return rv + + def pl(self): + """ + Fetch all (remaining) rows of a query result, returning them as + a Polars DataFrame. + + :return: Polars DataFrame of fetched rows. + """ + self._check_query_started() + + import polars as pl + + rv = pl.DataFrame({ + name: [row[i] for row in self._rows] if name else None + for i, name in enumerate(self._columns) + }) + self._rows = [] + return rv + + def arrow(self): + """ + Fetch all (remaining) rows of a query result, returning them as + a Arrow Table. + + :return: Arrow Table of fetched rows. + """ + self._check_query_started() + + import pyarrow as pa + + rv = pa.table({ + name: [row[i] for row in self._rows] if name else None + for i, name in enumerate(self._columns) + }) + self._rows = [] + return rv + def setinputsizes(self, sizes): # Do nothing. pass diff --git a/proton_driver/settings/available.py b/proton_driver/settings/available.py index 1f4343b..de342b1 100644 --- a/proton_driver/settings/available.py +++ b/proton_driver/settings/available.py @@ -402,4 +402,7 @@ 'format_regexp_escaping_rule': SettingString, 'format_regexp_skip_unmatched': SettingBool, 'output_format_enable_streaming': SettingBool, + + 'idempotent_id': SettingString, + 'enable_idempotent_processing': SettingBool, } diff --git a/tests/numpy/test_generic.py b/tests/numpy/test_generic.py index bfa2c58..266297a 100644 --- a/tests/numpy/test_generic.py +++ b/tests/numpy/test_generic.py @@ -8,6 +8,9 @@ from tests.testcase import BaseTestCase from tests.numpy.testcase import NumpyBaseTestCase +from proton_driver import connect +from datetime import datetime +from decimal import Decimal class GenericTestCase(NumpyBaseTestCase): @@ -171,3 +174,110 @@ def test_query_dataframe(self): self.assertEqual( 'Extras for NumPy must be installed', str(e.exception) ) + + +class DataFrameDBAPITestCase(NumpyBaseTestCase): + types = \ + 'a int64, b string, c datetime,' \ + 'd fixed_string(10), e decimal(9, 5), f float64,' \ + 'g low_cardinality(string), h nullable(int32)' + + columns = 'a,b,c,d,e,f,g,h' + data = [ + [ + 123, 'abc', datetime(2024, 5, 20, 12, 11, 10), + 'abcefgcxxx', Decimal('300.42'), 3.402823e12, + '127001', 332 + ], + [ + 456, 'cde', datetime(2024, 6, 21, 12, 13, 50), + '1234567890', Decimal('171.31'), -3.4028235e13, + '127001', None + ], + [ + 789, 'efg', datetime(1998, 7, 22, 12, 30, 10), + 'stream sql', Decimal('894.22'), float('inf'), + '127001', None + ], + ] + + def setUp(self): + super(DataFrameDBAPITestCase, self).setUp() + self.conn = connect('proton://localhost') + self.cur = self.conn.cursor() + self.cur.execute('DROP STREAM IF EXISTS test') + self.cur.execute(f'CREATE STREAM test ({self.types}) ENGINE = Memory') + self.cur.execute( + f'INSERT INTO test ({self.columns}) VALUES', + self.data + ) + self.cur.execute(f'SELECT {self.columns} FROM test') + + def tearDown(self): + super(DataFrameDBAPITestCase, self).tearDown() + self.cur.execute('DROP STREAM test') + + def test_dbapi_fetchnumpy(self): + expect = { + col: np.array([row[i] for row in self.data]) + for i, col in enumerate(self.columns.split(',')) + } + rv = self.cur.fetchnumpy() + for key, value in expect.items(): + self.assertIsNotNone(rv.get(key)) + self.assertarraysEqual(value, rv[key]) + + def test_dbapi_df(self): + expect = pd.DataFrame(self.data, columns=self.columns.split(',')) + df = self.cur.df() + + self.assertIsInstance(df, pd.DataFrame) + self.assertEqual(df.shape, (3, 8)) + self.assertEqual( + [type.name for type in df.dtypes], + ['int64', 'object', 'datetime64[ns]', + 'object', 'object', 'float64', + 'object', 'float64'] + ) + self.assertTrue(expect.equals(df)) + + def test_dbapi_pl(self): + try: + import polars as pl + except ImportError: + self.skipTest('Polars extras are not installed') + + expect = pl.DataFrame({ + col: [row[i] for row in self.data] + for i, col in enumerate(self.columns.split(',')) + }) + + df = self.cur.pl() + self.assertIsInstance(df, pl.DataFrame) + self.assertEqual(df.shape, (3, 8)) + self.assertSequenceEqual( + df.schema.dtypes(), + [pl.Int64, pl.String, pl.Datetime, pl.String, + pl.Decimal, pl.Float64, pl.String, pl.Int64] + ) + self.assertTrue(expect.equals(df)) + + def test_dbapi_arrow(self): + try: + import pyarrow as pa + except ImportError: + self.skipTest('Pyarrow extras are not installed') + + expect = pa.table({ + col: [row[i] for row in self.data] + for i, col in enumerate(self.columns.split(',')) + }) + at = self.cur.arrow() + self.assertEqual(at.shape, (3, 8)) + self.assertSequenceEqual( + at.schema.types, + [pa.int64(), pa.string(), pa.timestamp('us'), + pa.string(), pa.decimal128(5, 2), pa.float64(), + pa.string(), pa.int64()] + ) + self.assertTrue(expect.equals(at)) diff --git a/tests/test_dbapi.py b/tests/test_dbapi.py index 9b54f4c..96cfa48 100644 --- a/tests/test_dbapi.py +++ b/tests/test_dbapi.py @@ -3,7 +3,7 @@ from contextlib import contextmanager import socket from unittest.mock import patch - +from time import sleep from proton_driver import connect from proton_driver.dbapi import ( ProgrammingError, InterfaceError, OperationalError @@ -159,6 +159,27 @@ def test_execute_insert(self): cursor.execute('INSERT INTO test VALUES', [[4]]) self.assertEqual(cursor.rowcount, 1) + def test_idempotent_insert(self): + with self.created_cursor() as cursor: + cursor.execute('CREATE STREAM test (i int, v string)') + data = [ + (123, 'abc'), (456, 'def'), (789, 'ghi'), + (987, 'ihg'), (654, 'fed'), (321, 'cba'), + ] + cursor.set_settings(dict(idempotent_id='batch1')) + for _ in range(10): + cursor.execute( + 'INSERT INTO test (i, v) VALUES', + data + ) + self.assertEqual(cursor.rowcount, 6) + sleep(3) + rv = cursor.execute('SELECT count(*) FROM table(test)') + rv = cursor.fetchall() + self.assertEqual(rv, [(6,)]) + + cursor.execute('DROP STREAM test') + def test_description(self): with self.created_cursor() as cursor: self.assertIsNone(cursor.description) diff --git a/tests/test_insert.py b/tests/test_insert.py index 1bdf290..3fb9a47 100644 --- a/tests/test_insert.py +++ b/tests/test_insert.py @@ -1,5 +1,5 @@ from datetime import date - +from time import sleep from tests.testcase import BaseTestCase from proton_driver import errors from proton_driver.errors import ServerException @@ -148,6 +148,31 @@ def test_insert_return(self): ) self.assertEqual(rv, 5) + def test_idempotent_insert(self): + self.client.execute('CREATE STREAM test (i int, v string)') + + data = [ + (123, 'abc'), (456, 'def'), (789, 'ghi'), + (987, 'ihg'), (654, 'fed'), (321, 'cba'), + ] + + setting = { + 'idempotent_id': 'batch1' + } + + for _ in range(20): + rv = self.client.execute( + 'INSERT INTO test (i, v) VALUES', + data, + settings=setting + ) + self.assertEqual(rv, 6) + sleep(3) + rv = self.client.execute('SELECT count(*) FROM table(test)') + self.assertEqual(rv, [(6, )]) + + self.client.execute('DROP STREAM test') + class InsertColumnarTestCase(BaseTestCase): def test_insert_tuple_ok(self):