Skip to content

Commit

Permalink
Support fot Aiohttp Client tracing, aiohttp> 3.0.0 (#42)
Browse files Browse the repository at this point in the history
* Support fot Aiohttp Client tracing, aiohttp> 3.0.0

New `aws_xray_trace_config()` function to retrieve a
`aiohttp.TraceConfig` object ready to be used in any
`ClientSession`, once the Aiohttp Client session is instantiated with
this trace config all sampled HTTP calls will be traced as subsegements and
the data properly uploaded.

* Removed print

* Aiohttp3 dependencie greter or equal than 3.0.0

* Force namespace to remote, use local for local exceptions

* Added new feature to CHANGELOG

* Added py35 as valid environment for aiohttp3

* Added Aiohttp client trace config usage in the README
  • Loading branch information
pfreixes authored and haotianw465 committed Mar 20, 2018
1 parent c354f6a commit 9ff8539
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG

unreleased
==========
* feature: Aiohttp client tracing for aiohttp versions > 3. `PR42 <https://github.com/aws/aws-xray-sdk-python/pull/42>`_.
* feature: Use the official middleware pattern for Aiohttp ext. `PR29 <https://github.com/aws/aws-xray-sdk-python/pull/29>`_.
* bugfix: SQLAlcemy plugin would cause warning messages with some db connection strings that contained invalid characters for a segment/subsegment name.
* bugfix: Aiohttp middleware serialized URL values incorrectly. `PR37 <https://github.com/aws/aws-xray-sdk-python/pull/37>`_
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ app.router.add_get("/", handler)
web.run_app(app)
```

### Trace aiohttp client requests

Only available using Aiohttp releases greater than 3.X.

```python
from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config

async def foo():
trace_config = aws_xray_trace_config()
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.get(url) as resp
await resp.read()
```

### Use SQLAlchemy ORM
The SQLAlchemy integration requires you to override the Session and Query Classes for SQL Alchemy

Expand Down
2 changes: 1 addition & 1 deletion aws_xray_sdk/core/async_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def wrapper(wrapped, instance, args, kwargs):
return wrapper

async def record_subsegment_async(self, wrapped, instance, args, kwargs, name,
namespace, meta_processor):
namespace, meta_processor):

subsegment = self.begin_subsegment(name, namespace)

Expand Down
64 changes: 64 additions & 0 deletions aws_xray_sdk/ext/aiohttp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
AioHttp Client tracing, only compatible with Aiohttp 3.X versions
"""
import aiohttp
import traceback

from types import SimpleNamespace

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import inject_trace_header, strip_url

# All aiohttp calls will entail outgoing HTTP requests, only in some ad-hoc
# exceptions the namespace will be flip back to local.
REMOTE_NAMESPACE = 'remote'
LOCAL_NAMESPACE = 'local'
LOCAL_EXCEPTIONS = (
aiohttp.client_exceptions.ClientConnectionError,
# DNS issues
OSError
)


async def begin_subsegment(session, trace_config_ctx, params):
name = trace_config_ctx.name if trace_config_ctx.name else strip_url(str(params.url))
subsegment = xray_recorder.begin_subsegment(name, REMOTE_NAMESPACE)
subsegment.put_http_meta(http.METHOD, params.method)
subsegment.put_http_meta(http.URL, params.url.human_repr())
inject_trace_header(params.headers, subsegment)


async def end_subsegment(session, trace_config_ctx, params):
subsegment = xray_recorder.current_subsegment()
subsegment.put_http_meta(http.STATUS, params.response.status)
xray_recorder.end_subsegment()


async def end_subsegment_with_exception(session, trace_config_ctx, params):
subsegment = xray_recorder.current_subsegment()
subsegment.add_exception(
params.exception,
traceback.extract_stack(limit=xray_recorder._max_trace_back)
)

if isinstance(params.exception, LOCAL_EXCEPTIONS):
subsegment.namespace = LOCAL_NAMESPACE

xray_recorder.end_subsegment()


def aws_xray_trace_config(name=None):
"""
:param name: name used to identify the subsegment, with None internally the URL will
be used as identifier.
:returns: TraceConfig.
"""
trace_config = aiohttp.TraceConfig(
trace_config_ctx_factory=lambda trace_request_ctx: SimpleNamespace(name=name,
trace_request_ctx=trace_request_ctx)
)
trace_config.on_request_start.append(begin_subsegment)
trace_config.on_request_end.append(end_subsegment)
trace_config.on_request_exception.append(end_subsegment_with_exception)
return trace_config
7 changes: 7 additions & 0 deletions docs/aws_xray_sdk.ext.aiohttp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ aws\_xray\_sdk.ext.aiohttp.middleware module
:undoc-members:
:show-inheritance:

aws\_xray\_sdk.ext.aiohttp.client module
--------------------------------------------

.. automodule:: aws_xray_sdk.ext.aiohttp.client
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------
Expand Down
21 changes: 19 additions & 2 deletions docs/frameworks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ To generate segment based on incoming requests, you need to instantiate the X-Ra
Flask built-in template rendering will be wrapped into subsegments.
You can configure the recorder, see :ref:`Configure Global Recorder <configurations>` for more details.

aiohttp Server
==============
Aiohttp
=======

Server
------

For X-Ray to create a segment based on an incoming request, you need register some middleware with aiohttp. As aiohttp
is an asyncronous framework, X-Ray will also need to be configured with an ``AsyncContext`` compared to the default threaded
Expand Down Expand Up @@ -115,3 +118,17 @@ version.::
There are two things to note from the example above. Firstly a middleware corountine from aws-xray-sdk is provided during the creation
of an aiohttp server app. Lastly the ``xray_recorder`` has also been configured with a name and an ``AsyncContext``. See
:ref:`Configure Global Recorder <configurations>` for more information about configuring the ``xray_recorder``.

Client
------

Since 3.0.0 Aiohttp provides a generic object that allows third packages to gather the different events ocurred during an HTTP call, X-Ray
can be configured to track these requests as subsegments using the `aws_xray_trace_config` function. This will return a valid `TraceConfig` ready to be installed
in any `aiohttp.ClientSession`. The following example shows how it can be used.::

from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config

trace_config = aws_xray_trace_config()
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.get(url) as resp
await resp.read()
2 changes: 1 addition & 1 deletion docs/thirdparty.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ Patching httplib

httplib is a low-level python module which is used by several third party modules, so
by enabling patching to this module you can gain patching of many modules "for free."
Some examples of modules that depend on httplib: requests and httplib2
Some examples of modules that depend on httplib: requests and httplib2
132 changes: 132 additions & 0 deletions tests/ext/aiohttp/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
from aiohttp import ClientSession

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
from aws_xray_sdk.ext.util import strip_url
from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config
from aws_xray_sdk.ext.aiohttp.client import REMOTE_NAMESPACE, LOCAL_NAMESPACE


# httpbin.org is created by the same author of requests to make testing http easy.
BASE_URL = 'httpbin.org'


@pytest.fixture(scope='function')
def recorder(loop):
"""
Initiate a recorder and clear it up once has been used.
"""
xray_recorder.configure(service='test', sampling=False, context=AsyncContext(loop=loop))
xray_recorder.clear_trace_entities()
yield recorder
xray_recorder.clear_trace_entities()


async def test_ok(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config()
status_code = 200
url = 'http://{}/status/{}?foo=bar'.format(BASE_URL, status_code)
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.get(url):
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.name == strip_url(url)
assert subsegment.namespace == REMOTE_NAMESPACE

http_meta = subsegment.http
assert http_meta['request']['url'] == url
assert http_meta['request']['method'] == 'GET'
assert http_meta['response']['status'] == status_code


async def test_ok_name(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config(name='test')
status_code = 200
url = 'http://{}/status/{}?foo=bar'.format(BASE_URL, status_code)
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.get(url):
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.name == 'test'


async def test_error(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config()
status_code = 400
url = 'http://{}/status/{}'.format(BASE_URL, status_code)
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.post(url):
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.name == url
assert subsegment.error

http_meta = subsegment.http
assert http_meta['request']['url'] == url
assert http_meta['request']['method'] == 'POST'
assert http_meta['response']['status'] == status_code


async def test_throttle(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config()
status_code = 429
url = 'http://{}/status/{}'.format(BASE_URL, status_code)
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.head(url):
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.name == url
assert subsegment.error
assert subsegment.throttle

http_meta = subsegment.http
assert http_meta['request']['url'] == url
assert http_meta['request']['method'] == 'HEAD'
assert http_meta['response']['status'] == status_code


async def test_fault(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config()
status_code = 500
url = 'http://{}/status/{}'.format(BASE_URL, status_code)
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
async with session.put(url):
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.name == url
assert subsegment.fault

http_meta = subsegment.http
assert http_meta['request']['url'] == url
assert http_meta['request']['method'] == 'PUT'
assert http_meta['response']['status'] == status_code


async def test_invalid_url(loop, recorder):
xray_recorder.begin_segment('name')
trace_config = aws_xray_trace_config()
async with ClientSession(loop=loop, trace_configs=[trace_config]) as session:
try:
async with session.get('http://doesnt.exist'):
pass
except Exception:
# prevent uncatch exception from breaking test run
pass

subsegment = xray_recorder.current_segment().subsegments[0]
assert subsegment.namespace == LOCAL_NAMESPACE
assert subsegment.fault

exception = subsegment.cause['exceptions'][0]
assert exception.type == 'ClientConnectorError'
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def pop(self):
return None


class TestServer(object):
class ServerTest(object):
"""
Simple class to hold a copy of the event loop
"""
Expand Down Expand Up @@ -107,7 +107,7 @@ async def test_ok(test_client, loop, recorder):
:param loop: Eventloop fixture
:param recorder: X-Ray recorder fixture
"""
client = await test_client(TestServer.app(loop=loop))
client = await test_client(ServerTest.app(loop=loop))

resp = await client.get('/')
assert resp.status == 200
Expand All @@ -131,7 +131,7 @@ async def test_error(test_client, loop, recorder):
:param loop: Eventloop fixture
:param recorder: X-Ray recorder fixture
"""
client = await test_client(TestServer.app(loop=loop))
client = await test_client(ServerTest.app(loop=loop))

resp = await client.get('/error')
assert resp.status == 404
Expand All @@ -156,7 +156,7 @@ async def test_exception(test_client, loop, recorder):
:param loop: Eventloop fixture
:param recorder: X-Ray recorder fixture
"""
client = await test_client(TestServer.app(loop=loop))
client = await test_client(ServerTest.app(loop=loop))

resp = await client.get('/exception')
await resp.text() # Need this to trigger Exception
Expand All @@ -183,7 +183,7 @@ async def test_concurrent(test_client, loop, recorder):
:param loop: Eventloop fixture
:param recorder: X-Ray recorder fixture
"""
client = await test_client(TestServer.app(loop=loop))
client = await test_client(ServerTest.app(loop=loop))

recorder.emitter = CustomStubbedEmitter()

Expand Down
26 changes: 24 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[tox]
envlist =
py{27,34,35,36}
py36-aiohttp3
py35-aiohttp3
coverage-report

skip_missing_interpreters = True
Expand All @@ -19,17 +21,37 @@ deps =
django >= 1.10, <2.0
pynamodb
# Python3.5+ only deps
py{35,36}: aiohttp >= 2.3.0
py{35,36}: aiohttp >= 2.3.0,<3.0.0
py{35,36}: pytest-aiohttp
py{35,36}: aiobotocore

commands =
py{27,34}: coverage run --source aws_xray_sdk -m py.test tests --ignore tests/ext/aiohttp --ignore tests/ext/aiobotocore --ignore tests/test_async_local_storage.py --ignore tests/test_async_recorder.py
py{35,36}: coverage run --source aws_xray_sdk -m py.test tests
py{35,36}: coverage run --source aws_xray_sdk -m py.test tests --ignore tests/ext/aiohttp/test_client.py

setenv =
DJANGO_SETTINGS_MODULE = tests.ext.django.app.settings

[testenv:py35-aiohttp3]
deps =
pytest > 3.0.0
aiohttp >= 3.0.0
pytest-aiohttp
coverage

commands =
py{35}: coverage run --source aws_xray_sdk -m py.test tests/ext/aiohttp

[testenv:py36-aiohttp3]
deps =
pytest > 3.0.0
aiohttp >= 3.0.0
pytest-aiohttp
coverage

commands =
py{36}: coverage run --source aws_xray_sdk -m py.test tests/ext/aiohttp

[testenv:coverage-report]
deps = coverage
skip_install = true
Expand Down

0 comments on commit 9ff8539

Please sign in to comment.