Skip to content

Commit

Permalink
fix: Add Missing Logs (#1609)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavan-Microsoft authored Jan 2, 2025
1 parent 03f52b3 commit e92eba1
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 4 deletions.
5 changes: 4 additions & 1 deletion code/backend/batch/batch_push_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ def _get_file_name_from_message(message_body) -> str:
)
def batch_push_results(msg: func.QueueMessage) -> None:
message_body = json.loads(msg.get_body().decode("utf-8"))
logger.debug("Process Document Event queue function triggered: %s", message_body)
logger.info("Process Document Event queue function triggered: %s", message_body)

event_type = message_body.get("eventType", "")
# We handle "" in this scenario for backwards compatibility
# This function is primarily triggered by an Event Grid queue message from the blob storage
# However, it can also be triggered using a legacy schema from BatchStartProcessing
if event_type in ("", "Microsoft.Storage.BlobCreated"):
logger.info("Handling 'Blob Created' event with message body: %s", message_body)
_process_document_created_event(message_body)

elif event_type == "Microsoft.Storage.BlobDeleted":
logger.info("Handling 'Blob Deleted' event with message body: %s", message_body)
_process_document_deleted_event(message_body)

else:
logger.exception("Received an unrecognized event type: %s", event_type)
raise NotImplementedError(f"Unknown event type received: {event_type}")


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.identity import DefaultAzureCredential
import html
import traceback
from .env_helper import EnvHelper

logger = logging.getLogger(__name__)


class AzureFormRecognizerClient:
def __init__(self) -> None:
Expand Down Expand Up @@ -75,6 +78,8 @@ def begin_analyze_document_from_url(
model_id = "prebuilt-layout" if use_layout else "prebuilt-read"

try:
logger.info("Method begin_analyze_document_from_url started")
logger.info(f"Model ID selected: {model_id}")
poller = self.document_analysis_client.begin_analyze_document_from_url(
model_id, document_url=source_url
)
Expand Down Expand Up @@ -144,4 +149,7 @@ def begin_analyze_document_from_url(

return page_map
except Exception as e:
logger.exception(f"Exception in begin_analyze_document_from_url: {e}")
raise ValueError(f"Error: {traceback.format_exc()}. Error: {e}")
finally:
logger.info("Method begin_analyze_document_from_url ended")
8 changes: 7 additions & 1 deletion code/backend/batch/utilities/helpers/config/config_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,27 @@ def _set_new_config_properties(config: dict, default_config: dict):
@staticmethod
@functools.cache
def get_active_config_or_default():
logger.info("Method get_active_config_or_default started")
env_helper = EnvHelper()
config = ConfigHelper.get_default_config()

if env_helper.LOAD_CONFIG_FROM_BLOB_STORAGE:
logger.info("Loading configuration from Blob Storage")
blob_client = AzureBlobStorageClient(container_name=CONFIG_CONTAINER_NAME)

if blob_client.file_exists(CONFIG_FILE_NAME):
logger.info("Configuration file found in Blob Storage")
default_config = config
config_file = blob_client.download_file(CONFIG_FILE_NAME)
config = json.loads(config_file)

ConfigHelper._set_new_config_properties(config, default_config)
else:
logger.info("Returning default config")
logger.info(
"Configuration file not found in Blob Storage, using default configuration"
)

logger.info("Method get_active_config_or_default ended")
return Config(config)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ class IntegratedVectorizationEmbedder(EmbedderBase):
def __init__(self, env_helper: EnvHelper):
self.env_helper = env_helper
self.llm_helper: LLMHelper = LLMHelper()
logger.info("Initialized IntegratedVectorizationEmbedder.")

def embed_file(self, source_url: str, file_name: str = None):
logger.info(
f"Starting embed_file for source_url: {source_url}, file_name: {file_name}."
)
self.process_using_integrated_vectorization(source_url=source_url)

def process_using_integrated_vectorization(self, source_url: str):
logger.info(f"Starting integrated vectorization for source_url: {source_url}.")
config = ConfigHelper.get_active_config_or_default()
try:
search_datasource = AzureSearchDatasource(self.env_helper)
Expand All @@ -35,14 +40,20 @@ def process_using_integrated_vectorization(self, source_url: str):
self.env_helper.AZURE_SEARCH_INDEXER_NAME,
skillset_name=search_skillset_result.name,
)
logger.info("Integrated vectorization process completed successfully.")
return indexer_result
except Exception as e:
logger.error(f"Error processing {source_url}: {e}")
raise e

def reprocess_all(self):
logger.info("Starting reprocess_all operation.")
search_indexer = AzureSearchIndexer(self.env_helper)
if search_indexer.indexer_exists(self.env_helper.AZURE_SEARCH_INDEXER_NAME):
logger.info(
f"Running indexer: {self.env_helper.AZURE_SEARCH_INDEXER_NAME}."
)
search_indexer.run_indexer(self.env_helper.AZURE_SEARCH_INDEXER_NAME)
else:
logger.info("Indexer does not exist. Starting full processing.")
self.process_using_integrated_vectorization(source_url="all")
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

class PostgresEmbedder(EmbedderBase):
def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
logger.info("Initializing PostgresEmbedder.")
self.env_helper = env_helper
self.llm_helper = LLMHelper()
self.azure_postgres_helper = AzurePostgresHelper()
Expand All @@ -33,6 +34,7 @@ def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
self.embedding_configs[ext] = processor

def embed_file(self, source_url: str, file_name: str):
logger.info(f"Embedding file: {file_name} from source: {source_url}")
file_extension = file_name.split(".")[-1].lower()
embedding_config = self.embedding_configs.get(file_extension)
self.__embed(
Expand All @@ -48,32 +50,42 @@ def embed_file(self, source_url: str, file_name: str):
def __embed(
self, source_url: str, file_extension: str, embedding_config: EmbeddingConfig
):
logger.info(f"Starting embedding process for source: {source_url}")
documents_to_upload: List[SourceDocument] = []
if (
embedding_config.use_advanced_image_processing
and file_extension
in self.config.get_advanced_image_processing_image_types()
):
logger.error(
"Advanced image processing is not supported in PostgresEmbedder."
)
raise NotImplementedError(
"Advanced image processing is not supported in PostgresEmbedder."
)
else:
logger.info(f"Loading documents from source: {source_url}")
documents: List[SourceDocument] = self.document_loading.load(
source_url, embedding_config.loading
)
documents = self.document_chunking.chunk(
documents, embedding_config.chunking
)
logger.info("Chunked into document chunks.")

for document in documents:
documents_to_upload.append(self.__convert_to_search_document(document))

if documents_to_upload:
logger.info(
f"Uploading {len(documents_to_upload)} documents to vector store."
)
self.azure_postgres_helper.create_vector_store(documents_to_upload)
else:
logger.warning("No documents to upload.")

def __convert_to_search_document(self, document: SourceDocument):
logger.info(f"Generating embeddings for document ID: {document.id}")
embedded_content = self.llm_helper.generate_embeddings(document.content)
metadata = {
"id": document.id,
Expand All @@ -84,6 +96,7 @@ def __convert_to_search_document(self, document: SourceDocument):
"offset": document.offset,
"page_number": document.page_number,
}
logger.info(f"Metadata generated for document ID: {document.id}")
return {
"id": document.id,
"content": document.content,
Expand Down
13 changes: 13 additions & 0 deletions code/backend/batch/utilities/helpers/embedders/push_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class PushEmbedder(EmbedderBase):
def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
logger.info("Initializing PushEmbedder")
self.env_helper = env_helper
self.llm_helper = LLMHelper()
self.azure_search_helper = AzureSearchHelper()
Expand All @@ -33,11 +34,14 @@ def __init__(self, blob_client: AzureBlobStorageClient, env_helper: EnvHelper):
self.blob_client = blob_client
self.config = ConfigHelper.get_active_config_or_default()
self.embedding_configs = {}
logger.info("Loading document processors")
for processor in self.config.document_processors:
ext = processor.document_type.lower()
self.embedding_configs[ext] = processor
logger.info("Document processors loaded")

def embed_file(self, source_url: str, file_name: str):
logger.info(f"Embedding file: {file_name} from URL: {source_url}")
file_extension = file_name.split(".")[-1].lower()
embedding_config = self.embedding_configs.get(file_extension)
self.__embed(
Expand All @@ -46,19 +50,22 @@ def embed_file(self, source_url: str, file_name: str):
embedding_config=embedding_config,
)
if file_extension != "url":
logger.info(f"Upserting blob metadata for file: {file_name}")
self.blob_client.upsert_blob_metadata(
file_name, {"embeddings_added": "true"}
)

def __embed(
self, source_url: str, file_extension: str, embedding_config: EmbeddingConfig
):
logger.info(f"Processing embedding for file extension: {file_extension}")
documents_to_upload: List[SourceDocument] = []
if (
embedding_config.use_advanced_image_processing
and file_extension
in self.config.get_advanced_image_processing_image_types()
):
logger.info(f"Using advanced image processing for: {source_url}")
caption = self.__generate_image_caption(source_url)
caption_vector = self.llm_helper.generate_embeddings(caption)

Expand All @@ -69,6 +76,7 @@ def __embed(
)
)
else:
logger.info(f"Loading documents from source: {source_url}")
documents: List[SourceDocument] = self.document_loading.load(
source_url, embedding_config.loading
)
Expand All @@ -81,6 +89,7 @@ def __embed(

# Upload documents (which are chunks) to search index in batches
if documents_to_upload:
logger.info("Uploading documents in batches")
batch_size = self.env_helper.AZURE_SEARCH_DOC_UPLOAD_BATCH_SIZE
search_client = self.azure_search_helper.get_search_client()
for i in range(0, len(documents_to_upload), batch_size):
Expand All @@ -93,6 +102,7 @@ def __embed(
logger.warning("No documents to upload.")

def __generate_image_caption(self, source_url):
logger.info(f"Generating image caption for URL: {source_url}")
model = self.env_helper.AZURE_OPENAI_VISION_MODEL
caption_system_message = """You are an assistant that generates rich descriptions of images.
You need to be accurate in the information you extract and detailed in the descriptons you generate.
Expand All @@ -116,9 +126,11 @@ def __generate_image_caption(self, source_url):

response = self.llm_helper.get_chat_completion(messages, model)
caption = response.choices[0].message.content
logger.info("Caption generation completed")
return caption

def __convert_to_search_document(self, document: SourceDocument):
logger.info(f"Converting document ID {document.id} to search document format")
embedded_content = self.llm_helper.generate_embeddings(document.content)
metadata = {
self.env_helper.AZURE_SEARCH_FIELDS_ID: document.id,
Expand Down Expand Up @@ -151,6 +163,7 @@ def __create_image_document(
content: str,
content_vector: List[float],
):
logger.info(f"Creating image document for source URL: {source_url}")
parsed_url = urlparse(source_url)

file_url = parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path
Expand Down
1 change: 1 addition & 0 deletions code/backend/batch/utilities/helpers/env_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __load_config(self, **kwargs) -> None:
self.SEMENTIC_KERNEL_SYSTEM_PROMPT = os.getenv(
"SEMENTIC_KERNEL_SYSTEM_PROMPT", ""
)
logger.info("Initializing EnvHelper completed")

def is_chat_model(self):
if "gpt-4" in self.AZURE_OPENAI_MODEL_NAME.lower():
Expand Down
6 changes: 6 additions & 0 deletions code/backend/batch/utilities/helpers/llm_helper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from openai import AzureOpenAI
from typing import List, Union, cast
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
Expand All @@ -10,9 +11,12 @@
from azure.identity import DefaultAzureCredential
from .env_helper import EnvHelper

logger = logging.getLogger(__name__)


class LLMHelper:
def __init__(self):
logger.info("Initializing LLMHelper")
self.env_helper: EnvHelper = EnvHelper()
self.auth_type_keys = self.env_helper.is_auth_type_keys()
self.token_provider = self.env_helper.AZURE_TOKEN_PROVIDER
Expand All @@ -38,6 +42,8 @@ def __init__(self):
)
self.embedding_model = self.env_helper.AZURE_OPENAI_EMBEDDING_MODEL

logger.info("Initializing LLMHelper completed")

def get_llm(self):
if self.auth_type_keys:
return AzureChatOpenAI(
Expand Down
2 changes: 2 additions & 0 deletions code/backend/batch/utilities/orchestrator/lang_chain_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def orchestrate(
self, user_message: str, chat_history: List[dict], **kwargs: dict
) -> list[dict]:

logger.info("Method orchestrate of lang_chain_agent started")
# Call Content Safety tool
if self.config.prompts.enable_content_safety:
if response := self.call_content_safety_input(user_message):
Expand Down Expand Up @@ -122,4 +123,5 @@ async def orchestrate(
answer=answer.answer,
source_documents=answer.source_documents,
)
logger.info("Method orchestrate of lang_chain_agent ended")
return messages
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ def __init__(self) -> None:
async def orchestrate(
self, user_message: str, chat_history: List[dict], **kwargs: dict
) -> list[dict]:
logger.info("Method orchestrate of open_ai_functions started")
# Call Content Safety tool
if self.config.prompts.enable_content_safety:
logger.info("Content Safety enabled. Checking input message...")
if response := self.call_content_safety_input(user_message):
logger.info("Content Safety check returned a response. Exiting method.")
return response

# Call function to determine route
Expand Down Expand Up @@ -143,6 +146,7 @@ async def orchestrate(
answer = Answer(question=user_message, answer=text)

if answer.answer is None:
logger.info("Answer is None")
answer.answer = "The requested information is not available in the retrieved data. Please try another query or topic."

# Call Content Safety tool
Expand All @@ -156,4 +160,5 @@ async def orchestrate(
answer=answer.answer,
source_documents=answer.source_documents,
)
logger.info("Method orchestrate of open_ai_functions ended")
return messages
Loading

0 comments on commit e92eba1

Please sign in to comment.