Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ flake8 = "*"
pytest-cov = "*"
autopep8 = "*"
codacy-coverage = "*"
jupyter = "*"
notebook = "*"

[packages]
simpy = "*"
Expand Down
File renamed without changes.
8 changes: 8 additions & 0 deletions simulator/custom_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

class Error(Exception):
pass

class RegistrationError(Error):

def __init__(self, message):
self.message = message
51 changes: 51 additions & 0 deletions simulator/dds_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import simpy
import logging
import peer
import network
import driver
import processor

"""
Run app.
Peer control, duration and others details.

"""
# Configuração do root logger
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
handlers = [console_handler]
logging.basicConfig(level = logging.INFO,
format = '[%(levelname)10s] [%(module)12s] %(message)s',
handlers = handlers
)

NUM_PEERS = 1
SIM_DURATION = 1000

# create env
env = simpy.Environment()

# network
net = network.Network(env,2)

#create peers

nodes = []

teste = env.timeout(200)


proc_0 = processor.Processor(env, 0, 3)
dri_0 = driver.Driver(net, proc_0)
peer_0 = peer.Peer(dri_0, 0)
env.process(dri_0.run())
env.process(peer_0.wait_then_publish_message('TEST', 'Hello World!', 100))

proc_1 = processor.Processor(env, 1, 3)
dri_1 = driver.Driver(net, proc_1)
peer_1 = peer.Peer(dri_1, 1)
env.process(dri_1.run())
env.process(peer_1.dds_read_test())


env.run(until=SIM_DURATION)
77 changes: 57 additions & 20 deletions simulator/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class Driver:

def __init__(self, network, processor):
self.async_events = simpy.Store(network.env)
self.processing_queue = simpy.Store(network.env)
self.network = network
self.env = network.env
self.processor = processor
Expand All @@ -18,40 +18,52 @@ def __init__(self, network, processor):
'on_disconnect': []
}
self.keep_alive_interval = 20
self.async_calls = simpy.Store(network.env)

def run(self):
if self.address is None:
for z in self.connect():
yield z
self.env.process(self.send_keepalive()) # A partir de agora, mandamos um sinal de vida constantemente

for z in self.connect():
yield z
self.env.process(self.execute_stored_calls())

while True:
event = yield self.async_events.get()
event = yield self.processing_queue.get()
for z in self.issue_event(event[0], event[1]):
if z:
yield z
for z in self.issue_event(event[0], 'on_advertise'):
if z:
yield z
# for z in self.issue_event(event[0], 'on_advertise'):
# if z:
# yield z

def connect(self):
for z in self.network.register(self):
yield z
for z in self.issue_event('on_connect', self.address):
yield z
while True:
try:
for z in self.network.register(self):
yield z
for z in self.issue_event('on_connect', self.address):
yield z
break # Se chegarmos aqui, código completado com sucesso, saímos do loop
except ConnectionError as err:
print(err.message)
yield self.env.timeout(1)

def fetch_peer_list(self):
return self.network.send_addresses(self)

def disconnect(self):
self.address = None

def advertise(self, msg):
msg = 'ADV-'+str(msg)
return self.network.send_broadcast(self.address, msg)
for z in self.network.send_broadcast(self.address, msg):
yield z

def recieve (self, msg_envelope):
def receive(self, msg_envelope):
logging.info(str(self.env.now) + ' :: ' + '{} received from {}: {}'.format(
msg_envelope[1], msg_envelope[0], msg_envelope[2]))
msg_envelope[1], msg_envelope[0], str(msg_envelope[2])))

event = ['on_message', msg_envelope]
self.async_events.put(event)
self.processing_queue.put(event)

def send (self, to_addr , msg):
def send(self, to_addr , msg):
return self.network.send_unicast(self.address, to_addr, msg)

def register_handler (self, method, event='on_message'):
Expand All @@ -65,5 +77,30 @@ def issue_event (self, event, value=None):
yield z

def send_keepalive(self):
while True:
yield self.env.timeout(self.keep_alive_interval)
self.network.renew(self.address)

def get_time(self):
return self.env.now

# Coloca uma função na lista de processamento, que será executada
# em ordem.
def async_function_call(self, call_info):
self.async_calls.put(call_info)

def execute_stored_calls(self):
# TODO: Colocar bloco em função separada.
while True:
function_call = yield self.async_calls.get()
function_name = function_call[0]
# TODO: Mudar implementação para dictionary depois.
if function_name == 'send':
to_addr = function_call[1]
msg = function_call[2]
for z in self.send(to_addr, msg):
yield z
elif function_name == 'advertise':
msg = function_call[1]
for z in self.advertise(msg):
yield z
50 changes: 50 additions & 0 deletions simulator/mockup.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import simpy

import customdds

# Neste exemplo, o único tipo de dado que será usado é string, mas a especificação DDS permite a definição de tipos personalizados.
# De fato, a definição e registro de um tipo de dado são ações necessárias numa implementação real.
# Aqui, tentei escrever como um cliente faria uso desse API. Ainda é necessário implementar de fato. Mesmo assim, fica claro que...
# ... o uso do API simplifica bastante o trabalho do usuário, que não precisa se preocupar com o funcionamento da rede e detalhes de transmissão.
# Só pra enfatizar, isto é uma simplificação EXTREMA do DDS, mais precisamente, da implementação OpenDDS. Eu tentei capturar o que eu compreendi sobre ele.

class ApplicationMockup:

def __init__(self, domainID):
domainParticipantFactory = customdds.DomainParticipantFactoryImpl.get_instance() # Singleton
# Criamos participante escolhendo uma ID para o domínio
self.participant = domainParticipantFactory.create_participant(domainID)

def run(self):
# Criamos um tópico
topic_name = "Test"
topic = self.participant.create_topic(topic_name)
# Criamos um publisher
# Dentro de um publisher, é necessário haver uma referência ao DomainParticipant que o criou
publisher = self.participant.create_publisher()
# Criamos um DataWriter com o tópico
writer = publisher.create_datawriter(topic)

# Podemos esperar que haja pelo menos um subscriber antes de enviar a mensagem..
# <Código de espera aqui>
# Prosseguindo para a escrita

message_content = "This is a test message"
# Digamos que previamente tenhamos criado um tipo 'Message', de forma que possamos encapsular o conteúdo da mensagem em si...
# ... assim como outras metainformações.
# Dados existem em uma instância, a qual é uma entidade, tendo, assim, um identificador único no domínio.
msg_id = 63
new_msg = (msg_id, message_content)
# O resultado do método a seguir deve ser o envio da mensagem para todos os subscribers do tópico
writer.write(new_msg)

# Para o estabelecimento de um subscriber, o código seria bem parecido, exceto que usaríamos um DataReader para ler as mensagens com métodos 'read()' e 'take()'.



# Algumas notas adicionais:
# - No OpenDDS, pelo que entendi, existe um singleton denominado 'TheServiceParticipant' responsável pelo controle das entidades, etc. Esse 'Service' permite que...
# ... haja acesso aos domínios, tópicos, participantes, publishers, subscribers, instâncias de dados, ... ou seja, todas as entidades.
# - O uso de políticas QoS servem para configurar o API, de certa forma impondo exigências no funcionamento do serviço. Uma lista dessas políticas está presente na especificação.
# ... Decidi omitir o uso de QoS policies para simplificar, mas, numa implementação real, elas são indispensáveis.
# - Além do DDS, parece-me ser necessário mais uma camada de abstração. No OpenDDS, eles usam um API chamado TAO.
61 changes: 35 additions & 26 deletions simulator/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ def __init__(self, env, latency, max_hosts = 100):
self.default_lease_time = 40
self.addr_list = [{'node':None, 'lease':0} for i in range(self.max_hosts)] # O índice da lista serve como 'IP'
self.node_list = [] # Usamos esta lista para fazer broadcasts e checar empréstimos
self.env.process(self.dhcp())

def register(self, node_driver):
with self.channel.request() as rec:
yield rec
if self.full_capacity:
logging.warning(str(self.env.now) + ' :: ' + 'Could not register node: Network at full capacity')
raise ConnectionError("Network at full capacity")
else:
curr_address = self.next_available_address
logging.info(str(self.env.now) + ' :: ' + 'connecting {}'.format(curr_address))
Expand All @@ -39,42 +39,44 @@ def register(self, node_driver):
self.find_next_available()
yield self.timeout(self.latency)

def confirm_peer(self, address):
if address < 0 or address >= self.max_hosts:
raise RuntimeError(f"Invalid address: {address}")
elif self.addr_list[address]['node'] is None:
raise RuntimeError(f"Address not registered: {address}")
# Substituir por um erro personalizado depois?
# Talvez expandir verificação para que veja se o endereço realmente corresponde..
# .. ao driver que enviou mensagem
# Talvez adicionar mensagem de debug?

# Adicionar try.. except nas funções de unicast e broadcast, para lidar com erros
def send_unicast(self, from_addr, to_addr, msg):
self.confirm_peer(from_addr)
self.confirm_peer(to_addr)
logging.info(str(self.env.now) + ' :: ' + 'network sending unicast {} => {}'.format(from_addr, to_addr))
if(to_addr <= 0):
print('{} address not found (msg from {})'.format(
to_addr, from_addr))
yield self.env.timeout(0)
else:
with self.channel.request() as rec:
yield rec
msg_envelope = [from_addr, to_addr, msg]
with self.channel.request() as rec:
yield rec
node = self.addr_list[to_addr]['node']
if node:
node.recieve(msg_envelope)
yield self.env.timeout(self.latency)
print(msg_envelope)
else:
print('{} address not found (msg from {})'.format(
to_addr, from_addr))
node = self.addr_list[to_addr]['node']
node.receive(msg_envelope)
yield self.env.timeout(self.latency)

def send_broadcast(self, from_addr, msg):
logging.info(str(self.env.now) + ' :: ' + 'Message Broadcast from {} - {}'.format(from_addr,msg))
self.confirm_peer(from_addr)
logging.info(str(self.env.now) + ' :: ' + 'Message Broadcast from {} - {}'.format(from_addr, str(msg)))
with self.node_list_access.request(priority=0) as nl_access:
yield nl_access
for addr in range(len(self.node_list)):
to_addr = self.node_list[addr]['address']
msg_envelope2 = [from_addr, to_addr, msg]
if to_addr == from_addr:
continue
msg_envelope = [from_addr, to_addr, msg]
with self.channel.request() as rec:
yield rec
node = self.node_list[addr]['node']
if node: # Checando se o nodo ainda faz parte da rede
node.recieve(msg_envelope2)
yield self.env.timeout(self.latency)
logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope2))
else:
logging.info(str(self.env.now) + ' :: ' + '{} address not found (msg from {})'.format(
to_addr, from_addr))
node.receive(msg_envelope)
yield self.env.timeout(self.latency)
#logging.info(str(self.env.now) + ' :: ' + 'Broadcast:'+ str(msg_envelope))

def find_next_available(self):
addr = (self.next_available_address + 1) % self.max_hosts
Expand Down Expand Up @@ -112,8 +114,15 @@ def end_lease(self, index):
self.full_capacity = False
logging.info(str(self.env.now) + ' :: ' + 'Lease ended for address {}'.format(address))

def send_addresses(self, driver):
addr_list = []
for peer in self.node_list:
if peer['address'] is not driver.address:
addr_list.append(peer['address'])
return addr_list

def dhcp(self):
while True:
for z in self.check_lease():
yield z
yield self.env.timeout(1)
yield self.env.timeout(1)
18 changes: 9 additions & 9 deletions simulator/peer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
from simple_dds import *

"""
Class peer for create all the base stack for peer
Simulates the behavior of a peer in a network.
Uses driver object to interface with the network.

Definir as caracteristicas do nodo basicas - ID , Recursos - Qualidade

riar um canal broadcast onde o peer precisa se conectar
Criar um canal broadcast onde o peer precisa se conectar
E criar os links de comunicacao unicast

agente tem (ref de um canal broad)
Expand All @@ -23,6 +25,7 @@ def __init__(self, driver, id):
self.driver.register_handler(self.on_advertise, 'on_advertise')
self.driver.register_handler(self.on_disconnect, 'on_disconnect')
self.name = 'peer_{}'.format(id)
self.latest_read_msg = 0

def on_message (self, msg):
logging.info(str(self.driver.env.now) + ' :: ' + '{} received msg: {}'.format(self.name, msg))
Expand All @@ -39,12 +42,9 @@ def on_disconnect (self):
def on_advertise (self, msg):
for z in self.driver.advertise(msg):
yield z








def read_new_message(self, subscriber):
self.latest_read_msg = subscriber.read()



3 changes: 1 addition & 2 deletions simulator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
)

NUM_PEERS = 2
SIM_DURATION = 1000

SIM_DURATION = 300

# create env
env = simpy.Environment()
Expand Down
2 changes: 2 additions & 0 deletions simulator/simple_dds/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__all__ = ["entity", "dds_service", "domain_participant", "publisher", "subscriber",
"data_object", "topic"]
Loading