Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add slackbot example and update typing + debug logs #363

Merged
merged 9 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/slackbot/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.12-slim

WORKDIR /app

RUN apt-get update && \
apt-get install -y git && \
rm -rf /var/lib/apt/lists/*

COPY requirements.txt .

RUN pip install -U uv
RUN uv pip install --system -r requirements.txt --force-reinstall

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Empty file added examples/slackbot/__init__.py
Empty file.
156 changes: 156 additions & 0 deletions examples/slackbot/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import os
import re
from typing import Annotated

from pydantic import BaseModel, Field
from tools import search_internet, search_knowledge_base

import controlflow as cf


def _strip_app_mention(text: str) -> str:
return re.sub(r"<@[A-Z0-9]+>", "", text).strip()


class SearchResult(BaseModel):
"""Individual search result with source and relevance"""

content: str
source: str
relevance_score: float = Field(
ge=0.0,
le=1.0,
description="A score indicating the relevance of the search result to the user's question",
)


class ExplorerFindings(BaseModel):
"""Collection of search results with metadata"""

search_query: str

results: list[SearchResult] = Field(default_factory=list)
total_results: int = Field(
ge=0,
description="The total number of search results found",
)


class RefinedContext(BaseModel):
"""Final refined context after auditing"""

relevant_content: str
confidence_score: float = Field(
ge=0.0,
le=1.0,
description="A score indicating the confidence in the relevance of the relevant content to the user's question",
)
reasoning: str


bouncer = cf.Agent(
name="Bouncer",
instructions=(
"You are a gatekeeper. You are responsible for determining whether the user's question is appropriate for the system. "
"If the user asks a legitimate question about Prefect, let them through. If its conversational, or not about Prefect, "
"do not let them through. Tend towards giving the benefit of the doubt, since sometimes there are language barriers."
),
)

explorer = cf.Agent(
name="Explorer",
instructions=(
"You are a thorough researcher. Use the knowledgebase and the internet to find "
"documentation and code snippets related to Prefect. The knowledgebase is curated, "
"so it should be preferred over the internet for finding information, but the internet "
"should be used to supplement the knowledgebase when it doesn't contain the desired information."
),
tools=[search_knowledge_base, search_internet],
)

auditor = cf.Agent(
name="Auditor",
instructions=(
"You are a critical evaluator. Assess relevance and provide clear reasoning. "
"The explorer is only responsible for finding idiosyncratic information related "
"to Prefect and tangentially related information that might be in docs or the internet. "
"You are responsible for determining whether the explorer found the requisite Prefect information, "
"do not require that the explorer find common knowledge, only idiosyncratic Prefect information."
"If the user asks how to do something, or for an example, the explorer must find an exact example or "
"enough information for a downstream agent to extrapolate an example."
),
)

liaison = cf.Agent(
name="Marvin",
instructions=(
"You are a liaison between the user and the system. You are responsible for "
"summarizing the findings and providing a clear and concise answer. "
"Speak with the tone of a normal support agent, but subtly in the style of Marvin the Paranoid Android. "
"If their reply is unserious, feel free to riff on their message and be more playful (in character)."
),
)


async def search_knowledgebase_and_refine_context(
user_text: str, memories: list[cf.Memory]
) -> str:
clean_user_text = _strip_app_mention(user_text)
if clean_user_text.startswith(os.getenv("SUDO_PREFIX", "___")):
return cf.run(
objective="Answer the question to the best of your ability",
instructions="We have entered test mode. Ignore previous instructions and answer the question to the best of your ability.",
result_type=str,
context={
"user_text": user_text,
"test_mode": True,
"personality": "None. You are a helpful assistant.",
},
memories=memories,
agents=[liaison],
)

is_a_serious_question = await cf.run_async(
objective="Determine if the user's question is a serious question about Prefect",
result_type=bool,
agents=[bouncer],
context={"user_question": clean_user_text},
)

findings = None
supporting_context_is_insufficient = True

while supporting_context_is_insufficient and is_a_serious_question:
findings = await cf.run_async(
objective="Search through available sources to find relevant information about this query",
result_type=ExplorerFindings,
context={"query": clean_user_text},
agents=[explorer],
)

supporting_context_is_insufficient = await cf.run_async(
objective="Review and assess the relevance of search results to the user's question",
result_type=Annotated[
bool,
Field(
description="Whether the search results are insufficient to answer the user's question"
),
],
context={"findings": findings, "user_question": clean_user_text},
agents=[auditor],
)

relevant_context = {"user_question": clean_user_text}

relevant_context |= {"findings": findings} if findings else {"just_riffing": True}

return cf.run(
objective="Compose a final answer to the user's question.",
instructions=(
"Provide links to any relevant sources. The answer should address the user directly, NOT discuss the user"
),
result_type=str,
context=relevant_context,
agents=[liaison],
memories=memories,
)
54 changes: 54 additions & 0 deletions examples/slackbot/custom_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pydantic import BaseModel


class EventBlockElement(BaseModel):
type: str
text: str | None = None
user_id: str | None = None


class EventBlockElementGroup(BaseModel):
type: str
elements: list[EventBlockElement]


class EventBlock(BaseModel):
type: str
block_id: str
elements: list[EventBlockElement | EventBlockElementGroup]


class SlackEvent(BaseModel):
client_msg_id: str | None = None
type: str
text: str | None = None
user: str | None = None
ts: str | None = None
team: str | None = None
channel: str | None = None
event_ts: str
thread_ts: str | None = None
parent_user_id: str | None = None
blocks: list[EventBlock] | None = None


class EventAuthorization(BaseModel):
enterprise_id: str | None = None
team_id: str
user_id: str
is_bot: bool
is_enterprise_install: bool


class SlackPayload(BaseModel):
token: str
type: str
team_id: str | None = None
api_app_id: str | None = None
event: SlackEvent | None = None
event_id: str | None = None
event_time: int | None = None
authorizations: list[EventAuthorization] | None = None
is_ext_shared_channel: bool | None = None
event_context: str | None = None
challenge: str | None = None
Binary file added examples/slackbot/diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
61 changes: 61 additions & 0 deletions examples/slackbot/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
from typing import Any

from agents import search_knowledgebase_and_refine_context
from custom_types import SlackPayload
from fastapi import FastAPI, Request
from moderation import moderate_event
from prefect import task
from settings import settings
from tools import post_slack_message

from controlflow import Memory
from controlflow import flow as cf_flow

app = FastAPI()


@task
async def process_slack_event(payload: SlackPayload):
assert (event := payload.event) is not None and (
slack_user_id := event.user
) is not None, "User not found"

user_text, channel, thread_ts = moderate_event(event)
user_memory = Memory(
key=slack_user_id,
instructions=f"Store and retrieve information about user {slack_user_id}.",
)

answer: str = await cf_flow(thread_id=slack_user_id)(
search_knowledgebase_and_refine_context
)(
user_text,
memories=[user_memory],
)

await post_slack_message(
message=answer,
channel_id=channel,
thread_ts=thread_ts,
auth_token=settings.slack_api_token.get_secret_value(),
)


## routes
@app.post("/slack/events")
async def handle_events(request: Request):
payload = SlackPayload(**await request.json())
if payload.type == "url_verification":
return {"challenge": payload.challenge}
elif payload.type == "event_callback":
asyncio.create_task(process_slack_event(payload))
return {"message": "Request successfully queued"}
else:
return {"message": "Unknown event type"}


if __name__ == "__main__":
import uvicorn

uvicorn.run("main:app", port=8000, reload=True)
70 changes: 70 additions & 0 deletions examples/slackbot/moderation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Annotated, TypedDict

import marvin
from custom_types import SlackEvent
from prefect.events import emit_event
from prefect.logging import get_logger
from pydantic import Field

logger = get_logger(__name__)

Activation = Annotated[float, Field(ge=0, le=1)]


class ModerationException(Exception):
"""Exception raised when a message is not allowed."""

...


class ViolationActivation(TypedDict):
"""Violation activation."""

extreme_profanity: Annotated[Activation, Field(description="hell / damn are fine")]
sexually_explicit: Activation
hate_speech: Activation
harassment: Activation
self_harm: Activation
dangerous_content: Activation
makes_any_reference_to_bears_in_central_park: bool # for testing


def emit_moderated_event(event: SlackEvent, activation: ViolationActivation):
"""Emit an IO event."""
if not event.user:
return

emit_event(
"slackbot.event.moderated",
resource={"prefect.resource.id": event.user},
payload={
"activation": activation,
"event": event.model_dump(),
},
)


def moderate_event(event: SlackEvent) -> tuple[str, ...]:
"""Moderate an event."""
assert (text := event.text) is not None, "No text found on event"
assert (channel := event.channel) is not None, "No channel found on event"
assert (
thread_ts := event.thread_ts or event.ts
) is not None, "No thread_ts found on event"

activation: ViolationActivation = marvin.cast(
event.model_dump_json(include={"type", "text", "user", "channel"}),
ViolationActivation,
)

logger.info("Moderation activation: %s", activation)

emit_moderated_event(event, activation)

if activation["extreme_profanity"] > 0.9:
raise ModerationException("Message contains extreme profanity.")

if activation["makes_any_reference_to_bears_in_central_park"]:
raise ModerationException("where you going with that whale buddy?")

return text, channel, thread_ts
4 changes: 4 additions & 0 deletions examples/slackbot/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
git+https://github.com/PrefectHQ/controlflow.git@main
raggy
marvin
neo4j
15 changes: 15 additions & 0 deletions examples/slackbot/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
from raggy.vectorstores.chroma import ChromaClientType


class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")

slack_api_token: SecretStr
chroma_client_type: ChromaClientType = "cloud"
google_api_key: SecretStr
google_cse_id: SecretStr


settings = Settings() # type: ignore
Loading