diff --git a/.ci/travis-before-install.sh b/.ci/travis-before-install.sh index adf3c98..7b391e3 100755 --- a/.ci/travis-before-install.sh +++ b/.ci/travis-before-install.sh @@ -19,7 +19,8 @@ if [[ "${TRAVIS_OS_NAME}" == "linux" ]]; then echo "deb-src https://packagecloud.io/tarantool/${TARANTOOL_VERSION}/ubuntu/ $release main" | sudo tee -a /etc/apt/sources.list.d/tarantool_${TARANTOOL_VERSION}.list sudo apt-get -qq update sudo apt-get -y install tarantool - sudo tarantoolctl stop example || exit 0 + tarantool -V || exit 1 + sudo tarantoolctl stop example sudo apt-get install pandoc libssl-dev openssl elif [[ "${TRAVIS_OS_NAME}" == "osx" ]]; then brew update diff --git a/.ci/travis-build-wheel.sh b/.ci/travis-build-wheel.sh new file mode 100755 index 0000000..5a51d03 --- /dev/null +++ b/.ci/travis-build-wheel.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash + +set -e -x + + +if [[ -z "${TRAVIS_TAG}" || "${BUILD}" != *wheels* ]]; then + # Not a release + exit 0 +fi + +if [ "${TRAVIS_OS_NAME}" == "osx" ]; then + PYENV_ROOT="$HOME/.pyenv" + PATH="$PYENV_ROOT/bin:$PATH" + eval "$(pyenv init -)" +fi + +if [ "${TRAVIS_OS_NAME}" == "linux" ]; then + for pyver in ${RELEASE_PYTHON_VERSIONS}; do + ML_PYTHON_VERSION=$(python3 -c \ + "print('cp{maj}{min}-cp{maj}{min}m'.format( \ + maj='${pyver}'.split('.')[0], \ + min='${pyver}'.split('.')[1]))") + + for arch in x86_64 i686; do + ML_IMAGE="quay.io/pypa/manylinux1_${arch}" + docker pull "${ML_IMAGE}" + docker run --rm \ + -v "${_root}":/io \ + -e "PYMODULE=${PYMODULE}" \ + -e "PYTHON_VERSION=${ML_PYTHON_VERSION}" \ + -e "ASYNCPG_VERSION=${PACKAGE_VERSION}" \ + "${ML_IMAGE}" /io/.ci/build-manylinux-wheels.sh + + _upload_wheels + done + done + +elif [ "${TRAVIS_OS_NAME}" == "osx" ]; then + make clean + python setup.py bdist_wheel + + pip install ${PYMODULE}[test] -f "file:///${_root}/dist" + make -C "${_root}" ASYNCPG_VERSION="${PACKAGE_VERSION}" testinstalled + + _upload_wheels + +else + echo "Cannot build on ${TRAVIS_OS_NAME}." +fi \ No newline at end of file diff --git a/.ci/travis-install.sh b/.ci/travis-install.sh index f372473..b992faa 100755 --- a/.ci/travis-install.sh +++ b/.ci/travis-install.sh @@ -16,8 +16,7 @@ if [[ "${TRAVIS_OS_NAME}" == "osx" ]]; then eval "$(pyenv init -)" fi -pip install --upgrade pip wheel -pip install --upgrade setuptools +pip install --upgrade pip wheel setuptools pip install -r requirements.txt pip install coveralls pip install -e . diff --git a/.travis.yml b/.travis.yml index 69880a1..52a482d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,36 +3,12 @@ env: global: - ENCRYPTION_LABEL: "8db6dbddd29a" - COMMIT_AUTHOR_EMAIL: "igorcoding@gmail.com" + - RELEASE_PYTHON_VERSIONS: "3.5 3.6 3.7" matrix: fast_finish: true include: - # osx - - os: osx - language: generic - env: BUILD=tests PYTHON_VERSION=3.6.0 TARANTOOL_VERSION=1_9 - - - os: osx - language: generic - env: BUILD=tests PYTHON_VERSION=3.6.0 TARANTOOL_VERSION=1_9 - - - os: osx - language: generic - env: BUILD=tests PYTHON_VERSION=3.7.0 TARANTOOL_VERSION=1_9 - - - os: osx - language: generic - env: BUILD=tests PYTHON_VERSION=3.7.0 TARANTOOL_VERSION=1_9 - - # precise python3.5 Tarantool 1.6 - - os: linux - dist: precise - sudo: required - language: python - python: '3.5' - env: BUILD=quicktests TARANTOOL_VERSION=1_6 - # trusty python3.5 Tarantool 1.6 - os: linux dist: trusty @@ -47,7 +23,7 @@ matrix: sudo: required language: python python: '3.5' - env: BUILD=tests TARANTOOL_VERSION=1_7 + env: BUILD=quicktests TARANTOOL_VERSION=1_7 # trusty python3.5 Tarantool 1.9 - os: linux @@ -57,6 +33,22 @@ matrix: python: '3.5' env: BUILD=tests TARANTOOL_VERSION=1_9 + # trusty python3.6 Tarantool 1.6 + - os: linux + dist: trusty + sudo: required + language: python + python: '3.6' + env: BUILD=quicktests TARANTOOL_VERSION=1_6 + + # trusty python3.6 Tarantool 1.7 + - os: linux + dist: trusty + sudo: required + language: python + python: '3.6' + env: BUILD=quicktests TARANTOOL_VERSION=1_7 + # trusty python3.6 Tarantool 1.9 - os: linux dist: trusty @@ -65,6 +57,22 @@ matrix: python: '3.6' env: BUILD=tests TARANTOOL_VERSION=1_9 + # xenial python3.7 Tarantool 1.6 + - os: linux + dist: xenial + sudo: required + language: python + python: '3.7' + env: BUILD=quicktests TARANTOOL_VERSION=1_6 + + # xenial python3.7 Tarantool 1.7 + - os: linux + dist: xenial + sudo: required + language: python + python: '3.7' + env: BUILD=quicktests TARANTOOL_VERSION=1_7 + # xenial python3.7 Tarantool 1.9 - os: linux dist: xenial @@ -73,13 +81,28 @@ matrix: python: '3.7' env: BUILD=tests TARANTOOL_VERSION=1_9 - # trusty python3.6 Tarantool 1.9 + deploy + # deploy with Tarantool 1.9 and all pythons - os: linux - dist: trusty + dist: xenial sudo: required language: python - python: '3.6' - env: BUILD=tests,docs,coverage,release TARANTOOL_VERSION=1_9 + python: '3.7' + env: BUILD=tests,wheel,docs,coverage,release TARANTOOL_VERSION=1_9 + services: + - docker + + # osx + - os: osx + language: generic + env: BUILD=tests,wheel PYTHON_VERSION=3.5.5 TARANTOOL_VERSION=1_9 + + - os: osx + language: generic + env: BUILD=tests,wheel PYTHON_VERSION=3.6.0 TARANTOOL_VERSION=1_9 + + - os: osx + language: generic + env: BUILD=tests,wheel PYTHON_VERSION=3.7.0 TARANTOOL_VERSION=1_9 cache: - pip @@ -93,6 +116,7 @@ install: script: - ".ci/travis-tests.sh" - ".ci/travis-build-docs.sh" + #- ".ci/travis-build-wheel.sh" after_success: - if [[ "${BUILD}" == *coverage* ]]; then coveralls; fi diff --git a/CHANGELOG.md b/CHANGELOG.md index c8598d8..96f8adc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +v0.2.0 + +Changes: +* Improved and simplified connect/reconnect process +* Added ContextManager async with protocol for Connection +* Added `is_fully_connected` property to Connection +* Added disconnect Lock + +Bugs Fixed: +* Auto reconnect misbehaved on double on_connection_lost trigger (#11) + v0.1.13 Changes: diff --git a/asynctnt/__init__.py b/asynctnt/__init__.py index 17644c6..2616669 100644 --- a/asynctnt/__init__.py +++ b/asynctnt/__init__.py @@ -1,4 +1,4 @@ from .connection import Connection, connect from .iproto.protocol import Iterator, Response -__version__ = '0.1.13' +__version__ = '0.2.0' diff --git a/asynctnt/connection.py b/asynctnt/connection.py index 24ebd7f..beb57dc 100644 --- a/asynctnt/connection.py +++ b/asynctnt/connection.py @@ -14,7 +14,6 @@ class ConnectionState(enum.IntEnum): - IDLE = 0 CONNECTING = 1 CONNECTED = 2 RECONNECTING = 3 @@ -29,7 +28,8 @@ class Connection: '_encoding', '_connect_timeout', '_reconnect_timeout', '_request_timeout', '_tuple_as_dict', '_loop', '_state', '_state_prev', '_transport', '_protocol', '_db', - '_disconnect_waiter', '_reconnect_coro', '_connect_lock' + '_disconnect_waiter', '_reconnect_coro', + '_connect_lock', '_disconnect_lock' ) def __init__(self, *, @@ -50,6 +50,25 @@ def __init__(self, *, """ Connection constructor. + To manipulate a Connection instance there are several functions: + + * await connect() - performs connecting, authorization and + schema fetching. + + * await disconnect() - performs disconnection. + + * close() - closes connection (not a coroutine) + + Connection also supports context manager protocol, which connects + on entering and disconnecting on leaving a block. + + So one can simply use it as follows: + + .. code-block:: python + + async with asynctnt.Connection() as conn: + await conn.call('box.info') + :param host: Tarantool host (pass ``unix/`` to connect to unix socket) :param port: @@ -123,11 +142,12 @@ def __init__(self, *, self._protocol = None self._db = _DbMock() - self._state = ConnectionState.IDLE - self._state_prev = ConnectionState.IDLE + self._state = ConnectionState.DISCONNECTED + self._state_prev = ConnectionState.DISCONNECTED self._disconnect_waiter = None self._reconnect_coro = None self._connect_lock = asyncio.Lock(loop=self._loop) + self._disconnect_lock = asyncio.Lock(loop=self._loop) def _set_state(self, new_state): if self._state != new_state: @@ -138,45 +158,41 @@ def _set_state(self, new_state): return True return False - def connection_made(self): - pass - def connection_lost(self, exc): if self._transport: self._transport.close() self._transport = None - if self._reconnect_timeout > 0 \ - and self._state != ConnectionState.DISCONNECTING \ - and self._state != ConnectionState.DISCONNECTED: - if self._state == ConnectionState.RECONNECTING: - return - self._set_state(ConnectionState.DISCONNECTING) + if self._disconnect_waiter: + # disconnect() call happened + self._disconnect_waiter.set_result(True) + return + + # connection lost + + if self._reconnect_timeout > 0: + # should reconnect self._start_reconnect(return_exceptions=False) else: - if self._state != ConnectionState.CONNECTING: - self._set_state(ConnectionState.DISCONNECTED) - if self._disconnect_waiter: - self._disconnect_waiter.set_result(True) - self._disconnect_waiter = None - - def __create_reconnect_coro(self, return_exceptions=False): - if self._reconnect_coro: - self._reconnect_coro.cancel() - self._reconnect_coro = asyncio.ensure_future( - self._connect(return_exceptions=return_exceptions), - loop=self._loop - ) - return self._reconnect_coro + # should not reconnect, close everything + self.close() def _start_reconnect(self, return_exceptions=False): - if self._state == ConnectionState.RECONNECTING: - logger.info('Already in reconnecting state') + if self._state in [ConnectionState.CONNECTING, + ConnectionState.RECONNECTING]: + logger.debug('%s Already reconnecting', self.fingerprint) + return + + if self._reconnect_coro: return logger.info('%s Started reconnecting', self.fingerprint) self._set_state(ConnectionState.RECONNECTING) - self.__create_reconnect_coro(return_exceptions) + + self._reconnect_coro = asyncio.ensure_future( + self._connect(return_exceptions=return_exceptions), + loop=self._loop + ) def protocol_factory(self, connected_fut, cls=protocol.Protocol): return cls(host=self._host, @@ -190,7 +206,7 @@ def protocol_factory(self, connected_fut, cls=protocol.Protocol): encoding=self._encoding, tuple_as_dict=self._tuple_as_dict, connected_fut=connected_fut, - on_connection_made=self.connection_made, + on_connection_made=None, on_connection_lost=self.connection_lost, loop=self._loop) @@ -198,15 +214,13 @@ async def _connect(self, return_exceptions=True): async with self._connect_lock: while True: try: - if self._state == ConnectionState.CONNECTED: - self._reconnect_coro = None - return - ignore_states = { - ConnectionState.CONNECTING, - ConnectionState.DISCONNECTING, + ConnectionState.CONNECTED, + ConnectionState.DISCONNECTING, # disconnect() called } + if self._state in ignore_states: + self._reconnect_coro = None return self._set_state(ConnectionState.CONNECTING) @@ -234,38 +248,31 @@ async def full_connect(): connected_fut), self._host, self._port) - try: - tr, pr = await conn - except: - raise + tr, pr = await conn try: timeout = 0.05 # wait at least something if self._connect_timeout is not None: - timeout = self._connect_timeout/2 + timeout = self._connect_timeout / 2 await asyncio.wait_for( connected_fut, timeout=timeout, loop=self._loop) except asyncio.TimeoutError: + tr.close() continue # try to create connection again except: tr.close() - # ignoring reconnect by on_connection_lost - self._set_state(ConnectionState.DISCONNECTING) raise return tr, pr - try: - tr, pr = await asyncio.wait_for( - full_connect(), - timeout=self._connect_timeout, - loop=self._loop - ) - except: - raise + tr, pr = await asyncio.wait_for( + full_connect(), + timeout=self._connect_timeout, + loop=self._loop + ) logger.info('%s Connected successfully', self.fingerprint) self._set_state(ConnectionState.CONNECTED) @@ -287,23 +294,32 @@ async def full_connect(): if self._reconnect_timeout > 0: await self._wait_reconnect(e) continue + if return_exceptions: self._reconnect_coro = None raise e - else: - logger.exception(e) - if self._reconnect_timeout > 0: - await self._wait_reconnect(e) - continue + + logger.exception(e) + if self._reconnect_timeout > 0: + await self._wait_reconnect(e) + continue + + return # no reconnect, no return_exceptions + except asyncio.CancelledError: + logger.debug("connect is cancelled") + self._reconnect_coro = None + raise except Exception as e: if self._reconnect_timeout > 0: await self._wait_reconnect(e) continue + if return_exceptions: self._reconnect_coro = None raise e - else: - logger.exception(e) + + logger.exception(e) + return # no reconnect, no return_exceptions async def _wait_reconnect(self, exc=None): self._set_state(ConnectionState.RECONNECTING) @@ -320,41 +336,49 @@ async def connect(self): Connect coroutine """ await self._connect(True) + return self async def disconnect(self): """ Disconnect coroutine """ - if self._state in \ - {ConnectionState.DISCONNECTING, ConnectionState.DISCONNECTED}: - return - self._set_state(ConnectionState.DISCONNECTING) - logger.info('%s Disconnecting...', self.fingerprint) - waiter = _create_future(self._loop) - if self._reconnect_coro: - self._reconnect_coro.cancel() - self._reconnect_coro = None + async with self._disconnect_lock: + if self._state == ConnectionState.DISCONNECTED: + return + + self._set_state(ConnectionState.DISCONNECTING) + + logger.info('%s Disconnecting...', self.fingerprint) + if self._reconnect_coro: + self._reconnect_coro.cancel() + self._reconnect_coro = None - if self._transport: - self._disconnect_waiter = waiter - self._transport.close() - self._transport = None - self._protocol = None self._db = _DbMock() - else: - waiter.set_result(True) - self._set_state(ConnectionState.DISCONNECTED) - return await waiter + if self._transport: + self._disconnect_waiter = _create_future(self._loop) + self._transport.close() + self._transport = None + self._protocol = None + + await self._disconnect_waiter + self._disconnect_waiter = None + self._set_state(ConnectionState.DISCONNECTED) + else: + self._transport = None + self._protocol = None + self._disconnect_waiter = None + self._set_state(ConnectionState.DISCONNECTED) def close(self): """ Same as disconnect, but not a coroutine, i.e. it does not wait for disconnect to finish. """ - if self._state in \ - {ConnectionState.DISCONNECTING, ConnectionState.DISCONNECTED}: + + if self._state == ConnectionState.DISCONNECTED: return + self._set_state(ConnectionState.DISCONNECTING) logger.info('%s Disconnecting...', self.fingerprint) @@ -363,11 +387,12 @@ def close(self): self._reconnect_coro = None if self._transport: - self._disconnect_waiter = None self._transport.close() - self._transport = None - self._protocol = None - self._db = _DbMock() + + self._transport = None + self._protocol = None + self._disconnect_waiter = None + self._db = _DbMock() self._set_state(ConnectionState.DISCONNECTED) async def reconnect(self): @@ -376,9 +401,23 @@ async def reconnect(self): Just calls disconnect() and connect() """ await self.disconnect() - self._set_state(ConnectionState.IDLE) await self.connect() + async def __aenter__(self): + """ + Executed on entering the async with section. + Connects to Tarantool instance. + """ + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """ + Executed on leaving the async with section. + Disconnects from Tarantool instance. + """ + await self.disconnect() + @property def fingerprint(self): return 'Tarantool[{}:{}]'.format(self._host, self._port) @@ -481,12 +520,22 @@ def state(self): @property def is_connected(self): """ - Check if connection is active + Check if an underlying connection is active """ if self._protocol is None: return False return self._protocol.is_connected() + @property + def is_fully_connected(self): + """ + Check if connection is fully active (performed auth + and schema fetching) + """ + if self._protocol is None: + return False + return self._protocol.is_fully_connected() + @property def schema_id(self): """ diff --git a/asynctnt/iproto/protocol.pxd b/asynctnt/iproto/protocol.pxd index 3510d05..e5647f1 100644 --- a/asynctnt/iproto/protocol.pxd +++ b/asynctnt/iproto/protocol.pxd @@ -46,6 +46,7 @@ cdef class BaseProtocol(CoreProtocol): cdef void _do_fetch_schema(self, object fut) cdef uint64_t next_sync(self) + cdef bint is_closing(self) cdef uint32_t transform_iterator(self, iterator) except * cdef object _new_waiter_for_request(self, Request req, float timeout) diff --git a/asynctnt/iproto/protocol.pyx b/asynctnt/iproto/protocol.pyx index 7b6eb9d..90a900f 100644 --- a/asynctnt/iproto/protocol.pyx +++ b/asynctnt/iproto/protocol.pyx @@ -69,6 +69,7 @@ cdef class BaseProtocol(CoreProtocol): self.connected_fut = connected_fut self.on_connection_made_cb = on_connection_made self.on_connection_lost_cb = on_connection_lost + self._closing = False self._on_request_completed_cb = self._on_request_completed self._on_request_timeout_cb = self._on_request_timeout @@ -262,6 +263,11 @@ cdef class BaseProtocol(CoreProtocol): object key, value Py_ssize_t pos + if self._closing: + return + + self._closing = True + pos = 0 while cpython.dict.PyDict_Next(self._reqs, &pos, &pkey, &pvalue): sync = pkey @@ -284,6 +290,9 @@ cdef class BaseProtocol(CoreProtocol): self._sync += 1 return self._sync + cdef bint is_closing(self): + return self._closing + def _on_request_timeout(self, waiter): cdef Request req diff --git a/asynctnt/iproto/rbuffer.pxd b/asynctnt/iproto/rbuffer.pxd index f27008a..f39da80 100644 --- a/asynctnt/iproto/rbuffer.pxd +++ b/asynctnt/iproto/rbuffer.pxd @@ -3,8 +3,21 @@ from libc.stdint cimport uint32_t, uint64_t, int64_t cimport tnt -cdef inline size_t size_t_max(size_t a, size_t b) -cdef inline uint32_t nearest_power_of_2(uint32_t v) +cdef inline size_t size_t_max(size_t a, size_t b): + if a > b: + return a + return b + + +cdef inline uint32_t nearest_power_of_2(uint32_t v): + v -= 1 + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v += 1 + return v cdef class ReadBuffer: diff --git a/asynctnt/iproto/rbuffer.pyx b/asynctnt/iproto/rbuffer.pyx index 4da6bd7..ac5c6a5 100644 --- a/asynctnt/iproto/rbuffer.pyx +++ b/asynctnt/iproto/rbuffer.pyx @@ -16,23 +16,6 @@ from libc.math cimport fmax from asynctnt.log import logger -cdef inline size_t size_t_max(size_t a, size_t b): - if a > b: - return a - return b - - -cdef inline uint32_t nearest_power_of_2(uint32_t v): - v -= 1 - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - v += 1 - return v - - @cython.no_gc_clear @cython.final cdef class ReadBuffer: diff --git a/docs/examples.rst b/docs/examples.rst index 3181ea6..58ace7e 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -38,7 +38,6 @@ Python code: values = await conn.select('tester', []) print('Code: {}'.format(values.code)) print('Data: {}'.format(values.body)) - print(values.body2yaml()) # prints as yaml await conn.disconnect() @@ -51,10 +50,6 @@ Stdout: Code: 0 Data: [[1, 'hello1'], [2, 'hello2'], [3, 'hello3'], [4, 'hello4']] - - [1, hello1] - - [2, hello2] - - [3, hello3] - - [4, hello4] Example of using space format information ----------------------------------------- @@ -100,7 +95,6 @@ Python code: values = await conn.select('tester', []) print('Code: {}'.format(values.code)) print('Data: {}'.format(values.body)) - print(values.body2yaml()) # prints as yaml await conn.disconnect() @@ -113,7 +107,21 @@ Stdout (now got dict tuples instead of plain arrays): Code: 0 Data: [{'id': 1, 'text': 'hello1'}, {'id': 2, 'text': 'hello2'}, {'id': 3, 'text': 'hello3'}, {'id': 4, 'text': 'hello4'}] - - {id: 1, text: hello1} - - {id: 2, text: hello2} - - {id: 3, text: hello3} - - {id: 4, text: hello4} + + +Using Connection context manager +-------------------------------- + +.. code:: python + + import asyncio + import asynctnt + + + async def run(): + async with asynctnt.Connection(port=3301) as conn: + res = await conn.call('box.info') + print(res.body) + + loop = asyncio.get_event_loop() + loop.run_until_complete(run()) diff --git a/setup.py b/setup.py index ea4f749..c1864ff 100644 --- a/setup.py +++ b/setup.py @@ -119,9 +119,11 @@ def finalize_options(self): url='https://github.com/igorcoding/asynctnt', license='Apache Software License', classifiers=[ + "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Topic :: Software Development :: Libraries :: Python Modules", diff --git a/tests/test_connect.py b/tests/test_connect.py index 1eb071a..b5cc8e4 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -16,46 +16,96 @@ async def test__connect(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=0, loop=self.loop) - self.assertEqual(conn.state, ConnectionState.IDLE) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) - await conn.connect() + c = await conn.connect() + self.assertEqual(c, conn) self.assertIsNotNone(conn._transport) self.assertIsNotNone(conn._protocol) self.assertTrue(conn.is_connected) - self.assertTrue(conn._protocol.is_fully_connected()) + self.assertTrue(conn.is_fully_connected) self.assertEqual(conn.state, ConnectionState.CONNECTED) self.assertIsNotNone(conn._protocol.schema) self.assertIsNotNone(conn.version) + + await conn.call('box.info') await conn.disconnect() + async def test__connect_contextmanager(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=0, + loop=self.loop) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + async with conn: + self.assertIsNotNone(conn._transport) + self.assertIsNotNone(conn._protocol) + self.assertTrue(conn.is_connected) + self.assertTrue(conn.is_fully_connected) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertIsNotNone(conn._protocol.schema) + self.assertIsNotNone(conn.version) + + await conn.call('box.info') + + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + async def test__connect_contextmanager_connect_inside(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=0, + loop=self.loop) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + async with conn: + await conn.connect() + self.assertEqual(conn.state, ConnectionState.CONNECTED) + await conn.call('box.info') + + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + async def test__connect_contextmanager_disconnect_inside(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=0, + loop=self.loop) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + async with conn: + await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + async def test__connect_no_schema(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=0, fetch_schema=False, auto_refetch_schema=False, loop=self.loop) - await conn.connect() - self.assertIsNotNone(conn._transport) - self.assertIsNotNone(conn._protocol) - self.assertTrue(conn.is_connected) - self.assertTrue(conn._protocol.is_fully_connected()) - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertIsNotNone(conn._protocol.schema) - await conn.disconnect() + async with conn: + self.assertIsNotNone(conn._transport) + self.assertIsNotNone(conn._protocol) + self.assertTrue(conn.is_connected) + self.assertTrue(conn.is_fully_connected) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertIsNotNone(conn._protocol.schema) + await conn.call('box.info') async def test__connect_auth(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, username='t1', password='t1', reconnect_timeout=0, loop=self.loop) - await conn.connect() - self.assertIsNotNone(conn._transport) - self.assertIsNotNone(conn._protocol) - self.assertTrue(conn.is_connected) - self.assertTrue(conn._protocol.is_fully_connected()) - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertIsNotNone(conn._protocol.schema) - await conn.disconnect() + async with conn: + self.assertIsNotNone(conn._transport) + self.assertIsNotNone(conn._protocol) + self.assertTrue(conn.is_connected) + self.assertTrue(conn.is_fully_connected) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertIsNotNone(conn._protocol.schema) + await conn.call('box.info') async def test__connect_auth_no_schema(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, @@ -64,14 +114,14 @@ async def test__connect_auth_no_schema(self): auto_refetch_schema=False, reconnect_timeout=0, loop=self.loop) - await conn.connect() - self.assertIsNotNone(conn._transport) - self.assertIsNotNone(conn._protocol) - self.assertTrue(conn.is_connected) - self.assertTrue(conn._protocol.is_fully_connected()) - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertIsNotNone(conn._protocol.schema) - await conn.disconnect() + async with conn: + self.assertIsNotNone(conn._transport) + self.assertIsNotNone(conn._protocol) + self.assertTrue(conn.is_connected) + self.assertTrue(conn.is_fully_connected) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertIsNotNone(conn._protocol.schema) + await conn.call('box.info') async def test__disconnect(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, @@ -80,8 +130,24 @@ async def test__disconnect(self): await conn.connect() await conn.disconnect() self.assertFalse(conn.is_connected) + self.assertFalse(conn.is_fully_connected) self.assertEqual(conn.state, ConnectionState.DISCONNECTED) - # self.assertIsNone(conn.schema) + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + + async def test__disconnect_in_request(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=0, + loop=self.loop) + await conn.connect() + + coro = self.ensure_future(conn.eval('require "fiber".sleep(2)')) + await self.sleep(0.5) + await conn.disconnect() + + with self.assertRaises(TarantoolNotConnectedError): + await coro async def test__disconnect_auth(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, @@ -91,8 +157,32 @@ async def test__disconnect_auth(self): await conn.connect() await conn.disconnect() self.assertFalse(conn.is_connected) + self.assertFalse(conn.is_fully_connected) self.assertEqual(conn.state, ConnectionState.DISCONNECTED) - # self.assertIsNone(conn.schema) + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + + async def test__disconnect_while_reconnecting(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + username='t1', password='t1', + reconnect_timeout=0.1, + loop=self.loop) + try: + await conn.connect() + self.tnt.stop() + await self.sleep(0.5) + + await conn.disconnect() + + self.assertFalse(conn.is_connected) + self.assertFalse(conn.is_fully_connected) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + finally: + self.tnt.start() async def test__connect_multiple(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, @@ -103,20 +193,25 @@ async def test__connect_multiple(self): await conn.connect() await conn.disconnect() self.assertFalse(conn.is_connected) + self.assertFalse(conn.is_fully_connected) self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + async def test__connect_cancel(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, fetch_schema=True, reconnect_timeout=0, loop=self.loop) - f = asyncio.ensure_future(conn.connect(), loop=self.loop) - await self.sleep(0.0001) - f.cancel() - with self.assertRaises(asyncio.CancelledError): - await f - - await conn.disconnect() + try: + f = asyncio.ensure_future(conn.connect(), loop=self.loop) + await self.sleep(0.0001) + f.cancel() + with self.assertRaises(asyncio.CancelledError): + await f + finally: + await conn.disconnect() async def test__connect_wait_tnt_started(self): self.tnt.stop() @@ -125,27 +220,29 @@ async def test__connect_wait_tnt_started(self): fetch_schema=True, reconnect_timeout=0.000000001, loop=self.loop) - coro = self.ensure_future(conn.connect()) - await self.sleep(0.3) - self.tnt.start() - await self.sleep(1) - while True: - try: - await coro - break - except TarantoolDatabaseError as e: - if e.code == ErrorCode.ER_NO_SUCH_USER: - # Try again - coro = self.ensure_future(conn.connect()) - continue - raise - - self.assertEqual(conn.state, ConnectionState.CONNECTED) - await conn.disconnect() + try: + coro = self.ensure_future(conn.connect()) + await self.sleep(0.3) + self.tnt.start() + await self.sleep(1) + while True: + try: + await coro + break + except TarantoolDatabaseError as e: + if e.code == ErrorCode.ER_NO_SUCH_USER: + # Try again + coro = self.ensure_future(conn.connect()) + continue + raise + + self.assertEqual(conn.state, ConnectionState.CONNECTED) + + await conn.call('box.info') + finally: + await conn.disconnect() async def test__connect_waiting_for_spaces(self): - self.tnt.stop() - tnt = TarantoolSyncInstance( port=TarantoolSyncInstance.get_random_port(), console_port=TarantoolSyncInstance.get_random_port(), @@ -155,7 +252,7 @@ async def test__connect_waiting_for_spaces(self): tnt.replication_source = ['x:1'] tnt.start(wait=False) - conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + conn = asynctnt.Connection(host=tnt.host, port=tnt.port, fetch_schema=True, reconnect_timeout=0.1, connect_timeout=10, @@ -179,8 +276,9 @@ async def state_checker(): self.assertTrue(states.get(ConnectionState.CONNECTING, False), 'was in connecting') - self.assertTrue(states.get(ConnectionState.RECONNECTING, False), - 'was in reconnecting') + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') finally: tnt.stop() await conn.disconnect() @@ -193,10 +291,10 @@ async def test__connect_tnt_restarted(self): loop=self.loop) await conn.connect() - self.tnt.stop() - self.tnt.start() - await self.sleep(0.5) try: + self.tnt.stop() + self.tnt.start() + await self.sleep(0.5) await conn.ping() except Exception as e: self.fail( @@ -223,50 +321,68 @@ async def test__close(self): await self.sleep(0.1) self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + async def test_disconnect_from_idle(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=0, + loop=self.loop) + await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + async def test_reconnect_from_idle(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=0, loop=self.loop) await conn.reconnect() + try: - self.assertEqual(conn.state, ConnectionState.CONNECTED) - await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.CONNECTED) + await conn.call('box.info') + finally: + await conn.disconnect() async def test_reconnect_after_connect(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=0, loop=self.loop) - await conn.connect() - await conn.reconnect() + try: + await conn.connect() + await conn.reconnect() - self.assertEqual(conn.state, ConnectionState.CONNECTED) - await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.CONNECTED) + await conn.call('box.info') + finally: + await conn.disconnect() async def test_manual_reconnect(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=0, loop=self.loop) - await conn.connect() - await conn.disconnect() - await conn.connect() + try: + await conn.connect() + await conn.disconnect() + await conn.connect() - self.assertEqual(conn.state, ConnectionState.CONNECTED) - await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.CONNECTED) + await conn.call('box.info') + finally: + await conn.disconnect() async def test__connect_connection_lost(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=1/3, loop=self.loop) - await conn.connect() - self.tnt.stop() - await self.sleep(0.5) - self.tnt.start() - await self.sleep(0.5) - - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertTrue(conn.is_connected) + try: + await conn.connect() + self.tnt.stop() + await self.sleep(0.5) + self.tnt.start() + await self.sleep(0.5) - await conn.disconnect() + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertTrue(conn.is_connected) + await conn.call('box.info') + finally: + await conn.disconnect() async def test__connect_tuple_as_dict_invalid(self): with self.assertRaisesRegex( @@ -284,43 +400,68 @@ async def test__connect_from_multiple_coroutines(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=1/3, loop=self.loop) + try: + coros = [] + for _ in range(10): + coros.append(asyncio.ensure_future(conn.connect(), + loop=self.loop)) + + await asyncio.gather(*coros, loop=self.loop) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertTrue(conn.is_connected) + await conn.call('box.info') + finally: + await conn.disconnect() - coros = [] - for _ in range(10): - coros.append(asyncio.ensure_future(conn.connect(), - loop=self.loop)) + async def test__disconnect_from_multiple_coroutines(self): + conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, + reconnect_timeout=1/3, + loop=self.loop) + try: + await conn.connect() + coros = [] + for _ in range(10): + coros.append(asyncio.ensure_future(conn.disconnect(), + loop=self.loop)) - await asyncio.gather(*coros, loop=self.loop) - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertTrue(conn.is_connected) - await conn.disconnect() + await asyncio.gather(*coros, loop=self.loop) + self.assertEqual(conn.state, ConnectionState.DISCONNECTED) + self.assertFalse(conn.is_connected) + + with self.assertRaises(TarantoolNotConnectedError): + await conn.call('box.info') + finally: + await conn.disconnect() async def test__connect_while_reconnecting(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, reconnect_timeout=1, loop=self.loop) - coros = [] - for _ in range(10): - coros.append(asyncio.ensure_future(conn.connect(), - loop=self.loop)) + try: + coros = [] + for _ in range(10): + coros.append(asyncio.ensure_future(conn.connect(), + loop=self.loop)) - self.tnt.stop() - await self.sleep(0.5) + self.tnt.stop() + await self.sleep(0.5) - connect_coros = asyncio.ensure_future( - asyncio.gather(*coros, loop=self.loop), - loop=self.loop - ) + connect_coros = asyncio.ensure_future( + asyncio.gather(*coros, loop=self.loop), + loop=self.loop + ) - self.tnt.start() - await self.sleep(1) - await connect_coros + self.tnt.start() + await self.sleep(1) + await connect_coros - self.assertEqual(conn.state, ConnectionState.CONNECTED) - self.assertTrue(conn.is_connected) + self.assertEqual(conn.state, ConnectionState.CONNECTED) + self.assertTrue(conn.is_connected) - await conn.disconnect() + await conn.call('box.info') + finally: + await conn.disconnect() async def test__connect_on_tnt_crash_no_reconnect(self): conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port, @@ -338,6 +479,8 @@ async def test__connect_on_tnt_crash_no_reconnect(self): self.tnt.start() await asyncio.sleep(1, loop=self.loop) await conn.connect() # this connect should reconnect easily + + await conn.call('box.info') finally: await conn.disconnect() @@ -357,5 +500,7 @@ async def test__connect_on_tnt_crash_with_reconnect(self): self.tnt.start() await asyncio.sleep(1, loop=self.loop) await conn.connect() # this connect should reconnect easily + + await conn.call('box.info') finally: await conn.disconnect()