Skip to content

Commit

Permalink
chore(deps): bring back psycopg2 for risingwave
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Jan 17, 2025
1 parent 8af5d97 commit 5c09424
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 50 deletions.
1 change: 1 addition & 0 deletions conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- pins >=0.8.2
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow =11.0.0
- pyarrow-tests
Expand Down
1 change: 1 addition & 0 deletions conda/environment-arm64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- pins >=0.8.2
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow >=10.0.1
- pyarrow-tests
Expand Down
1 change: 1 addition & 0 deletions conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies:
- pip
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow >=10.0.1
- pyarrow-hotfix >=0.4
Expand Down
76 changes: 31 additions & 45 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote_plus

import psycopg2
import sqlglot as sg
import sqlglot.expressions as sge
from psycopg2 import extras

import ibis
import ibis.backends.sql.compilers as sc
Expand All @@ -28,7 +30,6 @@

import pandas as pd
import polars as pl
import psycopg2.extensions
import pyarrow as pa


Expand Down Expand Up @@ -104,49 +105,6 @@ def _from_url(self, url: ParseResult, **kwargs):

return self.connect(**kwargs)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
from pandas.api.types import is_float_dtype
from psycopg2.extras import execute_batch

schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)

name = op.name
quoted = self.compiler.quoted
create_stmt = sg.exp.Create(
kind="TABLE",
this=sg.exp.Schema(
this=sg.to_identifier(name, quoted=quoted),
expressions=schema.to_sqlglot(self.dialect),
),
properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]),
)
create_stmt_sql = create_stmt.sql(self.dialect)

df = op.data.to_frame()
# nan gets compiled into 'NaN'::float which throws errors in non-float columns
# In order to hold NaN values, pandas automatically converts integer columns
# to float columns if there are NaN values in them. Therefore, we need to convert
# them to their original dtypes (that support pd.NA) to figure out which columns
# are actually non-float, then fill the NaN values in those columns with None.
convert_df = df.convert_dtypes()
for col in convert_df.columns:
if not is_float_dtype(convert_df[col]):
df[col] = df[col].replace(float("nan"), None)

data = df.itertuples(index=False)
sql = self._build_insert_template(
name, schema=schema, columns=True, placeholder="%s"
)

with self.begin() as cur:
cur.execute(create_stmt_sql)
execute_batch(cur, sql, data, 128)

@contextlib.contextmanager
def begin(self):
con = self.con
Expand Down Expand Up @@ -529,7 +487,6 @@ def do_connect(
year int32
month int32
"""
import psycopg2

self.con = psycopg2.connect(
host=host,
Expand Down Expand Up @@ -683,6 +640,35 @@ def create_table(
name, schema=schema, source=self, namespace=ops.Namespace(database=database)
).to_expr()

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
raise com.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)

name = op.name
quoted = self.compiler.quoted

create_stmt = sg.exp.Create(
kind="TABLE",
this=sg.exp.Schema(
this=sg.to_identifier(name, quoted=quoted),
expressions=schema.to_sqlglot(self.dialect),
),
)
create_stmt_sql = create_stmt.sql(self.dialect)

df = op.data.to_frame()
data = df.itertuples(index=False)
sql = self._build_insert_template(
name, schema=schema, columns=True, placeholder="%s"
)
with self.begin() as cur:
cur.execute(create_stmt_sql)
extras.execute_batch(cur, sql, data, 128)

def list_databases(
self, *, like: str | None = None, catalog: str | None = None
) -> list[str]:
Expand Down
3 changes: 1 addition & 2 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
PsycoPg2InternalError,
PsycoPg2ProgrammingError,
PsycoPg2SyntaxError,
PsycoPgInternalError,
PsycoPgSyntaxError,
Py4JJavaError,
PyAthenaDatabaseError,
Expand Down Expand Up @@ -1097,7 +1096,7 @@ def test_array_intersect(con, data):

@builtin_array
@pytest.mark.notimpl(["postgres"], raises=PsycoPgSyntaxError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPgInternalError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPg2InternalError)
@pytest.mark.notimpl(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ sqlite = [
"rich>=12.4.4,<14",
]
risingwave = [
"psycopg[binary]>=3.2.0,<4",
"psycopg2>=2.8.4,<3",
"pyarrow>=10.0.1",
"pyarrow-hotfix>=0.4,<1",
"numpy>=1.23.2,<3",
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5c09424

Please sign in to comment.