forked from odtp-org/odtp-component-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logger.py
307 lines (223 loc) · 9.66 KB
/
logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/env python
from pymongo import MongoClient, errors
from bson import ObjectId
from datetime import datetime, timezone
import os
import time
import json
#################################################################################
# Testing Class MongoManager with v.0.2.0 Schema version of ID.
# Copied from odtp. Once the repo is public we can hide it
class MongoManager:
def __init__(self, mongodbUrl, db_name):
self.client = MongoClient(mongodbUrl)
self.db = self.client[db_name]
def add_user(self, user_data):
users_collection = self.db["users"]
return users_collection.insert_one(user_data).inserted_id
def add_component(self, component_data):
components_collection = self.db["components"]
return components_collection.insert_one(component_data).inserted_id
def add_version(self, componentId, version_data):
versions_collection = self.db["versions"]
version_data["componentId"] = componentId
return versions_collection.insert_one(version_data).inserted_id
def add_digital_twin(self, userRef, digital_twin_data):
digital_twins_collection = self.db["digitalTwins"]
digital_twin_data["userRef"] = userRef
digital_twin_id = digital_twins_collection.insert_one(digital_twin_data).inserted_id
# Add digital twin reference to user
self.db.users.update_one(
{"_id": userRef},
{"$push": {"digitalTwins": digital_twin_id}}
)
return digital_twin_id
def add_logs(self, log_data_list):
logs_collection = self.db["logs"]
log_ids = logs_collection.insert_many(log_data_list).inserted_ids
return log_ids
def append_execution(self, digital_twin_id, execution_data):
executions_collection = self.db["executions"]
execution_data["digitalTwinRef"] = digital_twin_id
execution_id = executions_collection.insert_one(execution_data).inserted_id
# Update digital twin with execution reference
self.db.digitalTwins.update_one(
{"_id": digital_twin_id},
{"$push": {"executions": execution_id}}
)
return execution_id
def append_step(self, execution_id, step_data):
steps_collection = self.db["steps"]
step_data["executionRef"] = execution_id
step_id = steps_collection.insert_one(step_data).inserted_id
# Update execution with step reference
self.db.executions.update_one(
{"_id": execution_id},
{"$push": {"steps": step_id}}
)
return step_id
def add_output(self, step_id, output_data):
output_collection = self.db["outputs"]
output_data["stepRef"] = step_id
# TODO: Make its own function. Taking out user_id
#output_data["access_control"]["authorized_users"] = user_id
output_id = output_collection.insert_one(output_data).inserted_id
# Update steps with execution reference
self.db.steps.update_one(
{"_id": ObjectId(step_id)}, # Specify the document to update
{"$set": {"output": output_id}} # Use $set to replace the value of a field
)
return output_id
def append_log(self, step_id, log_data):
steps_collection = self.db["steps"]
steps_collection.update_one(
{"_id": ObjectId(step_id)},
{"$push": {"logs": log_data}}
)
def append_logs(self, step_id, log_data_list):
steps_collection = self.db["steps"]
steps_collection.update_one(
{"_id": ObjectId(step_id)},
{"$push": {"logs": {"$each": log_data_list}}}
)
def update_result(self, result_id, output_id):
results_collection = self.db["results"]
results_collection.update_one(
{"_id": ObjectId(result_id)},
{"$push": {"output": output_id}}
)
results_collection.update_one(
{"_id": ObjectId(result_id)},
{"$set": {"updated_at": datetime.now(timezone.utc)}}
)
def update_end_time(self, step_id):
steps_collection = self.db["steps"]
steps_collection.update_one(
{"_id": ObjectId(step_id)},
{"$set": {"end_timestamp": datetime.now(timezone.utc)}}
)
######### Get methods
def get_all_collections_as_dict(self):
"""
Retrieve all documents in all collections as a dictionary.
"""
all_data = {}
for collection_name in self.db.list_collection_names():
cursor = self.db[collection_name].find()
all_data[collection_name] = [doc for doc in cursor]
return all_data
def get_all_collections_as_json_string(self):
"""
Retrieve all documents in all collections as a JSON-formatted string.
"""
all_data = self.get_all_collections_as_dict()
return json.dumps(all_data, indent=2, default=str) # default=str to handle datetime and ObjectId
def print_all_collections_as_json(self):
"""
Print all documents in all collections in JSON format.
"""
for collection_name in self.db.list_collection_names():
print(f"Collection: {collection_name}")
cursor = self.db[collection_name].find()
for doc in cursor:
print(json.dumps(doc, indent=2, default=str)) # default=str is added to handle datetime and ObjectId
print("-" * 50) # separator line between collections
def export_all_collections_as_json(self, filename):
"""
Save all documents in all collections as a JSON file.
"""
all_data = self.get_all_collections_as_dict()
with open(filename, 'w') as json_file:
json.dump(all_data, json_file, indent=2, default=str) # default=str to handle datetime and ObjectId
######################################
# USER METHOD
def get_all_users(self):
cursor = self.db.users.find({})
users = []
for doc in cursor:
doc["_id"] = str(doc["_id"]) # Convert ObjectId to string
users.append(doc)
return users
def get_digital_twins_by_user_id(self, user_id_str):
# Convert user_id string to ObjectId
user_id = ObjectId(user_id_str)
# Fetch digital twins by user_id
cursor = self.db.digitalTwins.find({"userRef": user_id}, {"_id": 1, "userRef": 1, "executions[0].timestamp": 1, "executions[0].timestamp": 1})
digital_twins = []
for doc in cursor:
doc["_id"] = str(doc["_id"]) # Convert ObjectId to string for pandas compatibility
digital_twins.append(doc)
return digital_twins
def print_logs_by_indices(self, twin_index, execution_index, step_index):
# Skip to the digital twin specified by the given index and retrieve it
digital_twin = self.db.digitalTwins.find().sort("_id", 1).skip(twin_index).limit(1).next()
try:
# Navigate to the logs using the given execution index
logs = digital_twin["executions"][execution_index]["steps"][step_index]["logs"]
except (IndexError, KeyError):
print(f"No logs found for execution {execution_index} of digital twin {twin_index}.")
return logs
######################################
# Closing & Deleting
######################################
def close(self):
self.client.close()
def deleteAll(self):
# Connect to your database. Replace 'mydatabase' with your database name.
db_odtp = self.db
# Get a list of all collections in the database
collections = db_odtp.list_collection_names()
# Drop each collection
for collection in collections:
db_odtp.drop_collection(collection)
########### LogReader
class LogReader:
def __init__(self, log_file):
self.log_file = log_file
self.last_position = 0
def read_from_last_position(self):
lines = []
with open(self.log_file, 'r') as f:
# Move to the last read position
f.seek(self.last_position)
# Read the remaining lines from the log file
for line in f:
lines.append(line.strip())
# Update the last read position
self.last_position = f.tell()
return lines
############ Main Method
# This method will push the log to an existing execution step
def main(delay=2):
print("LOGGER ACTIVATED")
print("##################################")
### Create Entry
MONGO_URL = os.getenv("ODTP_MONGO_SERVER")
step_id = os.getenv("ODTP_STEP_ID")
db_name = "odtp"
log_reader = LogReader('/odtp/odtp-logs/log.txt')
# Active until it finds "--- ODTP COMPONENT ENDING ---"
ending_detected = False
while ending_detected == False:
logs = log_reader.read_from_last_position()
newLogList = []
for log in logs:
newLogEntry= {
"stepRef": step_id,
"timestamp": datetime.now(timezone.utc),
"logstring": log}
newLogList.append(newLogEntry)
dbManager = MongoManager(MONGO_URL, db_name)
# _ = dbManager.append_logs(step_id, newLogList)
_ = dbManager.add_logs(newLogList)
dbManager.close()
time.sleep(0.2)
# TODO: Improve this
if log == "--- ODTP COMPONENT ENDING ---":
dbManager = MongoManager(MONGO_URL, db_name)
dbManager.update_end_time(step_id)
dbManager.close()
ending_detected = True
#time.sleep(delay)
if __name__ == '__main__':
main(delay=0.5)