Skip to content

Commit

Permalink
[enh] add asyncio plugins
Browse files Browse the repository at this point in the history
- standalone-py
- py-photonix testapps
- fastapi example
- pinpointpy plugins and testcase
  • Loading branch information
eeliu committed Nov 6, 2024
1 parent 2b4d5eb commit f47c785
Show file tree
Hide file tree
Showing 24 changed files with 578 additions and 88 deletions.
3 changes: 1 addition & 2 deletions common/include/common.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ int pinpoint_trace_is_root(NodeID);
* @return int 0 : means oK
* -1: exception found, check the log
*/
DEPRECATED(
"use pinpoint_end_trace. if you need no span missing, set pinpoint_set_agent with `timeout_ms`")
DEPRECATED("use pinpoint_end_trace. if you need all span, set pinpoint_set_agent with `timeout_ms`")
int pinpoint_force_end_trace(NodeID, int32_t timeout);

/**
Expand Down
3 changes: 1 addition & 2 deletions common/include/pinpoint/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ int pinpoint_trace_is_root(NodeID);
* @return int 0 : means oK
* -1: exception found, check the log
*/
DEPRECATED(
"use pinpoint_end_trace. if you need no span missing, set pinpoint_set_agent with `timeout_ms`")
DEPRECATED("use pinpoint_end_trace. if you need all span, set pinpoint_set_agent with `timeout_ms`")
int pinpoint_force_end_trace(NodeID, int32_t timeout);

/**
Expand Down
100 changes: 44 additions & 56 deletions plugins/PY/pinpointPy/Common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@
from pinpointPy import Defines, pinpoint, get_logger
from pinpointPy.TraceContext import get_trace_context
from functools import wraps
import random

import warnings
import functools


def deprecated(reason: str):
def decorator(func):
@functools.wraps(func)
def deprecated_func(*args, **kwargs):
warnings.warn(f"{func.__name__} is deprecated. Reason: {reason}",
category=DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return deprecated_func
return decorator


class Trace:
Expand Down Expand Up @@ -115,6 +128,32 @@ def getUniqueName(self):
return self.name


class AsyncPinTrace(PinTrace):

def __call__(self, func):
self.func_name = func.__name__

@wraps(func)
async def pinpointTrace(*args, **kwargs):
ret = None
# avoiding variable missing
# use and return
sampled, parentId, nArgs, nKwargs = self._isSample(*args, **kwargs)
if not sampled:
return await func(*nArgs, **nKwargs)
traceId, nArgs, nKwargs = self.onBefore(
parentId, *nArgs, **nKwargs)
try:
ret = await func(*nArgs, **nKwargs)
return ret
except Exception as e:
self.onException(traceId, e)
raise e
finally:
self.onEnd(traceId, ret)
return pinpointTrace


class TraceIdObject:
def __init__(self, id: int) -> None:
self.traceId = id
Expand Down Expand Up @@ -183,7 +222,7 @@ class PinTransaction(PinTrace):
def __init__(self, name: str, userGenHeaderCb: GenPinHeader):
"""pinpointPy user entry point
Example:
Example:
```
from pinpointPy.Common import GenPinHeader, PinHeader, PinTransaction
Expand All @@ -199,7 +238,7 @@ def run(msg):
```
Args:
name (str): entry points name(showing pinpoint)
userGenHeaderCb (GenPinHeader): This helps getting header from current function
userGenHeaderCb (GenPinHeader): This helps getting header from current function
"""
super().__init__(name)
self.name: str = name
Expand Down Expand Up @@ -280,56 +319,5 @@ def enable_experiment_plugins(async_plugins: bool = True):
thread_patch()
from pinpointPy.libs._process import monkey_patch as process_patch
process_patch()


class HookTargetPlugins(PinTrace):
def onBefore(self, parentId: int, *args, **kwargs):

traceId, args, kwargs = super().onBefore(parentId, *args, **kwargs)
pinpoint.add_trace_header(
Defines.PP_INTERCEPTOR_NAME, self.getUniqueName(), traceId)
pinpoint.add_trace_header(
Defines.PP_SERVER_TYPE, Defines.P_INVOCATION_CALL_TYPE, traceId)

async_id = random.randint(0, 9999)
pinpoint.add_trace_header(
Defines.PP_ASYNC_CALL_ID, f'{async_id}', traceId)

sequence_id = pinpoint.get_sequence_id(traceId)

tid = pinpoint.get_context(Defines.PP_TRANSCATION_ID, traceId)
seq_id = pinpoint.get_context(Defines.PP_SPAN_ID, traceId)
app_name = pinpoint.get_context(Defines.PP_APP_NAME, traceId)
app_id = pinpoint.get_context(Defines.PP_APP_ID, traceId)

origin_target = kwargs['target']

def pp_new_entry_func(*args, **kwargs):
# start trace
thread_trace_id = pinpoint.with_trace(0)
self.setCurrentTraceNodeId(thread_trace_id)
pinpoint.add_trace_header(
Defines.PP_APP_NAME, app_name, thread_trace_id)
pinpoint.add_trace_header(
Defines.PP_APP_ID, app_id, thread_trace_id)

pinpoint.add_trace_header(
Defines.PP_SPAN_ID, seq_id, thread_trace_id)
pinpoint.add_trace_header(
Defines.PP_TRANSCATION_ID, tid, thread_trace_id)
pinpoint.add_trace_header(
Defines.PP_SERVER_TYPE, Defines.PYTHON, thread_trace_id)
pinpoint.set_async_context(
thread_trace_id, async_id, sequence_id)

if callable(origin_target):
origin_target(*args, **kwargs)

pinpoint.end_trace(thread_trace_id)

kwargs['target'] = pp_new_entry_func
return traceId, args, kwargs

def onEnd(self, traceId, ret):
super().onEnd(traceId, ret)
return ret
from pinpointPy.libs._asyncio import monkey_patch as asyncio_patch
asyncio_patch()
90 changes: 90 additions & 0 deletions plugins/PY/pinpointPy/CommonPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


from pinpointPy import Common, Defines, pinpoint
import random


class PinpointCommonPlugin(Common.PinTrace):
Expand All @@ -36,3 +37,92 @@ def onEnd(self, traceId, ret):
def onException(self, traceId, e):
pinpoint.add_exception(str(e), traceId)
raise e


class AsyncCommonPlugin(Common.AsyncPinTrace):

# -> tuple[int, Any, dict[str, Any]]:
def onBefore(self, parentId, *args, **kwargs):
traceId, args, kwargs = super().onBefore(parentId, *args, **kwargs)
pinpoint.add_trace_header(
Defines.PP_INTERCEPTOR_NAME, self.getUniqueName(), traceId)
pinpoint.add_trace_header(
Defines.PP_SERVER_TYPE, Defines.PP_METHOD_CALL, traceId)
return traceId, args, kwargs

def onEnd(self, traceId, ret):
super().onEnd(traceId, ret)

def onException(self, traceId, e):
pinpoint.add_exception(str(e), traceId)


class HookTargetPlugins(Common.PinTrace):
def onBefore(self, parentId: int, *args, **kwargs):

traceId, args, kwargs = super().onBefore(parentId, *args, **kwargs)
pinpoint.add_trace_header(
Defines.PP_INTERCEPTOR_NAME, self.getUniqueName(), traceId)
pinpoint.add_trace_header(
Defines.PP_SERVER_TYPE, Defines.P_INVOCATION_CALL_TYPE, traceId)

async_id = random.randint(0, 9999)
pinpoint.add_trace_header(
Defines.PP_ASYNC_CALL_ID, f'{async_id}', traceId)

sequence_id = pinpoint.get_sequence_id(traceId)

tid = pinpoint.get_context(Defines.PP_TRANSCATION_ID, traceId)
seq_id = pinpoint.get_context(Defines.PP_SPAN_ID, traceId)
app_name = pinpoint.get_context(Defines.PP_APP_NAME, traceId)
app_id = pinpoint.get_context(Defines.PP_APP_ID, traceId)

if 'target' in kwargs:
origin_target = kwargs['target']

def pp_new_entry_func(*args, **kwargs):
# start trace
thread_trace_id = pinpoint.with_trace(0)
self.setCurrentTraceNodeId(thread_trace_id)
pinpoint.add_trace_header(
Defines.PP_APP_NAME, app_name, thread_trace_id)
pinpoint.add_context(
Defines.PP_APP_NAME, app_name, thread_trace_id)

pinpoint.add_trace_header(
Defines.PP_APP_ID, app_id, thread_trace_id)
pinpoint.add_context(
Defines.PP_APP_ID, app_id, thread_trace_id)

pinpoint.add_trace_header(
Defines.PP_SPAN_ID, seq_id, thread_trace_id)
pinpoint.add_context(
Defines.PP_SPAN_ID, seq_id, thread_trace_id)

pinpoint.add_trace_header(
Defines.PP_TRANSCATION_ID, tid, thread_trace_id)
pinpoint.add_context(
Defines.PP_TRANSCATION_ID, tid, thread_trace_id)

pinpoint.add_trace_header(
Defines.PP_SERVER_TYPE, Defines.PYTHON, thread_trace_id)
pinpoint.set_async_context(
thread_trace_id, async_id, sequence_id)

if callable(origin_target):
# todo add
@PinpointCommonPlugin(origin_target.__name__)
def call_origin_target(*args, **kwargs):
origin_target(*args, **kwargs)

call_origin_target(*args, **kwargs)

pinpoint.end_trace(thread_trace_id)

kwargs['target'] = pp_new_entry_func

return traceId, args, kwargs

def onEnd(self, traceId, ret):
super().onEnd(traceId, ret)
return ret
14 changes: 9 additions & 5 deletions plugins/PY/pinpointPy/Fastapi/AsyCommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
from pinpointPy.TraceContext import TraceContext
from pinpointPy.Common import PinTrace


class AsyncTraceContext (TraceContext):
def get_parent_id(self):
id = context.get('_pinpoint_id_', 0)
if id == 0:
try:
id = context.get('_pinpoint_id_', 0)
if id == 0:
return False, -1
else:
return True, id
except Exception:
return False, -1
else:
return True, id

def set_parent_id(self, id: int):
context['_pinpoint_id_'] = id


class AsyncPinTrace(PinTrace):

def __call__(self, func):
self.func_name = func.__name__

Expand Down
2 changes: 2 additions & 0 deletions plugins/PY/pinpointPy/Fastapi/AsyCommonPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

from pinpointPy.Fastapi.AsyCommon import AsyncPinTrace
from pinpointPy import Defines, pinpoint
from pinpointPy.Common import deprecated


@deprecated("please use AsyncCommonPlugin in pinpointPy.CommonPlugin eg: `from pinpointPy.CommonPlugin import AsyncCommonPlugin`")
class CommonPlugin(AsyncPinTrace):

# -> tuple[int, Any, dict[str, Any]]:
Expand Down
1 change: 1 addition & 0 deletions plugins/PY/pinpointPy/Fastapi/AsyRequestPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def onBefore(self, parentId, *args, **kwargs):
pinpoint.add_trace_header(
Defines.PP_APP_ID, pinpoint.app_id(), traceId)
pinpoint.add_context(Defines.PP_APP_NAME, pinpoint.app_name(), traceId)
pinpoint.add_context(Defines.PP_APP_ID, pinpoint.app_id(), traceId)
###############################################################
request = args[0]

Expand Down
6 changes: 5 additions & 1 deletion plugins/PY/pinpointPy/Fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def use_starlette_context():
set_trace_context(new_trace_context=AsyncTraceContext())


__version__ = '0.0.3'
__version__ = '0.0.4'
__author__ = 'liu.mingyi@navercorp.com'
__all__ = ['async_monkey_patch_for_pinpoint', 'asyn_monkey_patch_for_pinpoint', 'use_starlette_context', 'PinPointMiddleWare',
'CommonPlugin', 'PinTransaction', 'PinHeader', 'GenPinHeader', 'PinStarlettePlugin']

# 0.0.4
# Changes
# Append appid
18 changes: 18 additions & 0 deletions plugins/PY/pinpointPy/TraceContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pinpointPy.pinpoint import get_logger
import threading
from contextvars import ContextVar

_local_id = threading.local()

Expand Down Expand Up @@ -63,3 +64,20 @@ def get_parent_id(self):
def set_parent_id(self, id: int):
global _local_id
_local_id._pinpoint_id_ = id


class asyncio_local_context(TraceContext):

def __init__(self):
self.request_id = ContextVar(
'_pinpoint_id_', default=0)

def get_parent_id(self):
id = self.request_id.get()
if id > 0:
return True, id
else:
return False, -1

def set_parent_id(self, id: int):
self.request_id.set(id)
13 changes: 11 additions & 2 deletions plugins/PY/pinpointPy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from pinpointPy.libs import monkey_patch_for_pinpoint
from pinpointPy.pinpoint import set_agent, app_id, app_name, gen_tid, get_logger
from pinpointPy.TraceContext import set_trace_context, thread_local_context
from pinpointPy.TraceContext import set_trace_context, thread_local_context, asyncio_local_context
from pinpointPy.Common import PinTransaction, GenPinHeader, PinHeader, enable_experiment_plugins


Expand All @@ -29,7 +29,16 @@ def use_thread_local_context():
set_trace_context(thread_local_context())


__all__ = ['monkey_patch_for_pinpoint', 'use_thread_local_context'
def use_asyncio_local_context():
get_logger().debug("use_asyncio_local_context")
set_trace_context(asyncio_local_context())


__all__ = ['monkey_patch_for_pinpoint', 'use_thread_local_context', 'use_asyncio_local_context',
'set_agent', 'app_id', 'app_name', 'gen_tid', 'get_logger', 'PinTransaction', 'GenPinHeader', 'PinHeader', 'enable_experiment_plugins']
__version__ = "1.4.0"
__author__ = 'liu.mingyi@navercorp.com'

# 1.4.0
# Changes
# - use_asyncio_local_context
Loading

0 comments on commit f47c785

Please sign in to comment.