Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting dataframe and update idempotent query example #57

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ Insert Data

Pandas DataFrame
----------------
Big fan of Pandas? We too! You can mix SQL and Pandas API together:
Big fan of Pandas? We too! You can mix SQL and Pandas API together. Also you can converting query results to a variety of formats(e.g. Numpy Array, Pandas DataFrame, Polars DataFrame, Arrow Table) by DBAPI.


.. code-block:: python

Expand Down Expand Up @@ -128,3 +129,18 @@ Big fan of Pandas? We too! You can mix SQL and Pandas API together:
df = c.query_dataframe('SELECT * FROM table(test)')
print(df)
print(df.describe())

# Converting query results to a variety of formats with dbapi
with connect('proton://localhost') as conn:
with conn.cursor() as cur:
cur.execute('SELECT * FROM table(test)')
print(cur.df()) # Pandas DataFrame

cur.execute('SELECT * FROM table(test)')
print(cur.fetchnumpy()) # Numpy Arrays

cur.execute('SELECT * FROM table(test)')
print(cur.pl()) # Polars DataFrame

cur.execute('SELECT * FROM table(test)')
print(cur.arrow()) # Arrow Table
73 changes: 73 additions & 0 deletions example/idempotent/idempotent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from proton_driver import connect, Client
from datetime import date
from time import sleep


# Create a test stream
def create_test_stream(operator, table_name, table_columns):
operator.execute(f'DROP STREAM IF EXISTS {table_name};')
operator.execute(f'CREATE STREAM {table_name} ({table_columns})')


# Use dbapi to implement idempotent insertion
def use_dbapi():
with connect('proton://localhost') as conn:
with conn.cursor() as cur:
create_test_stream(
cur,
'test_user',
'id int32, name string, birthday date'
)
# Set idempotent_id.
cur.set_settings(dict(idempotent_id='batch1'))
# Insert data into test_user multiple times with the same idempotent_id. # noqa
# The query result should contain only the first inserted data.
data = [
(123456, 'timeplus', date(2024, 10, 24)),
(789012, 'stream ', date(2023, 10, 24)),
(135790, 'proton ', date(2024, 10, 24)),
(246801, 'database', date(2024, 10, 24)),
]
# Execute multiple insert operations.
for _ in range(10):
cur.execute(
'INSERT INTO test_user (id, name, birthday) VALUES',
data
)
cur.fetchall()
# wait for 3 sec to make sure data available in historical store.
sleep(3)
cur.execute('SELECT count() FROM table(test_user)')
res = cur.fetchall()
# Data is inserted only once,so res == (4,).
print(res)


# Use Client to implement idempotent insertion
def use_client():
cli = Client('localhost', 8463)
create_test_stream(cli, 'test_stream', '`i` int, `v` string')
setting = {
'idempotent_id': 'batch1'
}
data = [
(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'),
(5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')
]
# Execute multiple insert operations.
for _ in range(10):
cli.execute(
'INSERT INTO test_stream (i, v) VALUES',
data,
settings=setting
)
# wait for 3 sec to make sure data available in historical store.
sleep(3)
res = cli.execute('SELECT count() FROM table(test_stream)')
# Data is inserted only once,so res == (8,).
print(res)


if __name__ == "__main__":
use_dbapi() # (4,)
use_client() # (8,)
21 changes: 20 additions & 1 deletion example/pandas/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import time

from proton_driver import client
from proton_driver import client, connect

if __name__ == "__main__":
c = client.Client(host='127.0.0.1', port=8463)
Expand Down Expand Up @@ -37,3 +37,22 @@
df = c.query_dataframe('SELECT * FROM table(test)')
print(df)
print(df.describe())

# Converting query results to a variety of formats with dbapi
with connect('proton://localhost') as conn:
with conn.cursor() as cur:
cur.execute('SELECT * FROM table(test)')
print('--------------Pandas DataFrame--------------')
print(cur.df())

cur.execute('SELECT * FROM table(test)')
print('----------------Numpy Arrays----------------')
print(cur.fetchnumpy())

cur.execute('SELECT * FROM table(test)')
print('--------------Polars DataFrame--------------')
print(cur.pl())

cur.execute('SELECT * FROM table(test)')
print('-----------------Arrow Table----------------')
print(cur.arrow())
72 changes: 72 additions & 0 deletions proton_driver/dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,78 @@ def fetchall(self):
self._rows = []
return rv

def df(self):
"""
Fetch all (remaining) rows of a query result, returning them as
a pandas DataFrame.

:return: Pandas DataFrame of fetched rows.
"""
self._check_query_started()

import pandas as pd

rv = pd.DataFrame({
name: [row[i] for row in self._rows] if name else None
for i, name in enumerate(self._columns)
})
self._rows = []
return rv

def fetchnumpy(self):
"""
Fetch all (remaining) rows of a query result, returning
them as a dictionary of NumPy arrays.

:return: Dictionary of NumPy arrays of fetched rows.
"""
self._check_query_started()

import numpy as np

rv = {
name: np.array([row[i] for row in self._rows]) if name else None
for i, name in enumerate(self._columns)
}
self._rows = []
return rv

def pl(self):
"""
Fetch all (remaining) rows of a query result, returning them as
a Polars DataFrame.

:return: Polars DataFrame of fetched rows.
"""
self._check_query_started()

import polars as pl

rv = pl.DataFrame({
name: [row[i] for row in self._rows] if name else None
for i, name in enumerate(self._columns)
})
self._rows = []
return rv

def arrow(self):
"""
Fetch all (remaining) rows of a query result, returning them as
a Arrow Table.

:return: Arrow Table of fetched rows.
"""
self._check_query_started()

import pyarrow as pa

rv = pa.table({
name: [row[i] for row in self._rows] if name else None
for i, name in enumerate(self._columns)
})
self._rows = []
return rv

def setinputsizes(self, sizes):
# Do nothing.
pass
Expand Down
3 changes: 3 additions & 0 deletions proton_driver/settings/available.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,7 @@
'format_regexp_escaping_rule': SettingString,
'format_regexp_skip_unmatched': SettingBool,
'output_format_enable_streaming': SettingBool,

'idempotent_id': SettingString,
'enable_idempotent_processing': SettingBool,
}
110 changes: 110 additions & 0 deletions tests/numpy/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from tests.testcase import BaseTestCase
from tests.numpy.testcase import NumpyBaseTestCase
from proton_driver import connect
from datetime import datetime
from decimal import Decimal


class GenericTestCase(NumpyBaseTestCase):
Expand Down Expand Up @@ -171,3 +174,110 @@ def test_query_dataframe(self):
self.assertEqual(
'Extras for NumPy must be installed', str(e.exception)
)


class DataFrameDBAPITestCase(NumpyBaseTestCase):
types = \
'a int64, b string, c datetime,' \
'd fixed_string(10), e decimal(9, 5), f float64,' \
'g low_cardinality(string), h nullable(int32)'

columns = 'a,b,c,d,e,f,g,h'
data = [
[
123, 'abc', datetime(2024, 5, 20, 12, 11, 10),
'abcefgcxxx', Decimal('300.42'), 3.402823e12,
'127001', 332
],
[
456, 'cde', datetime(2024, 6, 21, 12, 13, 50),
'1234567890', Decimal('171.31'), -3.4028235e13,
'127001', None
],
[
789, 'efg', datetime(1998, 7, 22, 12, 30, 10),
'stream sql', Decimal('894.22'), float('inf'),
'127001', None
],
]

def setUp(self):
super(DataFrameDBAPITestCase, self).setUp()
self.conn = connect('proton://localhost')
self.cur = self.conn.cursor()
self.cur.execute('DROP STREAM IF EXISTS test')
self.cur.execute(f'CREATE STREAM test ({self.types}) ENGINE = Memory')
self.cur.execute(
f'INSERT INTO test ({self.columns}) VALUES',
self.data
)
self.cur.execute(f'SELECT {self.columns} FROM test')

def tearDown(self):
super(DataFrameDBAPITestCase, self).tearDown()
self.cur.execute('DROP STREAM test')

def test_dbapi_fetchnumpy(self):
expect = {
col: np.array([row[i] for row in self.data])
for i, col in enumerate(self.columns.split(','))
}
rv = self.cur.fetchnumpy()
for key, value in expect.items():
self.assertIsNotNone(rv.get(key))
self.assertarraysEqual(value, rv[key])

def test_dbapi_df(self):
expect = pd.DataFrame(self.data, columns=self.columns.split(','))
df = self.cur.df()

self.assertIsInstance(df, pd.DataFrame)
self.assertEqual(df.shape, (3, 8))
self.assertEqual(
[type.name for type in df.dtypes],
['int64', 'object', 'datetime64[ns]',
'object', 'object', 'float64',
'object', 'float64']
)
self.assertTrue(expect.equals(df))

def test_dbapi_pl(self):
try:
import polars as pl
except ImportError:
self.skipTest('Polars extras are not installed')

expect = pl.DataFrame({
col: [row[i] for row in self.data]
for i, col in enumerate(self.columns.split(','))
})

df = self.cur.pl()
self.assertIsInstance(df, pl.DataFrame)
self.assertEqual(df.shape, (3, 8))
self.assertSequenceEqual(
df.schema.dtypes(),
[pl.Int64, pl.String, pl.Datetime, pl.String,
pl.Decimal, pl.Float64, pl.String, pl.Int64]
)
self.assertTrue(expect.equals(df))

def test_dbapi_arrow(self):
try:
import pyarrow as pa
except ImportError:
self.skipTest('Pyarrow extras are not installed')

expect = pa.table({
col: [row[i] for row in self.data]
for i, col in enumerate(self.columns.split(','))
})
at = self.cur.arrow()
self.assertEqual(at.shape, (3, 8))
self.assertSequenceEqual(
at.schema.types,
[pa.int64(), pa.string(), pa.timestamp('us'),
pa.string(), pa.decimal128(5, 2), pa.float64(),
pa.string(), pa.int64()]
)
self.assertTrue(expect.equals(at))
23 changes: 22 additions & 1 deletion tests/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from contextlib import contextmanager
import socket
from unittest.mock import patch

from time import sleep
from proton_driver import connect
from proton_driver.dbapi import (
ProgrammingError, InterfaceError, OperationalError
Expand Down Expand Up @@ -159,6 +159,27 @@ def test_execute_insert(self):
cursor.execute('INSERT INTO test VALUES', [[4]])
self.assertEqual(cursor.rowcount, 1)

def test_idempotent_insert(self):
with self.created_cursor() as cursor:
cursor.execute('CREATE STREAM test (i int, v string)')
data = [
(123, 'abc'), (456, 'def'), (789, 'ghi'),
(987, 'ihg'), (654, 'fed'), (321, 'cba'),
]
cursor.set_settings(dict(idempotent_id='batch1'))
for _ in range(10):
cursor.execute(
'INSERT INTO test (i, v) VALUES',
data
)
self.assertEqual(cursor.rowcount, 6)
sleep(3)
rv = cursor.execute('SELECT count(*) FROM table(test)')
rv = cursor.fetchall()
self.assertEqual(rv, [(6,)])

cursor.execute('DROP STREAM test')

def test_description(self):
with self.created_cursor() as cursor:
self.assertIsNone(cursor.description)
Expand Down
Loading