Skip to content

Commit

Permalink
Move replication connection to C level.
Browse files Browse the repository at this point in the history
  • Loading branch information
a1exsh committed Oct 27, 2015
1 parent 433fb95 commit fbcf99a
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 53 deletions.
3 changes: 2 additions & 1 deletion lib/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column

from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError
from psycopg2._psycopg import ReplicationCursor, ReplicationMessage
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2._psycopg import ReplicationConnection, ReplicationCursor, ReplicationMessage

try:
from psycopg2._psycopg import set_wait_callback, get_wait_callback
Expand Down
57 changes: 11 additions & 46 deletions lib/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
from psycopg2.extensions import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2.extensions import ReplicationConnection as _replicationConnection
from psycopg2.extensions import ReplicationCursor as _replicationCursor
from psycopg2.extensions import ReplicationMessage
from psycopg2.extensions import adapt as _A, quote_ident
Expand Down Expand Up @@ -439,65 +441,28 @@ def callproc(self, procname, vars=None):
return LoggingCursor.callproc(self, procname, vars)


"""Replication connection types."""
REPLICATION_LOGICAL = "LOGICAL"
REPLICATION_PHYSICAL = "PHYSICAL"


class ReplicationConnectionBase(_connection):
class ReplicationConnectionBase(_replicationConnection):
"""
Base class for Logical and Physical replication connection
classes. Uses `ReplicationCursor` automatically.
"""

def __init__(self, *args, **kwargs):
"""
Initializes a replication connection by adding appropriate
parameters to the provided DSN and tweaking the connection
attributes.
"""

# replication_type is set in subclasses
if self.replication_type == REPLICATION_LOGICAL:
replication = 'database'

elif self.replication_type == REPLICATION_PHYSICAL:
replication = 'true'

else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)

items = _ext.parse_dsn(args[0])

# we add an appropriate replication keyword parameter, unless
# user has specified one explicitly in the DSN
items.setdefault('replication', replication)

dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v)))
for (k, v) in items.iteritems()])

args = [dsn] + list(args[1:]) # async is the possible 2nd arg
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)

# prevent auto-issued BEGIN statements
if not self.async:
self.autocommit = True

if self.cursor_factory is None:
self.cursor_factory = ReplicationCursor
self.cursor_factory = ReplicationCursor


class LogicalReplicationConnection(ReplicationConnectionBase):

def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_LOGICAL
kwargs['replication_type'] = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)


class PhysicalReplicationConnection(ReplicationConnectionBase):

def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_PHYSICAL
kwargs['replication_type'] = REPLICATION_PHYSICAL
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)


Expand Down Expand Up @@ -528,16 +493,16 @@ def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None)
if output_plugin is None:
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")

command += "%s %s" % (slot_type, quote_ident(output_plugin, self))
command += "LOGICAL %s" % quote_ident(output_plugin, self)

elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")

command += slot_type
command += "PHYSICAL"

else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))

self.execute(command)

Expand All @@ -562,15 +527,15 @@ def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")

command += "%s " % slot_type
command += "LOGICAL "

elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))

if type(start_lsn) is str:
lsn = start_lsn.split('/')
Expand Down
1 change: 1 addition & 0 deletions psycopg/psycopg.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ typedef struct connectionObject connectionObject;
typedef struct replicationMessageObject replicationMessageObject;

/* some utility functions */
HIDDEN PyObject *parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_args(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_make_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
Expand Down
12 changes: 11 additions & 1 deletion psycopg/psycopgmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "psycopg/connection.h"
#include "psycopg/cursor.h"
#include "psycopg/replication_connection.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
Expand Down Expand Up @@ -74,7 +75,7 @@ HIDDEN PyObject *psyco_DescriptionType = NULL;


/* finds a keyword or positional arg (pops it from kwargs if found there) */
static PyObject *
PyObject *
parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs)
{
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
Expand Down Expand Up @@ -1114,6 +1115,9 @@ INIT_MODULE(_psycopg)(void)
Py_TYPE(&cursorType) = &PyType_Type;
if (PyType_Ready(&cursorType) == -1) goto exit;

Py_TYPE(&replicationConnectionType) = &PyType_Type;
if (PyType_Ready(&replicationConnectionType) == -1) goto exit;

Py_TYPE(&replicationCursorType) = &PyType_Type;
if (PyType_Ready(&replicationCursorType) == -1) goto exit;

Expand Down Expand Up @@ -1237,13 +1241,16 @@ INIT_MODULE(_psycopg)(void)
PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION);
PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver");
PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM);
PyModule_AddIntMacro(module, REPLICATION_PHYSICAL);
PyModule_AddIntMacro(module, REPLICATION_LOGICAL);
PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL));
PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY));
PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE));

/* put new types in module dictionary */
PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
PyModule_AddObject(module, "ReplicationConnection", (PyObject*)&replicationConnectionType);
PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
Expand Down Expand Up @@ -1285,6 +1292,9 @@ INIT_MODULE(_psycopg)(void)
if (0 != psyco_errors_init()) { goto exit; }
psyco_errors_fill(dict);

replicationPhysicalConst = PyDict_GetItemString(dict, "REPLICATION_PHYSICAL");
replicationLogicalConst = PyDict_GetItemString(dict, "REPLICATION_LOGICAL");

Dprintf("initpsycopg: module initialization complete");

exit:
Expand Down
53 changes: 53 additions & 0 deletions psycopg/replication_connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* replication_connection.h - definition for the psycopg replication connection type
*
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/

#ifndef PSYCOPG_REPLICATION_CONNECTION_H
#define PSYCOPG_REPLICATION_CONNECTION_H 1

#include "psycopg/connection.h"

#ifdef __cplusplus
extern "C" {
#endif

extern HIDDEN PyTypeObject replicationConnectionType;

typedef struct replicationConnectionObject {
connectionObject conn;

long int type;
} replicationConnectionObject;

#define REPLICATION_PHYSICAL 1
#define REPLICATION_LOGICAL 2

extern HIDDEN PyObject *replicationPhysicalConst;
extern HIDDEN PyObject *replicationLogicalConst;

#ifdef __cplusplus
}
#endif

#endif /* !defined(PSYCOPG_REPLICATION_CONNECTION_H) */
Loading

0 comments on commit fbcf99a

Please sign in to comment.