Skip to content

Commit

Permalink
feat: add support for asyncpg
Browse files Browse the repository at this point in the history
  • Loading branch information
enocom committed Jan 9, 2024
1 parent a956f9f commit c5b89a2
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 2 deletions.
127 changes: 126 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ Currently supported drivers are:

You can install this library with `pip install`:

### pg8000

```sh
pip install "google-cloud-alloydb-connector[pg8000]"
```

### asyncpg

```sh
pip install "google-cloud-alloydb-connector[asyncpg]"
```

### APIs and Services

This package requires the following to connect successfully:
Expand Down Expand Up @@ -70,7 +78,7 @@ This package provides several functions for authorizing and encrypting
connections. These functions are used with your database driver to connect to
your AlloyDB instance.

AlloyDB supports network connectivity through private, internal IP addresses only.
AlloyDB supports network connectivity through private, internal IP addresses only.
This package must be run in an environment that is connected to the
[VPC Network][vpc] that hosts your AlloyDB private IP address.

Expand Down Expand Up @@ -202,6 +210,123 @@ with pool.connect() as db_conn:
print(row)
```

### Async Driver Usage

The AlloyDB Connector is compatible with [asyncio][] to improve the speed and
efficiency of database connections through concurrency. You can use all
non-asyncio drivers through the `Connector.connect_async` function, in addition
to the following asyncio database drivers:

- [asyncpg](https://magicstack.github.io/asyncpg)

[asyncio]: https://docs.python.org/3/library/asyncio.html

```python
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import Connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool

async def main():
loop = asyncio.get_running_loop()
connector = Connector(loop=loop)

# initialize connection pool
pool = await init_connection_pool(connector)

# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))

# close Connector
await connector.close_async()

# dispose of connection pool
await pool.dispose()
```

For more details on additional arguments with an `asyncpg.Connection`, please
visit the [official documentation][asyncpg-docs].


[asyncpg-docs]: https://magicstack.github.io/asyncpg/current/api/index.html

### Async Context Manager

The `Connector` also may be used as an async context manager, removing the need
for explicit calls to `connector.close_async()` to cleanup resources.

**Note:** This alternative requires that the running event loop be
passed in as the `loop` argument to `Connector()`.

```python
import asyncio
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import Connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool

async def main():
# initialize Connector object for connections to AlloyDB
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# initialize connection pool
pool = await init_connection_pool(connector)

# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))

# dispose of connection pool
await pool.dispose()
```

## Support policy

### Major version lifecycle
Expand Down
64 changes: 64 additions & 0 deletions google/cloud/alloydb/connector/asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Copyright 2023 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import ssl
from typing import Any, TYPE_CHECKING

SERVER_PROXY_PORT = 5433

if TYPE_CHECKING:
import asyncpg


async def connect(
ip_address: str, ctx: ssl.SSLContext, **kwargs: Any
) -> "asyncpg.Connection":
"""Helper function to create an asyncpg DB-API connection object.
:type ip_address: str
:param ip_address: A string containing an IP address for the AlloyDB
instance.
:type ctx: ssl.SSLContext
:param ctx: An SSLContext object created from the AlloyDB server CA
cert and ephemeral cert.
:type kwargs: Any
:param kwargs: Keyword arguments for establishing asyncpg connection
object to AlloyDB instance.
:rtype: asyncpg.Connection
:returns: An asyncpg.Connection object to an AlloyDB instance.
"""
try:
import asyncpg
except ImportError:
raise ImportError(
'Unable to import module "asyncpg." Please install and try again.'
)
user = kwargs.pop("user")
db = kwargs.pop("db")
passwd = kwargs.pop("password", None)

return await asyncpg.connect(
user=user,
database=db,
password=passwd,
host=ip_address,
port=SERVER_PROXY_PORT,
ssl=ctx,
direct_tls=True,
**kwargs,
)
24 changes: 23 additions & 1 deletion google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.instance import Instance
import google.cloud.alloydb.connector.pg8000 as pg8000
import google.cloud.alloydb.connector.asyncpg as asyncpg
from google.cloud.alloydb.connector.utils import generate_keys

if TYPE_CHECKING:
from google.auth.credentials import Credentials

ASYNC_DRIVERS = ["asyncpg"]


class Connector:
"""A class to configure and create connections to Cloud SQL instances.
Expand All @@ -51,9 +54,10 @@ def __init__(
credentials: Optional[Credentials] = None,
quota_project: Optional[str] = None,
alloydb_api_endpoint: str = "https://alloydb.googleapis.com",
loop: asyncio.AbstractEventLoop = None
) -> None:
# create event loop and start it in background thread
self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self._loop: asyncio.AbstractEventLoop = loop or asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
self._instances: Dict[str, Instance] = {}
Expand Down Expand Up @@ -131,6 +135,7 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->

connect_func = {
"pg8000": pg8000.connect,
"asyncpg": asyncpg.connect,
}
# only accept supported database drivers
try:
Expand All @@ -149,6 +154,10 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->

# synchronous drivers are blocking and run using executor
try:
# async drivers are unblocking and can be awaited directly
if driver in ASYNC_DRIVERS:
return await connector(ip_address, context, **kwargs)

connect_partial = partial(connector, ip_address, context, **kwargs)
return await self._loop.run_in_executor(None, connect_partial)
except Exception:
Expand All @@ -169,6 +178,19 @@ def __exit__(
"""Exit context manager by closing Connector"""
self.close()

async def __aenter__(self) -> Any:
"""Enter async context manager by returning Connector object"""
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit async context manager by closing Connector"""
await self.close_async()

def close(self) -> None:
"""Close Connector by stopping tasks and releasing resources."""
close_future = asyncio.run_coroutine_threadsafe(
Expand Down
50 changes: 50 additions & 0 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

import asyncpg
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine

from google.cloud.alloydb.connector import Connector


@pytest.mark.asyncio
async def test_connection_with_asyncpg() -> None:
async def getconn() -> asyncpg.Connection:
loop = asyncio.get_running_loop()
# initialize Connector object for connections to Cloud SQL
async with Connector(loop=loop) as connector:
conn: asyncpg.Connection = await connector.connect_async(
os.environ["ALLOYDB_INSTANCE_URI"],
"asyncpg",
user=os.environ["ALLOYDB_USER"],
password=os.environ["ALLOYDB_PASS"],
db=os.environ["ALLOYDB_DB"],
)
return conn

# create SQLAlchemy connection pool
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
async with pool.connect() as conn:
res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone()
res = res[0]
assert res == 1

0 comments on commit c5b89a2

Please sign in to comment.