-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
214 lines (176 loc) · 8.44 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import json
from loguru import logger
from sqlalchemy import create_engine, Column, String, Boolean, Integer, DateTime, Text, JSON, text # Added JSON type
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.exc import NoResultFound, MultipleResultsFound, IntegrityError
from dotenv import load_dotenv
from uuid import uuid5, NAMESPACE_DNS
from datetime import datetime, timezone
import os
import feedparser
from confluent_kafka import Producer
# Load environment variables
load_dotenv()
# PostgreSQL database connection details from environment
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
SOURCEID = os.getenv("SOURCEID")
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
# Set up the database URI and SQLAlchemy engine
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
engine = create_engine(DATABASE_URL, echo=os.getenv("POSTGRES_ECHO", "false").lower() == "true",
pool_size=int(os.getenv("POSTGRES_POOL_SIZE", "5")))
SessionLocal = sessionmaker(bind=engine)
# Define SQLAlchemy models
Base = declarative_base()
class Source(Base):
__tablename__ = 'sources'
sourceid = Column(String, primary_key=True)
enabled = Column(Boolean, nullable=False)
sourcetype = Column(String, nullable=False)
sourcename = Column(String, nullable=False)
sourcelocation = Column(String, nullable=False)
lastinterrogation = Column(DateTime, default=datetime.now(timezone.utc))
created = Column(DateTime, default=datetime.now(timezone.utc))
updated = Column(DateTime, default=datetime.now(timezone.utc))
numprocessed = Column(Integer, default=0)
articleelement = Column(JSON) # New JSON column for articleelement
class CollectedArtefact(Base):
__tablename__ = 'collected_artefacts'
artefactid = Column(String, primary_key=True)
description = Column(Text, nullable=False)
sourceid = Column(String, nullable=False) # Foreign key reference to Source
locator = Column(Text, nullable=False) # Stores the RSS entry link
created = Column(DateTime, default=datetime.now(timezone.utc))
descriptionlang = Column(String, nullable=True)
rawcontentlang = Column(String, nullable=True)
# Setup Kafka Producer
kafka_producer = Producer({'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})
# Log configuration
logger.add("application.log", rotation="500 MB")
# Function to connect to the database and retrieve the source entry
def get_source_by_id(session, source_id):
try:
source = session.query(Source).filter(Source.sourceid == source_id).one()
return source
except NoResultFound:
logger.error("No source found with sourceid: {}", source_id)
exit(1)
except MultipleResultsFound:
logger.error("Multiple sources found with sourceid: {}", source_id)
exit(1)
# Function to create a unique UUID based on RSS feed entry details
def create_unique_uid(title, link, description, pubDate):
unique_str = f"{title}-{link}-{description}-{pubDate}"
return str(uuid5(NAMESPACE_DNS, unique_str))
# Function to post to Kafka topic with JSON message
def post_to_kafka(topic, key, message):
kafka_producer.produce(topic, key=key, value=json.dumps(message))
kafka_producer.flush()
# Main application logic
def main():
with SessionLocal() as session:
# Get source information by SOURCEID
source = get_source_by_id(session, SOURCEID)
# Check if the source is enabled
if not source.enabled:
logger.info("Source with sourceid {} is disabled. Exiting.", SOURCEID)
exit(0)
# Parse RSS feed from sourcelocation
feed = feedparser.parse(source.sourcelocation)
if not feed.entries:
logger.warning("No entries found in the RSS feed at {}", source.sourcelocation)
return
# Insert RSS feed entries into collected_artefacts and track duplicates
num_entries = 0
duplicates = []
for entry in feed.entries:
uid = create_unique_uid(entry.title, entry.link, entry.description, entry.published)
# Check if the uid already exists in collected_artefacts for this sourceid
existing_artefact = session.query(CollectedArtefact).filter_by(artefactid=uid,
sourceid=source.sourceid).first()
if existing_artefact:
logger.warning(
"Artefact with uid {} and sourceid {} already exists. Skipping this entry.",
uid, source.sourceid
)
continue # Skip to the next entry if the artefact already exists
artefact_description = f"{source.sourcetype} from {source.sourcename} - {entry.title}"
lang = detect_language_with_langdetect(entry.description)
artefact = CollectedArtefact(
artefactid=uid,
description=artefact_description,
sourceid=source.sourceid,
locator=entry.link,
descriptionlang=lang[0]
)
try:
session.add(artefact)
session.commit()
num_entries += 1
# Prepare and send a Kafka message for the collected_artefact entry
artefact_message = {
"artefactid": uid,
"sourcetype": source.sourcetype,
"description": artefact_description,
"sourceid": source.sourceid,
"locator": entry.link,
"created": datetime.now(timezone.utc).isoformat(),
"articleelement": source.articleelement,
"descriptionlang": lang[0]
}
post_to_kafka("collected_artefacts_rss_pre_scrape", uid, artefact_message)
logger.info("Kafka message sent to 'collected_artefacts' with key '{}': {}", uid, artefact_message)
except IntegrityError:
session.rollback()
logger.warning(
"Duplicate entry detected with artefactid {}. Description: '{}', SourceID: '{}', Locator: '{}'",
uid, artefact_description, source.sourceid, entry.link
)
duplicates.append({
"artefactid": uid,
"description": artefact_description,
"sourceid": source.sourceid,
"locator": entry.link
})
except Exception as e:
logger.error("Unexpected error occurred: {}", str(e))
session.rollback()
# Update numprocessed and lastinterrogation in sources table
source.numprocessed = num_entries
source.lastinterrogation = datetime.now(timezone.utc)
session.commit()
# Prepare Kafka message with all source attributes and duplicates list if any
kafka_key = f"{SOURCEID}_{datetime.now(timezone.utc).date()}"
kafka_message = {
"sourceid": source.sourceid,
"enabled": source.enabled,
"sourcetype": source.sourcetype,
"sourcename": source.sourcename,
"sourcelocation": source.sourcelocation,
"articleelement": source.articleelement,
"lastinterrogation": source.lastinterrogation.isoformat() if source.lastinterrogation else None,
"created": source.created.isoformat() if source.created else None,
"updated": source.updated.isoformat() if source.updated else None,
"numprocessed": source.numprocessed,
"timestamp": datetime.now(timezone.utc).isoformat(),
"duplicates": duplicates
}
# Post JSON message to Kafka with key
post_to_kafka("sources", kafka_key, kafka_message)
logger.info("Kafka message sent to 'sources' with key '{}': {}", kafka_key, kafka_message)
logger.info("Application run complete.")
def detect_language_with_langdetect(line):
from langdetect import detect_langs
try:
langs = detect_langs(line)
for item in langs:
# The first one returned is usually the one that has the highest probability
return item.lang, item.prob
except:
return "err", 0.0
if __name__ == "__main__":
main()