diff --git a/.gitignore b/.gitignore index 0c6bf2e..bed3fd8 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,7 @@ Pipfile.lock .vscode/ .gitignore .vscode/settings.json + +# Logs from simulator program +simulator/logs/ + diff --git a/simulator/demo.py b/simulator/demo.py new file mode 100644 index 0000000..6c3bddc --- /dev/null +++ b/simulator/demo.py @@ -0,0 +1,109 @@ +import simpy +import logging +import peer +import network +import driver +import processor +from simple_pubsub import * + +def publisher(peer, topic_name, count, msg = None): + environment = peer.driver.env + yield environment.timeout(20) # Espera para que o driver esteja pronto + service = pubsub_service.PS_Service(peer.driver) + node = domain_participant.Domain_Participant(service) + topic = node.create_topic(topic_name) + pub = node.create_publisher(topic) + for i in range(count): + pub.write(str(topic_name) + ": Message #" + str(i+1) + ': ' + str(msg)) + yield environment.timeout(2) + +def subscriber(peer, topic_name): + environment = peer.driver.env + yield environment.timeout(100) + service = pubsub_service.PS_Service(peer.driver) + node = domain_participant.Domain_Participant(service) + topic = node.create_topic(topic_name) + sub = node.create_subscriber(topic, sub_callback) + yield environment.timeout(0) + +def sub_callback(subscriber): + msg = subscriber.read() + #print("Received: " + str(msg)) + +def subscriber2(peer, topic_name): + environment = peer.driver.env + yield environment.timeout(100) + service = pubsub_service.PS_Service(peer.driver) + node = domain_participant.Domain_Participant(service) + topic = node.create_topic(topic_name) + sub = node.create_subscriber(topic, sub_callback2) + yield environment.timeout(0) + +def sub_callback2(subscriber): + msg = subscriber.read() + #print("Received: " + str(msg)) + +# Configuração do root logger +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +handlers = [console_handler] +logging.basicConfig(level = logging.INFO, + format = '[%(levelname)s] [%(module)10s] %(message)s', + handlers = handlers +) + +NUM_PEERS = 2 +SIM_DURATION = 5000 + +# create env +env = simpy.Environment() + +# network +net = network.Network(env,2) + +#create peers + +nodes = [] + +topic_name = 'demo' + +# Setting up publisher +proc_0 = processor.Processor(env, 0, 3) +dri_0 = driver.Driver(net, proc_0) +peer_0 = peer.Peer(dri_0, 0) +env.process(dri_0.run()) +env.process(publisher(peer_0, topic_name, 100, '1st pub')) + +# Setting up subscriber + +proc_1 = processor.Processor(env, 1, 3) +dri_1 = driver.Driver(net, proc_1) +peer_1 = peer.Peer(dri_1, 1) +env.process(dri_1.run()) +env.process(subscriber(peer_1, topic_name)) + +# Setting up publisher that'll have no subscribers + +proc_2 = processor.Processor(env, 2, 3) +dri_2 = driver.Driver(net, proc_2) +peer_2 = peer.Peer(dri_2, 2) +env.process(dri_2.run()) +env.process(publisher(peer_2, 'nosub', 100)) + +# Setting up 2nd publisher to topic_name + +proc_3 = processor.Processor(env, 3, 3) +dri_3 = driver.Driver(net, proc_3) +peer_3 = peer.Peer(dri_3, 3) +env.process(dri_3.run()) +env.process(publisher(peer_3, topic_name, 100, '2nd pub')) + +# Setting up 2nd subscriber + +proc_4 = processor.Processor(env, 4, 3) +dri_4 = driver.Driver(net, proc_4) +peer_4 = peer.Peer(dri_4, 4) +env.process(dri_4.run()) +env.process(subscriber2(peer_4, topic_name)) + +env.run(until=SIM_DURATION) diff --git a/simulator/driver.py b/simulator/driver.py index e949a1c..3dd6154 100644 --- a/simulator/driver.py +++ b/simulator/driver.py @@ -52,6 +52,9 @@ def fetch_peer_list(self): def disconnect(self): self.address = None + def is_connected(self): + return self.address is not None + def advertise(self, msg): for z in self.network.send_broadcast(self.address, msg): yield z @@ -98,9 +101,7 @@ def execute_stored_calls(self): if function_name == 'send': to_addr = function_call[1] msg = function_call[2] - for z in self.send(to_addr, msg): - yield z + self.env.process(self.send(to_addr, msg)) elif function_name == 'advertise': msg = function_call[1] - for z in self.advertise(msg): - yield z \ No newline at end of file + self.env.process(self.advertise(msg)) \ No newline at end of file diff --git a/simulator/latency_report.py b/simulator/latency_report.py new file mode 100644 index 0000000..dd28965 --- /dev/null +++ b/simulator/latency_report.py @@ -0,0 +1,252 @@ +import simpy +import logging +import peer +import network +import driver +import processor +import pandas +from datetime import datetime +from simple_pubsub import * + +# Configuração do root logger +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +handlers = [console_handler] +logging.basicConfig(level = logging.INFO, + format = '[%(levelname)s] [%(module)15s] %(message)s', + handlers = handlers +) + +class PublisherForLatencyTest: + + def __init__(self, peer, topic_name, count, delay): + self.peer = peer + self.id = peer.id + self.topic_name = topic_name + self.count = count + self.delay = delay + 50 + + def publish(self): + yield self.peer.driver.env.timeout(self.delay) + for z in self.wait_for_connection(): + yield z + service = pubsub_service.PS_Service(self.peer.driver) + participant = domain_participant.Domain_Participant(service) + topic = participant.create_topic(self.topic_name) + pub = participant.create_publisher(topic) + for i in range(self.count): + time = self.peer.driver.get_time() + msg = (self.id, i, time) + pub.write(msg) + yield self.peer.driver.env.timeout(1) + + def wait_for_connection(self): + while (self.peer.driver.is_connected() is False): + yield self.peer.driver.env.timeout(1) + +class SubscriberForLatencyTest: + + def __init__(self, peer, topic_name, delay): + self.peer = peer + self.id = peer.id + self.topic_name = topic_name + self.latency_list = {} + self.delay = delay + + def subscribe(self): + yield self.peer.driver.env.timeout(self.delay) + for z in self.wait_for_connection(): + yield z + service = pubsub_service.PS_Service(self.peer.driver) + participant = domain_participant.Domain_Participant(service) + topic = participant.create_topic(self.topic_name) + sub = participant.create_subscriber(topic, self.get_latency) + + def wait_for_connection(self): + while (self.peer.driver.is_connected() is False): + yield self.peer.driver.env.timeout(1) + + def get_latency(self, subscriber): + msg = subscriber.read().get_raw_data() + publisher_id = str(msg[0]) + msg_num = str(msg[1]) + send_time = msg[2] + now = self.peer.driver.get_time() + latency = now - send_time + self.store_in_latency_list(publisher_id, msg_num, latency) + + def store_in_latency_list(self, publisher_id, msg_num, latency): + if publisher_id not in self.latency_list: + self.latency_list[publisher_id] = {} + self.latency_list[publisher_id][msg_num] = latency + +class FileWriterForLatencyReport: + + def __init__(self): + self.current_time = str(datetime.now()).replace(':', '_') + self.filename = 'latency_test_' + self.current_time + '.txt' + self.subscriber_list = None + self.publisher_count = None + self.msg_count = None + + def get_simulation_data(self, subscriber_list, publisher_count, msg_count): + self.subscriber_list = subscriber_list + self.publisher_count = publisher_count + self.msg_count = msg_count + + def write(self): + with open(self.filename, 'a') as f: + f.write('--- Latency Test Report - ' + str(datetime.now()) + '\n\n') + f.write('Publishers: ' + str(self.publisher_count) + '\n') + f.write('Subscribers: ' + str(len(self.subscriber_list)) + '\n') + f.write('Messages per Publisher: ' + str(self.msg_count) + '\n\n') + + for i, subscriber in enumerate(self.subscriber_list): + f.write('> Subscriber #' + str(i) + '\n') + for peer_id, msg_dict in subscriber.latency_list.items(): + f.write(' > Messages from Peer #' + peer_id + ' :\n') + for msg_num, latency in msg_dict.items(): + f.write(' #' + msg_num + ': ' + str(latency) + '\n') + f.write('\n') + f.write('\n') + +class SimulationDataFrameCreator: + + def __init__(self): + self.latency_data = [] + + def add_single_simulation_data (self, subscriber_list, publisher_number, subscriber_number, msg_num): + latency_values = [] + for subscriber in subscriber_list: + for msg_dict in subscriber.latency_list.values(): + for latency in msg_dict.values(): + latency_values.append(latency) + latency_series = pandas.Series(data=latency_values) + lowest_latency = latency_series.min() + highest_latency = latency_series.max() + avg_latency = latency_series.mean() + latency_standard_deviation = latency_series.std() + simulation_data = ( + publisher_number, + subscriber_number, + msg_num, + avg_latency, + highest_latency, + lowest_latency, + latency_standard_deviation + ) + self.latency_data.append(simulation_data) + return + + def save_to_csv(self): + print(len(self.latency_data)) + df = pandas.DataFrame(data=self.latency_data, columns=[ + 'Publishers', 'Subscribers', 'Msg Count', 'Avg Latency', + 'Highest Latency', 'Lowest Latency', 'Standard Deviation']) + #pandas.set_option('display.max_columns', 500) + time = str(datetime.now()).replace(':', '_') + df.to_csv('latency_report_' + time + '.csv' , index=False) + +class LatencyTest: + + def __init__(self, publisher_number, subscriber_number, network_latency, processor_latency, max_peers, sim_duration): + # if (publisher_number is 0 or subscriber_number is 0): + # raise RuntimeError("Invalid number of peers") + self.env = simpy.Environment() + self.net = network.Network(self.env, network_latency, max_peers) + self.full_connection_time_estimate = max_peers * network_latency + self.network_latency = network_latency + self.processor_latency = processor_latency + self.duration = sim_duration + self.node_count = 0 + self.topic_name = "latency_test" + self.msg_quantity = 1 + self.publishers = [] + self.subscribers = [] + self.setup_publishers(publisher_number) + self.setup_subscribers(subscriber_number) + + def generate_simulation_variables(self): + pub_range = 100 + sub_range = 100 + # Casos com até 100 publishers e 100 subscribers + for i in range(pub_range): + for j in range(sub_range): + pubs = i + 1 + subs = j + 1 + yield (pubs, subs) + + def reset_simulation(self, publisher_number, subscriber_number, max_peers): + # if (publisher_number is 0 or subscriber_number is 0): + # raise RuntimeError("Invalid number of peers") + self.env = simpy.Environment() + self.net = network.Network(self.env, self.network_latency, max_peers) + self.full_connection_time_estimate = max_peers * self.network_latency + self.node_count = 0 + self.topic_name = "latency_test" + self.publishers = [] + self.subscribers = [] + self.setup_publishers(publisher_number) + self.setup_subscribers(subscriber_number) + + def run_simulations_and_gather_data(self): + dfcreator = SimulationDataFrameCreator() + fwriter = FileWriterForLatencyReport() + for vars in self.generate_simulation_variables(): + publisher_number = vars[0] + subscriber_number = vars[1] + max_peers = publisher_number + subscriber_number + self.reset_simulation(publisher_number, subscriber_number, max_peers) + self.env.run() + self.env.run(until=self.duration) + fwriter.get_simulation_data(self.subscribers, len(self.publishers), self.msg_quantity) + fwriter.write() + dfcreator.add_single_simulation_data( + self.subscribers, len(self.publishers), len(self.subscribers), self.msg_quantity) + dfcreator.save_to_csv() + + def setup_simulation_objects(self): + proc = processor.Processor(self.env, self.node_count, self.processor_latency) + dri = driver.Driver(self.net, proc) + new_peer = peer.Peer(dri, self.node_count) + return (dri, new_peer) + + def setup_publishers(self, publisher_number): + for i in range(publisher_number): + driver, peer = self.setup_simulation_objects() + publisher = PublisherForLatencyTest(peer, self.topic_name, self.msg_quantity, self.full_connection_time_estimate) + self.publishers.append(publisher) + self.env.process(driver.run()) + self.env.process(publisher.publish()) + self.node_count += 1 + + def setup_subscribers(self, subscriber_number): + for i in range(subscriber_number): + driver, peer = self.setup_simulation_objects() + subscriber = SubscriberForLatencyTest(peer, self.topic_name, self.full_connection_time_estimate) + self.subscribers.append(subscriber) + self.env.process(driver.run()) + self.env.process(subscriber.subscribe()) + self.node_count += 1 + + def run(self): + self.env.run(until=self.duration) + fwriter = FileWriterForLatencyReport() + fwriter.get_simulation_data(self.subscribers, len(self.publishers), self.msg_quantity) + fwriter.write() + dfcreator = SimulationDataFrameCreator() + dfcreator.add_single_simulation_data( + self.subscribers, len(self.publishers), len(self.subscribers), self.msg_quantity) + dfcreator.save_to_csv() + + +num_pub = 2 +num_sub = 2 +max_peers = num_pub + num_sub +net_latency = 2 +proc_latency = 3 +sim_duration = 1000000 + +latency_test = LatencyTest(num_pub, num_sub, net_latency, proc_latency, max_peers, sim_duration) +latency_test.run() +#latency_test.run_simulations_and_gather_data() \ No newline at end of file diff --git a/simulator/network.py b/simulator/network.py index 1b180e1..beb00f8 100644 --- a/simulator/network.py +++ b/simulator/network.py @@ -64,19 +64,17 @@ def send_unicast(self, from_addr, to_addr, msg): def send_broadcast(self, from_addr, msg): self.confirm_peer(from_addr) logging.info(str(self.env.now) + ' :: ' + 'Message Broadcast from {} - {}'.format(from_addr, str(msg))) - with self.node_list_access.request(priority=0) as nl_access: - yield nl_access - for addr in range(len(self.node_list)): - to_addr = self.node_list[addr]['address'] - if to_addr == from_addr: - continue - msg_envelope = [from_addr, to_addr, msg] - with self.channel.request() as rec: - yield rec - node = self.node_list[addr]['node'] - node.receive(msg_envelope) - yield self.env.timeout(self.latency) - #logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope)) + for addr in range(len(self.node_list)): + to_addr = self.node_list[addr]['address'] + if to_addr == from_addr: + continue + msg_envelope = [from_addr, to_addr, msg] + with self.channel.request() as rec: + yield rec + node = self.node_list[addr]['node'] + node.receive(msg_envelope) + yield self.env.timeout(self.latency) + #logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope)) def find_next_available(self): addr = (self.next_available_address + 1) % self.max_hosts diff --git a/simulator/peer.py b/simulator/peer.py index c1257e0..9a62e5d 100644 --- a/simulator/peer.py +++ b/simulator/peer.py @@ -1,5 +1,4 @@ import logging -from simple_dds import * """ Simulates the behavior of a peer in a network. @@ -19,13 +18,13 @@ class Peer: def __init__(self, driver, id): + self.id = id self.driver = driver self.driver.register_handler(self.on_message) self.driver.register_handler(self.on_connect, 'on_connect') self.driver.register_handler(self.on_advertise, 'on_advertise') self.driver.register_handler(self.on_disconnect, 'on_disconnect') self.name = 'peer_{}'.format(id) - self.latest_read_msg = 0 def on_message (self, msg): logging.info(str(self.driver.env.now) + ' :: ' + '{} received msg: {}'.format(self.name, msg)) @@ -41,10 +40,4 @@ def on_disconnect (self): def on_advertise (self, msg): for z in self.driver.advertise(msg): - yield z - - def read_new_message(self, subscriber): - self.latest_read_msg = subscriber.read() - - - + yield z \ No newline at end of file diff --git a/simulator/simple_dds/__init__.py b/simulator/simple_dds/__init__.py deleted file mode 100644 index c2e948e..0000000 --- a/simulator/simple_dds/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -__all__ = ["entity", "dds_service", "domain_participant", "publisher", "subscriber", -"data_object", "topic"] \ No newline at end of file diff --git a/simulator/simple_dds/topic.py b/simulator/simple_dds/topic.py deleted file mode 100644 index f9868b8..0000000 --- a/simulator/simple_dds/topic.py +++ /dev/null @@ -1,27 +0,0 @@ -from simple_dds import entity - -class Topic(entity.Entity): - - def __init__(self, topic_name, participant): - super(Topic, self).__init__() - self.name = topic_name - self.participant = participant - self.publishers = [] - self.subscribers = [] - self.data_objects = {} - self.creation_time = self.participant.service.driver.get_time() - self.last_modified = self.creation_time - - def get_name(self): - return self.name - - def attach_data_object(self, data_object): - if data_object.get_topic_name() == self.name: - handle = data_object.get_instance_handle() - self.data_objects[handle] = data_object - self.last_modified = self.participant.service.driver.get_time() - - def can_be_deleted(self): - no_pubs = len(self.publishers) == 0 - no_subs = len(self.subscribers) == 0 - return no_pubs and no_subs \ No newline at end of file diff --git a/simulator/simple_pubsub/__init__.py b/simulator/simple_pubsub/__init__.py new file mode 100644 index 0000000..7b740bb --- /dev/null +++ b/simulator/simple_pubsub/__init__.py @@ -0,0 +1,2 @@ +__all__ = ["entity", "pubsub_service", "domain_participant", "publisher", "subscriber", +"data_object", "topic"] \ No newline at end of file diff --git a/simulator/simple_dds/data_object.py b/simulator/simple_pubsub/data_object.py similarity index 75% rename from simulator/simple_dds/data_object.py rename to simulator/simple_pubsub/data_object.py index b7e5d7d..57c7ba5 100644 --- a/simulator/simple_dds/data_object.py +++ b/simulator/simple_pubsub/data_object.py @@ -1,4 +1,4 @@ -from simple_dds import entity +from simple_pubsub import entity class Data_Object(entity.Entity): @@ -13,4 +13,7 @@ def __str__(self): return str(self.content) def get_topic_name(self): - return self.topic.get_name() \ No newline at end of file + return self.topic.get_name() + + def get_raw_data(self): + return self.content \ No newline at end of file diff --git a/simulator/simple_dds/domain_participant.py b/simulator/simple_pubsub/domain_participant.py similarity index 77% rename from simulator/simple_dds/domain_participant.py rename to simulator/simple_pubsub/domain_participant.py index 887a085..2669f0c 100644 --- a/simulator/simple_dds/domain_participant.py +++ b/simulator/simple_pubsub/domain_participant.py @@ -1,13 +1,13 @@ -from simple_dds import entity -from simple_dds import topic -from simple_dds import publisher -from simple_dds import subscriber +from simple_pubsub import entity +from simple_pubsub import topic +from simple_pubsub import publisher +from simple_pubsub import subscriber class Domain_Participant(entity.Entity): - def __init__(self, dds_service): + def __init__(self, ps_service): super(Domain_Participant, self).__init__() - self.service = dds_service + self.service = ps_service self.publishers = {} self.subscribers = {} self.topics = {} @@ -18,6 +18,7 @@ def create_topic(self, topic_name): logging.warning(f'{topic_name} already exists.') return None else: + current_time = self.service.driver.get_time() new_topic = topic.Topic(topic_name, self) self.service.add_topic(new_topic) self.topics[topic_name] = new_topic @@ -35,18 +36,18 @@ def create_publisher(self, topic): self.service.assign_handle(new_publisher) handle = new_publisher.get_instance_handle() self.publishers[handle] = new_publisher - self.service.announce_new_publisher(new_publisher) return new_publisher def delete_publisher(self, publisher): pass def create_subscriber(self, topic, listener=None): - data = self.service.retrieve_filtered_data_objects(topic.get_name()) - new_subscriber = subscriber.Subscriber(self, topic, data, listener) + new_subscriber = subscriber.Subscriber(self, topic, listener) self.service.assign_handle(new_subscriber) handle = new_subscriber.get_instance_handle() self.subscribers[handle] = new_subscriber + #self.service.notify_remote_participants_of_new_subscriber(new_subscriber) + topic.attach_local_subscriber(topic.get_name(), new_subscriber) return new_subscriber def delete_subscriber(self, subscriber): diff --git a/simulator/simple_dds/entity.py b/simulator/simple_pubsub/entity.py similarity index 68% rename from simulator/simple_dds/entity.py rename to simulator/simple_pubsub/entity.py index a9dced0..9ea7a77 100644 --- a/simulator/simple_dds/entity.py +++ b/simulator/simple_pubsub/entity.py @@ -5,11 +5,11 @@ def __init__(self): def set_instance_handle(self, handle): if self.instance_handle != 0: - raise RuntimeError("DDS Instance already has a handle.") + raise RuntimeError("PS Instance already has a handle.") else: self.instance_handle = handle def get_instance_handle(self): if self.instance_handle == 0: - raise RuntimeError("DDS Instance has not been assigned a handle") + raise RuntimeError("PS Instance has not been assigned a handle") return self.instance_handle \ No newline at end of file diff --git a/simulator/simple_dds/publisher.py b/simulator/simple_pubsub/publisher.py similarity index 57% rename from simulator/simple_dds/publisher.py rename to simulator/simple_pubsub/publisher.py index 13997f3..7e44406 100644 --- a/simulator/simple_dds/publisher.py +++ b/simulator/simple_pubsub/publisher.py @@ -1,5 +1,5 @@ -from simple_dds import entity -from simple_dds import data_object +from simple_pubsub import entity +from simple_pubsub import data_object class Publisher(entity.Entity): @@ -7,10 +7,14 @@ def __init__(self, participant, topic): super(Publisher, self).__init__() self.participant = participant self.topic = topic + self.subscriptions = 0 def get_topic(self): return self.topic def write(self, data): new_data = data_object.Data_Object(self, self.topic, data) - self.participant.service.add_data_object(new_data) \ No newline at end of file + self.participant.service.add_data_object(new_data) + + def get_subscriber_count(self): + return self.topic.get_subscription_count() \ No newline at end of file diff --git a/simulator/simple_dds/dds_service.py b/simulator/simple_pubsub/pubsub_service.py similarity index 52% rename from simulator/simple_dds/dds_service.py rename to simulator/simple_pubsub/pubsub_service.py index 037f54e..6c90e0b 100644 --- a/simulator/simple_dds/dds_service.py +++ b/simulator/simple_pubsub/pubsub_service.py @@ -1,11 +1,12 @@ # TODO -# - Não pedir todos os objetos-dado na criação do DDS Service; Fazer isso na.. +# - Não pedir todos os objetos-dado na criação do PS Service; Fazer isso na.. # .. criação de subscriber, e somente dados do tópico específico. import logging +import copy from threading import Lock from singleton import Singleton -from simple_dds import entity +from simple_pubsub import entity class UniqueHandleController(metaclass=Singleton): @@ -20,16 +21,15 @@ def generate_handle(self): self.next_available_handle += 1 return handle -class DDS_Service(entity.Entity): +class PS_Service(entity.Entity): def __init__(self, driver): self.handle_controller = UniqueHandleController() self.instance_handle = self.handle_controller.generate_handle() self.driver = driver self.peer_list = [] - self.handles = {} # Handle: Entity - self.participants = {} # Handle: Participant - self.local_participants = {} + self.local_participants = {} # Handle: Participant + self.remote_participants = {} # Handle: IP self.topics = {} # Topic name: Topic self.data_objects = {} # Handle: Data object self.message_handlers = {} @@ -37,10 +37,10 @@ def __init__(self, driver): self._add_message_handler_methods() self._attach_msg_reception_handler_to_driver() self._discover_peers() - self._request_full_domain_data() + #self._request_full_domain_data() def set_instance_handle(self, handle): - raise RuntimeError("DDS Service's handle cannot be changed.") + raise RuntimeError("PS Service's handle cannot be changed.") def get_instance_handle(self): return self.instance_handle @@ -55,20 +55,19 @@ def _send_to_all_peers(self, msg): def assign_handle(self, entity): handle = self.handle_controller.generate_handle() entity.set_instance_handle(handle) - self.handles[handle] = entity def add_participant(self, participant): self.assign_handle(participant) handle = participant.get_instance_handle() - self.participants[handle] = participant self.local_participants[handle] = participant - self._send_local_modification('NEW_PARTICIPANT', participant) + local_address = self.driver.address + self._send_local_modification('NEW_PARTICIPANT', (handle, local_address)) def add_topic(self, topic): self.assign_handle(topic) topic_key = topic.get_name() self.topics[topic_key] = topic - self._send_local_modification('NEW_TOPIC', topic) + #self._send_local_modification('NEW_TOPIC', topic) def add_data_object(self, data_object): self.assign_handle(data_object) @@ -77,8 +76,8 @@ def add_data_object(self, data_object): self._attach_data_object_to_topic(data_object) self._send_local_modification('NEW_DATA', data_object) - def announce_new_publisher(self, new_publisher): - self._send_local_modification('NEW_PUBLISHER', new_publisher) + def announce_new_subscriber(self, new_subscriver): + self._send_local_modification('NEW_SUBSCRIBER', new_subscriber) def topic_exists(self, topic_name): return topic_name in self.topics @@ -86,8 +85,8 @@ def topic_exists(self, topic_name): def get_topic(self, topic_name): if self.topic_exists(topic_name): return self.topics[topic_name] - else: # Como lidar com isto? - pass + else: + return None def _erase_topic_from_domain(self, topic): # Deleta tópico e todos os dados associados a ele. @@ -98,69 +97,105 @@ def _discover_peers(self): def _add_message_handler_methods(self): self.message_handlers['NEW_PARTICIPANT'] = self._append_remote_participant - self.message_handlers['NEW_TOPIC'] = self._append_remote_topic + # self.message_handlers['NEW_TOPIC'] = self._append_remote_topic self.message_handlers['NEW_DATA'] = self._append_data_object self.message_handlers['SEND_ALL_DATA'] = self._send_full_domain_data self.message_handlers['ALL_DATA'] = self._receive_full_domain_data - - def _append_remote_participant(self, r_participant): - handle = r_participant.get_instance_handle() - if handle not in self.participants and handle not in self.handles: - self.handles[handle] = r_participant - self.participants[handle] = r_participant - - def _append_remote_topic(self, r_topic): - topic_name = r_topic.get_name() - if not self.topic_exists(topic_name): - handle = r_topic.get_instance_handle() - self.handles[handle] = r_topic - self.topics[topic_name] = r_topic - else: - self._resolve_topic_conflict(r_topic) - - def _resolve_topic_conflict(self, topic): - # TODO: Completar este método. - # O tópico com a instance handle menor tem prioridade. - # Caso o serviço local tenha prioridade, é necessário informar os outros nodos. - pass + self.message_handlers['NEW_SUBSCRIBER'] = self._acknowledge_new_remote_subscriber + self.message_handlers['CONFIRM_SUBSCRIBER'] = self.confirm_subscriber_existence + + # Espera uma 2-tupla de handle e IP. + def _append_remote_participant(self, r_participant_info): + handle = r_participant_info[0] + remote_ip = r_participant_info[1] + if handle not in self.remote_participants: + self.remote_participants[handle] = remote_ip + + # def _append_remote_topic(self, r_topic): + # topic_name = r_topic.get_name() + # if not self.topic_exists(topic_name): + # handle = r_topic.get_instance_handle() + # self.topics[topic_name] = r_topic + # else: + # self._resolve_topic_conflict(r_topic) + + # def _resolve_topic_conflict(self, topic): + # # TODO: Completar este método. + # # O tópico com a instance handle menor tem prioridade. + # # Caso o serviço local tenha prioridade, é necessário informar os outros nodos. + # pass def _append_data_object(self, new_data): handle = new_data.get_instance_handle() self.data_objects[handle] = new_data - if handle not in self.handles: - self.handles[handle] = new_data self._send_data_object_to_all_participants(new_data) self._attach_data_object_to_topic(new_data) - # TODO: Ainda falta completar. - def _notify_subscribers_of_new_publisher(self, new_publisher): - for subscriber in self.participants.subscribers.values(): - topic_name = new_publisher.get_topic().get_name() - pass + def notify_remote_participants_of_new_subscriber(self, subscriber): + topic_name = subscriber.get_topic().get_name() + local_address = self.driver.address + handle = subscriber.get_instance_handle() + subscriber_info = (topic_name, handle, local_address) + msg = ('NEW_SUBSCRIBER', subscriber_info) + self._send_to_all_peers(msg) + + def _acknowledge_new_remote_subscriber(self, new_subscriber_info): + topic_name = new_subscriber_info[0] + handle = new_subscriber_info[1] + address = new_subscriber_info[2] + if self.topic_exists(topic_name): + self.topics[topic_name].attach_remote_subscriber(topic_name, handle, address) + + def assert_remote_subscriber_liveliness(self, topic_name, sub_dictionary): + local_address = self.driver.address + for handle, to_address in sub_dictionary.items(): + info = (topic_name, handle, local_address) + msg = ('CONFIRM_SUBSCRIBER', info) + self.driver.async_function_call(['send', to_address, msg]) + + def confirm_subscriber_existence(self, subscriber_info): + topic_name = subscriber_info[0] + sub_handle = subscriber_info[1] + remote_address = subscriber_info[2] + for participant in self.local_participants.values(): + if sub_handle in participant.subscribers: + subscriber = participant.subscribers[sub_handle] + self.confirm_subscription_to_remote_participant(subscriber, remote_address) + + def confirm_subscription_to_remote_participant(self, subscriber, to_address): + topic_name = subscriber.get_topic().get_name() + local_address = self.driver.address + handle = subscriber.get_instance_handle() + subscriber_info = (topic_name, handle, local_address) + msg = ('NEW_SUBSCRIBER', subscriber_info) + self.driver.async_function_call(['send', to_address, msg]) def _send_data_object_to_all_participants(self, data_object): - for participant in self.local_participants.values(): - participant.update_all_subscribers(data_object) + topic_name = data_object.get_topic_name() + if self.topic_exists(topic_name): + for participant in self.local_participants.values(): + participant.update_all_subscribers(data_object) def _attach_data_object_to_topic(self, data_object): topic_name = data_object.get_topic_name() if self.topic_exists(topic_name): - self.topics[topic_name].attach_data_object(data_object) + current_time = self.driver.get_time() + self.topics[topic_name].attach_data_object(data_object, current_time) def _send_full_domain_data(self, to_address): local_data = [] - for participant in self.participants.values(): - packet = ('NEW_PARTICIPANT', participant) - local_data.append(packet) - for topic in self.topics.values(): - packet = ('NEW_TOPIC', topic) - local_data.append(packet) - for data_object in self.data_objects.values(): - packet = ('NEW_DATA', data_object) + for participant in self.local_participants.values(): + local_address = self.driver.address + packet = ('NEW_PARTICIPANT', (participant.get_instance_handle(), local_address)) local_data.append(packet) + # No momento, não queremos enviar todos os dados para todos os nodos. + # + # for data_object in self.data_objects.values(): + # packet = ('NEW_DATA', data_object) + # local_data.append(packet) msg = ('ALL_DATA', local_data) self.driver.async_function_call(['send', to_address, msg]) - + def _receive_full_domain_data(self, r_data): for element in r_data: self._interpret_data(element) @@ -177,15 +212,15 @@ def _interpret_data(self, data): # .. uma string descrevendo o pedido, o segundo elemento os dados em si if data[0] not in self.message_handlers: pass - #logging.warning(str(self.driver.get_time()) + ' :: ' + f'DDS Service (Handle {str(self.instance_handle)}): Invalid request: {str(data[1])}') + #logging.warning(str(self.driver.get_time()) + ' :: ' + f'PSS Service (Handle {str(self.instance_handle)}): Invalid request: {str(data[1])}') else: self.message_handlers[data[0]](data[1]) def _attach_msg_reception_handler_to_driver(self): - self.driver.register_handler(self._receive_incoming_data, 'on_message') + self.driver.register_handler(self.receive_incoming_data, 'on_message') - def _receive_incoming_data(self, msg): - logging.info(str(self.driver.get_time()) + ' :: ' + f'Data received by DDS Service, handle {str(self.instance_handle)}') + def receive_incoming_data(self, msg): + logging.info(str(self.driver.get_time()) + ' :: ' + f'Data received by PS Service, handle {str(self.instance_handle)}') for z in self._unpack_data(msg): yield z diff --git a/simulator/simple_dds/subscriber.py b/simulator/simple_pubsub/subscriber.py similarity index 72% rename from simulator/simple_dds/subscriber.py rename to simulator/simple_pubsub/subscriber.py index 3051d01..e3e27e8 100644 --- a/simulator/simple_dds/subscriber.py +++ b/simulator/simple_pubsub/subscriber.py @@ -1,17 +1,15 @@ import logging from queue import * -from simple_dds import entity +from simple_pubsub import entity class Subscriber(entity.Entity): - def __init__(self, participant, topic, data_objects, listener_method=None): + def __init__(self, participant, topic, listener_method=None): super(Subscriber, self).__init__() self.participant = participant self.topic = topic self.available_data = Queue() - for element in data_objects: - self.available_data.put(element) self.listener = listener_method def get_topic(self): @@ -20,8 +18,8 @@ def get_topic(self): def receive_data(self, data_object): if data_object.get_topic_name() == self.topic.get_name(): self.available_data.put(data_object) - if self.listener != None: - self.listener(self) + if self.listener != None: + self.listener(self) def read(self): try: diff --git a/simulator/simple_pubsub/topic.py b/simulator/simple_pubsub/topic.py new file mode 100644 index 0000000..6b16982 --- /dev/null +++ b/simulator/simple_pubsub/topic.py @@ -0,0 +1,51 @@ +from simple_pubsub import entity +import copy + +class Topic(entity.Entity): + + def __init__(self, topic_name, participant): + super(Topic, self).__init__() + self.name = topic_name + self.participant = participant + self.publishers = {} + self.remote_subscribers = {} # Handle: IP + self.local_subscribers = {} + self.data_objects = {} + self.creation_time = self.participant.service.driver.get_time() + self.last_modified = self.creation_time + self.subscriber_total = 0 + self.local_subscriber_total = 0 + + def get_name(self): + return self.name + + def attach_data_object(self, data_object, current_time): + if data_object.get_topic_name() == self.name: + handle = data_object.get_instance_handle() + self.data_objects[handle] = data_object + self.last_modified = self.participant.service.driver.get_time() + + def attach_remote_subscriber(self, topic_name, subscriber_handle, network_address): + if topic_name is self.name and subscriber_handle not in self.remote_subscribers: + self.remote_subscribers[subscriber_handle] = network_address + self.subscriber_total += 1 + + def attach_local_subscriber(self, topic_name, subscriber): + subscriber_handle = subscriber.get_instance_handle() + if topic_name is self.name and subscriber_handle not in self.local_subscribers: + self.local_subscribers[subscriber_handle] = subscriber + self.subscriber_total +=1 + self.local_subscriber_total += 1 + + def get_subscription_count(self): + return self.subscriber_total + + def update_subscription_count(self): + self.subscriber_total = self.local_subscriber_total # Presumimos que somente subscribers locais continuam "vivos". + sub_dictionary = dict(self.remote_subscribers) + self.remote_subscribers.clear() + # O número de subscribers não é atualizado imediatamente, de forma que + # se lermos o número de subscribers agora, teríamos uma quantidade possivelmente menor que + # o número real. + self.participant.service.assert_remote_subscriber_liveliness(self.name, sub_dictionary) + diff --git a/tests/simulator/test_dds.py b/tests/simulator/test_dds.py deleted file mode 100644 index 9b14a4d..0000000 --- a/tests/simulator/test_dds.py +++ /dev/null @@ -1,112 +0,0 @@ -import pytest -import sys, os -myPath = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, myPath + '/../../simulator/') - -import simpy -import random -from network import Network -from processor import Processor -from driver import Driver -from peer import Peer -from simple_dds import * - -@pytest.fixture -def environment_and_network(): - network_latency = 2 - max_peers = 300 - env = simpy.Environment() - net = Network(env, network_latency, max_peers) - return (env, net) - -@pytest.fixture -def subscriber_number(): - # Set Number: - return 15 - -def test_simple_publication_two_peers(environment_and_network): - env, net = environment_and_network - proc_latency = 3 - random.seed() - message = random.randrange(1000) - topic_name = random.randrange(1000) - wait_before_publication = 100 - wait_before_reading = 150 - simulation_time = 300 - container = None - - publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) - subscribing_peer = initialize_peer(env, net, 1, 1, proc_latency) - publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) - reading = wait_then_read_message(subscribing_peer, topic_name, message, wait_before_reading) - add_process_to_simulation(env, publication) - add_process_to_simulation(env, reading) - env.run(until=simulation_time) - container = str(subscribing_peer.latest_read_msg) - assert container == str(message) - -def test_simple_publication_to_multiple_peers(environment_and_network, subscriber_number): - env, net = environment_and_network - proc_latency = 3 - random.seed() - message = 'test message' - topic_name = 'test topic' - wait_before_publication = 100 - wait_before_reading = 1000 - simulation_time = 900000 - subscriber_id = 1 - subscribers = [] - received_msg = None - - publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) - publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) - add_process_to_simulation(env, publication) - for i in range(subscriber_number): - subscriber = initialize_peer(env, net, 0, i, proc_latency) - reading = set_up_subscription(subscriber, topic_name, message, wait_before_reading) - add_process_to_simulation(env, reading) - subscribers.append(subscriber) - - env.run(until=simulation_time) - for i, subscriber in enumerate(subscribers): - print(i) - received_msg = str(subscriber.latest_read_msg) - assert received_msg == str(message) - -def initialize_peer(environment, network, proc_id, peer_id, proc_latency): - proc = Processor(environment, proc_id, proc_latency) - dri = Driver(network, proc) - peer = Peer(dri, peer_id) - environment.process(dri.run()) - return peer - -def add_process_to_simulation(environment, method): - environment.process(method) - -def set_up_subscription(peer, topic_name, message, wait_time=100): - yield peer.driver.env.timeout(wait_time) - the_service = dds_service.DDS_Service(peer.driver) - participant = domain_participant.Domain_Participant(the_service) - topic = participant.create_topic(topic_name) - # read_new_message é o método 'listener' - sub = participant.create_subscriber(topic, peer.read_new_message) - -# TODO: O nome não é adequado: faz mais do que publicar mensagem, antes cria objetos.. -# .. necessários. É preciso mudar depois. -def wait_then_publish_message(peer, topic_name, message, wait_time=100): - yield peer.driver.env.timeout(wait_time) - the_service = dds_service.DDS_Service(peer.driver) - participant = domain_participant.Domain_Participant(the_service) - topic = participant.create_topic(topic_name) - pub = participant.create_publisher(topic) - pub.write(message) - -def wait_then_read_message(peer, topic_name, message, wait_time=100): - yield peer.driver.env.timeout(wait_time) - the_service = dds_service.DDS_Service(peer.driver) - participant = domain_participant.Domain_Participant(the_service) - topic = participant.create_topic(topic_name) - sub = participant.create_subscriber(topic) - # Atenção à linha a seguir. Talvez seja necessário alterar o valor mais tarde. - yield peer.driver.env.timeout(17) # Tempo para recebimento de mensagens de outros peers contendo dados do domínio. - peer.latest_read_msg = sub.read() \ No newline at end of file diff --git a/tests/simulator/test_pubsub.py b/tests/simulator/test_pubsub.py new file mode 100644 index 0000000..25e6f16 --- /dev/null +++ b/tests/simulator/test_pubsub.py @@ -0,0 +1,185 @@ +import pytest +import sys, os +myPath = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, myPath + '/../../simulator/') + +import simpy +import random +from network import Network +from processor import Processor +from driver import Driver +from peer import Peer +from simple_pubsub import * + +@pytest.fixture +def environment_and_network(): + network_latency = 2 + max_peers = 300 + env = simpy.Environment() + net = Network(env, network_latency, max_peers) + return (env, net) + +@pytest.fixture +def subscriber_number(): + # Set Number: + return 200 + +class MsgReceptionTestApp: + + def __init__(self, peer): + self.peer = peer + self.latest_read_msg = None + + # Store message + def callback(self, subscriber): + self.latest_read_msg = subscriber.read() + +class SubscriptionCountTestApp: + + def __init__(self, peer, publisher = None): + self.peer = peer + self.publisher = publisher + + def subscription_count(self): + return self.publisher.get_subscriber_count() + +def test_simple_publication_two_peers(environment_and_network): + env, net = environment_and_network + proc_latency = 3 + random.seed() + message = random.randrange(1000) + topic_name = random.randrange(1000) + wait_before_publication = 100 + wait_before_subscription = 50 + simulation_time = 300 + container = None + + publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) + subscribing_peer = initialize_peer(env, net, 1, 1, proc_latency) + app = MsgReceptionTestApp(subscribing_peer) + publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) + reading = set_up_subscription(app, topic_name, wait_before_subscription) + add_process_to_simulation(env, publication) + add_process_to_simulation(env, reading) + env.run(until=simulation_time) + container = str(app.latest_read_msg) + assert container == str(message) + +# TODO: Elaborar uma fórmula para determinar automaticamente tempos de espera e de simulação, levando... +# .. em consideração os tempos de latência dados. +def test_simple_publication_to_multiple_peers(environment_and_network, subscriber_number): + env, net = environment_and_network + proc_latency = 3 + random.seed() + message = 'test message' + topic_name = 'test topic' + wait_before_publication = 500 + wait_before_subscription = 50 + simulation_time = 100000 + subscriber_id = 1 + subscribers = [] + received_msg = None + + publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) + publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) + add_process_to_simulation(env, publication) + for i in range(subscriber_number): + subscribing_peer = initialize_peer(env, net, 0, i, proc_latency) + sub_app = MsgReceptionTestApp(subscribing_peer) + reading = set_up_subscription(sub_app, topic_name, wait_before_subscription) + add_process_to_simulation(env, reading) + subscribers.append(sub_app) + + env.run(until=simulation_time) + for i, subscriber in enumerate(subscribers): + print(i) + received_msg = str(subscriber.latest_read_msg) + assert received_msg == str(message) + +# TODO: Refazer lógica da contagem de assinantes, depois trazer o teste de volta +# def test_get_subscription_count(environment_and_network, subscriber_number): +# env, net = environment_and_network +# proc_latency = 3 +# random.seed() +# message = 'test message' +# topic_name = 'test topic' +# wait_before_publication = 500 +# wait_before_subscription = 50 +# simulation_time = 3000 +# subscriber_id = 1 +# subscribers = [] +# received_msg = None + +# publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) +# publisher_app = SubscriptionCountTestApp(publishing_peer) +# publication = set_up_publisher(publisher_app, topic_name, message, wait_before_publication) +# add_process_to_simulation(env, publication) +# for i in range(subscriber_number): +# subscribing_peer = initialize_peer(env, net, 0, i, proc_latency) +# sub_app = MsgReceptionTestApp(subscribing_peer) +# reading = set_up_subscription(sub_app, topic_name, wait_before_subscription) +# add_process_to_simulation(env, reading) +# subscribers.append(sub_app) + +# env.run(until=simulation_time) +# sub_count = publisher_app.subscription_count() +# assert sub_count == subscriber_number + +# TODO: Adicionar teste mostrando que subscribers não recebem mensagens de tópicos que não +# sejam os seus. + +def initialize_peer(environment, network, proc_id, peer_id, proc_latency): + proc = Processor(environment, proc_id, proc_latency) + dri = Driver(network, proc) + peer = Peer(dri, peer_id) + environment.process(dri.run()) + return peer + +def add_process_to_simulation(environment, method): + environment.process(method) + +def set_up_subscription(application, topic_name, wait_time=100): + yield application.peer.driver.env.timeout(wait_time) + the_service = pubsub_service.PS_Service(application.peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + sub = participant.create_subscriber(topic, application.callback) + +# TODO: O nome não é adequado: faz mais do que publicar mensagem, antes cria objetos.. +# .. necessários. É preciso mudar depois. +def wait_then_publish_message(peer, topic_name, message, wait_time=100): + yield peer.driver.env.timeout(wait_time) + the_service = pubsub_service.PS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + pub = participant.create_publisher(topic) + pub.write(message) + +def wait_then_publish_message_multiple_times(peer, topic_name, message, wait_time=100, count=100): + yield peer.driver.env.timeout(wait_time) + the_service = pubsub_service.PS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + pub = participant.create_publisher(topic) + for i in range(count): + pub.write(message) + yield peer.driver.env.timeout(1) + +def set_up_publisher(application, topic_name, message, wait_time=100): + yield application.peer.driver.env.timeout(wait_time) + the_service = pubsub_service.PS_Service(application.peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + pub = participant.create_publisher(topic) + application.publisher = pub + pub.write(message) + +def wait_then_read_message(peer, topic_name, message, wait_time=100): + yield peer.driver.env.timeout(wait_time) + the_service = pubsub_service.PS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + sub = participant.create_subscriber(topic) + # Atenção à linha a seguir. Talvez seja necessário alterar o valor mais tarde. + yield peer.driver.env.timeout(17) # Tempo para recebimento de mensagens de outros peers contendo dados do domínio. + peer.latest_read_msg = sub.read() \ No newline at end of file