diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1608cfd0..6958393f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,7 @@ CHANGELOG unreleased ========== +* feature: Aiohttp client tracing for aiohttp versions > 3. `PR42 `_. * feature: Use the official middleware pattern for Aiohttp ext. `PR29 `_. * bugfix: SQLAlcemy plugin would cause warning messages with some db connection strings that contained invalid characters for a segment/subsegment name. * bugfix: Aiohttp middleware serialized URL values incorrectly. `PR37 `_ diff --git a/README.md b/README.md index 82e7922b..1bcbbe9a 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,20 @@ app.router.add_get("/", handler) web.run_app(app) ``` +### Trace aiohttp client requests + +Only available using Aiohttp releases greater than 3.X. + +```python +from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config + +async def foo(): + trace_config = aws_xray_trace_config() + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.get(url) as resp + await resp.read() +``` + ### Use SQLAlchemy ORM The SQLAlchemy integration requires you to override the Session and Query Classes for SQL Alchemy diff --git a/aws_xray_sdk/core/async_recorder.py b/aws_xray_sdk/core/async_recorder.py index 36bb73d7..74ec2c46 100644 --- a/aws_xray_sdk/core/async_recorder.py +++ b/aws_xray_sdk/core/async_recorder.py @@ -34,7 +34,7 @@ async def wrapper(wrapped, instance, args, kwargs): return wrapper async def record_subsegment_async(self, wrapped, instance, args, kwargs, name, - namespace, meta_processor): + namespace, meta_processor): subsegment = self.begin_subsegment(name, namespace) diff --git a/aws_xray_sdk/ext/aiohttp/client.py b/aws_xray_sdk/ext/aiohttp/client.py new file mode 100644 index 00000000..ed9fbf32 --- /dev/null +++ b/aws_xray_sdk/ext/aiohttp/client.py @@ -0,0 +1,64 @@ +""" +AioHttp Client tracing, only compatible with Aiohttp 3.X versions +""" +import aiohttp +import traceback + +from types import SimpleNamespace + +from aws_xray_sdk.core import xray_recorder +from aws_xray_sdk.core.models import http +from aws_xray_sdk.ext.util import inject_trace_header, strip_url + +# All aiohttp calls will entail outgoing HTTP requests, only in some ad-hoc +# exceptions the namespace will be flip back to local. +REMOTE_NAMESPACE = 'remote' +LOCAL_NAMESPACE = 'local' +LOCAL_EXCEPTIONS = ( + aiohttp.client_exceptions.ClientConnectionError, + # DNS issues + OSError +) + + +async def begin_subsegment(session, trace_config_ctx, params): + name = trace_config_ctx.name if trace_config_ctx.name else strip_url(str(params.url)) + subsegment = xray_recorder.begin_subsegment(name, REMOTE_NAMESPACE) + subsegment.put_http_meta(http.METHOD, params.method) + subsegment.put_http_meta(http.URL, params.url.human_repr()) + inject_trace_header(params.headers, subsegment) + + +async def end_subsegment(session, trace_config_ctx, params): + subsegment = xray_recorder.current_subsegment() + subsegment.put_http_meta(http.STATUS, params.response.status) + xray_recorder.end_subsegment() + + +async def end_subsegment_with_exception(session, trace_config_ctx, params): + subsegment = xray_recorder.current_subsegment() + subsegment.add_exception( + params.exception, + traceback.extract_stack(limit=xray_recorder._max_trace_back) + ) + + if isinstance(params.exception, LOCAL_EXCEPTIONS): + subsegment.namespace = LOCAL_NAMESPACE + + xray_recorder.end_subsegment() + + +def aws_xray_trace_config(name=None): + """ + :param name: name used to identify the subsegment, with None internally the URL will + be used as identifier. + :returns: TraceConfig. + """ + trace_config = aiohttp.TraceConfig( + trace_config_ctx_factory=lambda trace_request_ctx: SimpleNamespace(name=name, + trace_request_ctx=trace_request_ctx) + ) + trace_config.on_request_start.append(begin_subsegment) + trace_config.on_request_end.append(end_subsegment) + trace_config.on_request_exception.append(end_subsegment_with_exception) + return trace_config diff --git a/docs/aws_xray_sdk.ext.aiohttp.rst b/docs/aws_xray_sdk.ext.aiohttp.rst index ebc9b7d5..8a75fc12 100644 --- a/docs/aws_xray_sdk.ext.aiohttp.rst +++ b/docs/aws_xray_sdk.ext.aiohttp.rst @@ -12,6 +12,13 @@ aws\_xray\_sdk.ext.aiohttp.middleware module :undoc-members: :show-inheritance: +aws\_xray\_sdk.ext.aiohttp.client module +-------------------------------------------- + +.. automodule:: aws_xray_sdk.ext.aiohttp.client + :members: + :undoc-members: + :show-inheritance: Module contents --------------- diff --git a/docs/frameworks.rst b/docs/frameworks.rst index e04a7369..4dd5e3c0 100644 --- a/docs/frameworks.rst +++ b/docs/frameworks.rst @@ -84,8 +84,11 @@ To generate segment based on incoming requests, you need to instantiate the X-Ra Flask built-in template rendering will be wrapped into subsegments. You can configure the recorder, see :ref:`Configure Global Recorder ` for more details. -aiohttp Server -============== +Aiohttp +======= + +Server +------ For X-Ray to create a segment based on an incoming request, you need register some middleware with aiohttp. As aiohttp is an asyncronous framework, X-Ray will also need to be configured with an ``AsyncContext`` compared to the default threaded @@ -115,3 +118,17 @@ version.:: There are two things to note from the example above. Firstly a middleware corountine from aws-xray-sdk is provided during the creation of an aiohttp server app. Lastly the ``xray_recorder`` has also been configured with a name and an ``AsyncContext``. See :ref:`Configure Global Recorder ` for more information about configuring the ``xray_recorder``. + +Client +------ + +Since 3.0.0 Aiohttp provides a generic object that allows third packages to gather the different events ocurred during an HTTP call, X-Ray +can be configured to track these requests as subsegments using the `aws_xray_trace_config` function. This will return a valid `TraceConfig` ready to be installed +in any `aiohttp.ClientSession`. The following example shows how it can be used.:: + + from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config + + trace_config = aws_xray_trace_config() + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.get(url) as resp + await resp.read() diff --git a/docs/thirdparty.rst b/docs/thirdparty.rst index c13c16bd..2d839ee6 100644 --- a/docs/thirdparty.rst +++ b/docs/thirdparty.rst @@ -80,4 +80,4 @@ Patching httplib httplib is a low-level python module which is used by several third party modules, so by enabling patching to this module you can gain patching of many modules "for free." -Some examples of modules that depend on httplib: requests and httplib2 \ No newline at end of file +Some examples of modules that depend on httplib: requests and httplib2 diff --git a/tests/ext/aiohttp/test_client.py b/tests/ext/aiohttp/test_client.py new file mode 100644 index 00000000..1a0db0b7 --- /dev/null +++ b/tests/ext/aiohttp/test_client.py @@ -0,0 +1,132 @@ +import pytest +from aiohttp import ClientSession + +from aws_xray_sdk.core import xray_recorder +from aws_xray_sdk.core.async_context import AsyncContext +from aws_xray_sdk.ext.util import strip_url +from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config +from aws_xray_sdk.ext.aiohttp.client import REMOTE_NAMESPACE, LOCAL_NAMESPACE + + +# httpbin.org is created by the same author of requests to make testing http easy. +BASE_URL = 'httpbin.org' + + +@pytest.fixture(scope='function') +def recorder(loop): + """ + Initiate a recorder and clear it up once has been used. + """ + xray_recorder.configure(service='test', sampling=False, context=AsyncContext(loop=loop)) + xray_recorder.clear_trace_entities() + yield recorder + xray_recorder.clear_trace_entities() + + +async def test_ok(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config() + status_code = 200 + url = 'http://{}/status/{}?foo=bar'.format(BASE_URL, status_code) + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.get(url): + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.name == strip_url(url) + assert subsegment.namespace == REMOTE_NAMESPACE + + http_meta = subsegment.http + assert http_meta['request']['url'] == url + assert http_meta['request']['method'] == 'GET' + assert http_meta['response']['status'] == status_code + + +async def test_ok_name(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config(name='test') + status_code = 200 + url = 'http://{}/status/{}?foo=bar'.format(BASE_URL, status_code) + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.get(url): + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.name == 'test' + + +async def test_error(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config() + status_code = 400 + url = 'http://{}/status/{}'.format(BASE_URL, status_code) + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.post(url): + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.name == url + assert subsegment.error + + http_meta = subsegment.http + assert http_meta['request']['url'] == url + assert http_meta['request']['method'] == 'POST' + assert http_meta['response']['status'] == status_code + + +async def test_throttle(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config() + status_code = 429 + url = 'http://{}/status/{}'.format(BASE_URL, status_code) + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.head(url): + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.name == url + assert subsegment.error + assert subsegment.throttle + + http_meta = subsegment.http + assert http_meta['request']['url'] == url + assert http_meta['request']['method'] == 'HEAD' + assert http_meta['response']['status'] == status_code + + +async def test_fault(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config() + status_code = 500 + url = 'http://{}/status/{}'.format(BASE_URL, status_code) + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + async with session.put(url): + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.name == url + assert subsegment.fault + + http_meta = subsegment.http + assert http_meta['request']['url'] == url + assert http_meta['request']['method'] == 'PUT' + assert http_meta['response']['status'] == status_code + + +async def test_invalid_url(loop, recorder): + xray_recorder.begin_segment('name') + trace_config = aws_xray_trace_config() + async with ClientSession(loop=loop, trace_configs=[trace_config]) as session: + try: + async with session.get('http://doesnt.exist'): + pass + except Exception: + # prevent uncatch exception from breaking test run + pass + + subsegment = xray_recorder.current_segment().subsegments[0] + assert subsegment.namespace == LOCAL_NAMESPACE + assert subsegment.fault + + exception = subsegment.cause['exceptions'][0] + assert exception.type == 'ClientConnectorError' diff --git a/tests/ext/aiohttp/test_aiohttp.py b/tests/ext/aiohttp/test_middleware.py similarity index 95% rename from tests/ext/aiohttp/test_aiohttp.py rename to tests/ext/aiohttp/test_middleware.py index 3b8b9210..c74494bf 100644 --- a/tests/ext/aiohttp/test_aiohttp.py +++ b/tests/ext/aiohttp/test_middleware.py @@ -34,7 +34,7 @@ def pop(self): return None -class TestServer(object): +class ServerTest(object): """ Simple class to hold a copy of the event loop """ @@ -107,7 +107,7 @@ async def test_ok(test_client, loop, recorder): :param loop: Eventloop fixture :param recorder: X-Ray recorder fixture """ - client = await test_client(TestServer.app(loop=loop)) + client = await test_client(ServerTest.app(loop=loop)) resp = await client.get('/') assert resp.status == 200 @@ -131,7 +131,7 @@ async def test_error(test_client, loop, recorder): :param loop: Eventloop fixture :param recorder: X-Ray recorder fixture """ - client = await test_client(TestServer.app(loop=loop)) + client = await test_client(ServerTest.app(loop=loop)) resp = await client.get('/error') assert resp.status == 404 @@ -156,7 +156,7 @@ async def test_exception(test_client, loop, recorder): :param loop: Eventloop fixture :param recorder: X-Ray recorder fixture """ - client = await test_client(TestServer.app(loop=loop)) + client = await test_client(ServerTest.app(loop=loop)) resp = await client.get('/exception') await resp.text() # Need this to trigger Exception @@ -183,7 +183,7 @@ async def test_concurrent(test_client, loop, recorder): :param loop: Eventloop fixture :param recorder: X-Ray recorder fixture """ - client = await test_client(TestServer.app(loop=loop)) + client = await test_client(ServerTest.app(loop=loop)) recorder.emitter = CustomStubbedEmitter() diff --git a/tox.ini b/tox.ini index fb2726f1..7934409b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,8 @@ [tox] envlist = py{27,34,35,36} + py36-aiohttp3 + py35-aiohttp3 coverage-report skip_missing_interpreters = True @@ -19,17 +21,37 @@ deps = django >= 1.10, <2.0 pynamodb # Python3.5+ only deps - py{35,36}: aiohttp >= 2.3.0 + py{35,36}: aiohttp >= 2.3.0,<3.0.0 py{35,36}: pytest-aiohttp py{35,36}: aiobotocore commands = py{27,34}: coverage run --source aws_xray_sdk -m py.test tests --ignore tests/ext/aiohttp --ignore tests/ext/aiobotocore --ignore tests/test_async_local_storage.py --ignore tests/test_async_recorder.py - py{35,36}: coverage run --source aws_xray_sdk -m py.test tests + py{35,36}: coverage run --source aws_xray_sdk -m py.test tests --ignore tests/ext/aiohttp/test_client.py setenv = DJANGO_SETTINGS_MODULE = tests.ext.django.app.settings +[testenv:py35-aiohttp3] +deps = + pytest > 3.0.0 + aiohttp >= 3.0.0 + pytest-aiohttp + coverage + +commands = + py{35}: coverage run --source aws_xray_sdk -m py.test tests/ext/aiohttp + +[testenv:py36-aiohttp3] +deps = + pytest > 3.0.0 + aiohttp >= 3.0.0 + pytest-aiohttp + coverage + +commands = + py{36}: coverage run --source aws_xray_sdk -m py.test tests/ext/aiohttp + [testenv:coverage-report] deps = coverage skip_install = true