-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat add dd and otel celery tasks #409
Changes from all commits
04b7a9a
806c9db
7ad32d1
02ed081
422967d
d13419d
fae9b2e
0ddf891
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,16 @@ def record_exception(self): | |
Record the exception that is currently being handled. | ||
""" | ||
|
||
@abstractmethod | ||
def initialize_celery_monitoring(self, *args, **kwargs): | ||
""" | ||
Instrument celery to be monitored by the monitoring service. | ||
|
||
Optional kwargs: | ||
worker_process_init - required for open telemetry to integrate to the clery signal. | ||
Import from from celery.signals. | ||
""" | ||
|
||
|
||
class NewRelicBackend(TelemetryBackend): | ||
""" | ||
|
@@ -77,14 +87,17 @@ def record_exception(self): | |
# https://docs.newrelic.com/docs/apm/agents/python-agent/python-agent-api/recordexception-python-agent-api/ | ||
newrelic.agent.record_exception() | ||
|
||
def initialize_celery_monitoring(self, *args, **kwargs): | ||
pass | ||
|
||
|
||
class OpenTelemetryBackend(TelemetryBackend): | ||
""" | ||
Send telemetry via OpenTelemetry. | ||
|
||
Requirements to use: | ||
|
||
- Install `opentelemetry-api` Python package | ||
- Install `opentelemetry-api` and `opentelemetry-instrumentation-celery` Python packages. | ||
- Configure and initialize OpenTelemetry | ||
|
||
API reference: https://opentelemetry-python.readthedocs.io/en/latest/ | ||
|
@@ -93,7 +106,9 @@ class OpenTelemetryBackend(TelemetryBackend): | |
def __init__(self): | ||
# If import fails, the backend won't be used. | ||
from opentelemetry import trace | ||
from opentelemetry.instrumentation.celery import CeleryInstrumentor | ||
self.otel_trace = trace | ||
self.instrumentor = CeleryInstrumentor | ||
|
||
def set_attribute(self, key, value): | ||
# Sets the value on the current span, not necessarily the root | ||
|
@@ -103,6 +118,17 @@ def set_attribute(self, key, value): | |
def record_exception(self): | ||
self.otel_trace.get_current_span().record_exception(sys.exc_info()[1]) | ||
|
||
def initialize_celery_monitoring(self, *args, **kwargs): | ||
worker_process_init = kwargs.get('worker_process_init', None) | ||
if worker_process_init is not None: | ||
@worker_process_init.connect(weak=False) | ||
def init_celery_tracing(*args, **kwargs): | ||
self.instrumentor().instrument() | ||
else: | ||
raise Exception( | ||
"the worker_process_init celery signal must be provided for OpenTelemetry to monitor celery tasks." | ||
) | ||
|
||
|
||
class DatadogBackend(TelemetryBackend): | ||
""" | ||
|
@@ -118,8 +144,9 @@ class DatadogBackend(TelemetryBackend): | |
# pylint: disable=import-outside-toplevel | ||
def __init__(self): | ||
# If import fails, the backend won't be used. | ||
from ddtrace import tracer | ||
from ddtrace import patch, tracer | ||
self.dd_tracer = tracer | ||
self.patch = patch | ||
|
||
def set_attribute(self, key, value): | ||
if root_span := self.dd_tracer.current_root_span(): | ||
|
@@ -129,6 +156,9 @@ def record_exception(self): | |
if span := self.dd_tracer.current_span(): | ||
span.set_traceback() | ||
|
||
def initialize_celery_monitoring(self, *args, **kwargs): | ||
self.patch(celery=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like ddtrace patches Celery by default: https://github.com/DataDog/dd-trace-py/blob/01fbf9127180794392cc7dbe16acd610087dacf6/ddtrace/_monkey.py#L33 -- were you seeing otherwise? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was just going by these docs: https://ddtrace.readthedocs.io/en/stable/integrations.html#celery. Separately: In those docs I see that trace headers are off by default. We should find out if that is what we want from DD? Maybe long running tasks would kill our traces? Not sure. @connorhaugh: Can you task/track this? |
||
|
||
|
||
# We're using an lru_cache instead of assigning the result to a variable on | ||
# module load. With the default settings (pointing to a TelemetryBackend | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If OpenTelemetry needs it, then it will have to be passed to all the backends, because there will only be one call site for the
initialize_celery_monitoring
API call we expose. However, it looks like it doesn't need to be passed in at all—this is just a signal, so we couldfrom celery.signals import worker_process_init
.For that matter, it might make the most sense to instead say that the caller should do that, like so:
We could also just leave the OpenTelemetry side of this unimplemented for now, since we won't be able to give it the testing it deserves (in production).