Skip to content

Commit

Permalink
Final cleanups (#307)
Browse files Browse the repository at this point in the history
* Added ingestedimages in xlsx

* removed all content entities scenario in process images

* removed all content entities scenario in process images_2

* Added functionality to accept files too in slack collector

* Handling jira attachments

* Final changes

* Comment out traverser

* increment setup version

---------

Co-authored-by: ngupta10 <ngupta10.slb@gmail.com>
  • Loading branch information
Ansh5461 and ngupta10 committed May 13, 2024
1 parent d110c6c commit 162fc22
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 61 deletions.
22 changes: 22 additions & 0 deletions querent/collectors/jira/jira_collector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from typing import AsyncGenerator
import requests

from querent.collectors.collector_base import Collector
from querent.collectors.collector_factory import CollectorFactory
Expand Down Expand Up @@ -88,6 +89,20 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True, doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}"
)

if hasattr(issue.fields, 'attachment') and isinstance(issue.fields.attachment, list):
for attachment in issue.fields.attachment:
try:
attachment_bytes = self.download_attachment(attachment.content, self.auth)
yield CollectedBytes(
data=attachment_bytes, file=f"jira_attachment_{attachment.filename}", doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}"
)
yield CollectedBytes(
data=None, file=f"jira_attachment_{attachment.filename}", eof=True, doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}"
)
except Exception as e:
self.logger.error(f"Error downloading attachment {attachment.filename}: {str(e)}")


except common_errors.ConnectionError as e:
self.logger.error(f"Error polling Jira issues: {e}")
raise # Re-raise ConnectionError without adding additional information
Expand All @@ -98,6 +113,13 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
finally:
await self.disconnect()

async def download_attachment(self, attachment_url, auth):
response = requests.get(attachment_url, auth=auth)
if response.status_code == 200:
return response.content
else:
self.logger.error("Failed to download file: HTTP {}".format(response.status_code))


class JiraCollectorFactory(CollectorFactory):
def backend(self) -> CollectorBackend:
Expand Down
33 changes: 28 additions & 5 deletions querent/collectors/slack/slack_collector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import base64
import requests
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from typing import AsyncGenerator
Expand Down Expand Up @@ -60,11 +62,22 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
if response["ok"]:
messages = response["messages"]
for message in messages:
yield CollectedBytes(
file=f"slack://{self.channel}.slack",
data=bytes(message["text"] + "\n\n", "utf-8"),
doc_source = f"slack://{self.channel}"
)
if 'files' in message: # Check if the message contains files
for file in message['files']:
file_url = file['url_private']
# Assuming `self.download_file` is a method to download files using the URL
file_bytes = await self.fetch_file_bytes(file_url)
yield CollectedBytes(
file=f"slack://{self.channel}/{file['name']}",
data=file_bytes,
doc_source=f"slack://{self.channel}"
)
else:
yield CollectedBytes(
file=f"slack://{self.channel}.slack",
data=bytes(message["text"] + "\n\n", "utf-8"),
doc_source = f"slack://{self.channel}"
)

if not response["has_more"]:
yield CollectedBytes(
Expand All @@ -86,6 +99,16 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
raise common_errors.SlackApiError(
f"Slack API Error: {exc.response['error']}"
) from exc

async def fetch_file_bytes(self, url):
"""Fetch image bytes directly without downloading the image to disk."""
headers = {'Authorization': f'Bearer {self.client.token}'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.content
else:
print(f"Failed to fetch image, status code: {response.status_code}")
return None


class SlackCollectorFactory(CollectorFactory):
Expand Down
5 changes: 3 additions & 2 deletions querent/core/transformers/bert_ner_opensourcellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def process_images(self, data: IngestedImages):
if len(doc_entity_pairs_ocr) >= 1:
results = doc_entity_pairs_ocr
elif len(doc_entity_pairs_ocr) == 0:
if content:
if content and len(entity_ocr) >=1:
if self.fixed_entities:
content = self.entity_context_extractor.find_entity_sentences(content)
(_, doc_entity_pairs) = self.ner_llm_instance.get_entity_pairs(isConfinedSearch= self.isConfinedSearch,
Expand All @@ -150,7 +150,8 @@ async def process_images(self, data: IngestedImages):
if len(doc_entity_pairs) > 0 and len(entity_ocr) >=1:
results = [self.ner_llm_instance.filter_matching_entities(doc_entity_pairs, entity_ocr)]
elif len(doc_entity_pairs) > 0 and len(entity_ocr) == 0:
results = doc_entity_pairs
# results = doc_entity_pairs
pass
else:
return
if len(results) > 0:
Expand Down
94 changes: 69 additions & 25 deletions querent/ingestors/xlsx/xlsx_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import base64
from typing import List, AsyncGenerator
import io
import uuid
import pandas as pd
import openpyxl
from PIL import Image
import pytesseract

from querent.common.types.ingested_images import IngestedImages
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.ingestors.base_ingestor import BaseIngestor
from querent.processors.async_processor import AsyncProcessor
Expand Down Expand Up @@ -46,14 +52,9 @@ async def ingest(
current_file = chunk_bytes.file
elif current_file != chunk_bytes.file:
async for data in self.extract_and_process_xlsx(
CollectedBytes(file=current_file, data=collected_bytes)
CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source
):
yield IngestedTokens(
file=current_file,
data=[data],
error=None,
doc_source=chunk_bytes.doc_source
)
yield data
yield IngestedTokens(
file=current_file,
data=None,
Expand All @@ -67,37 +68,80 @@ async def ingest(
yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}", doc_source=chunk_bytes.doc_source)
finally:
async for data in self.extract_and_process_xlsx(
CollectedBytes(file=current_file, data=collected_bytes)
CollectedBytes(file=current_file, data=collected_bytes), chunk_bytes.doc_source
):
yield IngestedTokens(
file=current_file,
data=[data],
error=None,
doc_source=chunk_bytes.doc_source
)
yield data
yield IngestedTokens(file=current_file, data=None, error=None, doc_source=chunk_bytes.doc_source)

async def extract_and_process_xlsx(
self, collected_bytes: CollectedBytes
self, collected_bytes: CollectedBytes, doc_source
) -> AsyncGenerator[pd.DataFrame, None]:
df = await self.extract_text_from_xlsx(collected_bytes)
processed_text = await self.process_data(df.to_string())
yield processed_text
async for data in self.extract_text_from_xlsx(collected_bytes, doc_source):
yield data

async def extract_text_from_xlsx(
self, collected_bytes: CollectedBytes
) -> pd.DataFrame:
excel_buffer = io.BytesIO(collected_bytes.data)
dataframe = pd.read_excel(excel_buffer)
return dataframe
self, collected_bytes: CollectedBytes, doc_source):
try:
excel_buffer = io.BytesIO(collected_bytes.data)
workbook = openpyxl.load_workbook(excel_buffer, data_only=True)

for sheet in workbook:
df = pd.read_excel(excel_buffer, sheet_name=sheet.title)
df['Sheet'] = sheet.title
df = df.to_string()
processed_data = await self.process_data(df)


print(f"Processing sheet: {sheet.title}")
for image in sheet._images:
img = image._data()
ocr_text = await self.get_ocr_from_image(image=img)
if not ocr_text:
continue

yield IngestedImages(
file=collected_bytes.file,
image=str(base64.b64encode(img)),
image_name=f"{str(uuid.uuid4())}.jpg",
page_num=sheet.title,
text=processed_data,
coordinates=None,
ocr_text=[ocr_text],
doc_source=doc_source,
)

# Prepare data for DataFrame
yield IngestedTokens(
file=collected_bytes.file,
data=processed_data,
error=None,
doc_source=doc_source
)

except Exception as e:
self.logger.error("Exception-{e}")

async def get_ocr_from_image(self, image):
"""Implement this to return ocr text of the image"""
try:
image = Image.open(io.BytesIO(image))

image_status = await self.analyze_image(image)
if not image_status:
return
text = pytesseract.image_to_string(image)
except Exception as e:
self.logger.error("Exception-{e}")
raise e
return str(text).encode("utf-8").decode("utf-8")

async def process_data(self, text: str) -> str:
if self.processors is None or len(self.processors) == 0:
return text
return [text]
try:
processed_data = text
for processor in self.processors:
processed_data = await processor.process_text(processed_data)
return processed_data
return [processed_data]
except Exception as e:
self.logger.error(f"Error while processing text: {e}")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@

setup(
name="querent",
version="3.1.0",
version="3.1.1",
author="Querent AI",
description="The Asynchronous Data Dynamo and Graph Neural Network Catalyst",
long_description=long_description,
Expand Down
47 changes: 21 additions & 26 deletions tests/traverser/traverser.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@

# def fetch_triples(self, knowledge):
# subject, relationship, obj = knowledge.split('-')
# subject = subject.replace('_', ' ')
# obj = obj.replace('_', ' ')
# relationship = relationship.replace('_',' ')
# subject = subject
# obj = obj
# relationship = relationship
# query = """
# MATCH (n)-[r]->(m)
# WHERE type(r) = $relationship AND n.name = $subject AND m.name = $object
Expand Down Expand Up @@ -176,44 +176,38 @@
# # Initialize Neo4j connection
# neo4j_conn = Neo4jConnection(neo4j_uri, neo4j_user, neo4j_password)
# input_data = {
# "session_id": "30b0815cf52a4b6ebadbe3fe6ab868f9",
# "session_id": "647b459710c54881b464a1b20d5a40f0",
# "query": "How does the geological variation within the Eagle Ford Shale affect the production outcomes of different wells?",
# "insights": [
# {
# "document": "Decline curve analysis of shale oil production_ The case of Eagle Ford.docx",
# "source": "azure://testfiles/",
# "knowledge": "the_eagle_ford_shale_play-covers-texas",
# "sentence": "21 uppsala university, master thesis in energy systems engineering, linnea lund figure 7. geographic extent of the eagle ford shale play, covering about 20 counties in the state of texas, u.s. the green, yellow and red fields represent the occurrence of oil, wet gas and dry gas respectively.",
# "tags": "the eagle ford shale play, texas, covers"
# "knowledge": "carbonate-has_image-hydraulic_fracturing",
# "sentence": "4.1 geology the shale formation of eagle ford is of the late cretaceous era, roughly 90 million years old. it has a high carbonate content, up to 70%, which makes it brittle and facilitates hydraulic fracturing (texas rrc, 2014). during the cretaceous time the tectonic movements caused the land masses in the south-east, in the direction of the mexican gulf, to be pressed down.",
# "tags": "carbonate, hydraulic_fracturing, has_image"
# },
# {
# "document": "Decline curve analysis of shale oil production_ The case of Eagle Ford.docx",
# "source": "azure://testfiles/",
# "knowledge": "eagle_ford_shale-located_in-texas_usa",
# "sentence": "eagle ford shale - an early look at ultimate recovery. presented at the spe annual technical conference and exhibition, society of petroleum engineers, san antonio, texas usa. swint, b., bakhsh, n., 2013.",
# "tags": "eagle ford shale, texas usa, located in"
# "knowledge": "depth-has_image-eagle_ford_shale",
# "sentence": "underground slope of the geological layers in texas. the depth of the eagle ford shale varies from the surface to more than 4 km underground. source: eagleford.org (2014)",
# "tags": "depth, eagle_ford_shale, has_image"
# },
# {
# "document": "Decline curve analysis of shale oil production_ The case of Eagle Ford.docx",
# "source": "azure://testfiles/",
# "knowledge": "4_1_geology-is_of-the_late_cretaceous_era",
# "sentence": "the formation is about 80 km wide and 650 km long with varying depth (texas rrc, 2014). 4.1 geology the shale formation of eagle ford is of the late cretaceous era, roughly 90 million years old. it has a high carbonate content, up to 70%, which makes it brittle and facilitates hydraulic fracturing (texas rrc, 2014).",
# "tags": "4 1 geology, the late cretaceous era, is of"
# },
# {
# "document": "Reservoir Pressure Mapping from Well-Test Data_ An Eagle Ford Example.docx",
# "source": "azure://testfiles/",
# "knowledge": "this_low_permeability_source_rock_reservoir-developed_using-horizontal_wells",
# "sentence": "we apply our methodology to the eagle ford play of south texas. like other unconven-tional plays, this low-permeability source-rock reservoir is developed using horizontal wells. in our study area, the first eagle ford completion forms were filed in 2010 with the most forms filed in 2014 and 2016 (fig. 2).",
# "tags": "this low permeability source rock reservoir, horizontal wells, developed using"
# "knowledge": "depth-has_image-eagle_ford_shale",
# "sentence": " \n\nt\n\neagle ford shale play, wo\nwestern gulf basin, :\n~~ south texas __.... inal\n\n \n \n \n \n \n\nmies\na sara\ndeg 2\n\nmap dato 'may 29, 2010\n\n \n \n\n \n\n \n \n \n\n \n\neagle ford producing wells (hpd)\n+ ot\n+ cas\neagle ford petroleum windows (ptrohawk, eos, d})\non\nwot gastcondonsate\nry gas\ntop eagle ford subsea depth structure, ft(petrohawk)\neagle ford shale ticknass,ft(eog)\niii one for sat: ausin chak outeropstnris)\n\n \n\n \n\n \n\n \n\n \n\n \n\n \n\f",
# "tags": "depth, eagle_ford_shale, has_image"
# },
# {
# "document": "Decline curve analysis of shale oil production_ The case of Eagle Ford.docx",
# "source": "azure://testfiles/",
# "knowledge": "a_high_carbonate_content-facilitates-hydraulic_fracturing",
# "sentence": "4.1 geology the shale formation of eagle ford is of the late cretaceous era, roughly 90 million years old. it has a high carbonate content, up to 70%, which makes it brittle and facilitates hydraulic fracturing (texas rrc, 2014). during the cretaceous time the tectonic movements caused the land masses in the south-east, in the direction of the mexican gulf, to be pressed down.",
# "tags": "a high carbonate content, hydraulic fracturing, facilitates"
# }]
# "knowledge": "permeability-has_image-porosity",
# "sentence": "the wells are located in two counties in different parts of the eagle ford region and there is big chance other parameters than the api gravity differ between the counties. such parameters could be permeability, porosity, brittleness (ability to induce fractures) and other geological parameters. if the porosity is higher more water will be used in the hydro-fracturing and more of fracturing water would stay in the reservoir.",
# "tags": "permeability, porosity, has_image"
# }
# ]
# }

# # Extract tags
Expand All @@ -232,12 +226,13 @@
# WHERE n.name IN $tags AND m.name IN $tags
# RETURN n, r, m
# """
# # subgraph_data = neo4j_conn.extract_subgraph_to_csv(query1, tags, output_file)
# # print(subgraph_data)
# subgraph_data = neo4j_conn.extract_subgraph_to_csv(query1, tags, output_file)
# print(subgraph_data)

# knowledge_items = [insight['knowledge'] for insight in input_data['insights']]
# nodes = set()
# for knowledge in knowledge_items:
# print("Knowledge----", knowledge)
# nodes.update(neo4j_conn.fetch_triples(knowledge))

# print("Nodes for graph query:", nodes)
Expand Down
Loading

0 comments on commit 162fc22

Please sign in to comment.