Skip to content

Commit

Permalink
harvester: Harvesting match data and fixed logging
Browse files Browse the repository at this point in the history
  • Loading branch information
okbrandon committed Oct 17, 2024
1 parent 2f02f2d commit b5c9831
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 54 deletions.
43 changes: 20 additions & 23 deletions harvester/core/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
import psycopg2
import logging

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def get_env(key: str) -> str:
value = os.getenv(key)
if value:
logging.debug(f"Environment variable retrieved: {key} = {value}")
if value:
return value
logging.debug(f"Environment variable not found: {key}")
logger.debug(f"Environment variable not found: {key}")
return ''

def get_users_with_flag(flag: int):
Expand All @@ -19,39 +18,35 @@ def get_users_with_flag(flag: int):
db_user = get_env('POSTGRES_USER')
db_password = get_env('POSTGRES_PASSWORD')
db_name = get_env('POSTGRES_DB')

conn_str = f"host={db_host} port={db_port} user={db_user} password={db_password} dbname={db_name} sslmode=disable"
logging.debug(f"Connecting to database: {conn_str}")

try:
conn = psycopg2.connect(conn_str)
except Exception as err:
logging.error(f"Failed to open database connection: {err}")
logger.error(f"Failed to open database connection: {err}")
return None, err

cursor = conn.cursor()

query = 'SELECT "userID" FROM api_user WHERE (flags & %s) = %s'
logging.debug(f"Executing query: {query} with flag {flag}")

try:
cursor.execute(query, (flag, flag))
rows = cursor.fetchall()
except Exception as err:
logging.error(f"Query execution failed: {err}")
logger.error(f"Query execution failed: {err}")
return None, err

user_ids = [row[0] for row in rows]
logging.debug(f"UserIDs retrieved: {user_ids}")
logger.debug(f"There are {len(user_ids)} users to harvest")

# Remove flag after collecting user IDs
for user_id in user_ids:
query = 'UPDATE api_user SET flags = flags & ~(CAST(%s AS integer)) WHERE "userID" = %s'
try:
cursor.execute(query, (flag, user_id))
logging.debug(f"User flag removed successfully for userID: {user_id}")
except Exception as err:
logging.error(f"Error removing user flag for userID {user_id}: {err}")
logger.error(f"Error removing user flag for userID {user_id}: {err}")

conn.commit()
cursor.close()
Expand All @@ -65,14 +60,12 @@ def get_user_data(user_id: str, table_columns: dict):
db_user = get_env('POSTGRES_USER')
db_password = get_env('POSTGRES_PASSWORD')
db_name = get_env('POSTGRES_DB')

conn_str = f"host={db_host} port={db_port} user={db_user} password={db_password} dbname={db_name} sslmode=disable"
logging.debug(f"Connecting to database for user data: {conn_str}")

try:
conn = psycopg2.connect(conn_str)
except Exception as err:
logging.error(f"Failed to open database connection: {err}")
logger.error(f"Failed to open database connection: {err}")
return None, err

cursor = conn.cursor()
Expand All @@ -81,11 +74,18 @@ def get_user_data(user_id: str, table_columns: dict):
for table, column in table_columns.items():
if ',' in column:
columns = column.split(',')
query = f'SELECT * FROM {table} WHERE "{columns[0]}" = %s OR "{columns[1]}" = %s'

if columns[0].startswith("json:"):
identifier = columns[0].split(':')[1]
first_column = columns[0].split(':')[2]
second_column = columns[1].split(':')[2]
query = f'SELECT * FROM {table} WHERE "{first_column}"->>\'{identifier}\' = %s::text OR "{second_column}"->>\'{identifier}\' = %s::text'
else:
query = f'SELECT * FROM {table} WHERE "{columns[0]}" = %s OR "{columns[1]}" = %s'
else:
query = f'SELECT * FROM {table} WHERE "{column}" = %s'

logging.debug(f"Executing query for user data: {query} with userID {user_id}")
logger.debug(f"Executing query for user data: {query} with userID {user_id}")

try:
if ',' in column:
Expand All @@ -94,22 +94,19 @@ def get_user_data(user_id: str, table_columns: dict):
cursor.execute(query, (user_id,))
rows = cursor.fetchall()
except Exception as err:
logging.error(f"Query execution failed: {err}")
logger.error(f"Query execution failed: {err}")
return None, err

columns = [desc[0] for desc in cursor.description]
logging.debug(f"Columns retrieved: {columns}")

data = [columns] # First row is the column headers
data = [columns] # First row is the column headers
for row in rows:
data.append([str(value) for value in row])
logging.debug(f"Row data retrieved: {row}")

user_data[table] = data
logging.debug(f"User data for table {table} retrieved: {data}")

cursor.close()
conn.close()

logging.debug(f"User data retrieval completed: {user_data}")
logger.debug(f"Completed data extraction for userID {user_id}")
return user_data, None
23 changes: 10 additions & 13 deletions harvester/core/backends/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from .database import *

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Tables and columns where the userID is stored
tables_and_columns = {
"api_user": "userID",
"api_purchase": "userID",
#"api_match": "json:id:playerA,json:id:playerB",
"api_match": "json:id:playerA,json:id:playerB",
"api_verificationcode": "userID",
"api_usersettings": "userID",
"api_relationship": "userA,userB",
Expand All @@ -21,34 +22,30 @@ def harvest_users():
while True:
user_ids, err = get_users_with_flag(1 << 3)
if err:
logging.error("Error fetching users with flag: %s", err)
logger.error(f"Error fetching users with flag: {err}")
continue

logging.debug("Fetched user IDs: %d", len(user_ids))

for user_id in user_ids:
logging.info("Harvesting userID: %s", user_id)

user_data, err = get_user_data(user_id, tables_and_columns)
if err:
logging.error("Error fetching user data for userID %s: %s", user_id, err)
logger.error(f"Error fetching user data for userID {user_id}: {err}")
continue

logging.debug("User data retrieved for userID: %s", user_id)
logger.debug(f"User data retrieved for userID: {user_id}")

err = export_csv(user_id, user_data)
if err:
logging.error("Error exporting CSV for userID %s: %s", user_id, err)
logger.error(f"Error exporting CSV for userID {user_id}: {err}")
continue

logging.debug("CSV exported successfully for userID: %s", user_id)
logger.debug(f"CSV exported successfully for userID: {user_id}")

err = zip_and_move(user_id)
if err:
logging.error("Error creating ZIP file for userID %s: %s", user_id, err)
logger.error(f"Error creating ZIP file for userID {user_id}: {err}")
continue

logging.info("ZIP export successful for userID: %s", user_id)
logger.info(f"ZIP export successful for userID: {user_id}")

logging.debug("Sleeping for 3 minutes before next harvest")
logger.debug("Sleeping for 3 minutes before next harvest")
time.sleep(3 * 60)
30 changes: 12 additions & 18 deletions harvester/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,65 +11,59 @@ def zip_and_move(user_id: str) -> None:
output_dir = "output"
export_dir = "/exports"

logger.debug("Creating export directory: %s", export_dir)
logger.debug(f"Creating export directory: {export_dir}")
os.makedirs(export_dir, exist_ok=True)

zip_file_name = os.path.join(export_dir, f"{user_id}.zip")
logger.debug("Creating ZIP file: %s", zip_file_name)
logger.debug(f"Creating ZIP file: {zip_file_name}")
try:
with zipfile.ZipFile(zip_file_name, 'w') as zip_file:
files = glob.glob(os.path.join(output_dir, f"{user_id}_*.csv"))
logger.debug("Found CSV files to add to ZIP: %s", files)
for file in files:
if add_file_to_zip(zip_file, file) is not None:
logger.error("Failed to add file to ZIP: %s", file)
logger.error(f"Failed to add file to ZIP: {file}")
return

except Exception as err:
logger.error("Failed to create ZIP file: %s, error: %s", zip_file_name, err)
logger.error(f"Failed to create ZIP file: {zip_file_name}, error: {err}")
return

logger.info("ZIP file created successfully: %s", zip_file_name)
logger.info(f"ZIP file created successfully: {zip_file_name}")

def add_file_to_zip(zip_file: zipfile.ZipFile, file_path: str) -> None:
logger.debug("Opening file for ZIP: %s", file_path)
logger.debug(f"Opening file for ZIP: {file_path}")
try:
with open(file_path, 'rb') as file:
logger.debug("Copying file content to ZIP: %s", file_path)
zip_file.write(file_path, os.path.basename(file_path))
logger.debug("File added to ZIP: %s", file_path)
except Exception as err:
logger.error("Failed to open file: %s, error: %s", file_path, err)
logger.error(f"Failed to open file: {file_path}, error: {err}")
return err

def export_csv(user_id: str, user_data: dict) -> None:
output_directory = "output"
print("Creating output directory:", output_directory)
logger.debug(f"Creating output directory: {output_directory}")

try:
os.makedirs(output_directory, exist_ok=True)
print("Output directory created successfully:", output_directory)
except Exception as error:
print("Failed to create output directory:", output_directory, "error:", error)
logger.error(f"Failed to create output directory: {output_directory}, error: {error}")
return error

for table, data in user_data.items():
file_path = os.path.join(output_directory, f"{user_id}_{table}.csv")
print("Creating CSV file:", file_path)

try:
with open(file_path, mode='w', newline='') as file:
writer = csv.writer(file)
print("Writing records to CSV file:", file_path)

for record in data:
try:
writer.writerow(record)
except Exception as error:
print("Failed to write record to CSV:", file_path, "record:", record, "error:", error)
logger.error(f"Failed to write record to CSV: {file_path}, record: {record}, error: {error}")
return error

print("CSV exported successfully:", file_path)
logger.info(f"CSV exported successfully: {file_path}")
except Exception as error:
print("Failed to create CSV file:", file_path, "error:", error)
logger.error(f"Failed to create CSV file: {file_path}, error: {error}")
return error

0 comments on commit b5c9831

Please sign in to comment.