pysqlsync helps you synchronize your target database or data warehouse with a data source, making efficient use of Python language elements and database drivers (client libraries). The library package employs Python data-classes (decorated with @dataclass
) to define database tables and generate CREATE
, DROP
, INSERT
, MERGE
and DELETE
SQL statements. Commands against the database driver are executed with the asynchronous paradigm (async
and await
). This can result in an order of magnitude speed increase over traditional methods such as SQLAlchemy when data is inserted or upserted (merged) into, or deleted from a table.
- Formation. Create an initializer SQL script (e.g. ANSI, PostgreSQL or MySQL) from a set of Python
@dataclass
definitions. - Discovery. Create a traversable Python object hierarchy (e.g.
Namespace
,Table
,Column
andConstraint
objects) from a database catalog (usinginformation_schema
orpg_catalog
). - Schema synchronization. Emit SQL statements to mutate the database schema from a source state to a desired target state using
CREATE
,DROP
andALTER
. - Data import. Efficiently insert data from a list of objects or tuples of simple types into a database table using
INSERT
,MERGE
andDELETE
with multiple arguments, and lists of tuples or collections of objects as input.
First, define the table structure with a standard Python data-class (including dependent data types):
class WorkflowState(enum.Enum):
active = "active"
inactive = "inactive"
deleted = "deleted"
@dataclasses.dataclass
class UserTable:
id: PrimaryKey[int]
updated_at: datetime
workflow_state: WorkflowState
uuid: uuid.UUID
name: str
short_name: Annotated[str, MaxLength(255)]
homepage_url: Optional[str] = None
The data-class can be defined statically in code, or generated dynamically from input (with dataclasses.make_dataclass
). Fields can be required or nullable (represented in Python as None
). All basic data types are supported, including integers (of various widths), floating-point numbers, strings (of fixed or variable length), timestamps (datetime.datetime
in Python), UUIDs (uuid.UUID
in Python), enumerations (represented in Python as enum.Enum
), etc. list[...]
is supported as a collection type, and composite types (data-classes without a primary key) are also permitted.
Next, instantiate a database engine, open a connection, create the database structure (with a CREATE TABLE
statement), and populate the database with initial data (with SQL INSERT
or COPY
):
engine = get_dialect("postgresql")
parameters = ConnectionParameters(
host="localhost",
port=5432,
username="levente.hunyadi",
password=None,
database="levente.hunyadi",
)
options = GeneratorOptions(
enum_mode=EnumMode.RELATION, namespaces={tables: "example"}
)
data = [
UserTable(
id=1,
updated_at=datetime.now(),
workflow_state=WorkflowState.active,
uuid=uuid.uuid4(),
name="Laura Twenty-Four",
short_name="Laura",
)
]
async with engine.create_connection(parameters, options) as conn:
await conn.create_objects([UserTable])
await conn.insert_data(UserTable, data)
Let's assume the database structure changes. With the help of an Explorer
instance, discover the objects in the database, and create/drop objects to match the state as captured in the specified Python module:
async with engine.create_connection(parameters, options) as conn:
await engine.create_explorer(conn).synchronize(module=tables)
Finally, keep the target database content synchronized with data from the source (with the equivalent of SQL MERGE
):
data = [
UserTable(
id=2,
updated_at=datetime.now(),
workflow_state=WorkflowState.active,
uuid=uuid.uuid4(),
name="Zeta Twelve",
short_name="Zeta",
)
]
async with engine.create_connection(parameters, options) as conn:
await engine.create_explorer(conn).synchronize(module=tables)
await conn.upsert_data(UserTable, data)
In order to boost efficiency, you can insert (or update) data directly from a list of tuples:
field_names = ["id", "uuid", "name", "short_name", "workflow_state", "updated_at"]
field_types = [int, uuid.UUID, str, str, str, datetime]
rows = [
(1, uuid.uuid4(), "Laura Twenty-Four", "Laura", "active", datetime.now()),
(2, uuid.uuid4(), "Zeta Twelve", "Zeta", "inactive", datetime.now()),
]
async with engine.create_connection(parameters, options) as conn:
await engine.create_explorer(conn).synchronize(module=tables)
table = conn.get_table(UserTable)
await conn.upsert_rows(
table,
field_names=tuple(field_names),
field_types=tuple(field_types),
records=rows,
)
pysqlsync supports two modes of operation: structure synchronization and data synchronization.
When performing structure synchronization, pysqlsync morphs a database source state into a desired target state. The source state is typically obtained with reflection (e.g. extracting metadata from information_schema
or pg_catalog
). The target state is defined as a set of Python data-classes either statically or dynamically (e.g. based on a JSON Schema). Comparing source and target state, pysqlsync creates a transformation script (predominantly ANSI SQL with vendor-specific extensions such as comments), and runs the script against an asynchronous client such as asyncpg
or aiomysql
. This script creates new schemas, structure and enumeration types, tables, columns, constraints, etc. whenever the target state contains items that the source state lacks, and drops database objects when the opposite is true. When there are matching objects (based on qualified name), the object is mutated (e.g. the data type of a column is changed).
Once database structure has been morphed into the desired state, data synchronization helps keep a local database state in sync with a remote database state. pysqlsync implements insert and upsert functions to handle lists of tuples of data. Each tuple corresponds to a table row, and each tuple member is a column entry. Data in each column may have to be transformed to be suitable for the database dialect, e.g. MySQL would have to transform a UUID
into a BINARY(16)
, represented in Python as bytes
because it has no uuid
type. These transformation functions are derived in advance such that there is minimum CPU load to process a record (one of possibly billions). Loading data from network/disk to memory may use an efficient parser implementation such as tsv2py, which significantly speeds up parse time for composite types such as datetime
or UUID
with the help of SIMD CPU instructions.
Formation is the process of generating a series of SQL statements from a collection of Python data-class definitions. Several formation modes are supported.
EnumMode
determines how Python enumeration types (subclasses of enum.Enum
) are converted into database object types. Possible options for the target of an enumeration type are:
- a SQL
ENUM
type created withCREATE TYPE ... AS ENUM ( ... )
(PostgreSQL), or - an inline SQL
ENUM
definition, e.g.ENUM('a', 'b', 'c')
(MySQL and Oracle), or - a SQL data type corresponding to the enumeration value type, and a
CHECK
constraint on the column to block invalid values, or - a foreign/primary key relation (reference constraint), coupled with a lookup table, in which the lookup table consists of an identity column acting as the primary key and a unique column storing the enumeration values.
Extensible enumerations (declared with E | str
or E | str | None
where E
is a subclasses of enum.Enum
) are always expanded into a relation (i.e. separate table) regardless of EnumMode
.
StructMode
determines how to convert composite types that are not mapped into SQL tables. A data-class type may be converted into
- a composite SQL type with
CREATE TYPE ... AS ( ... )
(PostgreSQL), or - the SQL
json
type (PostgreSQL), or - a text type, e.g.
varchar
, which holds the data as serialized JSON.
ArrayMode
determines how to treat sequence types, such as Python lists, i.e. whether to represent them as
- a SQL
array
type (PostgreSQL), or - a JSON array stored in a column with the SQL data type
json
(PostgreSQL), or - a serialized JSON string stored in a text column type, e.g.
varchar
.
GeneratorOptions
allows you to pass a mapping between module types and SQL schema identifiers. Objects in each Python module would be transformed into SQL objects in the corresponding SQL schema (namespace).
- PostgreSQL (with asyncpg)
- Microsoft SQL Server (with pyodbc)
- Oracle (with oracledb)
- MySQL (with aiomysql)
When calling str()
on Python objects such as Table
, Column
or Constraint
, the library returns an ANSI-compliant representation.
pysqlsync features several Python classes that correspond to database objects:
Catalog
captures a database state of possibly several namespaces.Namespace
corresponds to a database schema in engines that feature schemas (e.g. PostgreSQL).Table
represents a database table with a primary key, several columns and constraints.Column
objects declare data type and nullability, and a possibly hold a default value.StructType
andStructMember
represent composite types in engines that support them (e.g. PostgreSQL).EnumType
captures enumeration types for engines that represent them as their own type (e.g. PostgreSQL).
The collection of database objects represents a current structure, which may morph into a target structure. Some of the objects may be vendor-specific, e.g. MySQL has its own MySQLColumn
type and PostgreSQL has its own PostgreSQLTable
type.
All objects are identified either with a local ID (e.g. namespaces and columns) or a qualified ID (e.g. tables). When the database engine lacks namespace support, qualified IDs map to a prefix, e.g. canvas.accounts
(table accounts
in namespace canvas
) becomes canvas__accounts
(a table with no namespace).
pysqlsync comes with several database dialects shipped with the library. However, it is possible to create and register new dialects that behave the same way as built-in dialects. In terms of capabilities, there are no differences between built-in and user-defined dialects.
If you are about to write integration for a new database dialect, it is recommended that you take one of the existing dialects (e.g. PostgreSQL, Microsoft SQL Server, Oracle or MySQL), and use it as a template. For more information, explore the folder pysqlsync/dialect
.