diff --git a/Pipfile b/Pipfile index d2c7f2d..8460d4f 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,8 @@ flake8 = "*" pytest-cov = "*" autopep8 = "*" codacy-coverage = "*" +jupyter = "*" +notebook = "*" [packages] simpy = "*" diff --git a/tests/simulator/__init__.py b/simulator/__init__.py similarity index 100% rename from tests/simulator/__init__.py rename to simulator/__init__.py diff --git a/simulator/custom_error.py b/simulator/custom_error.py new file mode 100644 index 0000000..38c6687 --- /dev/null +++ b/simulator/custom_error.py @@ -0,0 +1,8 @@ + +class Error(Exception): + pass + +class RegistrationError(Error): + + def __init__(self, message): + self.message = message \ No newline at end of file diff --git a/simulator/dds_run.py b/simulator/dds_run.py new file mode 100644 index 0000000..be3d143 --- /dev/null +++ b/simulator/dds_run.py @@ -0,0 +1,51 @@ +import simpy +import logging +import peer +import network +import driver +import processor + +""" +Run app. +Peer control, duration and others details. + +""" +# Configuração do root logger +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +handlers = [console_handler] +logging.basicConfig(level = logging.INFO, + format = '[%(levelname)10s] [%(module)12s] %(message)s', + handlers = handlers +) + +NUM_PEERS = 1 +SIM_DURATION = 1000 + +# create env +env = simpy.Environment() + +# network +net = network.Network(env,2) + +#create peers + +nodes = [] + +teste = env.timeout(200) + + +proc_0 = processor.Processor(env, 0, 3) +dri_0 = driver.Driver(net, proc_0) +peer_0 = peer.Peer(dri_0, 0) +env.process(dri_0.run()) +env.process(peer_0.wait_then_publish_message('TEST', 'Hello World!', 100)) + +proc_1 = processor.Processor(env, 1, 3) +dri_1 = driver.Driver(net, proc_1) +peer_1 = peer.Peer(dri_1, 1) +env.process(dri_1.run()) +env.process(peer_1.dds_read_test()) + + +env.run(until=SIM_DURATION) diff --git a/simulator/driver.py b/simulator/driver.py index 890b56c..e949a1c 100644 --- a/simulator/driver.py +++ b/simulator/driver.py @@ -6,7 +6,7 @@ class Driver: def __init__(self, network, processor): - self.async_events = simpy.Store(network.env) + self.processing_queue = simpy.Store(network.env) self.network = network self.env = network.env self.processor = processor @@ -18,40 +18,52 @@ def __init__(self, network, processor): 'on_disconnect': [] } self.keep_alive_interval = 20 + self.async_calls = simpy.Store(network.env) def run(self): - if self.address is None: - for z in self.connect(): - yield z - self.env.process(self.send_keepalive()) # A partir de agora, mandamos um sinal de vida constantemente - + for z in self.connect(): + yield z + self.env.process(self.execute_stored_calls()) + while True: - event = yield self.async_events.get() + event = yield self.processing_queue.get() for z in self.issue_event(event[0], event[1]): if z: yield z - for z in self.issue_event(event[0], 'on_advertise'): - if z: - yield z + # for z in self.issue_event(event[0], 'on_advertise'): + # if z: + # yield z def connect(self): - for z in self.network.register(self): - yield z - for z in self.issue_event('on_connect', self.address): - yield z + while True: + try: + for z in self.network.register(self): + yield z + for z in self.issue_event('on_connect', self.address): + yield z + break # Se chegarmos aqui, código completado com sucesso, saímos do loop + except ConnectionError as err: + print(err.message) + yield self.env.timeout(1) + + def fetch_peer_list(self): + return self.network.send_addresses(self) + + def disconnect(self): + self.address = None def advertise(self, msg): - msg = 'ADV-'+str(msg) - return self.network.send_broadcast(self.address, msg) + for z in self.network.send_broadcast(self.address, msg): + yield z - def recieve (self, msg_envelope): + def receive(self, msg_envelope): logging.info(str(self.env.now) + ' :: ' + '{} received from {}: {}'.format( - msg_envelope[1], msg_envelope[0], msg_envelope[2])) + msg_envelope[1], msg_envelope[0], str(msg_envelope[2]))) event = ['on_message', msg_envelope] - self.async_events.put(event) + self.processing_queue.put(event) - def send (self, to_addr , msg): + def send(self, to_addr , msg): return self.network.send_unicast(self.address, to_addr, msg) def register_handler (self, method, event='on_message'): @@ -65,5 +77,30 @@ def issue_event (self, event, value=None): yield z def send_keepalive(self): + while True: yield self.env.timeout(self.keep_alive_interval) self.network.renew(self.address) + + def get_time(self): + return self.env.now + + # Coloca uma função na lista de processamento, que será executada + # em ordem. + def async_function_call(self, call_info): + self.async_calls.put(call_info) + + def execute_stored_calls(self): + # TODO: Colocar bloco em função separada. + while True: + function_call = yield self.async_calls.get() + function_name = function_call[0] + # TODO: Mudar implementação para dictionary depois. + if function_name == 'send': + to_addr = function_call[1] + msg = function_call[2] + for z in self.send(to_addr, msg): + yield z + elif function_name == 'advertise': + msg = function_call[1] + for z in self.advertise(msg): + yield z \ No newline at end of file diff --git a/simulator/mockup.txt b/simulator/mockup.txt new file mode 100644 index 0000000..f2cf479 --- /dev/null +++ b/simulator/mockup.txt @@ -0,0 +1,50 @@ +import simpy + +import customdds + +# Neste exemplo, o único tipo de dado que será usado é string, mas a especificação DDS permite a definição de tipos personalizados. +# De fato, a definição e registro de um tipo de dado são ações necessárias numa implementação real. +# Aqui, tentei escrever como um cliente faria uso desse API. Ainda é necessário implementar de fato. Mesmo assim, fica claro que... +# ... o uso do API simplifica bastante o trabalho do usuário, que não precisa se preocupar com o funcionamento da rede e detalhes de transmissão. +# Só pra enfatizar, isto é uma simplificação EXTREMA do DDS, mais precisamente, da implementação OpenDDS. Eu tentei capturar o que eu compreendi sobre ele. + +class ApplicationMockup: + + def __init__(self, domainID): + domainParticipantFactory = customdds.DomainParticipantFactoryImpl.get_instance() # Singleton + # Criamos participante escolhendo uma ID para o domínio + self.participant = domainParticipantFactory.create_participant(domainID) + + def run(self): + # Criamos um tópico + topic_name = "Test" + topic = self.participant.create_topic(topic_name) + # Criamos um publisher + # Dentro de um publisher, é necessário haver uma referência ao DomainParticipant que o criou + publisher = self.participant.create_publisher() + # Criamos um DataWriter com o tópico + writer = publisher.create_datawriter(topic) + + # Podemos esperar que haja pelo menos um subscriber antes de enviar a mensagem.. + # + # Prosseguindo para a escrita + + message_content = "This is a test message" + # Digamos que previamente tenhamos criado um tipo 'Message', de forma que possamos encapsular o conteúdo da mensagem em si... + # ... assim como outras metainformações. + # Dados existem em uma instância, a qual é uma entidade, tendo, assim, um identificador único no domínio. + msg_id = 63 + new_msg = (msg_id, message_content) + # O resultado do método a seguir deve ser o envio da mensagem para todos os subscribers do tópico + writer.write(new_msg) + + # Para o estabelecimento de um subscriber, o código seria bem parecido, exceto que usaríamos um DataReader para ler as mensagens com métodos 'read()' e 'take()'. + + + +# Algumas notas adicionais: +# - No OpenDDS, pelo que entendi, existe um singleton denominado 'TheServiceParticipant' responsável pelo controle das entidades, etc. Esse 'Service' permite que... +# ... haja acesso aos domínios, tópicos, participantes, publishers, subscribers, instâncias de dados, ... ou seja, todas as entidades. +# - O uso de políticas QoS servem para configurar o API, de certa forma impondo exigências no funcionamento do serviço. Uma lista dessas políticas está presente na especificação. +# ... Decidi omitir o uso de QoS policies para simplificar, mas, numa implementação real, elas são indispensáveis. +# - Além do DDS, parece-me ser necessário mais uma camada de abstração. No OpenDDS, eles usam um API chamado TAO. \ No newline at end of file diff --git a/simulator/network.py b/simulator/network.py index 0ef0c12..1b180e1 100644 --- a/simulator/network.py +++ b/simulator/network.py @@ -17,13 +17,13 @@ def __init__(self, env, latency, max_hosts = 100): self.default_lease_time = 40 self.addr_list = [{'node':None, 'lease':0} for i in range(self.max_hosts)] # O índice da lista serve como 'IP' self.node_list = [] # Usamos esta lista para fazer broadcasts e checar empréstimos - self.env.process(self.dhcp()) def register(self, node_driver): with self.channel.request() as rec: yield rec if self.full_capacity: logging.warning(str(self.env.now) + ' :: ' + 'Could not register node: Network at full capacity') + raise ConnectionError("Network at full capacity") else: curr_address = self.next_available_address logging.info(str(self.env.now) + ' :: ' + 'connecting {}'.format(curr_address)) @@ -39,42 +39,44 @@ def register(self, node_driver): self.find_next_available() yield self.timeout(self.latency) + def confirm_peer(self, address): + if address < 0 or address >= self.max_hosts: + raise RuntimeError(f"Invalid address: {address}") + elif self.addr_list[address]['node'] is None: + raise RuntimeError(f"Address not registered: {address}") + # Substituir por um erro personalizado depois? + # Talvez expandir verificação para que veja se o endereço realmente corresponde.. + # .. ao driver que enviou mensagem + # Talvez adicionar mensagem de debug? + + # Adicionar try.. except nas funções de unicast e broadcast, para lidar com erros def send_unicast(self, from_addr, to_addr, msg): + self.confirm_peer(from_addr) + self.confirm_peer(to_addr) logging.info(str(self.env.now) + ' :: ' + 'network sending unicast {} => {}'.format(from_addr, to_addr)) - if(to_addr <= 0): - print('{} address not found (msg from {})'.format( - to_addr, from_addr)) - yield self.env.timeout(0) - else: + with self.channel.request() as rec: + yield rec msg_envelope = [from_addr, to_addr, msg] - with self.channel.request() as rec: - yield rec - node = self.addr_list[to_addr]['node'] - if node: - node.recieve(msg_envelope) - yield self.env.timeout(self.latency) - print(msg_envelope) - else: - print('{} address not found (msg from {})'.format( - to_addr, from_addr)) + node = self.addr_list[to_addr]['node'] + node.receive(msg_envelope) + yield self.env.timeout(self.latency) def send_broadcast(self, from_addr, msg): - logging.info(str(self.env.now) + ' :: ' + 'Message Broadcast from {} - {}'.format(from_addr,msg)) + self.confirm_peer(from_addr) + logging.info(str(self.env.now) + ' :: ' + 'Message Broadcast from {} - {}'.format(from_addr, str(msg))) with self.node_list_access.request(priority=0) as nl_access: yield nl_access for addr in range(len(self.node_list)): to_addr = self.node_list[addr]['address'] - msg_envelope2 = [from_addr, to_addr, msg] + if to_addr == from_addr: + continue + msg_envelope = [from_addr, to_addr, msg] with self.channel.request() as rec: yield rec node = self.node_list[addr]['node'] - if node: # Checando se o nodo ainda faz parte da rede - node.recieve(msg_envelope2) - yield self.env.timeout(self.latency) - logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope2)) - else: - logging.info(str(self.env.now) + ' :: ' + '{} address not found (msg from {})'.format( - to_addr, from_addr)) + node.receive(msg_envelope) + yield self.env.timeout(self.latency) + #logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope)) def find_next_available(self): addr = (self.next_available_address + 1) % self.max_hosts @@ -112,8 +114,15 @@ def end_lease(self, index): self.full_capacity = False logging.info(str(self.env.now) + ' :: ' + 'Lease ended for address {}'.format(address)) + def send_addresses(self, driver): + addr_list = [] + for peer in self.node_list: + if peer['address'] is not driver.address: + addr_list.append(peer['address']) + return addr_list + def dhcp(self): while True: for z in self.check_lease(): yield z - yield self.env.timeout(1) + yield self.env.timeout(1) \ No newline at end of file diff --git a/simulator/peer.py b/simulator/peer.py index 92484f8..c1257e0 100644 --- a/simulator/peer.py +++ b/simulator/peer.py @@ -1,11 +1,13 @@ import logging +from simple_dds import * """ -Class peer for create all the base stack for peer +Simulates the behavior of a peer in a network. +Uses driver object to interface with the network. Definir as caracteristicas do nodo basicas - ID , Recursos - Qualidade -riar um canal broadcast onde o peer precisa se conectar +Criar um canal broadcast onde o peer precisa se conectar E criar os links de comunicacao unicast agente tem (ref de um canal broad) @@ -23,6 +25,7 @@ def __init__(self, driver, id): self.driver.register_handler(self.on_advertise, 'on_advertise') self.driver.register_handler(self.on_disconnect, 'on_disconnect') self.name = 'peer_{}'.format(id) + self.latest_read_msg = 0 def on_message (self, msg): logging.info(str(self.driver.env.now) + ' :: ' + '{} received msg: {}'.format(self.name, msg)) @@ -39,12 +42,9 @@ def on_disconnect (self): def on_advertise (self, msg): for z in self.driver.advertise(msg): yield z - - - - - - - + def read_new_message(self, subscriber): + self.latest_read_msg = subscriber.read() + + diff --git a/simulator/run.py b/simulator/run.py index ea5cb31..32d0383 100644 --- a/simulator/run.py +++ b/simulator/run.py @@ -20,8 +20,7 @@ ) NUM_PEERS = 2 -SIM_DURATION = 1000 - +SIM_DURATION = 300 # create env env = simpy.Environment() diff --git a/simulator/simple_dds/__init__.py b/simulator/simple_dds/__init__.py new file mode 100644 index 0000000..c2e948e --- /dev/null +++ b/simulator/simple_dds/__init__.py @@ -0,0 +1,2 @@ +__all__ = ["entity", "dds_service", "domain_participant", "publisher", "subscriber", +"data_object", "topic"] \ No newline at end of file diff --git a/simulator/simple_dds/data_object.py b/simulator/simple_dds/data_object.py new file mode 100644 index 0000000..b7e5d7d --- /dev/null +++ b/simulator/simple_dds/data_object.py @@ -0,0 +1,16 @@ +from simple_dds import entity + +class Data_Object(entity.Entity): + + def __init__(self, publisher, topic, data): + super(Data_Object, self).__init__() + self.publisher = publisher + self.topic = topic + self.content = data + self.creation_time = self.publisher.participant.service.driver.get_time() + + def __str__(self): + return str(self.content) + + def get_topic_name(self): + return self.topic.get_name() \ No newline at end of file diff --git a/simulator/simple_dds/dds_service.py b/simulator/simple_dds/dds_service.py new file mode 100644 index 0000000..037f54e --- /dev/null +++ b/simulator/simple_dds/dds_service.py @@ -0,0 +1,204 @@ +# TODO +# - Não pedir todos os objetos-dado na criação do DDS Service; Fazer isso na.. +# .. criação de subscriber, e somente dados do tópico específico. + +import logging +from threading import Lock +from singleton import Singleton +from simple_dds import entity + +class UniqueHandleController(metaclass=Singleton): + + def __init__(self): + self.next_available_handle = 1 + self.lock = Lock() + + def generate_handle(self): + handle = None + with self.lock: + handle = self.next_available_handle + self.next_available_handle += 1 + return handle + +class DDS_Service(entity.Entity): + + def __init__(self, driver): + self.handle_controller = UniqueHandleController() + self.instance_handle = self.handle_controller.generate_handle() + self.driver = driver + self.peer_list = [] + self.handles = {} # Handle: Entity + self.participants = {} # Handle: Participant + self.local_participants = {} + self.topics = {} # Topic name: Topic + self.data_objects = {} # Handle: Data object + self.message_handlers = {} + + self._add_message_handler_methods() + self._attach_msg_reception_handler_to_driver() + self._discover_peers() + self._request_full_domain_data() + + def set_instance_handle(self, handle): + raise RuntimeError("DDS Service's handle cannot be changed.") + + def get_instance_handle(self): + return self.instance_handle + + def _send_local_modification(self, type_name, data): + change = (type_name, data) + self._send_to_all_peers(change) + + def _send_to_all_peers(self, msg): + self.driver.async_function_call(['advertise', msg]) + + def assign_handle(self, entity): + handle = self.handle_controller.generate_handle() + entity.set_instance_handle(handle) + self.handles[handle] = entity + + def add_participant(self, participant): + self.assign_handle(participant) + handle = participant.get_instance_handle() + self.participants[handle] = participant + self.local_participants[handle] = participant + self._send_local_modification('NEW_PARTICIPANT', participant) + + def add_topic(self, topic): + self.assign_handle(topic) + topic_key = topic.get_name() + self.topics[topic_key] = topic + self._send_local_modification('NEW_TOPIC', topic) + + def add_data_object(self, data_object): + self.assign_handle(data_object) + handle = data_object.get_instance_handle() + self.data_objects[handle] = data_object + self._attach_data_object_to_topic(data_object) + self._send_local_modification('NEW_DATA', data_object) + + def announce_new_publisher(self, new_publisher): + self._send_local_modification('NEW_PUBLISHER', new_publisher) + + def topic_exists(self, topic_name): + return topic_name in self.topics + + def get_topic(self, topic_name): + if self.topic_exists(topic_name): + return self.topics[topic_name] + else: # Como lidar com isto? + pass + + def _erase_topic_from_domain(self, topic): + # Deleta tópico e todos os dados associados a ele. + pass + + def _discover_peers(self): + self.peer_list = self.driver.fetch_peer_list() + + def _add_message_handler_methods(self): + self.message_handlers['NEW_PARTICIPANT'] = self._append_remote_participant + self.message_handlers['NEW_TOPIC'] = self._append_remote_topic + self.message_handlers['NEW_DATA'] = self._append_data_object + self.message_handlers['SEND_ALL_DATA'] = self._send_full_domain_data + self.message_handlers['ALL_DATA'] = self._receive_full_domain_data + + def _append_remote_participant(self, r_participant): + handle = r_participant.get_instance_handle() + if handle not in self.participants and handle not in self.handles: + self.handles[handle] = r_participant + self.participants[handle] = r_participant + + def _append_remote_topic(self, r_topic): + topic_name = r_topic.get_name() + if not self.topic_exists(topic_name): + handle = r_topic.get_instance_handle() + self.handles[handle] = r_topic + self.topics[topic_name] = r_topic + else: + self._resolve_topic_conflict(r_topic) + + def _resolve_topic_conflict(self, topic): + # TODO: Completar este método. + # O tópico com a instance handle menor tem prioridade. + # Caso o serviço local tenha prioridade, é necessário informar os outros nodos. + pass + + def _append_data_object(self, new_data): + handle = new_data.get_instance_handle() + self.data_objects[handle] = new_data + if handle not in self.handles: + self.handles[handle] = new_data + self._send_data_object_to_all_participants(new_data) + self._attach_data_object_to_topic(new_data) + + # TODO: Ainda falta completar. + def _notify_subscribers_of_new_publisher(self, new_publisher): + for subscriber in self.participants.subscribers.values(): + topic_name = new_publisher.get_topic().get_name() + pass + + def _send_data_object_to_all_participants(self, data_object): + for participant in self.local_participants.values(): + participant.update_all_subscribers(data_object) + + def _attach_data_object_to_topic(self, data_object): + topic_name = data_object.get_topic_name() + if self.topic_exists(topic_name): + self.topics[topic_name].attach_data_object(data_object) + + def _send_full_domain_data(self, to_address): + local_data = [] + for participant in self.participants.values(): + packet = ('NEW_PARTICIPANT', participant) + local_data.append(packet) + for topic in self.topics.values(): + packet = ('NEW_TOPIC', topic) + local_data.append(packet) + for data_object in self.data_objects.values(): + packet = ('NEW_DATA', data_object) + local_data.append(packet) + msg = ('ALL_DATA', local_data) + self.driver.async_function_call(['send', to_address, msg]) + + def _receive_full_domain_data(self, r_data): + for element in r_data: + self._interpret_data(element) + + def _unpack_data(self, msg): + # Formato esperado da mensagem: + # [0] Remetente; [1] Destinatário; [2] Mensagem em si + data = msg[2] + self._interpret_data(data) + yield self.driver.env.timeout(0) + + def _interpret_data(self, data): + # Presumimos que os dados estejam em uma 2-tupla, sendo o primeiro elemento.. + # .. uma string descrevendo o pedido, o segundo elemento os dados em si + if data[0] not in self.message_handlers: + pass + #logging.warning(str(self.driver.get_time()) + ' :: ' + f'DDS Service (Handle {str(self.instance_handle)}): Invalid request: {str(data[1])}') + else: + self.message_handlers[data[0]](data[1]) + + def _attach_msg_reception_handler_to_driver(self): + self.driver.register_handler(self._receive_incoming_data, 'on_message') + + def _receive_incoming_data(self, msg): + logging.info(str(self.driver.get_time()) + ' :: ' + f'Data received by DDS Service, handle {str(self.instance_handle)}') + for z in self._unpack_data(msg): + yield z + + def _request_full_domain_data(self): + request_msg = ('SEND_ALL_DATA', self.driver.address) + self._send_to_all_peers(request_msg) + + def retrieve_all_data_objects(self): + return self.data_objects.values() + + def retrieve_filtered_data_objects(self, topic_name): + data = [] + for element in self.data_objects.values(): + if element.get_topic_name() == topic_name: + data.append(element) + return data \ No newline at end of file diff --git a/simulator/simple_dds/domain_participant.py b/simulator/simple_dds/domain_participant.py new file mode 100644 index 0000000..887a085 --- /dev/null +++ b/simulator/simple_dds/domain_participant.py @@ -0,0 +1,61 @@ +from simple_dds import entity +from simple_dds import topic +from simple_dds import publisher +from simple_dds import subscriber + +class Domain_Participant(entity.Entity): + + def __init__(self, dds_service): + super(Domain_Participant, self).__init__() + self.service = dds_service + self.publishers = {} + self.subscribers = {} + self.topics = {} + self.service.add_participant(self) + + def create_topic(self, topic_name): + if self.service.topic_exists(topic_name): + logging.warning(f'{topic_name} already exists.') + return None + else: + new_topic = topic.Topic(topic_name, self) + self.service.add_topic(new_topic) + self.topics[topic_name] = new_topic + return new_topic + + def delete_topic(self, topic): + # Pré-condição: tópico deve ter sido criado por este participante + pass + + def find_topic(self, topic_name): + return self.service.get_topic(topic_name) + + def create_publisher(self, topic): + new_publisher = publisher.Publisher(self, topic) + self.service.assign_handle(new_publisher) + handle = new_publisher.get_instance_handle() + self.publishers[handle] = new_publisher + self.service.announce_new_publisher(new_publisher) + return new_publisher + + def delete_publisher(self, publisher): + pass + + def create_subscriber(self, topic, listener=None): + data = self.service.retrieve_filtered_data_objects(topic.get_name()) + new_subscriber = subscriber.Subscriber(self, topic, data, listener) + self.service.assign_handle(new_subscriber) + handle = new_subscriber.get_instance_handle() + self.subscribers[handle] = new_subscriber + return new_subscriber + + def delete_subscriber(self, subscriber): + pass + + def get_discovered_participants(self): + pass + # Usar o service + + def update_all_subscribers(self, data_object): + for subscriber in self.subscribers.values(): + subscriber.receive_data(data_object) \ No newline at end of file diff --git a/simulator/simple_dds/entity.py b/simulator/simple_dds/entity.py new file mode 100644 index 0000000..a9dced0 --- /dev/null +++ b/simulator/simple_dds/entity.py @@ -0,0 +1,15 @@ +class Entity: + + def __init__(self): + self.instance_handle = 0 + + def set_instance_handle(self, handle): + if self.instance_handle != 0: + raise RuntimeError("DDS Instance already has a handle.") + else: + self.instance_handle = handle + + def get_instance_handle(self): + if self.instance_handle == 0: + raise RuntimeError("DDS Instance has not been assigned a handle") + return self.instance_handle \ No newline at end of file diff --git a/simulator/simple_dds/publisher.py b/simulator/simple_dds/publisher.py new file mode 100644 index 0000000..13997f3 --- /dev/null +++ b/simulator/simple_dds/publisher.py @@ -0,0 +1,16 @@ +from simple_dds import entity +from simple_dds import data_object + +class Publisher(entity.Entity): + + def __init__(self, participant, topic): + super(Publisher, self).__init__() + self.participant = participant + self.topic = topic + + def get_topic(self): + return self.topic + + def write(self, data): + new_data = data_object.Data_Object(self, self.topic, data) + self.participant.service.add_data_object(new_data) \ No newline at end of file diff --git a/simulator/simple_dds/subscriber.py b/simulator/simple_dds/subscriber.py new file mode 100644 index 0000000..3051d01 --- /dev/null +++ b/simulator/simple_dds/subscriber.py @@ -0,0 +1,32 @@ +import logging +from queue import * + +from simple_dds import entity + +class Subscriber(entity.Entity): + + def __init__(self, participant, topic, data_objects, listener_method=None): + super(Subscriber, self).__init__() + self.participant = participant + self.topic = topic + self.available_data = Queue() + for element in data_objects: + self.available_data.put(element) + self.listener = listener_method + + def get_topic(self): + return self.topic + + def receive_data(self, data_object): + if data_object.get_topic_name() == self.topic.get_name(): + self.available_data.put(data_object) + if self.listener != None: + self.listener(self) + + def read(self): + try: + data_object = self.available_data.get(block=False) + return data_object + except Empty: + logging.debug('No data objects available') + return None \ No newline at end of file diff --git a/simulator/simple_dds/topic.py b/simulator/simple_dds/topic.py new file mode 100644 index 0000000..f9868b8 --- /dev/null +++ b/simulator/simple_dds/topic.py @@ -0,0 +1,27 @@ +from simple_dds import entity + +class Topic(entity.Entity): + + def __init__(self, topic_name, participant): + super(Topic, self).__init__() + self.name = topic_name + self.participant = participant + self.publishers = [] + self.subscribers = [] + self.data_objects = {} + self.creation_time = self.participant.service.driver.get_time() + self.last_modified = self.creation_time + + def get_name(self): + return self.name + + def attach_data_object(self, data_object): + if data_object.get_topic_name() == self.name: + handle = data_object.get_instance_handle() + self.data_objects[handle] = data_object + self.last_modified = self.participant.service.driver.get_time() + + def can_be_deleted(self): + no_pubs = len(self.publishers) == 0 + no_subs = len(self.subscribers) == 0 + return no_pubs and no_subs \ No newline at end of file diff --git a/simulator/singleton.py b/simulator/singleton.py new file mode 100644 index 0000000..372d71c --- /dev/null +++ b/simulator/singleton.py @@ -0,0 +1,13 @@ +# Referências +# - https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python +# - https://stackoverflow.com/questions/100003/what-are-metaclasses-in-python + +class Singleton(type): + _instances = {} + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + +class ExampleSingleton(metaclass=Singleton): + pass \ No newline at end of file diff --git a/tests/simulator/test_dds.py b/tests/simulator/test_dds.py new file mode 100644 index 0000000..9b14a4d --- /dev/null +++ b/tests/simulator/test_dds.py @@ -0,0 +1,112 @@ +import pytest +import sys, os +myPath = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, myPath + '/../../simulator/') + +import simpy +import random +from network import Network +from processor import Processor +from driver import Driver +from peer import Peer +from simple_dds import * + +@pytest.fixture +def environment_and_network(): + network_latency = 2 + max_peers = 300 + env = simpy.Environment() + net = Network(env, network_latency, max_peers) + return (env, net) + +@pytest.fixture +def subscriber_number(): + # Set Number: + return 15 + +def test_simple_publication_two_peers(environment_and_network): + env, net = environment_and_network + proc_latency = 3 + random.seed() + message = random.randrange(1000) + topic_name = random.randrange(1000) + wait_before_publication = 100 + wait_before_reading = 150 + simulation_time = 300 + container = None + + publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) + subscribing_peer = initialize_peer(env, net, 1, 1, proc_latency) + publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) + reading = wait_then_read_message(subscribing_peer, topic_name, message, wait_before_reading) + add_process_to_simulation(env, publication) + add_process_to_simulation(env, reading) + env.run(until=simulation_time) + container = str(subscribing_peer.latest_read_msg) + assert container == str(message) + +def test_simple_publication_to_multiple_peers(environment_and_network, subscriber_number): + env, net = environment_and_network + proc_latency = 3 + random.seed() + message = 'test message' + topic_name = 'test topic' + wait_before_publication = 100 + wait_before_reading = 1000 + simulation_time = 900000 + subscriber_id = 1 + subscribers = [] + received_msg = None + + publishing_peer = initialize_peer(env, net, 0, 0, proc_latency) + publication = wait_then_publish_message(publishing_peer, topic_name, message, wait_before_publication) + add_process_to_simulation(env, publication) + for i in range(subscriber_number): + subscriber = initialize_peer(env, net, 0, i, proc_latency) + reading = set_up_subscription(subscriber, topic_name, message, wait_before_reading) + add_process_to_simulation(env, reading) + subscribers.append(subscriber) + + env.run(until=simulation_time) + for i, subscriber in enumerate(subscribers): + print(i) + received_msg = str(subscriber.latest_read_msg) + assert received_msg == str(message) + +def initialize_peer(environment, network, proc_id, peer_id, proc_latency): + proc = Processor(environment, proc_id, proc_latency) + dri = Driver(network, proc) + peer = Peer(dri, peer_id) + environment.process(dri.run()) + return peer + +def add_process_to_simulation(environment, method): + environment.process(method) + +def set_up_subscription(peer, topic_name, message, wait_time=100): + yield peer.driver.env.timeout(wait_time) + the_service = dds_service.DDS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + # read_new_message é o método 'listener' + sub = participant.create_subscriber(topic, peer.read_new_message) + +# TODO: O nome não é adequado: faz mais do que publicar mensagem, antes cria objetos.. +# .. necessários. É preciso mudar depois. +def wait_then_publish_message(peer, topic_name, message, wait_time=100): + yield peer.driver.env.timeout(wait_time) + the_service = dds_service.DDS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + pub = participant.create_publisher(topic) + pub.write(message) + +def wait_then_read_message(peer, topic_name, message, wait_time=100): + yield peer.driver.env.timeout(wait_time) + the_service = dds_service.DDS_Service(peer.driver) + participant = domain_participant.Domain_Participant(the_service) + topic = participant.create_topic(topic_name) + sub = participant.create_subscriber(topic) + # Atenção à linha a seguir. Talvez seja necessário alterar o valor mais tarde. + yield peer.driver.env.timeout(17) # Tempo para recebimento de mensagens de outros peers contendo dados do domínio. + peer.latest_read_msg = sub.read() \ No newline at end of file diff --git a/tests/simulator/test_initial_connection.py b/tests/simulator/test_initial_connection.py index 014c7ac..29aad0c 100644 --- a/tests/simulator/test_initial_connection.py +++ b/tests/simulator/test_initial_connection.py @@ -1,10 +1,13 @@ import pytest import simpy +import sys, os +myPath = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, myPath + '/../../simulator/') -from simulator.network import Network -from simulator.processor import Processor -from simulator.driver import Driver -from simulator.peer import Peer +from network import Network +from processor import Processor +from driver import Driver +from peer import Peer def test_connection(): @@ -35,9 +38,9 @@ def test_timeout_keep_alive(): proc = Processor(env, 0, 3) dri = Driver(net, proc) new_peer = Peer(dri, 0) - + #env.process(dri.disconnect()) env.run(until=50) - - assert dri.address == None + + assert dri.address == None \ No newline at end of file