-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfetcher.py
95 lines (75 loc) · 2.6 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
import logging
import os
import time
from datetime import datetime
from pathlib import Path
from random import randint
from typing import Dict, Tuple
import openai
from openai import OpenAI
from text_constants import role_desc
LOG = logging.getLogger(__name__)
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
)
# Function to translate text using OpenAI's Chat API
async def generate_posts_batch(text, override_role=None):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system",
"content": role_desc if override_role is None else override_role},
{"role": "user", "content": text}
],
temperature=0.75,
)
except openai.RateLimitError:
time.sleep(60)
return await generate_posts_batch(text)
try:
out = json.loads(response.choices[0].message.content)
LOG.info(f"Generated {len(out)} posts")
return out
except Exception as ex:
LOG.info(f"Failed to parse response from OpenAI {ex}")
LOG.info(response.choices[0].message.content)
return []
async def save_batch(name: str, batch: list[Dict]):
if not batch:
return
complete = {"t": time.time(), "posts": batch}
with open(f"posts-{name}.json", "w", encoding="utf-8") as f:
json.dump(complete, f, indent=4)
async def get_next_post(name: str) -> Tuple[str, datetime, float] | None:
# Load the JSON file
filename = f"posts-{name}.json"
filepath = Path(filename)
if not await posts_exist(name):
raise FileNotFoundError(f"The file {filename} does not exist.")
with filepath.open("r") as file:
data = json.load(file)
# Convert the timestamp to a datetime object
timestamp = datetime.fromtimestamp(data["t"])
# Find the post with the highest predicted_reach
posts = data["posts"]
if not posts:
LOG.info("No posts available.")
return None
m_p = len(posts) - 1
highest_post = posts[randint(0, m_p)]
# Remove the post and its "predicted_reach" key from the list
posts.remove(highest_post)
reach = highest_post["predicted_reach"]
highest_post.pop("predicted_reach", None)
# Save the updated JSON back to the file
with filepath.open("w") as file:
json.dump(data, file, indent=4)
return highest_post["post"], timestamp, reach
async def posts_exist(name):
filename = f"posts-{name}.json"
filepath = Path(filename)
return filepath.is_file()
if __name__ == "__main__":
print(get_next_post("ava"))