Skip to content

Commit

Permalink
add timing info ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
mdr223 committed Nov 7, 2023
1 parent 913d6ef commit f4e38bf
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dev-ci-cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- bug/investigate-timing-issues
jobs:
deploy-dev-system:
runs-on: ubuntu-latest
Expand Down
80 changes: 69 additions & 11 deletions A2rchi/interfaces/chat_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from A2rchi.utils.config_loader import Config_Loader
from A2rchi.utils.data_manager import DataManager
from A2rchi.utils.env import read_secret
from A2rchi.utils.sql import SQL_INSERT_CONVO, SQL_INSERT_FEEDBACK, SQL_QUERY_CONVO
from A2rchi.utils.sql import SQL_INSERT_CONVO, SQL_INSERT_FEEDBACK, SQL_INSERT_TIMING, SQL_QUERY_CONVO

from datetime import datetime
from pygments import highlight
Expand Down Expand Up @@ -193,7 +193,7 @@ def insert_conversation(self, conversation_id, user_message, a2rchi_message, is_
"""
print(" INFO - entered insert_conversation.")

# parse user message / a2rchi message if not None
# parse user message / a2rchi message
user_sender, user_content, user_msg_ts = user_message
a2rchi_sender, a2rchi_content, a2rchi_msg_ts = a2rchi_message

Expand Down Expand Up @@ -223,18 +223,57 @@ def insert_conversation(self, conversation_id, user_message, a2rchi_message, is_
self.cursor, self.conn = None, None

return message_ids

def insert_timing(self, message_id, timestamps):
"""
Store timing info to understand response profile.
"""
print(" INFO - entered insert_timing.")

# construct insert_tup
insert_tup = (
message_id,
timestamps['client_sent_msg_ts'],
timestamps['server_received_msg_ts'],
timestamps['lock_acquisition_ts'],
timestamps['vectorstore_update_ts'],
timestamps['query_convo_history_ts'],
timestamps['chain_finished_ts'],
timestamps['similarity_search_ts'],
timestamps['a2rchi_message_ts'],
timestamps['insert_convo_ts'],
timestamps['finish_call_ts'],
timestamps['server_response_msg_ts'],
timestamps['server_response_msg_ts'] - timestamps['server_received_msg_ts']
)

# create connection to database
self.conn = psycopg2.connect(**self.pg_config)
self.cursor = self.conn.cursor()
self.cursor.execute(SQL_INSERT_TIMING, insert_tup)
self.conn.commit()

# clean up database connection state
self.cursor.close()
self.conn.close()
self.cursor, self.conn = None, None


def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, msg_ts: datetime, client_msg_ts: float, client_timeout: float):
def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, server_received_msg_ts: datetime, client_msg_ts: float, client_timeout: float):
"""
Execute the chat functionality.
"""
# store timestamps for code profiling information
timestamps = {}

self.lock.acquire()
timestamps['lock_acquisition_ts'] = datetime.now()
try:
# update vector store through data manager; will only do something if new files have been added
print("INFO - acquired lock file update vectorstore")

self.data_manager.update_vectorstore()
timestamps['vectorstore_update_ts'] = datetime.now()

except Exception as e:
print(f"ERROR - {str(e)}")
Expand All @@ -253,6 +292,7 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m

# fetch history given conversation_id
history = self.query_conversation_history(conversation_id)
timestamps['query_convo_history_ts'] = datetime.now()

# if this is a chat refresh / message regeneration; remove previous contiguous non-A2rchi message(s)
if is_refresh:
Expand All @@ -261,13 +301,14 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m

# guard call to LLM; if timestamp from message is more than timeout secs in the past;
# return error=True and do not generate response as the client will have timed out
if msg_ts.timestamp() - client_msg_ts > client_timeout:
if server_received_msg_ts.timestamp() - client_msg_ts > client_timeout:
return None, None, None, True

# run chain to get result; limit users to 1000 queries per conversation; refreshing browser starts new conversation
if len(history) < QUERY_LIMIT:
full_history = history + [(sender, content)] if not is_refresh else history
result = self.chain(full_history)
timestamps['chain_finished_ts'] = datetime.now()
else:
# for now let's return a timeout error, as returning a different
# error message would require handling new message_ids param. properly
Expand All @@ -281,6 +322,7 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m
# - low score means very close (it's a distance between embedding vectors approximated
# by an approximate k-nearest neighbors algorithm called HNSW)
score = self.chain.similarity_search(content)
timestamps['similarity_search_ts'] = datetime.now()

# load the present list of sources
try:
Expand All @@ -305,10 +347,12 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m
output = "<p>" + self.format_code_in_text(result["answer"]) + "</p>"

# write user message and A2rchi response to database
user_message = (sender, content, msg_ts)
a2rchi_message = ("A2rchi", output, datetime.now())
timestamps['a2rchi_message_ts'] = datetime.now()
user_message = (sender, content, server_received_msg_ts)
a2rchi_message = ("A2rchi", output, timestamps['a2rchi_message_ts'])

message_ids = self.insert_conversation(conversation_id, user_message, a2rchi_message, is_refresh)
timestamps['insert_convo_ts'] = datetime.now().timestamp()

except Exception as e:
print(f"ERROR - {str(e)}")
Expand All @@ -318,8 +362,10 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m
self.cursor.close()
if self.conn is not None:
self.conn.close()

timestamps['finish_call_ts'] = datetime.now().timestamp()

return output, conversation_id, message_ids, False
return output, conversation_id, message_ids, timestamps, False


class FlaskAppWrapper(object):
Expand Down Expand Up @@ -369,25 +415,37 @@ def get_chat_response(self):
discussion ID (either None or an integer)
"""
# compute timestamp at which message was received by server
msg_ts = datetime.now()
server_received_msg_ts = datetime.now()

# get user input and conversation_id from the request
message = request.json.get('last_message')
conversation_id = request.json.get('conversation_id')
is_refresh = request.json.get('is_refresh')
client_msg_ts = request.json.get('client_msg_ts') / 1000
client_sent_msg_ts = request.json.get('client_sent_msg_ts') / 1000
client_timeout = request.json.get('client_timeout') / 1000

# query the chat and return the results.
print(" INFO - Calling the ChatWrapper()")
response, conversation_id, message_ids, error = self.chat(message, conversation_id, is_refresh, msg_ts, client_msg_ts, client_timeout)
response, conversation_id, message_ids, timestamps, error = self.chat(message, conversation_id, is_refresh, server_received_msg_ts, client_sent_msg_ts, client_timeout)

# handle timeout error
if error:
return jsonify({'error': 'client timeout'}), 408

# compute timestamp at which message was returned to client
timestamps['server_response_msg_ts'] = datetime.now()

# store timing info for this message
self.insert_timing(message_ids[-1], timestamps)

# otherwise return A2rchi's response to client
return jsonify({'response': response, 'conversation_id': conversation_id, 'a2rchi_msg_id': message_ids[-1]})
return jsonify({
'response': response,
'conversation_id': conversation_id,
'a2rchi_msg_id': message_ids[-1],
'server_response_msg_ts': timestamps['server_response_msg_ts'].timestamp(),
'final_response_msg_ts': datetime.now().timestamp(),
})

def index(self):
return render_template('index.html')
Expand Down
2 changes: 1 addition & 1 deletion A2rchi/interfaces/chat_app/static/script.js-template
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const getChatResponse = async (incomingChatDiv, isRefresh=false) => {
last_message: conversation.slice(-1),
conversation_id: conversation_id,
is_refresh: isRefresh,
client_msg_ts: Date.now(),
client_sent_msg_ts: Date.now(),
client_timeout: DEFAULT_TIMEOUT_SECS * 1000
}),
timeout: DEFAULT_TIMEOUT_SECS * 1000
Expand Down
39 changes: 36 additions & 3 deletions A2rchi/utils/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,39 @@
"""SQL queries used by A2rchi"""
SQL_INSERT_CONVO = "INSERT INTO conversations (conversation_id, sender, content, ts) VALUES %s RETURNING message_id;"
SQL_INSERT_CONVO = """INSERT INTO conversations (
conversation_id, sender, content, ts
)
VALUES %s
RETURNING message_id;
"""

SQL_INSERT_FEEDBACK = "INSERT INTO feedback (mid, feedback_ts, feedback, feedback_msg, incorrect, unhelpful, inappropriate) VALUES (%s, %s, %s, %s, %s, %s, %s);"
SQL_INSERT_FEEDBACK = """INSERT INTO feedback (
mid, feedback_ts, feedback, feedback_msg, incorrect, unhelpful, inappropriate
)
VALUES (%s, %s, %s, %s, %s, %s, %s);
"""

SQL_QUERY_CONVO = "SELECT sender, content FROM conversations WHERE conversation_id = %s ORDER BY message_id ASC;"
SQL_QUERY_CONVO = """SELECT (
sender, content
)
FROM conversations
WHERE conversation_id = %s
ORDER BY message_id ASC;
"""

SQL_INSERT_TIMING = """INSERT INTO timing (
mid
client_sent_msg_ts,
server_received_msg_ts,
lock_acquisition_ts,
vectorstore_update_ts,
query_convo_history_ts,
chain_finished_ts,
similarity_search_ts,
a2rchi_message_ts,
insert_convo_ts,
finish_call_ts,
server_response_msg_ts,
msg_duration
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
19 changes: 18 additions & 1 deletion deploy/dev/dev-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,21 @@ CREATE TABLE IF NOT EXISTS feedback (
inappropriate BOOLEAN,
PRIMARY KEY (mid, feedback_ts),
FOREIGN KEY (mid) REFERENCES conversations(message_id)
);
);
CREATE TABLE IF NOT EXISTS timing (
mid INTEGER NOT NULL,
client_sent_msg_ts TIMESTAMP NOT NULL,
server_received_msg_ts TIMESTAMP NOT NULL,
lock_acquisition_ts TIMESTAMP NOT NULL,
vectorstore_update_ts TIMESTAMP NOT NULL,
query_convo_history_ts TIMESTAMP NOT NULL,
chain_finished_ts TIMESTAMP NOT NULL,
similarity_search_ts TIMESTAMP NOT NULL,
a2rchi_message_ts TIMESTAMP NOT NULL,
insert_convo_ts TIMESTAMP NOT NULL,
finish_call_ts TIMESTAMP NOT NULL,
server_response_msg_ts TIMESTAMP NOT NULL,
msg_duration INTERVAL SECOND NOT NULL,
PRIMARY KEY (mid),
FOREIGN KEY (mid) REFERENCES conversations(message_id)
);

0 comments on commit f4e38bf

Please sign in to comment.