-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRAGengine.py
More file actions
234 lines (195 loc) · 8.15 KB
/
RAGengine.py
File metadata and controls
234 lines (195 loc) · 8.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import re
import asyncio
from dotenv import load_dotenv
from fastembed import TextEmbedding
# --- LANGCHAIN IMPORTS ---
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
VECTOR_INDEX_NAME = os.getenv("VECTOR_INDEX_NAME")
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
class RuleSearchEngine:
def __init__(self, db, verbose=False):
"""
Initialize the Async Rule Search Engine
Args:
db: The Motor database instance (from database.py)
verbose: Print debug logs
"""
self.verbose = verbose
self.db = db
self.collection = self.db["rules"] # Reuses the Motor collection
# Load Embedding Model (CPU bound, stays sync)
if self.verbose: print("Loading embedding model...")
self.embedding_model = TextEmbedding(model_name=MODEL_NAME)
# Initialize Groq LLM
if self.verbose: print("Initializing Groq...")
self.llm = ChatGroq(
temperature=0,
model_name="llama-3.1-8b-instant",
api_key=GROQ_API_KEY
)
async def search(self, query: str, limit: int = 3):
"""Async Search using Motor"""
# A. Regex Search (Async)
rule_match = re.search(r"rule\s+([\d\.]+)", query, re.IGNORECASE)
if rule_match:
target_rule = rule_match.group(1)
regex_pattern = rf"^{re.escape(target_rule)}($|\.)"
cursor = self.collection.find({
"metadata.rule_number": {"$regex": regex_pattern}
})
return await cursor.to_list(length=None)
# B. Page Search (Async)
page_match = re.search(r"page\s+(\d+)", query, re.IGNORECASE)
if page_match:
target_page = int(page_match.group(1))
cursor = self.collection.find({"metadata.page_number": target_page})
return await cursor.to_list(length=None)
# C. Vector Search
# Embeddings are CPU bound, so we run them in a thread to avoid blocking the loop
query_vector = await asyncio.to_thread(self._embed_query, query)
pipeline = [
{
"$vectorSearch": {
"index": VECTOR_INDEX_NAME,
"path": "embedding",
"queryVector": query_vector,
"numCandidates": 50,
"limit": limit
}
},
{
"$project": {
"_id": 1,
"text_content": 1,
"metadata": 1,
"score": {"$meta": "vectorSearchScore"}
}
}
]
# Motor aggregate (Async)
try:
cursor = self.collection.aggregate(pipeline)
return await cursor.to_list(length=None)
except Exception as e:
if self.verbose: print(f"Error in vector search: {e}")
return []
def _embed_query(self, query):
"""Helper to run synchronous embedding in a thread"""
generator = self.embedding_model.embed([query])
return list(generator)[0].tolist()
async def contextualize_query(self, user_input: str, chat_history: list):
"""
Uses LLM to rewrite the user query based on chat history.
Args:
user_input (str): The user's question
chat_history (list): List of HumanMessage/AIMessage objects
Returns:
str: Contextualized standalone question
"""
if not chat_history:
return user_input
contextualize_q_system_prompt = (
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as is."
)
prompt = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
])
chain = prompt | self.llm | StrOutputParser()
# Use ainvoke for Async LLM call
return await chain.ainvoke({
"chat_history": chat_history,
"input": user_input
})
async def generate_answer(self, user_query: str, documents: list, chat_history: list):
"""
Generates the final answer using Groq + Retrieved Documents
Args:
user_query (str): The user's question
documents (list): Retrieved documents from search
chat_history (list): List of HumanMessage/AIMessage objects
Returns:
tuple: (answer, context_text)
"""
context_text = ""
if not documents:
context_text = "No specific rules found in the database."
else:
for doc in documents:
meta = doc.get('metadata', {})
rule = meta.get('rule_number', 'N/A')
page = meta.get('page_number', 'N/A')
content = doc.get('text_content', '').strip()
context_text += f"[Source: Rule {rule}, Page {page}]: {content}\n\n"
qa_system_prompt = (
"You are an assistant for question-answering tasks. "
"Use the following pieces of retrieved context to answer "
"the question. If you don't know the answer, say that you "
"don't know. "
"ALWAYS cite the specific Rule Number or Page Number associated "
"with the information in your answer.\n\n"
"CONTEXT:\n{context}"
)
prompt = ChatPromptTemplate.from_messages([
("system", qa_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
])
chain = prompt | self.llm | StrOutputParser()
# Use ainvoke for Async LLM call
response = await chain.ainvoke({
"chat_history": chat_history,
"context": context_text,
"input": user_query
})
return response, context_text
async def ask(self, user_input: str, chat_history: list = None):
"""
Main method to ask a question and get an answer.
Handles contextualization, search, and answer generation.
Now STATELESS - caller manages chat history.
Args:
user_input (str): The user's question
chat_history (list): List of HumanMessage/AIMessage objects (optional)
Returns:
str: The AI's answer
"""
if chat_history is None:
chat_history = []
# 1. Contextualize (Async)
standalone_query = await self.contextualize_query(user_input, chat_history)
# 2. Retrieve Data (Async)
docs = await self.search(standalone_query)
# 3. Generate Answer (Async)
answer, _ = await self.generate_answer(user_input, docs, chat_history)
return answer
async def generate_chat_title(self, first_message: str):
"""
Generates a short, concise title (3-5 words) for the chat session
based on the user's first message.
"""
title_system_prompt = (
"You are a helpful assistant. Generate a very short, concise "
"title (maximum 5 words) for a chat session that starts with "
"the following message. Do not use quotes or markdown."
)
prompt = ChatPromptTemplate.from_messages([
("system", title_system_prompt),
("human", "{input}")
])
chain = prompt | self.llm | StrOutputParser()
try:
title = await chain.ainvoke({"input": first_message})
return title.strip()
except Exception:
return first_message[:30] + "..."