forked from mastermindML/mastermind
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MASTERMIND.py
160 lines (129 loc) · 4.92 KB
/
MASTERMIND.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Auto-Configuration for config.json
import os
import json
def auto_configure():
config_path = 'config.json'
default_agents = ['SimpleCoder.py', 'autonomize.py']
if not os.path.exists(config_path):
config = {'agents': default_agents}
with open(config_path, 'w') as config_file:
json.dump(config, config_file)
logging.info('config.json created with default agents: SimpleCoder.py, autonomize.py')
else:
logging.info('config.json already exists. Skipping auto-configuration.')
# Calling auto-configuration function during the initialization
auto_configure()
# MASTERMIND
import logging
import json
import threading
from typing import Dict, Type, Union, List
from abc import ABC, abstractmethod
import os
import psutil
# Initialize logging
logging.basicConfig(level=logging.INFO)
# Agent Interface that all agents will implement
class AgentInterface(ABC):
@abstractmethod
def initialize(self):
pass
@abstractmethod
def execute(self):
pass
@abstractmethod
def get_data(self):
pass
@abstractmethod
def shutdown(self):
pass
# MASTERMIND Class
class MASTERMIND:
def __init__(self):
self.agent_store: Dict[str, AgentInterface] = {}
self.data_store: Dict[str, Union[str, Dict]] = {}
self.load_config()
def load_config(self):
try:
with open("config.json", "r") as f:
self.config = json.load(f)
except Exception as e:
logging.error(f"Could not load config: {e}")
def load_agent(self, agent_name: str, agent_class: Type[AgentInterface]):
# Security Check
if not self.validate_agent(agent_name):
logging.error(f"Agent {agent_name} failed the security validation.")
return
try:
agent_instance = agent_class()
agent_instance.initialize()
self.agent_store[agent_name] = agent_instance
except Exception as e:
logging.error(f"Failed to load agent {agent_name}: {e}")
def unload_agent(self, agent_name: str):
try:
agent_instance = self.agent_store.pop(agent_name)
agent_instance.shutdown()
except KeyError:
logging.error(f"Agent {agent_name} not found.")
except Exception as e:
logging.error(f"Failed to unload agent {agent_name}: {e}")
def execute_agents(self):
threads = []
for agent_name, agent_instance in self.agent_store.items():
thread = threading.Thread(target=self.execute_single_agent, args=(agent_name, agent_instance,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
def execute_single_agent(self, agent_name: str, agent_instance: AgentInterface):
try:
agent_instance.execute()
agent_data = agent_instance.get_data()
self.accumulate_data(agent_name, agent_data)
except Exception as e:
logging.error(f"Failed to execute agent {agent_name}: {e}")
def accumulate_data(self, agent_name: str, data: Union[str, Dict]):
# Data Validation
if not self.validate_data(data):
logging.error(f"Data from agent {agent_name} failed the validation check.")
return
self.data_store[agent_name] = data
def get_data(self, agent_name: str):
return self.data_store.get(agent_name, "Data not found.")
def validate_agent(self, agent_name: str) -> bool:
# For now, a simple validation to check if the agent is in the allowed list
return agent_name in self.config.get("allowed_agents", [])
def validate_data(self, data: Union[str, Dict]) -> bool:
# Placeholder for more complex data validation
return True
def monitor_resources(self):
# Simple resource monitoring using psutil
cpu_percent = psutil.cpu_percent()
memory_info = psutil.virtual_memory()
logging.info(f"CPU Usage: {cpu_percent}%")
logging.info(f"Memory Usage: {memory_info.percent}%")
# Save data store to JSON file
def save_data_store(mastermind_instance: MASTERMIND):
try:
with open("data_store.json", "w") as f:
json.dump(mastermind_instance.data_store, f)
except Exception as e:
logging.error(f"Failed to save data store: {e}")
# Example of a simple agent that implements the AgentInterface
class SimpleAgent(AgentInterface):
def initialize(self):
self.data = "Initialized"
def execute(self):
self.data = "Executed"
def get_data(self):
return self.data
def shutdown(self):
self.data = "Shutdown"
# Test the MASTERMIND class with a simple agent
if __name__ == "__main__":
mastermind = MASTERMIND()
mastermind.load_agent("SimpleAgent", SimpleAgent)
mastermind.execute_agents()
save_data_store(mastermind)
mastermind.monitor_resources()