-
Notifications
You must be signed in to change notification settings - Fork 562
feat(integrations): Add tracing to DramatiqIntegration #4571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
edee89d
5562a9c
863d16b
5674a62
9764f19
7156ac4
39e3a50
a41b477
4746db6
a5e9ad0
1357bea
39d3532
db239dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,31 @@ | ||
import json | ||
|
||
import sentry_sdk | ||
from sentry_sdk.integrations import Integration | ||
from sentry_sdk.consts import OP, SPANSTATUS | ||
from sentry_sdk.api import continue_trace, get_baggage, get_traceparent | ||
from sentry_sdk.integrations import Integration, DidNotEnable | ||
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds | ||
from sentry_sdk.tracing import ( | ||
BAGGAGE_HEADER_NAME, | ||
SENTRY_TRACE_HEADER_NAME, | ||
TransactionSource, | ||
) | ||
from sentry_sdk.utils import ( | ||
AnnotatedValue, | ||
capture_internal_exceptions, | ||
event_from_exception, | ||
) | ||
from typing import TypeVar | ||
|
||
R = TypeVar("R") | ||
|
||
from dramatiq.broker import Broker # type: ignore | ||
from dramatiq.message import Message # type: ignore | ||
from dramatiq.middleware import Middleware, default_middleware # type: ignore | ||
from dramatiq.errors import Retry # type: ignore | ||
try: | ||
from dramatiq.broker import Broker | ||
from dramatiq.middleware import Middleware, default_middleware | ||
from dramatiq.errors import Retry | ||
from dramatiq.message import Message | ||
except ImportError: | ||
raise DidNotEnable("Dramatiq is not installed") | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
|
@@ -34,10 +47,12 @@ class DramatiqIntegration(Integration): | |
""" | ||
|
||
identifier = "dramatiq" | ||
origin = f"auto.queue.{identifier}" | ||
|
||
@staticmethod | ||
def setup_once(): | ||
# type: () -> None | ||
|
||
_patch_dramatiq_broker() | ||
|
||
|
||
|
@@ -85,50 +100,93 @@ class SentryMiddleware(Middleware): # type: ignore[misc] | |
DramatiqIntegration. | ||
""" | ||
|
||
def before_process_message(self, broker, message): | ||
# type: (Broker, Message) -> None | ||
SENTRY_HEADERS_NAME = "_sentry_headers" | ||
|
||
def before_enqueue(self, broker, message, delay): | ||
# type: (Broker, Message[R], int) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
message._scope_manager = sentry_sdk.new_scope() | ||
message._scope_manager.__enter__() | ||
message.options[self.SENTRY_HEADERS_NAME] = { | ||
BAGGAGE_HEADER_NAME: get_baggage(), | ||
SENTRY_TRACE_HEADER_NAME: get_traceparent(), | ||
} | ||
|
||
def before_process_message(self, broker, message): | ||
# type: (Broker, Message[R]) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
scope = sentry_sdk.get_current_scope() | ||
scope.set_transaction_name(message.actor_name) | ||
message._scope_manager = sentry_sdk.isolation_scope() | ||
scope = message._scope_manager.__enter__() | ||
scope.clear_breadcrumbs() | ||
scope.set_extra("dramatiq_message_id", message.message_id) | ||
sentrivana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
scope.add_event_processor(_make_message_event_processor(message, integration)) | ||
|
||
sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {} | ||
if "retries" in message.options: | ||
# start new trace in case of retrying | ||
sentry_headers = {} | ||
|
||
transaction = continue_trace( | ||
sentry_headers, | ||
name=message.actor_name, | ||
op=OP.QUEUE_TASK_DRAMATIQ, | ||
source=TransactionSource.TASK, | ||
origin=DramatiqIntegration.origin, | ||
) | ||
transaction.set_status(SPANSTATUS.OK) | ||
sentry_sdk.start_transaction( | ||
transaction, | ||
name=message.actor_name, | ||
op=OP.QUEUE_TASK_DRAMATIQ, | ||
source=TransactionSource.TASK, | ||
) | ||
transaction.__enter__() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Transaction Initialization RedundancyThe transaction initialization in |
||
|
||
def after_process_message(self, broker, message, *, result=None, exception=None): | ||
# type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None | ||
# type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
actor = broker.get_actor(message.actor_name) | ||
throws = message.options.get("throws") or actor.options.get("throws") | ||
|
||
try: | ||
if ( | ||
exception is not None | ||
and not (throws and isinstance(exception, throws)) | ||
and not isinstance(exception, Retry) | ||
): | ||
event, hint = event_from_exception( | ||
exception, | ||
client_options=sentry_sdk.get_client().options, | ||
mechanism={ | ||
"type": DramatiqIntegration.identifier, | ||
"handled": False, | ||
}, | ||
) | ||
sentry_sdk.capture_event(event, hint=hint) | ||
finally: | ||
message._scope_manager.__exit__(None, None, None) | ||
scope_manager = message._scope_manager | ||
transaction = sentry_sdk.get_current_scope().transaction | ||
if not transaction: | ||
return None | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
is_event_capture_required = ( | ||
exception is not None | ||
and not (throws and isinstance(exception, throws)) | ||
and not isinstance(exception, Retry) | ||
) | ||
if not is_event_capture_required: | ||
# normal transaction finish | ||
transaction.__exit__(None, None, None) | ||
scope_manager.__exit__(None, None, None) | ||
return | ||
|
||
event, hint = event_from_exception( | ||
exception, # type: ignore[arg-type] | ||
client_options=sentry_sdk.get_client().options, | ||
mechanism={ | ||
"type": DramatiqIntegration.identifier, | ||
"handled": False, | ||
}, | ||
) | ||
sentry_sdk.capture_event(event, hint=hint) | ||
# transaction error | ||
transaction.__exit__(type(exception), exception, None) | ||
scope_manager.__exit__(type(exception), exception, None) | ||
|
||
|
||
def _make_message_event_processor(message, integration): | ||
# type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] | ||
# type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] | ||
|
||
def inner(event, hint): | ||
# type: (Event, Hint) -> Optional[Event] | ||
|
@@ -142,7 +200,7 @@ def inner(event, hint): | |
|
||
class DramatiqMessageExtractor: | ||
def __init__(self, message): | ||
# type: (Message) -> None | ||
# type: (Message[R]) -> None | ||
self.message_data = dict(message.asdict()) | ||
|
||
def content_length(self): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need some sort of scope management in order to make sure the data we collect about tasks is isolated.
The general rule of thumb is: if you start a transaction, you should start it in a new isolation scope. See for example huey.
So we should start an isolation scope right after the initial
if integration is None: return
check withEverything that we do on the
scope
later in the function can stay, but it should be done on the isolation scope, not current scope as before.And finally, we need to
__exit__
the saved scope inafter_process_message
withmessage._scope_manager.__exit__(None, None, None)
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
but please recheck it)