Skip to content

Commit

Permalink
Merge pull request #263 from fabric-testbed/262.re_read_kafka_config
Browse files Browse the repository at this point in the history
262.re read kafka config
  • Loading branch information
kthare10 authored Mar 1, 2023
2 parents 6f1d671 + 684f979 commit c31bac5
Show file tree
Hide file tree
Showing 27 changed files with 339 additions and 191 deletions.
2 changes: 1 addition & 1 deletion fabric_cf/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VERSION__ = "1.4.3"
__VERSION__ = "1.4.4"
42 changes: 11 additions & 31 deletions fabric_cf/actor/boot/configuration_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,50 +447,30 @@ def process_peer(self, *, peer: Peer):
@param peer peer
@raises ConfigurationException in case of error
"""
from_guid = ID(uid=peer.get_guid())
from_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type())
to_guid = self.actor.get_guid()
to_type = self.actor.get_type()

# We only like peers broker->site and orchestrator->broker
# Reverse the peer if it connects site->broker or broker->orchestrator

if from_type == ActorType.Authority and to_type == ActorType.Broker:
from_guid, to_guid = to_guid, from_guid
from_type, to_type = to_type, from_type

if from_type == ActorType.Broker and to_type == ActorType.Orchestrator:
from_guid, to_guid = to_guid, from_guid
from_type, to_type = to_type, from_type

if from_type == ActorType.Authority and to_type == ActorType.Orchestrator:
from_guid, to_guid = to_guid, from_guid
from_type, to_type = to_type, from_type
peer_guid = ID(uid=peer.get_guid())
peer_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type())
actor_guid = self.actor.get_guid()
actor_type = self.actor.get_type()

# peers between actors of same type aren't allowed unless the actors are both brokers
if from_type == to_type and from_type != ActorType.Broker:
if peer_type == actor_type and peer_type != ActorType.Broker:
raise ConfigurationException(
"Invalid peer type: broker can only talk to broker, orchestrator or site authority")

container = ManagementUtils.connect(caller=self.actor.get_identity())
to_mgmt_actor = container.get_actor(guid=to_guid)
self.logger.debug(f"to_mgmt_actor={to_mgmt_actor} to_guid={to_guid}")
if to_mgmt_actor is None and container.get_last_error() is not None:
mgmt_actor = container.get_actor(guid=actor_guid)
self.logger.info(f"Management Actor: {mgmt_actor} === {type(mgmt_actor)}")
if mgmt_actor is None and container.get_last_error() is not None:
self.logger.error(container.get_last_error())
from_mgmt_actor = container.get_actor(guid=from_guid)
self.logger.debug(f"from_mgmt_actor={from_mgmt_actor} from_guid={from_guid}")
if from_mgmt_actor is None and container.get_last_error() is not None:
self.logger.error(container.get_last_error())

self.vertex_to_registry_cache(peer=peer)

try:
client = RemoteActorCacheSingleton.get().establish_peer(from_guid=from_guid,
from_mgmt_actor=from_mgmt_actor,
to_guid=to_guid, to_mgmt_actor=to_mgmt_actor)
client = RemoteActorCacheSingleton.get().establish_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid,
peer_type=peer_type)
self.logger.debug(f"Client returned {client}")
if client is not None:
self.parse_exports(peer=peer, client=client, mgmt_actor=to_mgmt_actor)
self.parse_exports(peer=peer, client=client, mgmt_actor=mgmt_actor)
except Exception as e:
raise ConfigurationException(f"Could not process exports from: {peer.get_guid()} to "
f"{self.actor.get_guid()}. e= {e}")
Expand Down
9 changes: 9 additions & 0 deletions fabric_cf/actor/core/apis/abc_client_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ def add_broker(self, *, broker: ABCBrokerProxy):
@params broker broker to register
"""

@abstractmethod
def update_broker(self, *, broker: ABCBrokerProxy):
"""
Registers a broker. If this is the first broker to be registered, it is
set as the default broker.
@params broker broker to register
"""

@abstractmethod
def demand(self, *, rid: ID):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro:
@return success or failure status
"""

@abstractmethod
def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro:
"""
Update a broker
@param broker: broker_proxy to be added
@param caller: caller
@return success or failure status
"""

@abstractmethod
def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str,
level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro:
Expand Down
8 changes: 8 additions & 0 deletions fabric_cf/actor/core/apis/abc_mgmt_client_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ def add_broker(self, *, broker: ProxyAvro) -> bool:
@return true for sucess; false otherwise
"""

@abstractmethod
def update_broker(self, *, broker: ProxyAvro) -> bool:
"""
Update an existing broker.
@param broker broker
@return true for sucess; false otherwise
"""

@abstractmethod
def get_broker_query_model(self, *, broker: ID, id_token: str, level: int,
graph_format: GraphFormat) -> BrokerQueryModelAvro:
Expand Down
9 changes: 9 additions & 0 deletions fabric_cf/actor/core/apis/abc_mgmt_server_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ def register_client(self, *, client: ClientMng, kafka_topic: str) -> bool:
@return true for success; false otherwise
"""

@abstractmethod
def update_client(self, *, client: ClientMng, kafka_topic: str) -> bool:
"""
Update a client
@param client client
@param kafka_topic Kafka topic
@return true for success; false otherwise
"""

@abstractmethod
def unregister_client(self, *, guid: ID) -> bool:
"""
Expand Down
8 changes: 8 additions & 0 deletions fabric_cf/actor/core/apis/abc_server_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ def register_client(self, *, client: Client):
@throws Exception in case of error
"""

@abstractmethod
def update_client(self, *, client: Client):
"""
Update the specified client.
@param client client to register
@throws Exception in case of error
"""

@abstractmethod
def unregister_client(self, *, guid: ID):
"""
Expand Down
Loading

0 comments on commit c31bac5

Please sign in to comment.