Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement LangfuseTracer #3398

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions src/backend/base/langflow/services/tracing/langfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from datetime import datetime

from loguru import logger

from langflow.services.tracing.base import BaseTracer
from langflow.services.tracing.schema import Log

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
from langchain.callbacks.base import BaseCallbackHandler


class LangFuseTracer(BaseTracer):
flow_id: str

def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.project_name = project_name
self.trace_name = trace_name
self.trace_type = trace_type
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
self.last_span = None
self.spans: dict = {}
self._ready: bool = self.setup_langfuse()

@property
def ready(self):
return self._ready

def setup_langfuse(self) -> bool:
try:
from langfuse import Langfuse
from langfuse.callback.langchain import LangchainCallbackHandler

config = self._get_config()
if not all(config.values()):
raise ValueError("Missing Langfuse configuration")

self._client = Langfuse(**config)
self.trace = self._client.trace(id=str(self.trace_id), name=self.flow_id)

config |= {
"trace_name": self.flow_id,
"stateful_client": self.trace,
"update_stateful_client": True,
}
self._callback = LangchainCallbackHandler(**config)

except ImportError:
logger.error("Could not import langfuse. Please install it with `pip install langfuse`.")
return False

except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
return False

return True

def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
start_time = datetime.utcnow()
if not self._ready:
return

_metadata: dict = {}
_metadata |= {"trace_type": trace_type} if trace_type else {}
_metadata |= metadata if metadata else {}

_name = trace_name.removesuffix(f" ({trace_id})")
content_span = {
"name": _name,
"input": inputs,
"metadata": _metadata,
"start_time": start_time,
}

if self.last_span:
span = self.last_span.span(**content_span)
else:
span = self.trace.span(**content_span)

self.last_span = span
self.spans[trace_id] = span

def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
end_time = datetime.utcnow()
if not self._ready:
return

span = self.spans.get(trace_id, None)
if span:
_output: dict = {}
_output |= outputs if outputs else {}
_output |= {"error": str(error)} if error else {}
_output |= {"logs": logs} if logs else {}
content = {"output": _output, "end_time": end_time}
span.update(**content)

def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
if not self._ready:
return

self._client.flush()

def get_langchain_callback(self) -> Optional["BaseCallbackHandler"]:
if not self._ready:
return None
return None # self._callback

def _get_config(self):
secret_key = os.getenv("LANGFUSE_SECRET_KEY", None)
public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None)
host = os.getenv("LANGFUSE_HOST", None)
return {"secret_key": secret_key, "public_key": public_key, "host": host}
17 changes: 17 additions & 0 deletions src/backend/base/langflow/services/tracing/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def _get_langwatch_tracer():
return LangWatchTracer


def _get_langfuse_tracer():
from langflow.services.tracing.langfuse import LangFuseTracer

return LangFuseTracer


class TracingService(Service):
name = "tracing_service"

Expand Down Expand Up @@ -101,6 +107,7 @@ async def initialize_tracers(self):
await self.start()
self._initialize_langsmith_tracer()
self._initialize_langwatch_tracer()
self._initialize_langfuse_tracer()
except Exception as e:
logger.debug(f"Error initializing tracers: {e}")

Expand All @@ -127,6 +134,16 @@ def _initialize_langwatch_tracer(self):
trace_id=self.run_id,
)

def _initialize_langfuse_tracer(self):
self.project_name = os.getenv("LANGCHAIN_PROJECT", "Langflow")
langfuse_tracer = _get_langfuse_tracer()
self._tracers["langfuse"] = langfuse_tracer(
trace_name=self.run_name,
trace_type="chain",
project_name=self.project_name,
trace_id=self.run_id,
)

def set_run_name(self, name: str):
self.run_name = name

Expand Down
Loading