From a44b756e951921efcae0f30b9a91d7d674a534d0 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 11:16:59 -0500 Subject: [PATCH 1/6] re-read kafka config on container restart --- .../actor/boot/configuration_processor.py | 42 +--- fabric_cf/actor/core/apis/abc_client_actor.py | 9 + .../actor/core/apis/abc_mgmt_client_actor.py | 8 + .../actor/core/apis/abc_mgmt_server_actor.py | 9 + fabric_cf/actor/core/apis/abc_server_actor.py | 8 + .../core/container/remote_actor_cache.py | 196 +++++++----------- fabric_cf/actor/core/core/authority.py | 10 + fabric_cf/actor/core/core/broker.py | 16 ++ fabric_cf/actor/core/core/controller.py | 3 + .../client_actor_management_object_helper.py | 80 ++++--- fabric_cf/actor/core/manage/converter.py | 8 +- .../actor/core/manage/kafka/kafka_broker.py | 3 + .../core/manage/kafka/kafka_controller.py | 3 + .../core/manage/kafka/kafka_server_actor.py | 3 + .../actor/core/manage/local/local_broker.py | 12 ++ .../core/manage/local/local_controller.py | 12 ++ .../core/manage/local/local_server_actor.py | 18 +- .../manage/server_actor_management_object.py | 31 +++ .../actor/core/registry/peer_registry.py | 17 ++ 19 files changed, 305 insertions(+), 183 deletions(-) diff --git a/fabric_cf/actor/boot/configuration_processor.py b/fabric_cf/actor/boot/configuration_processor.py index b27dcf71..2d2535b7 100644 --- a/fabric_cf/actor/boot/configuration_processor.py +++ b/fabric_cf/actor/boot/configuration_processor.py @@ -447,50 +447,30 @@ def process_peer(self, *, peer: Peer): @param peer peer @raises ConfigurationException in case of error """ - from_guid = ID(uid=peer.get_guid()) - from_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type()) - to_guid = self.actor.get_guid() - to_type = self.actor.get_type() - - # We only like peers broker->site and orchestrator->broker - # Reverse the peer if it connects site->broker or broker->orchestrator - - if from_type == ActorType.Authority and to_type == ActorType.Broker: - from_guid, to_guid = to_guid, from_guid - from_type, to_type = to_type, from_type - - if from_type == ActorType.Broker and to_type == ActorType.Orchestrator: - from_guid, to_guid = to_guid, from_guid - from_type, to_type = to_type, from_type - - if from_type == ActorType.Authority and to_type == ActorType.Orchestrator: - from_guid, to_guid = to_guid, from_guid - from_type, to_type = to_type, from_type + peer_guid = ID(uid=peer.get_guid()) + peer_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type()) + actor_guid = self.actor.get_guid() + actor_type = self.actor.get_type() # peers between actors of same type aren't allowed unless the actors are both brokers - if from_type == to_type and from_type != ActorType.Broker: + if peer_type == actor_type and peer_type != ActorType.Broker: raise ConfigurationException( "Invalid peer type: broker can only talk to broker, orchestrator or site authority") container = ManagementUtils.connect(caller=self.actor.get_identity()) - to_mgmt_actor = container.get_actor(guid=to_guid) - self.logger.debug(f"to_mgmt_actor={to_mgmt_actor} to_guid={to_guid}") - if to_mgmt_actor is None and container.get_last_error() is not None: + mgmt_actor = container.get_actor(guid=actor_guid) + self.logger.info(f"Management Actor: {mgmt_actor} === {type(mgmt_actor)}") + if mgmt_actor is None and container.get_last_error() is not None: self.logger.error(container.get_last_error()) - from_mgmt_actor = container.get_actor(guid=from_guid) - self.logger.debug(f"from_mgmt_actor={from_mgmt_actor} from_guid={from_guid}") - if from_mgmt_actor is None and container.get_last_error() is not None: - self.logger.error(container.get_last_error()) self.vertex_to_registry_cache(peer=peer) try: - client = RemoteActorCacheSingleton.get().establish_peer(from_guid=from_guid, - from_mgmt_actor=from_mgmt_actor, - to_guid=to_guid, to_mgmt_actor=to_mgmt_actor) + client = RemoteActorCacheSingleton.get().establish_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, + peer_type=peer_type) self.logger.debug(f"Client returned {client}") if client is not None: - self.parse_exports(peer=peer, client=client, mgmt_actor=to_mgmt_actor) + self.parse_exports(peer=peer, client=client, mgmt_actor=mgmt_actor) except Exception as e: raise ConfigurationException(f"Could not process exports from: {peer.get_guid()} to " f"{self.actor.get_guid()}. e= {e}") diff --git a/fabric_cf/actor/core/apis/abc_client_actor.py b/fabric_cf/actor/core/apis/abc_client_actor.py index 928f4c81..197306fb 100644 --- a/fabric_cf/actor/core/apis/abc_client_actor.py +++ b/fabric_cf/actor/core/apis/abc_client_actor.py @@ -63,6 +63,15 @@ def add_broker(self, *, broker: ABCBrokerProxy): @params broker broker to register """ + @abstractmethod + def update_broker(self, *, broker: ABCBrokerProxy): + """ + Registers a broker. If this is the first broker to be registered, it is + set as the default broker. + + @params broker broker to register + """ + @abstractmethod def demand(self, *, rid: ID): """ diff --git a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py index 04b6f279..c4197a29 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py @@ -107,6 +107,14 @@ def add_broker(self, *, broker: ProxyAvro) -> bool: @return true for sucess; false otherwise """ + @abstractmethod + def update_broker(self, *, broker: ProxyAvro) -> bool: + """ + Update an existing broker. + @param broker broker + @return true for sucess; false otherwise + """ + @abstractmethod def get_broker_query_model(self, *, broker: ID, id_token: str, level: int, graph_format: GraphFormat) -> BrokerQueryModelAvro: diff --git a/fabric_cf/actor/core/apis/abc_mgmt_server_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_server_actor.py index 20b62b4d..6b9aba03 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_server_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_server_actor.py @@ -104,6 +104,15 @@ def register_client(self, *, client: ClientMng, kafka_topic: str) -> bool: @return true for success; false otherwise """ + @abstractmethod + def update_client(self, *, client: ClientMng, kafka_topic: str) -> bool: + """ + Update a client + @param client client + @param kafka_topic Kafka topic + @return true for success; false otherwise + """ + @abstractmethod def unregister_client(self, *, guid: ID) -> bool: """ diff --git a/fabric_cf/actor/core/apis/abc_server_actor.py b/fabric_cf/actor/core/apis/abc_server_actor.py index 5dfc30cc..2bae1831 100644 --- a/fabric_cf/actor/core/apis/abc_server_actor.py +++ b/fabric_cf/actor/core/apis/abc_server_actor.py @@ -73,6 +73,14 @@ def register_client(self, *, client: Client): @throws Exception in case of error """ + @abstractmethod + def update_client(self, *, client: Client): + """ + Update the specified client. + @param client client to register + @throws Exception in case of error + """ + @abstractmethod def unregister_client(self, *, guid: ID): """ diff --git a/fabric_cf/actor/core/container/remote_actor_cache.py b/fabric_cf/actor/core/container/remote_actor_cache.py index d521029f..5ffb30dd 100644 --- a/fabric_cf/actor/core/container/remote_actor_cache.py +++ b/fabric_cf/actor/core/container/remote_actor_cache.py @@ -39,7 +39,7 @@ if TYPE_CHECKING: from fabric_cf.actor.core.apis.abc_mgmt_actor import ABCMgmtActor - from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin + from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin, ActorType class RemoteActorCacheException(Exception): @@ -154,164 +154,116 @@ def get_cache_entry_copy(self, *, guid: ID) -> dict: finally: self.lock.release() - def check_peer(self, *, from_mgmt_actor: ABCMgmtActor, from_guid: ID, to_mgmt_actor: ABCMgmtActor, to_guid: ID): + def check_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: ActorType): """ Check if a peer is already connected - @param from_mgmt_actor from actor - @param from_guid guid of from actor - @param to_mgmt_actor to actor - @param to_guid guid of to actor + @param mgmt_actor mgmt_actor + @param peer_guid peer_guid + @param peer_type peer_type """ - self.logger.debug("from_mgmt_actor={} from_guid={} to_mgmt_actor={} to_guid={}".format(type(from_mgmt_actor), - from_guid, - type(to_mgmt_actor), - to_guid)) + self.logger.debug(f"Check if Peer {peer_guid}/{peer_type} already exists!") try: # For Broker/AM - if to_mgmt_actor is not None and isinstance(to_mgmt_actor, ABCMgmtServerActor): - clients = to_mgmt_actor.get_clients(guid=from_guid) + if isinstance(mgmt_actor, ABCMgmtServerActor): + self.logger.debug(f"Checking clients") + clients = mgmt_actor.get_clients(guid=peer_guid) + self.logger.debug(f"clients -- {clients} {mgmt_actor.get_last_error()}") if clients is not None: - self.logger.debug("Edge between {} and {} exists (client)".format(from_guid, to_guid)) + self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} exists (client)") return True # For Orchestrator/Broker - elif from_mgmt_actor is not None and isinstance(from_mgmt_actor, ABCMgmtClientActor): - brokers = from_mgmt_actor.get_brokers(broker=to_guid) + elif isinstance(mgmt_actor, ABCMgmtClientActor): + self.logger.debug(f"Checking brokers") + brokers = mgmt_actor.get_brokers(broker=peer_guid) + self.logger.debug(f"brokers -- {brokers}") if brokers is not None: - self.logger.debug("Edge between {} and {} exists (broker)".format(from_guid, to_guid)) + self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} exists (broker)") return True except Exception as e: - raise RemoteActorCacheException("Unable to cast actor {} or {} e={}".format(from_guid, to_guid, e)) + raise RemoteActorCacheException(f"Unable to cast actor {mgmt_actor.get_guid()} or {peer_guid} e={e}") - self.logger.debug("Edge between {} and {} does not exist".format(from_guid, to_guid)) + self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} does not exist") return False - def establish_peer_private(self, *, from_mgmt_actor: ABCMgmtActor, from_guid: ID, to_mgmt_actor: ABCMgmtActor, - to_guid: ID) -> ClientMng: + def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, update: bool = False) -> ClientMng: """ Establish connection i.e. create either proxies or clients between peer - @param from_mgmt_actor from actor - @param from_guid guid of from actor - @param to_mgmt_actor to actor - @param to_guid guid of to actor + @param mgmt_actor mgmt_actor + @param peer_guid peer_guid + @param peer_type peer_type """ self.logger.debug("establish_peer_private IN") client = None - from_map = self.get_cache_entry_copy(guid=from_guid) - to_map = self.get_cache_entry_copy(guid=to_guid) - - if from_map is None: - raise RemoteActorCacheException("Actor {} does not have a registry cache entry".format(from_guid)) - - if to_map is None: - raise RemoteActorCacheException("Actor {} does not have a registry cache entry".format(to_guid)) - - if from_mgmt_actor is not None: - self.logger.debug("From actor {} is local".format(from_mgmt_actor.get_name())) - - protocol = Constants.PROTOCOL_LOCAL - kafka_topic = None - - if self.actor_location in to_map: - if self.actor_protocol not in to_map: - raise RemoteActorCacheException("Actor {} does not specify communications protocol (local/kafka)". - format(to_map[self.actor_name])) - - protocol = to_map.get(self.actor_protocol, None) - kafka_topic = to_map[self.actor_location] - self.logger.debug("Added To actor location (non-local) {}".format(to_map[self.actor_location])) - - identity = ActorIdentity(name=to_map[self.actor_name], guid=to_guid) - - if kafka_topic is not None and isinstance(from_mgmt_actor, ABCMgmtClientActor): - self.logger.debug("Kafka Topic is available, registering broker proxy") - proxy = ProxyAvro() - proxy.set_protocol(protocol) - proxy.set_guid(str(identity.get_guid())) - proxy.set_name(identity.get_name()) - proxy.set_type(to_map[self.actor_type]) - proxy.set_kafka_topic(kafka_topic) - - try: - if not from_mgmt_actor.add_broker(broker=proxy): - raise RemoteActorCacheException("Could not register broker {}". - format(from_mgmt_actor.get_last_error())) - except Exception as e: - self.logger.error(e) - self.logger.error(traceback.format_exc()) - else: - self.logger.debug("Not adding broker to actor at this time because the remote actor actor " - "kafka topic is not available") - - if to_mgmt_actor is not None and isinstance(to_mgmt_actor, ABCMgmtServerActor): - self.logger.debug("Creating a client for local to actor") - client = ClientMng() - client.set_name(name=from_mgmt_actor.get_name()) - client.set_guid(guid=str(from_mgmt_actor.get_guid())) - try: - to_mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) - except Exception as e: - raise RemoteActorCacheException("Could not register actor: {} as a client of actor: {} e= {}". - format(client.get_name(), to_mgmt_actor.get_name(), e)) - else: - # fromActor is remote: toActor must be local - # no-need to create any proxies - # we only need to register clients - if to_mgmt_actor is None: - raise RemoteActorCacheException("Both peer endpoints are non local actors: {} {}".format( - from_map[self.actor_name], to_map[self.actor_name])) - - if self.actor_guid not in from_map: - raise RemoteActorCacheException("Missing guid for remote actor: {}".format(from_map[self.actor_name])) - - self.logger.debug("From actor was remote, to actor {} is local".format(to_mgmt_actor.get_name())) - if self.actor_location in from_map and isinstance(to_mgmt_actor, ABCMgmtServerActor): - kafka_topic = from_map[self.actor_location] - self.logger.debug("From actor has kafka topic") - self.logger.debug("Creating client for from actor {}".format(from_map[self.actor_name])) - client = ClientMng() - client.set_name(name=from_map[self.actor_name]) - client.set_guid(guid=str(from_map[self.actor_guid])) - try: - to_mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) - except Exception as e: - raise RemoteActorCacheException( - "Could not register actor: {} as a client of actor: {} e= {}".format( - client.get_name(), to_mgmt_actor.get_name(), e)) - else: - self.logger.debug("Not adding client to actor at this time - remote actor topic not available") + cache_entry = self.get_cache_entry_copy(guid=peer_guid) + if cache_entry is None: + raise RemoteActorCacheException(f"Actor {peer_guid} does not have a registry cache entry") + + protocol = cache_entry.get(self.actor_protocol) + kafka_topic = cache_entry.get(self.actor_location) + identity = ActorIdentity(name=cache_entry.get(self.actor_name), guid=peer_guid) + + if kafka_topic is None: + raise RemoteActorCacheException(f"Actor {peer_guid} does not have a kafka topic") + + if isinstance(mgmt_actor, ABCMgmtClientActor): + proxy = ProxyAvro() + proxy.set_protocol(protocol) + proxy.set_guid(str(identity.get_guid())) + proxy.set_name(identity.get_name()) + proxy.set_type(cache_entry.get(self.actor_type)) + proxy.set_kafka_topic(kafka_topic) + + try: + if not update: + if not mgmt_actor.add_broker(broker=proxy): + raise RemoteActorCacheException(f"Could not register broker {peer_guid} " + f"error: {mgmt_actor.get_last_error()}") + else: + if not mgmt_actor.update_broker(broker=proxy): + raise RemoteActorCacheException(f"Could not update broker {peer_guid} " + f"error: {mgmt_actor.get_last_error()}") + except Exception as e: + self.logger.error(e) + self.logger.error(traceback.format_exc()) + elif isinstance(mgmt_actor, ABCMgmtServerActor): + self.logger.debug("Creating a client for local to actor") + client = ClientMng() + client.set_name(name=cache_entry.get(self.actor_name)) + client.set_guid(guid=str(peer_guid)) + try: + mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) + except Exception as e: + raise RemoteActorCacheException(f"Could not register actor: {peer_guid} as a client of " + f"actor: {mgmt_actor} e= {e}") + self.logger.debug("establish_peer_private OUT {}".format(client)) return client - def establish_peer(self, *, from_guid: ID, from_mgmt_actor: ABCMgmtActor, to_guid: ID, - to_mgmt_actor: ABCMgmtActor) -> ClientMng: + def establish_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: ActorType) -> ClientMng: """ Check if peer exists in cache and if not Establish connection i.e. create either proxies or clients between peer - @param from_mgmt_actor from actor - @param from_guid guid of from actor - @param to_mgmt_actor to actor - @param to_guid guid of to actor + @param mgmt_actor mgmt_actor + @param peer_guid peer_guid + @param peer_type peer_type """ self.logger.debug("establish_peer IN") client = None - if from_guid is None or to_guid is None: + if mgmt_actor is None or peer_guid is None: self.logger.error("Cannot establish peer when either guid is not known") raise RemoteActorCacheException("Cannot establish peer when either guid is not known") try: - if not self.check_peer(from_mgmt_actor=from_mgmt_actor, from_guid=from_guid, - to_mgmt_actor=to_mgmt_actor, to_guid=to_guid): + if not self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type): - client = self.establish_peer_private(from_mgmt_actor=from_mgmt_actor, from_guid=from_guid, - to_mgmt_actor=to_mgmt_actor, to_guid=to_guid) + client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) - self.check_to_remove_entry(guid=from_guid) - self.check_to_remove_entry(guid=to_guid) + self.check_to_remove_entry(guid=peer_guid) - self.logger.debug("Peer established from {} to {}".format(from_guid, to_guid)) + self.logger.debug(f"Peer established from {mgmt_actor} to {peer_guid}") except Exception as e: self.logger.error(traceback.format_exc()) - self.logger.error("Peer could not be established from {} to {} e:={}".format(from_guid, to_guid, e)) + self.logger.error(f"Peer could not be established from {mgmt_actor} to {peer_guid} e:={e}") self.logger.debug("establish_peer OUT {}".format(client)) return client diff --git a/fabric_cf/actor/core/core/authority.py b/fabric_cf/actor/core/core/authority.py index 205d2c53..b836bddd 100644 --- a/fabric_cf/actor/core/core/authority.py +++ b/fabric_cf/actor/core/core/authority.py @@ -248,6 +248,16 @@ def register_client(self, *, client: Client): db.add_client(client=client) + def update_client(self, *, client: Client): + db = self.plugin.get_database() + + try: + db.get_client(guid=client.get_guid()) + except Exception as e: + self.logger.debug("Client does not exist e:{}".format(e)) + + db.update_client(client=client) + def unregister_client(self, *, guid: ID): db = self.plugin.get_database() db.remove_client(guid=guid) diff --git a/fabric_cf/actor/core/core/broker.py b/fabric_cf/actor/core/core/broker.py index ff8c801e..11925641 100644 --- a/fabric_cf/actor/core/core/broker.py +++ b/fabric_cf/actor/core/core/broker.py @@ -123,6 +123,9 @@ def actor_added(self, *, config: ActorConfig): def add_broker(self, *, broker: ABCBrokerProxy): self.registry.add_broker(broker=broker) + def update_broker(self, *, broker: ABCBrokerProxy): + self.registry.update_broker(broker=broker) + def register_client_slice(self, *, slice_obj: ABCSlice): self.wrapper.register_slice(slice_object=slice_obj) @@ -388,6 +391,19 @@ def register_client(self, *, client: Client): except Exception as e: raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"client: {client.get_guid()} e: {e}") + def update_client(self, *, client: Client): + database = self.plugin.get_database() + + try: + database.get_client(guid=client.get_guid()) + except Exception as e: + raise BrokerException(error_code=ExceptionErrorCode.NOT_FOUND, msg=f"client: {client.get_guid()} e: {e}") + + try: + database.update_client(client=client) + except Exception as e: + raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"client: {client.get_guid()} e: {e}") + def unregister_client(self, *, guid: ID): database = self.plugin.get_database() diff --git a/fabric_cf/actor/core/core/controller.py b/fabric_cf/actor/core/core/controller.py index eb761cb5..7bf7904b 100644 --- a/fabric_cf/actor/core/core/controller.py +++ b/fabric_cf/actor/core/core/controller.py @@ -143,6 +143,9 @@ def actor_added(self, *, config: ActorConfig): def add_broker(self, *, broker: ABCBrokerProxy): self.registry.add_broker(broker=broker) + def update_broker(self, *, broker: ABCBrokerProxy): + self.registry.update_broker(broker=broker) + def bid(self): """ Bids for resources as dictated by the plugin bidding policy for the diff --git a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py index 2a9e562b..6e1579eb 100644 --- a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py +++ b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py @@ -42,6 +42,7 @@ from fabric_cf.actor.core.apis.abc_actor_runnable import ABCActorRunnable from fabric_cf.actor.core.apis.abc_controller_reservation import ABCControllerReservation +from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin from fabric_cf.actor.core.common.constants import Constants, ErrorCodes from fabric_cf.actor.core.common.exceptions import ManageException from fabric_cf.actor.core.kernel.reservation_client import ClientReservationFactory @@ -126,6 +127,29 @@ def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: return result + def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: + result = ResultAvro() + + if broker is None or caller is None: + result.set_code(ErrorCodes.ErrorInvalidArguments.value) + result.set_message(ErrorCodes.ErrorInvalidArguments.interpret()) + return result + + try: + proxy = Converter.get_agent_proxy(mng=broker) + if proxy is None: + result.set_code(ErrorCodes.ErrorInvalidArguments.value) + result.set_message(ErrorCodes.ErrorInvalidArguments.interpret()) + else: + self.client.update_broker(broker=proxy) + except Exception as e: + self.logger.error("update_broker {}".format(e)) + result.set_code(ErrorCodes.ErrorInternalError.value) + result.set_message(ErrorCodes.ErrorInternalError.interpret(exception=e)) + result = ManagementObject.set_exception_details(result=result, e=e) + + return result + def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str, level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro: result = ResultBrokerQueryModelAvro() @@ -320,6 +344,31 @@ def __init__(self, *, actor: ABCClientActor, logger): self.actor = actor self.logger = logger + def __add_predecessors(self, predecessors: List[ReservationPredecessorAvro], + res: ABCControllerReservation): + for pred in predecessors: + if pred.get_reservation_id() is None: + self.logger.warning(f"Predecessor specified for rid={res.get_reservation_id()} " + "but missing reservation id of predecessor") + continue + + predid = ID(uid=pred.get_reservation_id()) + pr = self.actor.get_reservation(rid=predid) + + if pr is None: + self.logger.warning(f"Predecessor for rid={res.get_reservation_id()} with rid={predid} " + f"does not exist. Ignoring it!") + continue + + if not isinstance(pr, ABCControllerReservation): + self.logger.warning(f"Predecessor for rid={res.get_reservation_id()} is not an " + f"IControllerReservation: class={type(pr)}") + continue + + self.logger.debug(f"Setting redeem predecessor on reservation # {res.get_reservation_id()} " + f"pred={pr.get_reservation_id()}") + res.add_redeem_predecessor(reservation=pr) + def run(self): result = ResultAvro() rid = ID(uid=reservation.get_reservation_id()) @@ -332,28 +381,9 @@ def run(self): ManagementUtils.update_reservation(res_obj=r, rsv_mng=reservation) if isinstance(reservation, LeaseReservationAvro): predecessors = reservation.get_redeem_predecessors() - for pred in predecessors: - if pred.get_reservation_id() is None: - self.logger.warning("Redeem predecessor specified for rid={} " - "but missing reservation id of predecessor".format(rid)) - continue - - predid = ID(uid=pred.get_reservation_id()) - pr = self.actor.get_reservation(rid=predid) - - if pr is None: - self.logger.warning("Redeem predecessor for rid={} with rid={} does not exist. " - "Ignoring it!".format(rid, predid)) - continue - - if not isinstance(pr, ABCControllerReservation): - self.logger.warning("Redeem predecessor for rid={} is not an IControllerReservation: " - "class={}".format(rid, type(pr))) - continue - - self.logger.debug("Setting redeem predecessor on reservation # {} pred={}". - format(r.get_reservation_id(), pr.get_reservation_id())) - r.add_redeem_predecessor(reservation=pr) + if predecessors is not None: + self.logger.debug("Processing Redeem predecessors") + self.__add_predecessors(res=r, predecessors=predecessors) try: self.actor.get_plugin().get_database().update_reservation(reservation=r) @@ -397,7 +427,7 @@ def run(self): result.set_message(ErrorCodes.ErrorNoSuchReservation.interpret()) return result - dep_res_list = [] + redeem_dep_res_list = [] if dependencies is not None: for d in dependencies: dep_res = self.actor.get_reservation(rid=ID(uid=d.get_reservation_id())) @@ -405,7 +435,7 @@ def run(self): result.set_code(ErrorCodes.ErrorNoSuchReservation.value) result.set_message(ErrorCodes.ErrorNoSuchReservation.interpret()) return result - dep_res_list.append(dep_res) + redeem_dep_res_list.append(dep_res) rset = ResourceSet() units = r.get_resources().get_units() @@ -424,7 +454,7 @@ def run(self): rset.set_sliver(sliver=sliver) self.actor.extend(rid=r.get_reservation_id(), resources=rset, term=new_term, - dependencies=dep_res_list) + dependencies=redeem_dep_res_list) return result diff --git a/fabric_cf/actor/core/manage/converter.py b/fabric_cf/actor/core/manage/converter.py index 1c91af3f..c543d796 100644 --- a/fabric_cf/actor/core/manage/converter.py +++ b/fabric_cf/actor/core/manage/converter.py @@ -194,14 +194,14 @@ def fill_client(*, client_mng: ClientMng) -> Client: return result @staticmethod - def fill_client_mng(*, client: dict) -> ClientMng: + def fill_client_mng(*, client: Client) -> ClientMng: result = ClientMng() - result.set_name(name=client['clt_name']) - result.set_guid(guid=client['clt_guid']) + result.set_name(name=client.get_name()) + result.set_guid(guid=str(client.get_guid())) return result @staticmethod - def fill_clients(*, client_list: list) -> List[ClientMng]: + def fill_clients(*, client_list: List[Client]) -> List[ClientMng]: result = [] for c in client_list: mng = Converter.fill_client_mng(client=c) diff --git a/fabric_cf/actor/core/manage/kafka/kafka_broker.py b/fabric_cf/actor/core/manage/kafka/kafka_broker.py index bab5fd5d..778475b7 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_broker.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_broker.py @@ -184,3 +184,6 @@ def clone(self): def add_broker(self, *, broker: ProxyAvro) -> bool: raise ManageException(Constants.NOT_IMPLEMENTED) + + def update_broker(self, *, broker: ProxyAvro) -> bool: + raise ManageException(Constants.NOT_IMPLEMENTED) diff --git a/fabric_cf/actor/core/manage/kafka/kafka_controller.py b/fabric_cf/actor/core/manage/kafka/kafka_controller.py index 70257b90..4f7f4606 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_controller.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_controller.py @@ -59,6 +59,9 @@ def clone(self): def add_broker(self, *, broker: ProxyAvro) -> bool: raise ManageException(Constants.NOT_IMPLEMENTED) + def update_broker(self, *, broker: ProxyAvro) -> bool: + raise ManageException(Constants.NOT_IMPLEMENTED) + def get_brokers(self, *, broker: ID = None, id_token: str = None) -> List[ProxyAvro]: request = GetActorsRequestAvro() request = self.fill_request_by_id_message(request=request, id_token=id_token, broker_id=broker) diff --git a/fabric_cf/actor/core/manage/kafka/kafka_server_actor.py b/fabric_cf/actor/core/manage/kafka/kafka_server_actor.py index 1feb1e70..a00cee29 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_server_actor.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_server_actor.py @@ -126,5 +126,8 @@ def get_clients(self, *, guid: ID = None, id_token: str = None) -> List[ClientMn def register_client(self, *, client: ClientMng, kafka_topic: str) -> bool: raise ManageException(Constants.NOT_IMPLEMENTED) + def update_client(self, *, client: ClientMng, kafka_topic: str) -> bool: + raise ManageException(Constants.NOT_IMPLEMENTED) + def unregister_client(self, *, guid: ID) -> bool: raise ManageException(Constants.NOT_IMPLEMENTED) diff --git a/fabric_cf/actor/core/manage/local/local_broker.py b/fabric_cf/actor/core/manage/local/local_broker.py index cfff6e7a..212d61c3 100644 --- a/fabric_cf/actor/core/manage/local/local_broker.py +++ b/fabric_cf/actor/core/manage/local/local_broker.py @@ -69,6 +69,18 @@ def add_broker(self, *, broker: ProxyAvro) -> bool: return False + def update_broker(self, *, broker: ProxyAvro) -> bool: + self.clear_last() + try: + result = self.manager.update_broker(broker=broker, caller=self.auth) + self.last_status = result + + return result.get_code() == 0 + except Exception as e: + self.on_exception(e=e, traceback_str=traceback.format_exc()) + + return False + def get_brokers(self, *, broker: ID = None, id_token: str = None) -> List[ProxyAvro]: self.clear_last() try: diff --git a/fabric_cf/actor/core/manage/local/local_controller.py b/fabric_cf/actor/core/manage/local/local_controller.py index e2f7230a..60da7632 100644 --- a/fabric_cf/actor/core/manage/local/local_controller.py +++ b/fabric_cf/actor/core/manage/local/local_controller.py @@ -71,6 +71,18 @@ def add_broker(self, *, broker: ProxyAvro) -> bool: return False + def update_broker(self, *, broker: ProxyAvro) -> bool: + self.clear_last() + try: + result = self.manager.update_broker(broker=broker, caller=self.auth) + self.last_status = result + + return result.get_code() == 0 + except Exception as e: + self.on_exception(e=e, traceback_str=traceback.format_exc()) + + return False + def get_brokers(self, *, broker: ID = None, id_token: str = None) -> List[ProxyAvro]: self.clear_last() try: diff --git a/fabric_cf/actor/core/manage/local/local_server_actor.py b/fabric_cf/actor/core/manage/local/local_server_actor.py index deb64273..4ca547a4 100644 --- a/fabric_cf/actor/core/manage/local/local_server_actor.py +++ b/fabric_cf/actor/core/manage/local/local_server_actor.py @@ -73,7 +73,7 @@ def get_clients(self, *, guid: ID = None, id_token: str = None) -> List[ClientMn result = self.manager.get_clients(caller=self.auth, guid=guid) self.last_status = result.status if result.status.get_code() == 0: - return result.clients + return result.result except Exception as e: self.on_exception(e=e, traceback_str=traceback.format_exc()) @@ -95,6 +95,22 @@ def register_client(self, *, client: ClientMng, kafka_topic: str) -> bool: return False + def update_client(self, *, client: ClientMng, kafka_topic: str) -> bool: + self.clear_last() + if client is None or kafka_topic is None: + self.last_exception = Exception("Invalid arguments") + return False + + try: + result = self.manager.update_client(client=client, kafka_topic=kafka_topic, caller=self.auth) + self.last_status = result + + return result.get_code() == 0 + except Exception as e: + self.on_exception(e=e, traceback_str=traceback.format_exc()) + + return False + def unregister_client(self, *, guid: ID) -> bool: self.clear_last() if guid is None: diff --git a/fabric_cf/actor/core/manage/server_actor_management_object.py b/fabric_cf/actor/core/manage/server_actor_management_object.py index 5ed0c81a..c715c0a5 100644 --- a/fabric_cf/actor/core/manage/server_actor_management_object.py +++ b/fabric_cf/actor/core/manage/server_actor_management_object.py @@ -227,6 +227,36 @@ def run(self): return result + def update_client(self, *, client: ClientMng, kafka_topic: str, caller: AuthToken) -> ResultAvro: + result = ResultAvro() + + if client is None or kafka_topic is None or caller is None: + result.set_code(ErrorCodes.ErrorInvalidArguments.value) + result.set_message(ErrorCodes.ErrorInvalidArguments.interpret()) + return result + + try: + client_obj = Converter.fill_client(client_mng=client) + client_obj.set_kafka_topic(kafka_topic=kafka_topic) + + class Runner(ABCActorRunnable): + def __init__(self, *, actor: ABCServerActor): + self.actor = actor + + def run(self): + self.actor.update_client(client=client_obj) + return None + + self.actor.execute_on_actor_thread_and_wait(runnable=Runner(actor=self.actor)) + + except Exception as e: + self.logger.error("register_client: {}".format(e)) + result.set_code(ErrorCodes.ErrorInternalError.value) + result.set_message(ErrorCodes.ErrorInternalError.interpret(exception=e)) + result = ManagementObject.set_exception_details(result=result, e=e) + + return result + def get_clients(self, *, caller: AuthToken, guid: ID = None) -> ResultClientMng: result = ResultClientMng() result.status = ResultAvro() @@ -255,6 +285,7 @@ def get_clients(self, *, caller: AuthToken, guid: ID = None) -> ResultClientMng: result.status.set_code(ErrorCodes.ErrorInternalError.value) result.status.set_message(ErrorCodes.ErrorInternalError.interpret(exception=e)) result.status = ManagementObject.set_exception_details(result=result.status, e=e) + return result def unregister_client(self, *, guid: ID, caller: AuthToken) -> ResultAvro: result = ResultAvro() diff --git a/fabric_cf/actor/core/registry/peer_registry.py b/fabric_cf/actor/core/registry/peer_registry.py index 3aab8761..e4345e3e 100644 --- a/fabric_cf/actor/core/registry/peer_registry.py +++ b/fabric_cf/actor/core/registry/peer_registry.py @@ -70,6 +70,23 @@ def add_broker(self, *, broker: ABCBrokerProxy): except Exception as e: self.plugin.get_logger().error("Error while adding broker {}".format(e)) + def update_broker(self, *, broker: ABCBrokerProxy): + try: + self.lock.acquire() + self.brokers[broker.get_identity().get_guid()] = broker + + if self.default_broker is None: + self.default_broker = broker + finally: + self.lock.release() + + try: + self.plugin.get_database().update_broker(broker=broker) + self.logger.info("Added {} as broker".format(broker.get_name())) + except Exception as e: + self.plugin.get_logger().error("Error while adding broker {}".format(e)) + + def get_broker(self, *, guid: ID) -> ABCBrokerProxy: ret_val = None try: From 1504ec7f6fe3bb0033a783803d5d9d8745be4b30 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 11:23:39 -0500 Subject: [PATCH 2/6] update existing clients or brokers --- fabric_cf/actor/core/container/remote_actor_cache.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fabric_cf/actor/core/container/remote_actor_cache.py b/fabric_cf/actor/core/container/remote_actor_cache.py index 5ffb30dd..18e4cca1 100644 --- a/fabric_cf/actor/core/container/remote_actor_cache.py +++ b/fabric_cf/actor/core/container/remote_actor_cache.py @@ -191,7 +191,7 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd Establish connection i.e. create either proxies or clients between peer @param mgmt_actor mgmt_actor @param peer_guid peer_guid - @param peer_type peer_type + @param update update """ self.logger.debug("establish_peer_private IN") client = None @@ -253,13 +253,13 @@ def establish_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: self.logger.error("Cannot establish peer when either guid is not known") raise RemoteActorCacheException("Cannot establish peer when either guid is not known") try: - if not self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type): + exists = self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) - client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) + client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, update=exists is not None) - self.check_to_remove_entry(guid=peer_guid) + self.check_to_remove_entry(guid=peer_guid) - self.logger.debug(f"Peer established from {mgmt_actor} to {peer_guid}") + self.logger.debug(f"Peer established from {mgmt_actor} to {peer_guid}") except Exception as e: self.logger.error(traceback.format_exc()) From d3fc5b3aa24c9578728ba147bd5d017fe6c95a51 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 11:30:34 -0500 Subject: [PATCH 3/6] update existing clients --- fabric_cf/actor/core/container/remote_actor_cache.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fabric_cf/actor/core/container/remote_actor_cache.py b/fabric_cf/actor/core/container/remote_actor_cache.py index 18e4cca1..f509a92b 100644 --- a/fabric_cf/actor/core/container/remote_actor_cache.py +++ b/fabric_cf/actor/core/container/remote_actor_cache.py @@ -232,7 +232,10 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd client.set_name(name=cache_entry.get(self.actor_name)) client.set_guid(guid=str(peer_guid)) try: - mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) + if not update: + mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) + else: + mgmt_actor.update_client(client=client, kafka_topic=kafka_topic) except Exception as e: raise RemoteActorCacheException(f"Could not register actor: {peer_guid} as a client of " f"actor: {mgmt_actor} e= {e}") From a7efa2e85a0fa309eb6509393eda07fc6c542e17 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 11:38:18 -0500 Subject: [PATCH 4/6] add log for create/update --- fabric_cf/actor/core/container/remote_actor_cache.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/fabric_cf/actor/core/container/remote_actor_cache.py b/fabric_cf/actor/core/container/remote_actor_cache.py index f509a92b..68772041 100644 --- a/fabric_cf/actor/core/container/remote_actor_cache.py +++ b/fabric_cf/actor/core/container/remote_actor_cache.py @@ -216,10 +216,12 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd try: if not update: + self.logger.debug(f"Creating new proxy: {proxy}") if not mgmt_actor.add_broker(broker=proxy): raise RemoteActorCacheException(f"Could not register broker {peer_guid} " f"error: {mgmt_actor.get_last_error()}") else: + self.logger.debug(f"Updating existing proxy: {proxy}") if not mgmt_actor.update_broker(broker=proxy): raise RemoteActorCacheException(f"Could not update broker {peer_guid} " f"error: {mgmt_actor.get_last_error()}") @@ -227,14 +229,15 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd self.logger.error(e) self.logger.error(traceback.format_exc()) elif isinstance(mgmt_actor, ABCMgmtServerActor): - self.logger.debug("Creating a client for local to actor") client = ClientMng() client.set_name(name=cache_entry.get(self.actor_name)) client.set_guid(guid=str(peer_guid)) try: if not update: + self.logger.debug(f"Creating new client: {client}") mgmt_actor.register_client(client=client, kafka_topic=kafka_topic) else: + self.logger.debug(f"Updating existing client: {client}") mgmt_actor.update_client(client=client, kafka_topic=kafka_topic) except Exception as e: raise RemoteActorCacheException(f"Could not register actor: {peer_guid} as a client of " @@ -256,9 +259,9 @@ def establish_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: self.logger.error("Cannot establish peer when either guid is not known") raise RemoteActorCacheException("Cannot establish peer when either guid is not known") try: - exists = self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) + update = self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) - client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, update=exists is not None) + client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, update=update) self.check_to_remove_entry(guid=peer_guid) From 306cdda5c92657f38f33daae72105c4912091048 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 11:51:44 -0500 Subject: [PATCH 5/6] add update broker to controller/broker mgmt object --- .../core/apis/abc_client_actor_management_object.py | 9 +++++++++ fabric_cf/actor/core/manage/broker_management_object.py | 3 +++ .../actor/core/manage/controller_management_object.py | 3 +++ 3 files changed, 15 insertions(+) diff --git a/fabric_cf/actor/core/apis/abc_client_actor_management_object.py b/fabric_cf/actor/core/apis/abc_client_actor_management_object.py index 72bcfc3e..796f7c4a 100644 --- a/fabric_cf/actor/core/apis/abc_client_actor_management_object.py +++ b/fabric_cf/actor/core/apis/abc_client_actor_management_object.py @@ -116,6 +116,15 @@ def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: @return success or failure status """ + @abstractmethod + def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: + """ + Update a broker + @param broker: broker_proxy to be added + @param caller: caller + @return success or failure status + """ + @abstractmethod def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str, level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro: diff --git a/fabric_cf/actor/core/manage/broker_management_object.py b/fabric_cf/actor/core/manage/broker_management_object.py index 34c71316..6290bcaf 100644 --- a/fabric_cf/actor/core/manage/broker_management_object.py +++ b/fabric_cf/actor/core/manage/broker_management_object.py @@ -93,6 +93,9 @@ def get_brokers(self, *, caller: AuthToken, broker_id: ID = None) -> ResultProxy def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: return self.client_helper.add_broker(broker=broker, caller=caller) + def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: + return self.client_helper.update_broker(broker=broker, caller=caller) + def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str, level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro: return self.client_helper.get_broker_query_model(broker=broker, caller=caller, id_token=id_token, level=level, diff --git a/fabric_cf/actor/core/manage/controller_management_object.py b/fabric_cf/actor/core/manage/controller_management_object.py index 635773b9..ee380ea9 100644 --- a/fabric_cf/actor/core/manage/controller_management_object.py +++ b/fabric_cf/actor/core/manage/controller_management_object.py @@ -97,6 +97,9 @@ def get_brokers(self, *, caller: AuthToken, broker_id: ID = None) -> ResultProxy def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: return self.client_helper.add_broker(broker=broker, caller=caller) + def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: + return self.client_helper.add_broker(broker=broker, caller=caller) + def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str, level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro: return self.client_helper.get_broker_query_model(broker=broker, caller=caller, id_token=id_token, level=level, From 684f97916ec8a9d6f7bc8441ffda38cd31582604 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 28 Feb 2023 14:17:25 -0500 Subject: [PATCH 6/6] up version, register as client/proxy based on peer type --- fabric_cf/__init__.py | 2 +- .../core/container/remote_actor_cache.py | 38 +++++++++++-------- .../manage/controller_management_object.py | 2 +- .../actor/core/registry/peer_registry.py | 1 - fabric_cf/authority/docker-compose.geni.yml | 2 +- fabric_cf/authority/docker-compose.yml | 2 +- fabric_cf/broker/docker-compose.yml | 2 +- fabric_cf/orchestrator/docker-compose.yml | 2 +- 8 files changed, 28 insertions(+), 23 deletions(-) diff --git a/fabric_cf/__init__.py b/fabric_cf/__init__.py index 7c083080..a3054cd0 100644 --- a/fabric_cf/__init__.py +++ b/fabric_cf/__init__.py @@ -1 +1 @@ -__VERSION__ = "1.4.3" +__VERSION__ = "1.4.4" diff --git a/fabric_cf/actor/core/container/remote_actor_cache.py b/fabric_cf/actor/core/container/remote_actor_cache.py index 68772041..99167ebc 100644 --- a/fabric_cf/actor/core/container/remote_actor_cache.py +++ b/fabric_cf/actor/core/container/remote_actor_cache.py @@ -36,10 +36,11 @@ from fabric_cf.actor.core.manage.messages.client_mng import ClientMng from fabric_mb.message_bus.messages.proxy_avro import ProxyAvro from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.apis.abc_actor_mixin import ActorType if TYPE_CHECKING: from fabric_cf.actor.core.apis.abc_mgmt_actor import ABCMgmtActor - from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin, ActorType + from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin class RemoteActorCacheException(Exception): @@ -163,34 +164,38 @@ def check_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: Acto """ self.logger.debug(f"Check if Peer {peer_guid}/{peer_type} already exists!") try: - # For Broker/AM - if isinstance(mgmt_actor, ABCMgmtServerActor): - self.logger.debug(f"Checking clients") - clients = mgmt_actor.get_clients(guid=peer_guid) - self.logger.debug(f"clients -- {clients} {mgmt_actor.get_last_error()}") - if clients is not None: - self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} exists (client)") - return True - - # For Orchestrator/Broker - elif isinstance(mgmt_actor, ABCMgmtClientActor): + # For Broker - all AMs are added as proxies + # For Orchestrator - all peers will be added as Proxies + if isinstance(mgmt_actor, ABCMgmtClientActor) and peer_type in [ActorType.Authority, ActorType.Broker]: self.logger.debug(f"Checking brokers") brokers = mgmt_actor.get_brokers(broker=peer_guid) self.logger.debug(f"brokers -- {brokers}") if brokers is not None: self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} exists (broker)") return True + + # For AM - all peers will be added as clients + # For Broker - orchestrator as client + elif isinstance(mgmt_actor, ABCMgmtServerActor) and peer_type in [ActorType.Orchestrator, ActorType.Broker]: + self.logger.debug(f"Checking clients") + clients = mgmt_actor.get_clients(guid=peer_guid) + self.logger.debug(f"clients -- {clients} {mgmt_actor.get_last_error()}") + if clients is not None: + self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} exists (client)") + return True except Exception as e: raise RemoteActorCacheException(f"Unable to cast actor {mgmt_actor.get_guid()} or {peer_guid} e={e}") self.logger.debug(f"Edge between {mgmt_actor.get_guid()} and {peer_guid} does not exist") return False - def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, update: bool = False) -> ClientMng: + def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: ActorType, + update: bool = False) -> ClientMng: """ Establish connection i.e. create either proxies or clients between peer @param mgmt_actor mgmt_actor @param peer_guid peer_guid + @param peer_type peer_type @param update update """ self.logger.debug("establish_peer_private IN") @@ -206,7 +211,7 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd if kafka_topic is None: raise RemoteActorCacheException(f"Actor {peer_guid} does not have a kafka topic") - if isinstance(mgmt_actor, ABCMgmtClientActor): + if isinstance(mgmt_actor, ABCMgmtClientActor) and peer_type in [ActorType.Authority, ActorType.Broker]: proxy = ProxyAvro() proxy.set_protocol(protocol) proxy.set_guid(str(identity.get_guid())) @@ -228,7 +233,7 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd except Exception as e: self.logger.error(e) self.logger.error(traceback.format_exc()) - elif isinstance(mgmt_actor, ABCMgmtServerActor): + elif isinstance(mgmt_actor, ABCMgmtServerActor) and peer_type in [ActorType.Orchestrator, ActorType.Broker]: client = ClientMng() client.set_name(name=cache_entry.get(self.actor_name)) client.set_guid(guid=str(peer_guid)) @@ -261,7 +266,8 @@ def establish_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: try: update = self.check_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type) - client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, update=update) + client = self.establish_peer_private(mgmt_actor=mgmt_actor, peer_guid=peer_guid, peer_type=peer_type, + update=update) self.check_to_remove_entry(guid=peer_guid) diff --git a/fabric_cf/actor/core/manage/controller_management_object.py b/fabric_cf/actor/core/manage/controller_management_object.py index ee380ea9..d1697511 100644 --- a/fabric_cf/actor/core/manage/controller_management_object.py +++ b/fabric_cf/actor/core/manage/controller_management_object.py @@ -98,7 +98,7 @@ def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: return self.client_helper.add_broker(broker=broker, caller=caller) def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro: - return self.client_helper.add_broker(broker=broker, caller=caller) + return self.client_helper.update_broker(broker=broker, caller=caller) def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str, level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro: diff --git a/fabric_cf/actor/core/registry/peer_registry.py b/fabric_cf/actor/core/registry/peer_registry.py index e4345e3e..363719c9 100644 --- a/fabric_cf/actor/core/registry/peer_registry.py +++ b/fabric_cf/actor/core/registry/peer_registry.py @@ -86,7 +86,6 @@ def update_broker(self, *, broker: ABCBrokerProxy): except Exception as e: self.plugin.get_logger().error("Error while adding broker {}".format(e)) - def get_broker(self, *, guid: ID) -> ABCBrokerProxy: ret_val = None try: diff --git a/fabric_cf/authority/docker-compose.geni.yml b/fabric_cf/authority/docker-compose.geni.yml index 69f80b35..e6247d93 100644 --- a/fabric_cf/authority/docker-compose.geni.yml +++ b/fabric_cf/authority/docker-compose.geni.yml @@ -51,7 +51,7 @@ services: network: host context: ../../../ dockerfile: Dockerfile-auth - image: authority:1.4.3 + image: authority:1.4.4 container_name: site1-am restart: always depends_on: diff --git a/fabric_cf/authority/docker-compose.yml b/fabric_cf/authority/docker-compose.yml index fe14ca18..48dfae06 100644 --- a/fabric_cf/authority/docker-compose.yml +++ b/fabric_cf/authority/docker-compose.yml @@ -51,7 +51,7 @@ services: network: host context: ../../../ dockerfile: Dockerfile-auth - image: authority:1.4.3 + image: authority:1.4.4 container_name: site1-am restart: always depends_on: diff --git a/fabric_cf/broker/docker-compose.yml b/fabric_cf/broker/docker-compose.yml index 59ad4439..92124b30 100644 --- a/fabric_cf/broker/docker-compose.yml +++ b/fabric_cf/broker/docker-compose.yml @@ -53,7 +53,7 @@ services: build: context: ../../../ dockerfile: Dockerfile-broker - image: broker:1.4.3 + image: broker:1.4.4 container_name: broker restart: always networks: diff --git a/fabric_cf/orchestrator/docker-compose.yml b/fabric_cf/orchestrator/docker-compose.yml index 4d591ce5..58d200c5 100644 --- a/fabric_cf/orchestrator/docker-compose.yml +++ b/fabric_cf/orchestrator/docker-compose.yml @@ -67,7 +67,7 @@ services: build: context: ../../../ dockerfile: Dockerfile-orchestrator - image: orchestrator:1.4.3 + image: orchestrator:1.4.4 container_name: orchestrator restart: always depends_on: