diff --git a/.github/workflows/dev-ci-cd.yaml b/.github/workflows/dev-ci-cd.yaml index 0bde2784..ac06b0ce 100644 --- a/.github/workflows/dev-ci-cd.yaml +++ b/.github/workflows/dev-ci-cd.yaml @@ -4,6 +4,7 @@ on: push: branches: - main + - bug/investigate-timing-issues jobs: deploy-dev-system: runs-on: ubuntu-latest diff --git a/A2rchi/interfaces/chat_app/app.py b/A2rchi/interfaces/chat_app/app.py index 451f3a9e..59ee38e8 100644 --- a/A2rchi/interfaces/chat_app/app.py +++ b/A2rchi/interfaces/chat_app/app.py @@ -2,7 +2,7 @@ from A2rchi.utils.config_loader import Config_Loader from A2rchi.utils.data_manager import DataManager from A2rchi.utils.env import read_secret -from A2rchi.utils.sql import SQL_INSERT_CONVO, SQL_INSERT_FEEDBACK, SQL_QUERY_CONVO +from A2rchi.utils.sql import SQL_INSERT_CONVO, SQL_INSERT_FEEDBACK, SQL_INSERT_TIMING, SQL_QUERY_CONVO from datetime import datetime from pygments import highlight @@ -193,7 +193,7 @@ def insert_conversation(self, conversation_id, user_message, a2rchi_message, is_ """ print(" INFO - entered insert_conversation.") - # parse user message / a2rchi message if not None + # parse user message / a2rchi message user_sender, user_content, user_msg_ts = user_message a2rchi_sender, a2rchi_content, a2rchi_msg_ts = a2rchi_message @@ -223,18 +223,57 @@ def insert_conversation(self, conversation_id, user_message, a2rchi_message, is_ self.cursor, self.conn = None, None return message_ids + + def insert_timing(self, message_id, timestamps): + """ + Store timing info to understand response profile. + """ + print(" INFO - entered insert_timing.") + + # construct insert_tup + insert_tup = ( + message_id, + timestamps['client_sent_msg_ts'], + timestamps['server_received_msg_ts'], + timestamps['lock_acquisition_ts'], + timestamps['vectorstore_update_ts'], + timestamps['query_convo_history_ts'], + timestamps['chain_finished_ts'], + timestamps['similarity_search_ts'], + timestamps['a2rchi_message_ts'], + timestamps['insert_convo_ts'], + timestamps['finish_call_ts'], + timestamps['server_response_msg_ts'], + timestamps['server_response_msg_ts'] - timestamps['server_received_msg_ts'] + ) + + # create connection to database + self.conn = psycopg2.connect(**self.pg_config) + self.cursor = self.conn.cursor() + self.cursor.execute(SQL_INSERT_TIMING, insert_tup) + self.conn.commit() + + # clean up database connection state + self.cursor.close() + self.conn.close() + self.cursor, self.conn = None, None - def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, msg_ts: datetime, client_msg_ts: float, client_timeout: float): + def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, server_received_msg_ts: datetime, client_msg_ts: float, client_timeout: float): """ Execute the chat functionality. """ + # store timestamps for code profiling information + timestamps = {} + self.lock.acquire() + timestamps['lock_acquisition_ts'] = datetime.now() try: # update vector store through data manager; will only do something if new files have been added print("INFO - acquired lock file update vectorstore") self.data_manager.update_vectorstore() + timestamps['vectorstore_update_ts'] = datetime.now() except Exception as e: print(f"ERROR - {str(e)}") @@ -253,6 +292,7 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m # fetch history given conversation_id history = self.query_conversation_history(conversation_id) + timestamps['query_convo_history_ts'] = datetime.now() # if this is a chat refresh / message regeneration; remove previous contiguous non-A2rchi message(s) if is_refresh: @@ -261,13 +301,14 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m # guard call to LLM; if timestamp from message is more than timeout secs in the past; # return error=True and do not generate response as the client will have timed out - if msg_ts.timestamp() - client_msg_ts > client_timeout: + if server_received_msg_ts.timestamp() - client_msg_ts > client_timeout: return None, None, None, True # run chain to get result; limit users to 1000 queries per conversation; refreshing browser starts new conversation if len(history) < QUERY_LIMIT: full_history = history + [(sender, content)] if not is_refresh else history result = self.chain(full_history) + timestamps['chain_finished_ts'] = datetime.now() else: # for now let's return a timeout error, as returning a different # error message would require handling new message_ids param. properly @@ -281,6 +322,7 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m # - low score means very close (it's a distance between embedding vectors approximated # by an approximate k-nearest neighbors algorithm called HNSW) score = self.chain.similarity_search(content) + timestamps['similarity_search_ts'] = datetime.now() # load the present list of sources try: @@ -305,10 +347,12 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m output = "
" + self.format_code_in_text(result["answer"]) + "
" # write user message and A2rchi response to database - user_message = (sender, content, msg_ts) - a2rchi_message = ("A2rchi", output, datetime.now()) + timestamps['a2rchi_message_ts'] = datetime.now() + user_message = (sender, content, server_received_msg_ts) + a2rchi_message = ("A2rchi", output, timestamps['a2rchi_message_ts']) message_ids = self.insert_conversation(conversation_id, user_message, a2rchi_message, is_refresh) + timestamps['insert_convo_ts'] = datetime.now().timestamp() except Exception as e: print(f"ERROR - {str(e)}") @@ -318,8 +362,10 @@ def __call__(self, message: List[str], conversation_id: int, is_refresh: bool, m self.cursor.close() if self.conn is not None: self.conn.close() + + timestamps['finish_call_ts'] = datetime.now().timestamp() - return output, conversation_id, message_ids, False + return output, conversation_id, message_ids, timestamps, False class FlaskAppWrapper(object): @@ -369,25 +415,37 @@ def get_chat_response(self): discussion ID (either None or an integer) """ # compute timestamp at which message was received by server - msg_ts = datetime.now() + server_received_msg_ts = datetime.now() # get user input and conversation_id from the request message = request.json.get('last_message') conversation_id = request.json.get('conversation_id') is_refresh = request.json.get('is_refresh') - client_msg_ts = request.json.get('client_msg_ts') / 1000 + client_sent_msg_ts = request.json.get('client_sent_msg_ts') / 1000 client_timeout = request.json.get('client_timeout') / 1000 # query the chat and return the results. print(" INFO - Calling the ChatWrapper()") - response, conversation_id, message_ids, error = self.chat(message, conversation_id, is_refresh, msg_ts, client_msg_ts, client_timeout) + response, conversation_id, message_ids, timestamps, error = self.chat(message, conversation_id, is_refresh, server_received_msg_ts, client_sent_msg_ts, client_timeout) # handle timeout error if error: return jsonify({'error': 'client timeout'}), 408 + # compute timestamp at which message was returned to client + timestamps['server_response_msg_ts'] = datetime.now() + + # store timing info for this message + self.insert_timing(message_ids[-1], timestamps) + # otherwise return A2rchi's response to client - return jsonify({'response': response, 'conversation_id': conversation_id, 'a2rchi_msg_id': message_ids[-1]}) + return jsonify({ + 'response': response, + 'conversation_id': conversation_id, + 'a2rchi_msg_id': message_ids[-1], + 'server_response_msg_ts': timestamps['server_response_msg_ts'].timestamp(), + 'final_response_msg_ts': datetime.now().timestamp(), + }) def index(self): return render_template('index.html') diff --git a/A2rchi/interfaces/chat_app/static/script.js-template b/A2rchi/interfaces/chat_app/static/script.js-template index 43c88630..27eb10a8 100644 --- a/A2rchi/interfaces/chat_app/static/script.js-template +++ b/A2rchi/interfaces/chat_app/static/script.js-template @@ -100,7 +100,7 @@ const getChatResponse = async (incomingChatDiv, isRefresh=false) => { last_message: conversation.slice(-1), conversation_id: conversation_id, is_refresh: isRefresh, - client_msg_ts: Date.now(), + client_sent_msg_ts: Date.now(), client_timeout: DEFAULT_TIMEOUT_SECS * 1000 }), timeout: DEFAULT_TIMEOUT_SECS * 1000 diff --git a/A2rchi/utils/sql.py b/A2rchi/utils/sql.py index c2b780db..1d6a2e41 100644 --- a/A2rchi/utils/sql.py +++ b/A2rchi/utils/sql.py @@ -1,6 +1,39 @@ """SQL queries used by A2rchi""" -SQL_INSERT_CONVO = "INSERT INTO conversations (conversation_id, sender, content, ts) VALUES %s RETURNING message_id;" +SQL_INSERT_CONVO = """INSERT INTO conversations ( + conversation_id, sender, content, ts +) +VALUES %s +RETURNING message_id; +""" -SQL_INSERT_FEEDBACK = "INSERT INTO feedback (mid, feedback_ts, feedback, feedback_msg, incorrect, unhelpful, inappropriate) VALUES (%s, %s, %s, %s, %s, %s, %s);" +SQL_INSERT_FEEDBACK = """INSERT INTO feedback ( + mid, feedback_ts, feedback, feedback_msg, incorrect, unhelpful, inappropriate +) +VALUES (%s, %s, %s, %s, %s, %s, %s); +""" -SQL_QUERY_CONVO = "SELECT sender, content FROM conversations WHERE conversation_id = %s ORDER BY message_id ASC;" +SQL_QUERY_CONVO = """SELECT ( + sender, content +) +FROM conversations +WHERE conversation_id = %s +ORDER BY message_id ASC; +""" + +SQL_INSERT_TIMING = """INSERT INTO timing ( + mid + client_sent_msg_ts, + server_received_msg_ts, + lock_acquisition_ts, + vectorstore_update_ts, + query_convo_history_ts, + chain_finished_ts, + similarity_search_ts, + a2rchi_message_ts, + insert_convo_ts, + finish_call_ts, + server_response_msg_ts, + msg_duration +) +VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); +""" diff --git a/deploy/dev/dev-init.sql b/deploy/dev/dev-init.sql index 90d49c61..291b7f30 100644 --- a/deploy/dev/dev-init.sql +++ b/deploy/dev/dev-init.sql @@ -16,4 +16,21 @@ CREATE TABLE IF NOT EXISTS feedback ( inappropriate BOOLEAN, PRIMARY KEY (mid, feedback_ts), FOREIGN KEY (mid) REFERENCES conversations(message_id) -); \ No newline at end of file +); +CREATE TABLE IF NOT EXISTS timing ( + mid INTEGER NOT NULL, + client_sent_msg_ts TIMESTAMP NOT NULL, + server_received_msg_ts TIMESTAMP NOT NULL, + lock_acquisition_ts TIMESTAMP NOT NULL, + vectorstore_update_ts TIMESTAMP NOT NULL, + query_convo_history_ts TIMESTAMP NOT NULL, + chain_finished_ts TIMESTAMP NOT NULL, + similarity_search_ts TIMESTAMP NOT NULL, + a2rchi_message_ts TIMESTAMP NOT NULL, + insert_convo_ts TIMESTAMP NOT NULL, + finish_call_ts TIMESTAMP NOT NULL, + server_response_msg_ts TIMESTAMP NOT NULL, + msg_duration INTERVAL SECOND NOT NULL, + PRIMARY KEY (mid), + FOREIGN KEY (mid) REFERENCES conversations(message_id) +);