Skip to content

Commit 50222a6

Browse files
committed
celery rabbitmq plugins
1 parent df75fa8 commit 50222a6

File tree

15 files changed

+545
-7
lines changed

15 files changed

+545
-7
lines changed

plugins/PY/pinpointPy/libs/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,18 @@ def monkey_patch_for_pinpoint(pymongo=True,
3838
sqlalchemy=True,
3939
MySQLdb=True,
4040
MysqlConnector=True,
41-
pyscopg2=True):
41+
pyscopg2=True,
42+
rabbitmq=True,
43+
kombu=True):
4244
__monkey_patch(_pymongo=pymongo, _MySQLdb=MySQLdb, _PyMysql=PyMysql, _pyRedis=pyRedis, _requests=requests,
43-
_urllib=urllib, _sqlalchemy=sqlalchemy, _MysqlConnector=MysqlConnector, _psycopg2=pyscopg2)
45+
_urllib=urllib, _sqlalchemy=sqlalchemy, _MysqlConnector=MysqlConnector, _psycopg2=pyscopg2,
46+
_rabbitmq=rabbitmq, _kombu=kombu)
4447

4548

4649
__all__ = ['monkey_patch_for_pinpoint',
4750
'CeleryWorkerPlugin', 'CeleryCallerPlugin']
48-
__version__ = '0.0.4'
51+
__version__ = '0.0.5'
4952
__author__ = 'liu.mingyi@navercorp.com'
53+
54+
# 0.0.5
55+
# add rabbitmq
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python
2+
# -*- coding: UTF-8 -*-
3+
4+
# ------------------------------------------------------------------------------
5+
# Copyright 2020. NAVER Corp. -
6+
# -
7+
# Licensed under the Apache License, Version 2.0 (the "License"); -
8+
# you may not use this file except in compliance with the License. -
9+
# You may obtain a copy of the License at -
10+
# -
11+
# http://www.apache.org/licenses/LICENSE-2.0 -
12+
# -
13+
# Unless required by applicable law or agreed to in writing, software -
14+
# distributed under the License is distributed on an "AS IS" BASIS, -
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
16+
# See the License for the specific language governing permissions and -
17+
# limitations under the License. -
18+
# ------------------------------------------------------------------------------
19+
20+
# Created by eeliu at 11/13/2024
21+
22+
from pinpointPy import Common, pinpoint, Defines
23+
import kombu
24+
25+
26+
class KombuPlugin(Common.PinTrace):
27+
28+
def __init__(self, name):
29+
super().__init__(name)
30+
31+
def get_host(self, *args, **kwargs):
32+
message = args[0]
33+
from kombu.messaging import Producer
34+
if isinstance(message, Producer):
35+
return message.connection.host
36+
else:
37+
return "localhost"
38+
39+
def get_routing_exchange(self, *args, **kwargs):
40+
if kwargs:
41+
# return "" if 'routing_key' not in kwargs else kwargs['routing_key'], "" if 'exchange' not in kwargs else kwargs['exchange'].name
42+
routing_key = "" if 'routing_key' not in kwargs else kwargs['routing_key']
43+
exchange = "" if 'exchange' not in kwargs else kwargs['exchange']
44+
45+
if isinstance(exchange, kombu.entity.Exchange):
46+
exchange = exchange.name
47+
return routing_key, exchange
48+
else:
49+
return "", ""
50+
51+
def onBefore(self, parentId, *args, **kwargs):
52+
trace_id, args, kwargs = super().onBefore(parentId, *args, **kwargs)
53+
pinpoint.add_trace_header(
54+
Defines.PP_INTERCEPTOR_NAME, self.getUniqueName(), trace_id)
55+
# fixme treat all type as rabbitmq
56+
pinpoint.add_trace_header(
57+
Defines.PP_SERVER_TYPE, Defines.PP_RABBITMQ_CLIENT, trace_id)
58+
59+
pinpoint.add_trace_header(
60+
Defines.PP_DESTINATION, self.get_host(*args, **kwargs), trace_id)
61+
62+
routing, ex = self.get_routing_exchange(*args, **kwargs)
63+
pinpoint.add_trace_header_v2(
64+
Defines.PP_RABBITMQ_ROUTINGKEY, routing, trace_id)
65+
pinpoint.add_trace_header_v2(
66+
Defines.PP_RABBITMQ_EXCHANGEKEY, ex, trace_id)
67+
68+
return trace_id, args, kwargs
69+
70+
def onEnd(self, traceId, ret):
71+
super().onEnd(traceId, ret)
72+
return ret
73+
74+
def onException(self, traceId, e):
75+
pinpoint.add_exception(str(e), traceId)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#!/usr/bin/env python
2+
# -*- coding: UTF-8 -*-
3+
4+
# ------------------------------------------------------------------------------
5+
# Copyright 2020. NAVER Corp. -
6+
# -
7+
# Licensed under the Apache License, Version 2.0 (the "License"); -
8+
# you may not use this file except in compliance with the License. -
9+
# You may obtain a copy of the License at -
10+
# -
11+
# http://www.apache.org/licenses/LICENSE-2.0 -
12+
# -
13+
# Unless required by applicable law or agreed to in writing, software -
14+
# distributed under the License is distributed on an "AS IS" BASIS, -
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
16+
# See the License for the specific language governing permissions and -
17+
# limitations under the License. -
18+
# ------------------------------------------------------------------------------
19+
20+
# Created by eeliu at 11/13/2024
21+
22+
from pinpointPy.Interceptor import Interceptor, intercept_once
23+
from pinpointPy import get_logger
24+
from pinpointPy.CommonPlugin import PinpointCommonPlugin
25+
26+
27+
@intercept_once
28+
def monkey_patch():
29+
try:
30+
from kombu.messaging import Producer
31+
from .KombuPlugin import KombuPlugin
32+
Interceptors = [
33+
Interceptor(Producer, 'publish',
34+
KombuPlugin),
35+
36+
]
37+
38+
for interceptor in Interceptors:
39+
interceptor.enable()
40+
41+
except ImportError as e:
42+
get_logger().info(f'exception at {e}')
43+
44+
45+
__all__ = ['monkey_patch']
46+
__version__ = '0.0.1'
47+
__author__ = 'liu.mingyi@navercorp.com'
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
2+
3+
from pinpointPy.libs._kombu import monkey_patch
4+
import unittest
5+
from pinpointPy import PinTransaction, Defines
6+
from pinpointPy.tests import TestCase, GenTestHeader
7+
8+
9+
class Test_Case(TestCase):
10+
11+
@classmethod
12+
def setUpClass(cls):
13+
super().setUpClass()
14+
monkey_patch()
15+
16+
@PinTransaction("testcase", GenTestHeader())
17+
def test_kombu_amqp(self):
18+
# https://github.com/celery/kombu?tab=readme-ov-file#quick-overview
19+
from kombu import Connection, Exchange, Queue
20+
21+
media_exchange = Exchange('media', 'direct', durable=True)
22+
video_queue = Queue(
23+
'video', exchange=media_exchange, routing_key='video')
24+
25+
def process_media(body, message):
26+
print(body)
27+
message.ack()
28+
29+
# connections
30+
with Connection('amqp://user:password@rabbit-mq:5672//') as conn:
31+
32+
# produce
33+
producer = conn.Producer(serializer='json')
34+
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
35+
exchange=media_exchange, routing_key='video',
36+
declare=[video_queue])
37+
38+
39+
if __name__ == '__main__':
40+
unittest.main()
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#!/usr/bin/env python
2+
# -*- coding: UTF-8 -*-
3+
4+
# Created by eeliu at 11/23/24
5+
6+
7+
# ******************************************************************************
8+
# Copyright 2020. NAVER Corp.
9+
#
10+
# Licensed under the Apache License, Version 2.0 (the "License");
11+
# you may not use this file except in compliance with the License.
12+
# You may obtain a copy of the License at
13+
#
14+
# http://www.apache.org/licenses/LICENSE-2.0
15+
#
16+
# Unless required by applicable law or agreed to in writing, software
17+
# distributed under the License is distributed on an "AS IS" BASIS,
18+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
# See the License for the specific language governing permissions and
20+
# limitations under the License.
21+
# ******************************************************************************
22+
23+
from pinpointPy import Common, pinpoint, Defines
24+
25+
26+
class Plugin(Common.PinTrace):
27+
28+
def __init__(self, name):
29+
super().__init__(name)
30+
31+
def get_host(self, *args, **kwargs):
32+
inst = args[0]
33+
from amqp import Channel
34+
if isinstance(inst, Channel):
35+
con = inst.connection
36+
return con.host
37+
else:
38+
return "localhost"
39+
40+
def get_routing_exchange(self, *args, **kwargs):
41+
if kwargs:
42+
return "" if 'routing_key' not in kwargs else kwargs['routing_key'], "" if 'exchange' not in kwargs else kwargs['exchange']
43+
else:
44+
return "", ""
45+
46+
def onBefore(self, parentId, *args, **kwargs):
47+
trace_id, args, kwargs = super().onBefore(parentId, *args, **kwargs)
48+
pinpoint.add_trace_header(
49+
Defines.PP_INTERCEPTOR_NAME, self.getUniqueName(), trace_id)
50+
pinpoint.add_trace_header(
51+
Defines.PP_SERVER_TYPE, Defines.PP_RABBITMQ_CLIENT, trace_id)
52+
53+
pinpoint.add_trace_header(
54+
Defines.PP_DESTINATION, self.get_host(*args, **kwargs), trace_id)
55+
56+
routing, ex = self.get_routing_exchange(*args, **kwargs)
57+
pinpoint.add_trace_header_v2(
58+
Defines.PP_RABBITMQ_ROUTINGKEY, routing, trace_id)
59+
pinpoint.add_trace_header_v2(
60+
Defines.PP_RABBITMQ_EXCHANGEKEY, ex, trace_id)
61+
62+
return trace_id, args, kwargs
63+
64+
def onEnd(self, traceId, ret):
65+
super().onEnd(traceId, ret)
66+
return ret
67+
68+
def onException(self, traceId, e):
69+
pinpoint.add_exception(str(e), traceId)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#!/usr/bin/env python
2+
# -*- coding: UTF-8 -*-
3+
4+
# ------------------------------------------------------------------------------
5+
# Copyright 2020. NAVER Corp. -
6+
# -
7+
# Licensed under the Apache License, Version 2.0 (the "License"); -
8+
# you may not use this file except in compliance with the License. -
9+
# You may obtain a copy of the License at -
10+
# -
11+
# http://www.apache.org/licenses/LICENSE-2.0 -
12+
# -
13+
# Unless required by applicable law or agreed to in writing, software -
14+
# distributed under the License is distributed on an "AS IS" BASIS, -
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -
16+
# See the License for the specific language governing permissions and -
17+
# limitations under the License. -
18+
# ------------------------------------------------------------------------------
19+
20+
# Created by eeliu at 11/13/2024
21+
22+
from pinpointPy.Interceptor import Interceptor, intercept_once
23+
from pinpointPy import get_logger
24+
from pinpointPy.CommonPlugin import PinpointCommonPlugin
25+
26+
27+
@intercept_once
28+
def monkey_patch():
29+
try:
30+
from amqp.channel import Channel
31+
from .Plugins import Plugin
32+
Interceptors = [
33+
Interceptor(Channel, 'basic_publish_confirm',
34+
PinpointCommonPlugin),
35+
Interceptor(Channel, '_basic_publish',
36+
Plugin),
37+
]
38+
39+
for interceptor in Interceptors:
40+
interceptor.enable()
41+
42+
except ImportError as e:
43+
get_logger().info(f'exception at {e}')
44+
45+
46+
__all__ = ['monkey_patch']
47+
__version__ = '0.0.1'
48+
__author__ = 'liu.mingyi@navercorp.com'
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
3+
from pinpointPy.libs._rabbitmq import monkey_patch
4+
import unittest
5+
from pinpointPy import PinTransaction, Defines
6+
from pinpointPy.tests import TestCase, GenTestHeader
7+
8+
9+
class Test_Case(TestCase):
10+
11+
@classmethod
12+
def setUpClass(cls):
13+
super().setUpClass()
14+
monkey_patch()
15+
16+
@PinTransaction("testcase", GenTestHeader())
17+
def test_amqp(self):
18+
import amqp
19+
with amqp.Connection(
20+
'rabbit-mq', exchange='test_exchange', userid='user', password='password',
21+
confirm_publish=True,
22+
) as c:
23+
ch = c.channel()
24+
ch.basic_publish(amqp.Message('Hello World'),
25+
routing_key='test', exchange="")
26+
27+
28+
if __name__ == '__main__':
29+
unittest.main()

plugins/PY/pinpointPy/tests/TestCase.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ def GetHeader(self, *args, **kwargs) -> PinHeader:
1212
_header.Host = "127.0.0.1"
1313
_header.RemoteAddr = "127.0.0.1"
1414
_header.ParentType = "1700"
15-
_header.ParentName = "parent"
15+
_header.ParentName = "cd.dev.test.py"
1616
_header.ParentHost = "127.0.0.1"
1717
_header.ParentTid = "abc^452568^23"
18+
_header.ParentSid = 23344
1819
return _header
1920

2021

@@ -23,8 +24,8 @@ class TestCase(unittest.TestCase):
2324
@classmethod
2425
def setUpClass(cls):
2526
use_thread_local_context()
26-
set_agent("cd.dev.test.py", "cd.dev.test.py",
27-
'tcp:localhost:9999', -1, log_level=logging.DEBUG)
27+
set_agent("cd.dev.test.utest", "cd.dev.test.py",
28+
'tcp:localhost:10000', -1, log_level=logging.DEBUG)
2829

2930
def assert_in_pinpoint_context(self):
3031
# get_trace_context().get_parent_id()

plugins/PY/requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ mysqlclient
1313
grpcio
1414
grpc-interceptor
1515
psycopg2
16-
pinpointPy
16+
pinpointPy
17+
kombu
18+
amqp

0 commit comments

Comments
 (0)