From 3c6dae15fe99609e2d05a1615d5d407fdf09ea86 Mon Sep 17 00:00:00 2001 From: Abdelilah Essiari Date: Sun, 10 Mar 2024 18:18:32 -0700 Subject: [PATCH] handling sense, chameleon, fabric, better state reconciliation during exceptions --- fabfed/controller/controller.py | 116 +++--------------- fabfed/controller/helper.py | 8 +- fabfed/controller/provider_factory.py | 3 + fabfed/model/state.py | 91 ++++++++++++-- fabfed/provider/api/provider.py | 75 +++++++++-- fabfed/provider/chi/chi_provider.py | 98 +++++++-------- fabfed/provider/cloudlab/cloudlab_provider.py | 87 +++++-------- fabfed/provider/fabric/fabric_node.py | 7 +- fabfed/provider/fabric/fabric_provider.py | 9 +- fabfed/provider/fabric/fabric_slice.py | 26 ++-- fabfed/provider/sense/sense_provider.py | 66 +++++++--- fabfed/util/constants.py | 1 + fabfed/util/node_tester.py | 1 - fabfed/util/state.py | 43 ++++++- tests/test_workflow.py | 2 +- tools/fabfed.py | 22 +--- 16 files changed, 372 insertions(+), 283 deletions(-) diff --git a/fabfed/controller/controller.py b/fabfed/controller/controller.py index 0ea211da..5f0e6e99 100644 --- a/fabfed/controller/controller.py +++ b/fabfed/controller/controller.py @@ -2,7 +2,7 @@ from typing import List, Union, Dict from fabfed.exceptions import ControllerException -from fabfed.model.state import BaseState, ProviderState +from fabfed.model.state import ResourceState, ProviderState from fabfed.util.config import WorkflowConfig from .helper import ControllerResourceListener, partition_layer3_config from fabfed.policy.policy_helper import ProviderPolicy @@ -333,7 +333,7 @@ def apply(self, provider_states: List[ProviderState]): raise ControllerException(exceptions) @staticmethod - def _build_state_map(provider_states: List[ProviderState]) -> Dict[str, List[BaseState]]: + def _build_state_map(provider_states: List[ProviderState]) -> Dict[str, List[ResourceState]]: resource_state_map = dict() for provider_state in provider_states: @@ -414,6 +414,10 @@ def destroy(self, *, provider_states: List[ProviderState]): skip_resources.update([external_state.label for external_state in external_states]) exceptions.append(e) + if not remaining_resources: + provider_states.clear() + return + provider_states_copy = provider_states.copy() provider_states.clear() @@ -421,109 +425,17 @@ def destroy(self, *, provider_states: List[ProviderState]): provider_state.node_states.clear() provider_state.network_states.clear() provider_state.service_states.clear() - provider = self.provider_factory.get_provider(label=provider_state.label) - provider_state.failed = provider.failed - - for remaining_resource in remaining_resources: - resource_state = resource_state_map[remaining_resource.label] - - if remaining_resource.provider.label == provider_state.label: - if remaining_resource.is_network: - provider_state.network_states.extend(resource_state) - elif remaining_resource.is_node: - provider_state.node_states.extend(resource_state) - elif remaining_resource.is_service: - provider_state.service_states.extend(resource_state) - if provider_state.node_states or provider_state.network_states or provider_state.service_states: - provider_states.append(provider_state) + if self.provider_factory.has_provider(label=provider_state.label): + provider = self.provider_factory.get_provider(label=provider_state.label) + provider_state.failed = provider.failed - if exceptions: - raise ControllerException(exceptions) + for remaining_resource in [r for r in remaining_resources if r.provider.label == provider_state.label]: + resource_states = resource_state_map[remaining_resource.label] + provider_state.add_all(resource_states) - def delete(self, *, provider_states: List[ProviderState]): - exceptions = [] - resource_state_map = Controller._build_state_map(provider_states) - provider_resource_map = dict() - - for provider_state in provider_states: - key = provider_state.label - provider_resource_map[key] = list() - - temp = self.resources - temp.reverse() - - for resource in temp: - if resource.label in resource_state_map: - key = resource.provider.label - external_dependencies = resource.attributes.get(Constants.EXTERNAL_DEPENDENCIES, []) - external_states = [resource_state_map[ed.resource.label] for ed in external_dependencies] - resource.attributes[Constants.EXTERNAL_DEPENDENCY_STATES] = sum(external_states, []) - provider_resource_map[key].append(resource) - resource.attributes[Constants.SAVED_STATES] = resource_state_map[resource.label] - - remaining_resources = list() - skip_resources = set() - - for resource in temp: - if resource.label not in resource_state_map: - continue - - provider_label = resource.provider.label - provider = self.provider_factory.get_provider(label=provider_label) - external_states = resource.attributes[Constants.EXTERNAL_DEPENDENCY_STATES] - - if resource.label in skip_resources: - self.logger.warning(f"Skipping deleting resource: {resource} with {provider_label}") - remaining_resources.append(resource) - skip_resources.update([external_state.label for external_state in external_states]) - continue - - fabric_work_around = False - # TODO: THIS FABRIC SPECIFIC AS WE DON"T SUPPORT SLICE MODIFY API JUST YET - for remaining_resource in remaining_resources: - if provider_label == remaining_resource.provider.label \ - and "@fabric" in remaining_resource.provider.label: - fabric_work_around = True - break - - if fabric_work_around: - self.logger.warning(f"Skipping deleting fabric resource: {resource} with {provider_label}") - remaining_resources.append(resource) - skip_resources.update([external_state.label for external_state in external_states]) - continue - - try: - provider.delete_resource(resource=resource.attributes) - except Exception as e: - self.logger.warning(f"Exception occurred while deleting resource: {e} using {provider_label}") - remaining_resources.append(resource) - skip_resources.update([external_state.label for external_state in external_states]) - exceptions.append(e) - - provider_states_copy = provider_states.copy() - provider_states.clear() - - for provider_state in provider_states_copy: - provider_state.node_states.clear() - provider_state.network_states.clear() - provider_state.service_states.clear() - provider = self.provider_factory.get_provider(label=provider_state.label) - provider_state.failed = provider.failed - - for remaining_resource in remaining_resources: - resource_state = resource_state_map[remaining_resource.label] - - if remaining_resource.provider.label == provider_state.label: - if remaining_resource.is_network: - provider_state.network_states.extend(resource_state) - elif remaining_resource.is_node: - provider_state.node_states.extend(resource_state) - elif remaining_resource.is_service: - provider_state.service_states.extend(resource_state) - - if provider_state.node_states or provider_state.network_states or provider_state.service_states: - provider_states.append(provider_state) + if provider_state.states(): + provider_states.append(provider_state) if exceptions: raise ControllerException(exceptions) diff --git a/fabfed/controller/helper.py b/fabfed/controller/helper.py index c547a1e1..ab3abe42 100644 --- a/fabfed/controller/helper.py +++ b/fabfed/controller/helper.py @@ -101,7 +101,9 @@ def find_node_clusters(*, resources): visited_networks.append(peer.label) nodes.extend(find_nodes_related_to_network(network=peer, resources=resources)) visited_nodes.extend([n.label for n in nodes]) - clusters.append(nodes) + + if nodes: + clusters.append(nodes) for net in networks: if net.label not in visited_networks: @@ -111,7 +113,9 @@ def find_node_clusters(*, resources): visited_networks.append(net.label) nodes = find_nodes_related_to_network(network=net, resources=resources) visited_nodes.extend([n.label for n in nodes]) - clusters.append(nodes) + + if nodes: + clusters.append(nodes) nodes = [r for r in resources if r.is_node] diff --git a/fabfed/controller/provider_factory.py b/fabfed/controller/provider_factory.py index a220ed7f..84485626 100644 --- a/fabfed/controller/provider_factory.py +++ b/fabfed/controller/provider_factory.py @@ -36,6 +36,9 @@ def init_provider(self, *, type: str, label: str, name: str, attributes, logger) def providers(self) -> List[Provider]: return list(self._providers.values()) + def has_provider(self, *, label: str) -> bool: + return label in self._providers + def get_provider(self, *, label: str) -> Provider: return self._providers[label] diff --git a/fabfed/model/state.py b/fabfed/model/state.py index 0e4970f9..2d4cbb24 100644 --- a/fabfed/model/state.py +++ b/fabfed/model/state.py @@ -4,6 +4,7 @@ from fabfed.util.config_models import Config, ResourceConfig, BaseConfig, ProviderConfig, Dependency, DependencyInfo from fabfed.model import ResolvedDependency +from fabfed.util.constants import Constants class BaseState: @@ -13,19 +14,43 @@ def __init__(self, type: str, label: str, attributes: Dict): self.attributes = attributes -class NetworkState(BaseState): +class ResourceState(BaseState): + def __init__(self, type: str, label: str, attributes: Dict): + super().__init__(type, label, attributes) + self.type = type + self.label = label + self.attributes = attributes + + @property + def name(self) -> str: + return self.attributes['name'] + + @property + def is_node_state(self): + return self.type == Constants.RES_TYPE_NODE + + @property + def is_network_state(self): + return self.type == Constants.RES_TYPE_NETWORK + + @property + def is_service_state(self): + return self.type == Constants.RES_TYPE_SERVICE + + +class NetworkState(ResourceState): def __init__(self, *, label, attributes): - super().__init__("network", label, attributes) + super().__init__(Constants.RES_TYPE_NETWORK, label, attributes) -class NodeState(BaseState): +class NodeState(ResourceState): def __init__(self, *, label, attributes): - super().__init__("node", label, attributes) + super().__init__(Constants.RES_TYPE_NODE, label, attributes) -class ServiceState(BaseState): +class ServiceState(ResourceState): def __init__(self, *, label, attributes): - super().__init__("service", label, attributes) + super().__init__(Constants.RES_TYPE_SERVICE, label, attributes) class ProviderState(BaseState): @@ -41,8 +66,57 @@ def __init__(self, label, attributes, network_states: List[NetworkState], self.failed = failed self.creation_details = creation_details - def states(self) -> List[BaseState]: - return self.network_states + self.node_states + self.service_states + def add_if_not_found(self, resource_state: ResourceState): + from typing import cast + assert resource_state.type in [Constants.RES_TYPE_NODE, + Constants.RES_TYPE_NETWORK, + Constants.RES_TYPE_SERVICE] + + def exists(state, states: list): + temp = next(filter(lambda s: s.label == state.label and s.name == state.name, states), None) + + return temp is not None + + if exists(resource_state, self.states()): + return + + if resource_state.is_node_state: + self.node_states.append(cast(NodeState, resource_state)) + elif resource_state.is_network_state: + self.network_states.append(cast(NetworkState, resource_state)) + elif resource_state.is_service_state: + self.service_states.append(cast(ServiceState, resource_state)) + + def add(self, resource_state: ResourceState): + from typing import cast + assert resource_state.type in [Constants.RES_TYPE_NODE, + Constants.RES_TYPE_NETWORK, + Constants.RES_TYPE_SERVICE] + + def exists(state, states: list): + temp = next(filter(lambda s: s.label == state.label and s.name == state.name, states), None) + + return temp is not None + + assert not exists(resource_state, self.states()) + + if resource_state.is_node_state: + self.node_states.append(cast(NodeState, resource_state)) + elif resource_state.is_network_state: + self.network_states.append(cast(NetworkState, resource_state)) + elif resource_state.is_service_state: + self.service_states.append(cast(ServiceState, resource_state)) + + def add_all(self, resource_states: List[ResourceState]): + for resource_state in resource_states: + self.add(resource_state) + + def states(self) -> List[ResourceState]: + temp = [] + temp.extend(self.network_states) + temp.extend(self.node_states) + temp.extend(self.service_states) + return temp def number_of_created_resources(self): count = 0 @@ -68,6 +142,7 @@ def number_of_total_resources(self): return count + def provider_constructor(loader: yaml.SafeLoader, node: yaml.nodes.MappingNode) -> ProviderState: return ProviderState(**loader.construct_mapping(node)) diff --git a/fabfed/provider/api/provider.py b/fabfed/provider/api/provider.py index 8d9899cf..9a05b93f 100644 --- a/fabfed/provider/api/provider.py +++ b/fabfed/provider/api/provider.py @@ -29,6 +29,16 @@ def __init__(self, *, type, label, name, logger: logging.Logger, config: dict): self.add_duration = self.create_duration = self.delete_duration = self.init_duration = 0 self._saved_state: ProviderState = Union[ProviderState, None] + self._existing_map: Dict[str, List[str]] = {} + self._added_map: Union[Dict[str, List[str]], None] = None + + @property + def existing_map(self) -> Dict[str, List[str]]: + return self._existing_map + + @property + def added_map(self) -> Dict[str, List[str]]: + return self._added_map @property def resources(self) -> List: @@ -142,6 +152,62 @@ def init(self): def supports_modify(self): return False + def resource_name(self, resource: dict, idx: int = 0): + return f"{self.name}-{resource[Constants.RES_NAME_PREFIX]}-{idx}" + + def add_to_existing_map(self, resource: dict): + label = resource.get(Constants.LABEL) + self.existing_map[label] = [] + + if self.saved_state and resource.get(Constants.LABEL) in self.saved_state.creation_details: + provider_saved_creation_details = self.saved_state.creation_details[label] + + for n in range(0, provider_saved_creation_details['total_count']): + resource_name = self.resource_name(resource, n) + self.existing_map[label].append(resource_name) + + def _compute_added_map(self): + if self.added_map is None: + self._added_map = {} + + for net in self.networks: + if net.label not in self.added_map: + self.added_map[net.label] = [] + + self.added_map[net.label].append(net.name) + + for node in self.nodes: + if node.label not in self.added_map: + self.added_map[node.label] = [] + + self.added_map[node.label].append(node.name) + + for service in self.services: + if service.label not in self.added_map: + self.added_map[service.label] = [] + + self.added_map[service.label].append(service.name) + + @property + def modified(self): + if self.saved_state: + self._compute_added_map() + return self._existing_map and self.added_map != self._existing_map + + return False + + def retrieve_attribute_from_saved_state(self, resource, resource_name, attribute): + if self.saved_state: + for state in resource[Constants.SAVED_STATES]: + if state.attributes['name'] == resource_name: + ret = state.attributes.get(attribute) + + if isinstance(ret, list) and len(ret) == 1: + return ret[0] + + return ret + return None + def validate_resource(self, *, resource: dict): label = resource.get(Constants.LABEL) @@ -152,6 +218,7 @@ def validate_resource(self, *, resource: dict): self.creation_details[label]['failed_count'] = 0 self.creation_details[label]['created_count'] = 0 self.creation_details[label]['name_prefix'] = resource[Constants.RES_NAME_PREFIX] + self.add_to_existing_map(resource) import time @@ -249,7 +316,7 @@ def create_resource(self, *, resource: dict): self.logger.warning( f"Adding no longer pending internally {internal_dependency_label} failed using {e2}") - self.logger.info(f"Creating {label} using {self.label}") + self.logger.info(f"Creating {label} using {self.label}: {self._added}") if label in self._added: try: @@ -298,12 +365,6 @@ def cleanup_attrs(attrs): service_states = [ServiceState(label=s.label, attributes=cleanup_attrs(vars(s))) for s in services] pending = [res['label'] for res in self.pending] pending_internal = [res['label'] for res in self.pending_internal] - # # nodes = [n for n in self.nodes if n.labelin self.failed] - # for n in nodes: - # self.failed[n.label] = {"phase": "xxx" , 'resource': cleanup_attrs(vars(n))} - # for n in networks: - # self.failed[n.label] = {"phase": "yyy" , 'resource': cleanup_attrs(vars(n))} - return ProviderState(self.label, dict(name=self.name), net_states, node_states, service_states, pending, pending_internal, self.failed, self.creation_details) diff --git a/fabfed/provider/chi/chi_provider.py b/fabfed/provider/chi/chi_provider.py index 9c74987a..681a5d0e 100644 --- a/fabfed/provider/chi/chi_provider.py +++ b/fabfed/provider/chi/chi_provider.py @@ -17,7 +17,6 @@ class ChiProvider(Provider): def __init__(self, *, type, label, name, config: dict[str, str]): super().__init__(type=type, label=label, name=name, logger=logger, config=config) self.helper = None - self.existing_map = {} def setup_environment(self): site = "CHI@UC" @@ -121,20 +120,6 @@ def do_add_resource(self, *, resource: dict): label = resource[Constants.LABEL] rtype = resource[Constants.RES_TYPE] site = resource[Constants.RES_SITE] - self.existing_map[label] = [] - - if self.saved_state and label in self.saved_state.creation_details: - provider_saved_creation_details = self.saved_state.creation_details[label] - - if rtype == Constants.RES_NODES: - for n in range(0, provider_saved_creation_details['total_count']): - node_name = f"{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}{n}" - self.existing_map[label].append(node_name) - else: - assert provider_saved_creation_details['total_count'] == 1 - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' - self.existing_map[label].append(net_name) - creation_details = resource[Constants.RES_CREATION_DETAILS] if not creation_details['in_config_file']: @@ -146,13 +131,22 @@ def do_add_resource(self, *, resource: dict): if rtype == Constants.RES_TYPE_NETWORK: layer3 = resource.get(Constants.RES_LAYER3) - stitch_info = resource.get(Constants.RES_STITCH_INFO) - assert stitch_info and stitch_info.consumer, f"resource {label} missing stitch provider" - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + from typing import Union + from fabfed.policy.policy_helper import StitchInfo + stitch_info: Union[str, StitchInfo] = resource.get(Constants.RES_STITCH_INFO) + assert stitch_info, f"resource {label} missing stitch info" + + if isinstance(stitch_info, str): + stitch_provider = stitch_info + else: + stitch_provider = stitch_info.consumer + + assert stitch_provider, f"resource {label} missing stitch provider" + net_name = self.resource_name(resource) from fabfed.provider.chi.chi_network import ChiNetwork net = ChiNetwork(label=label, name=net_name, site=site, - layer3=layer3, stitch_provider=stitch_info.consumer, + layer3=layer3, stitch_provider=stitch_provider, project_name=project_name) self._networks.append(net) @@ -168,11 +162,10 @@ def do_add_resource(self, *, resource: dict): node_count = resource[Constants.RES_COUNT] image = resource.get(Constants.RES_IMAGE) - node_name_prefix = resource.get(Constants.RES_NAME_PREFIX) flavor = resource.get(Constants.RES_FLAVOR, DEFAULT_FLAVOR) for n in range(0, node_count): - node_name = f"{self.name}-{node_name_prefix}{n}" + node_name = self.resource_name(resource, n) self.existing_map[label].append(node_name) from fabfed.provider.chi.chi_node import ChiNode @@ -190,30 +183,27 @@ def do_create_resource(self, *, resource: dict): label = resource.get(Constants.LABEL) rtype = resource.get(Constants.RES_TYPE) - if rtype == Constants.RES_TYPE_NETWORK.lower(): - from fabfed.provider.chi.chi_network import ChiNetwork - existing_names = self.existing_map[label] - temp: List[ChiNetwork] = [net for net in self._networks if net.label == label] - added_names = [net.name for net in temp] - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + if rtype == Constants.RES_TYPE_NETWORK: + if self.modified: + net_name = self.resource_name(resource) - if net_name in existing_names and net_name not in added_names: - from fabfed.provider.chi.chi_network import ChiNetwork - from ...util.config_models import Config + if net_name in self.existing_map[label] and net_name not in self.added_map.get(label, list()): + from fabfed.provider.chi.chi_network import ChiNetwork + from ...util.config_models import Config - layer3 = Config("", "", {}) - project_name = self.config[CHI_PROJECT_NAME] + layer3 = Config("", "", {}) + project_name = self.config[CHI_PROJECT_NAME] - net = ChiNetwork(label=label, name=net_name, site=site, - layer3=layer3, stitch_provider='', project_name=project_name) - net.delete() + net = ChiNetwork(label=label, name=net_name, site=site, + layer3=layer3, stitch_provider='', project_name=project_name) + net.delete() - self.logger.info(f"Deleted network: {net_name} at site {site}") + self.logger.info(f"Deleted network: {net_name} at site {site}") - if self.resource_listener: - self.resource_listener.on_deleted(source=self, provider=self, resource=net) + if self.resource_listener: + self.resource_listener.on_deleted(source=self, provider=self, resource=net) - return + return net = next(filter(lambda n: n.label == label, self.networks)) net.create() @@ -225,24 +215,23 @@ def do_create_resource(self, *, resource: dict): from fabfed.provider.chi.chi_node import ChiNode temp: List[ChiNode] = [node for node in self._nodes if node.label == label] - names = [node.name for node in temp] - existing_names = self.existing_map[label] - for node_name in existing_names: - key_pair = self.config[CHI_KEY_PAIR] - project_name = self.config[CHI_PROJECT_NAME] + if self.modified: + for node_name in self.existing_map[label]: + key_pair = self.config[CHI_KEY_PAIR] + project_name = self.config[CHI_PROJECT_NAME] - if node_name not in names: - from fabfed.provider.chi.chi_node import ChiNode + if node_name not in self.added_map.get(label, list()): + from fabfed.provider.chi.chi_node import ChiNode - node = ChiNode(label=label, name=node_name, image='', site=site, flavor='', - key_pair=key_pair, network='', project_name=project_name) - node.delete() + node = ChiNode(label=label, name=node_name, image='', site=site, flavor='', + key_pair=key_pair, network='', project_name=project_name) + node.delete() - self.logger.info(f"Deleted node: {node_name} at site {site}") + self.logger.info(f"Deleted node: {node_name} at site {site}") - if self.resource_listener: - self.resource_listener.on_deleted(source=self, provider=self, resource=node) + if self.resource_listener: + self.resource_listener.on_deleted(source=self, provider=self, resource=node) for node in temp: node.create() @@ -266,7 +255,7 @@ def do_delete_resource(self, *, resource: dict): rtype = resource.get(Constants.RES_TYPE) if rtype == Constants.RES_TYPE_NETWORK.lower(): - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + net_name = self.resource_name(resource) self.logger.debug(f"Deleting network: {net_name} at site {site}") from fabfed.provider.chi.chi_network import ChiNetwork from ...util.config_models import Config @@ -284,8 +273,7 @@ def do_delete_resource(self, *, resource: dict): node_count = resource.get(Constants.RES_COUNT, 1) for n in range(0, node_count): - node_name_prefix = resource.get(Constants.RES_NAME_PREFIX) - node_name = f"{self.name}-{node_name_prefix}{n}" + node_name = self.resource_name(resource, n) self.logger.debug(f"Deleting node: {node_name} at site {site}") from fabfed.provider.chi.chi_node import ChiNode diff --git a/fabfed/provider/cloudlab/cloudlab_provider.py b/fabfed/provider/cloudlab/cloudlab_provider.py index 42cad603..a392f268 100644 --- a/fabfed/provider/cloudlab/cloudlab_provider.py +++ b/fabfed/provider/cloudlab/cloudlab_provider.py @@ -13,9 +13,8 @@ class CloudlabProvider(Provider): def __init__(self, *, type, label, name, config: dict): super().__init__(type=type, label=label, name=name, logger=logger, config=config) - self.supported_resources = [Constants.RES_TYPE_NETWORK.lower(), Constants.RES_TYPE_NODE.lower()] - self.modified = None - self.existing_map = {} + self.supported_resources = [Constants.RES_TYPE_NETWORK, Constants.RES_TYPE_NODE] + self._handled_modify = False def setup_environment(self): for attr in CLOUDLAB_CONF_ATTRS: @@ -110,54 +109,41 @@ def do_validate_resource(self, *, resource: dict): def _get_interfaces(self, resource): interfaces = resource.get(Constants.RES_INTERFACES, list()) - cloudlab_stitch_port = get_stitch_port_for_provider(resource=resource, provider=self.type) if not interfaces: + cloudlab_stitch_port = get_stitch_port_for_provider(resource=resource, provider=self.type) + if 'option' in cloudlab_stitch_port and Constants.RES_INTERFACES in cloudlab_stitch_port['option']: interfaces = cloudlab_stitch_port['option'][Constants.RES_INTERFACES] return interfaces def do_add_resource(self, *, resource: dict): - rtype = resource.get(Constants.RES_TYPE) - label = resource.get(Constants.LABEL) - self.existing_map[label] = [] - - if self.saved_state and label in self.saved_state.creation_details: - provider_saved_creation_details = self.saved_state.creation_details[label] - - if rtype == Constants.RES_NODES: - for n in range(0, provider_saved_creation_details['total_count']): - node_name = f"{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}-{n}" - self.existing_map[label].append(node_name) - else: - assert provider_saved_creation_details['total_count'] == 1 - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' - self.existing_map[label].append(net_name) - creation_details = resource[Constants.RES_CREATION_DETAILS] if not creation_details['in_config_file']: return + rtype = resource.get(Constants.RES_TYPE) + label = resource.get(Constants.LABEL) + if rtype == Constants.RES_TYPE_NETWORK: - states = resource[Constants.SAVED_STATES] + net_name = self.resource_name(resource) + interface = self.retrieve_attribute_from_saved_state(resource, net_name, attribute='interface') - if states: - state = states[0] - vlan = state.attributes['interface'][0]['vlan'] - interfaces = self._get_interfaces(resource) + assert interface, "expecting a vlan interface in saved state for {label}" + vlan = interface['vlan'] - if isinstance(vlan, str): - vlan = int(vlan) + if isinstance(vlan, str): + vlan = int(vlan) - if interfaces and interfaces[0]['vlan'] != vlan: - self.logger.warning( - f"{self.label} ignoring: {label}'s interface does not match provisioned vlan {vlan}") + interfaces = self._get_interfaces(resource) - resource[Constants.RES_INTERFACES] = [{'vlan': vlan}] + if interfaces and interfaces[0]['vlan'] != vlan: + self.logger.warning( + f"{self.label} ignoring: {label}'s interface does not match provisioned vlan {vlan}") - name_prefix = resource[Constants.RES_NAME_PREFIX] + resource[Constants.RES_INTERFACES] = [{'vlan': vlan}] if rtype == Constants.RES_TYPE_NODE: from .cloudlab_node import CloudlabNode @@ -168,7 +154,7 @@ def do_add_resource(self, *, resource: dict): node_count = resource[Constants.RES_COUNT] for idx in range(0, node_count): - node_name = f'{self.name}-{name_prefix}' + node_name = self.resource_name(resource, idx) node = CloudlabNode(label=label, name=f'{node_name}-{idx}', provider=self, network=net) self._nodes.append(node) self.resource_listener.on_added(source=self, provider=self, resource=node) @@ -176,7 +162,7 @@ def do_add_resource(self, *, resource: dict): from .cloudlab_network import CloudNetwork - net_name = f'{self.name}-{name_prefix}' + net_name = self.resource_name(resource) profile = resource.get(Constants.RES_PROFILE) cloudlab_stitch_port = get_stitch_port_for_provider(resource=resource, provider=self.type) @@ -211,29 +197,13 @@ def do_create_resource(self, *, resource: dict): rtype = resource.get(Constants.RES_TYPE) label = resource.get(Constants.LABEL) - if self.modified is None: - added_map = {} - - for net in self.networks: - if net.label not in added_map: - added_map[net.label] = [] - - added_map[net.label].append(net.name) - - for node in self.nodes: - if node.label not in added_map: - added_map[node.label] = [] - - added_map[node.label].append(node.name) - - self.modified = (self.existing_map != added_map) - - if self.modified: - assert rtype == Constants.RES_TYPE_NETWORK + if not self._handled_modify and self.modified: + assert rtype == Constants.RES_TYPE_NETWORK, "cloudlab expects network to be created first" + self._handled_modify = True try: self.logger.info(f"Deleting cloudlab resources ....") - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + net_name = self.resource_name(resource) logger.debug(f"Deleting network: {net_name}") from .cloudlab_network import CloudNetwork @@ -246,19 +216,18 @@ def do_create_resource(self, *, resource: dict): net.delete() self.logger.info(f"Done deleting cloudlab resources ....") - self.modified = False self.resource_listener.on_deleted(source=self, provider=self, resource=net) except Exception as e: self.logger.error(f"Exception deleting cloudlab resources ....", e) - if rtype == Constants.RES_TYPE_NETWORK.lower(): + if rtype == Constants.RES_TYPE_NETWORK: if self.networks: self._networks[0].create() self.resource_listener.on_created(source=self, provider=self, resource=self._networks[0]) return - if rtype == Constants.RES_TYPE_NODE.lower(): + if rtype == Constants.RES_TYPE_NODE: for node in [node for node in self._nodes if node.label == label]: self.logger.debug(f"Creating node: {vars(node)}") node.create() @@ -271,11 +240,11 @@ def do_delete_resource(self, *, resource: dict): assert rtype in self.supported_resources label = resource.get(Constants.LABEL) - if rtype == Constants.RES_TYPE_NODE.lower(): + if rtype == Constants.RES_TYPE_NODE: # DO NOTHING return - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + net_name = self.resource_name(resource) logger.debug(f"Deleting network: {net_name}") from .cloudlab_network import CloudNetwork diff --git a/fabfed/provider/fabric/fabric_node.py b/fabfed/provider/fabric/fabric_node.py index 89aa1828..43e2cba5 100644 --- a/fabfed/provider/fabric/fabric_node.py +++ b/fabfed/provider/fabric/fabric_node.py @@ -17,8 +17,8 @@ def __init__(self, *, label, delegate: Delegate, nic_model=None, network_label): logger.info(f" Node {self.name} construtor called ... ") self._delegate = delegate self.nic_model = nic_model - slice_object = delegate.get_slice() - self.slice_name = slice_object.get_name() + self._slice_object = delegate.get_slice() + self.slice_name = self._slice_object.get_name() self.mgmt_ip = delegate.get_management_ip() self.mgmt_ip = str(self.mgmt_ip) if self.mgmt_ip else None self.network_label = network_label @@ -38,6 +38,9 @@ def __init__(self, *, label, delegate: Delegate, nic_model=None, network_label): self.components = [dict(name=c.get_name(), model=c.get_model()) for c in delegate.get_components()] self.addr_list = {} + def handle_networking(self): + slice_object = self._slice_object + if not self.mgmt_ip: logger.warning(f" Node {self.name} has no management ip ") return diff --git a/fabfed/provider/fabric/fabric_provider.py b/fabfed/provider/fabric/fabric_provider.py index 79d304cf..95a13244 100644 --- a/fabfed/provider/fabric/fabric_provider.py +++ b/fabfed/provider/fabric/fabric_provider.py @@ -8,6 +8,7 @@ logger = get_logger() + class FabricProvider(Provider): def __init__(self, *, type, label, name, config: dict): super().__init__(type=type, label=label, name=name, logger=logger, config=config) @@ -15,13 +16,14 @@ def __init__(self, *, type, label, name, config: dict): self.retry = 5 self.slice_init = False + # TODO Should not be needed for fablib 1.6.4 from fabrictestbed_extensions.fablib.constants import Constants as FC from fabfed.util.utils import get_base_dir FC.DEFAULT_FABRIC_CONFIG_DIR = get_base_dir(self.name) FC.DEFAULT_FABRIC_RC = f"{FC.DEFAULT_FABRIC_CONFIG_DIR}/fabric_rc" - def _to_abs_for(self, env_var: str, config:dict): + def _to_abs_for(self, env_var: str, config: dict): path = config.get(env_var) from pathlib import Path @@ -33,7 +35,7 @@ def _to_abs_for(self, env_var: str, config:dict): f.read() logger.debug(f"was able to read {path}") except Exception as e: - raise ProviderException(f"{self.name}:Unable to read {path} for {env_var}") + raise ProviderException(f"{self.name}:Unable to read {path} for {env_var}:{e}") try: if path.endswith(".json"): @@ -42,7 +44,7 @@ def _to_abs_for(self, env_var: str, config:dict): json.load(fp) except Exception as e: - raise ProviderException(f"{self.name}:Unable to parse {path} to json for {env_var}") + raise ProviderException(f"{self.name}:Unable to parse {path} to json for {env_var}:{e}") return path @@ -131,4 +133,3 @@ def do_create_resource(self, *, resource: dict): def do_delete_resource(self, *, resource: dict): self._init_slice(True) self.slice.delete_resource(resource=resource) - diff --git a/fabfed/provider/fabric/fabric_slice.py b/fabfed/provider/fabric/fabric_slice.py index 23498f89..dea35928 100644 --- a/fabfed/provider/fabric/fabric_slice.py +++ b/fabfed/provider/fabric/fabric_slice.py @@ -63,11 +63,11 @@ def pending(self): def _add_network(self, resource: dict): label = resource[Constants.LABEL] - name_prefix = resource[Constants.RES_NAME_PREFIX] + net_name = self.provider.resource_name(resource) - if name_prefix in self.existing_networks: - delegate = self.slice_object.get_network(name_prefix) - assert delegate is not None, "expected to find network {name_prefix} in slice {self.name}" + if net_name in self.existing_networks: + delegate = self.slice_object.get_network(net_name) + assert delegate is not None, "expected to find network {net_name} in slice {self.name}" layer3 = resource.get(Constants.RES_LAYER3) peer_layer3 = resource.get(Constants.RES_PEER_LAYER3) peering = resource.get(Constants.RES_PEERING) @@ -87,7 +87,7 @@ def _add_network(self, resource: dict): return - network_builder = NetworkBuilder(label, self.provider, self.slice_object, name_prefix, resource) + network_builder = NetworkBuilder(label, self.provider, self.slice_object, net_name, resource) network_builder.handle_facility_port() temp = [] if util.has_resolved_internal_dependencies(resource=resource, attribute='interface'): @@ -108,7 +108,6 @@ def _add_network(self, resource: dict): def _add_node(self, resource: dict): node_count = resource[Constants.RES_COUNT] - name_prefix = resource[Constants.RES_NAME_PREFIX] label = resource[Constants.LABEL] states = resource[Constants.SAVED_STATES] state_map = {} @@ -117,7 +116,7 @@ def _add_node(self, resource: dict): state_map[state.attributes['name']] = state.attributes for i in range(node_count): - name = f"{name_prefix}{i}" + name = self.provider.resource_name(resource, i) if name in self.existing_nodes: delegate = self.slice_object.get_node(name) @@ -204,6 +203,7 @@ def _reload_nodes(self): n.set_network_label(node.network_label) n.set_used_dataplane_ipv4(node.used_dataplane_ipv4()) temp.append(n) + n.handle_networking() self.provider._nodes = temp @@ -369,7 +369,17 @@ def create_resource(self, *, resource: dict): self.slice_object.get_fim_topology().remove_node(name=n) self.slice_modified = True - self.logger.info(f"Done emoving node {n} from slice {self.name}") + self.logger.info(f"Done removing node {n} from slice {self.name}") + + aset = set(self.existing_networks) + bset = {n.name for n in self.networks} + diff = aset - bset + + if diff: + for n in diff: + self.slice_object.get_fim_topology().remove_network_service(name=n) + self.slice_modified = True + self.logger.info(f"Done removing network {n} from slice {self.name}") for network in [net for net in self.networks if net.name in self.existing_networks]: temp = (bset - aset) diff --git a/fabfed/provider/sense/sense_provider.py b/fabfed/provider/sense/sense_provider.py index b52ecc56..f7406ede 100644 --- a/fabfed/provider/sense/sense_provider.py +++ b/fabfed/provider/sense/sense_provider.py @@ -3,7 +3,6 @@ from fabfed.provider.api.provider import Provider from fabfed.util.constants import Constants from fabfed.util.utils import get_logger -from . import sense_utils from .sense_exceptions import SenseException logger = get_logger() @@ -12,7 +11,9 @@ class SenseProvider(Provider): def __init__(self, *, type, label, name, config: dict): super().__init__(type=type, label=label, name=name, logger=logger, config=config) - self.supported_resources = [Constants.RES_TYPE_NETWORK.lower(), Constants.RES_TYPE_NODE.lower()] + self.supported_resources = [Constants.RES_TYPE_NETWORK, Constants.RES_TYPE_NODE] + self.initialized = False + self._handled_modify = False def setup_environment(self): from fabfed.util import utils @@ -29,9 +30,6 @@ def setup_environment(self): ) self.config = config[profile] - from .sense_client import init_client - - init_client(self.config) @property def private_key_file_location(self): @@ -62,27 +60,47 @@ def _handle_peering_config(self, resource): return peering + def _init_client(self): + if not self.initialized: + from .sense_client import init_client + + init_client(self.config) + self.initialized = True + def do_add_resource(self, *, resource: dict): + self._init_client() + + creation_details = resource[Constants.RES_CREATION_DETAILS] + + if not creation_details['in_config_file']: + return + label = resource.get(Constants.LABEL) rtype = resource.get(Constants.RES_TYPE) if rtype not in self.supported_resources: raise ResourceTypeNotSupported(f"{rtype} for {label}") - name_prefix = resource.get(Constants.RES_NAME_PREFIX) - - if rtype == Constants.RES_TYPE_NODE.lower(): + if rtype == Constants.RES_TYPE_NODE: from .sense_node import SenseNode import fabfed.provider.api.dependency_util as util + from . import sense_utils assert util.has_resolved_internal_dependencies(resource=resource, attribute='network') net = util.get_single_value_for_dependency(resource=resource, attribute='network') profile_uuid = sense_utils.get_profile_uuid(profile=net.profile) vms = sense_utils.get_vms_specs_from_profile(profile_uuid=profile_uuid) - node_name = f'{self.name}-{name_prefix}' + + count = resource[Constants.RES_COUNT] + + if count != len(vms): + from fabfed.exceptions import ProviderException + + raise ProviderException(f'count {count} != sense_vm_specs: {len(vms)}') for idx, vm in enumerate(vms): - node = SenseNode(label=label, name=f'{node_name}-{idx}', network=net.name, spec=vm, provider=self) + node_name = self.resource_name(resource, idx) + node = SenseNode(label=label, name=f'{node_name}', network=net.name, spec=vm, provider=self) self._nodes.append(node) if self.resource_listener: @@ -101,7 +119,7 @@ def do_add_resource(self, *, resource: dict): if not interface.get(Constants.RES_BANDWIDTH): interface[Constants.RES_BANDWIDTH] = resource.get(Constants.RES_BANDWIDTH) - net_name = f'{self.name}-{name_prefix}' + net_name = self.resource_name(resource) profile = resource.get(Constants.RES_PROFILE) if not profile and sense_stitch_port: @@ -125,12 +143,32 @@ def do_add_resource(self, *, resource: dict): self.resource_listener.on_added(source=self, provider=self, resource=net) def do_create_resource(self, *, resource: dict): + assert self.initialized rtype = resource.get(Constants.RES_TYPE) assert rtype in self.supported_resources + label = resource.get(Constants.LABEL) + + if not self._handled_modify and self.modified: + assert rtype == Constants.RES_TYPE_NETWORK, "sense expects network to be created first" + self._handled_modify = True + + self.logger.info(f"Deleting sense resources ....") + + from .sense_network import SenseNetwork + + net_name = self.resource_name(resource) + net = SenseNetwork(label=label, name=net_name, bandwidth=None, profile=None, layer3=None, interfaces=None, + peering=None) + + try: + net.delete() + self.logger.warning(f"Deleted sense resources ....") + except Exception as e: + self.logger.error(f"Exception deleting cloudlab resources ....", e) label = resource.get(Constants.LABEL) - if rtype == Constants.RES_TYPE_NODE.lower(): + if rtype == Constants.RES_TYPE_NODE: for node in [node for node in self._nodes if node.label == label]: self.logger.debug(f"Creating node: {vars(node)}") node.create() @@ -156,11 +194,11 @@ def do_delete_resource(self, *, resource: dict): assert rtype in self.supported_resources label = resource.get(Constants.LABEL) - if rtype == Constants.RES_TYPE_NODE.lower(): + if rtype == Constants.RES_TYPE_NODE: # DO NOTHING return - net_name = f'{self.name}-{resource.get(Constants.RES_NAME_PREFIX)}' + net_name = self.resource_name(resource) logger.debug(f"Deleting network: {net_name}") diff --git a/fabfed/util/constants.py b/fabfed/util/constants.py index a62a2da0..df3b28f7 100644 --- a/fabfed/util/constants.py +++ b/fabfed/util/constants.py @@ -105,6 +105,7 @@ class Constants: "dummy": "fabfed.provider.dummy.dummy_provider.DummyProvider" } + RECONCILE_STATES = True RUN_SSH_TESTER = True COPY_TOKENS = False PROVIDER_STATE = 'provider_state' diff --git a/fabfed/util/node_tester.py b/fabfed/util/node_tester.py index 1b4fb4e4..01f880d8 100644 --- a/fabfed/util/node_tester.py +++ b/fabfed/util/node_tester.py @@ -115,7 +115,6 @@ def run_ssh_test(self, *, command='ls -l', retry=5, retry_interval=10): def run_dataplane_test(self, *, command='ping -c 3', retry=3, retry_interval=10): for helper in self.helpers: - print(f"Hello ", helper.label) logger.info(f"SSH executing {command} on Node: {helper.label}") for attempt in range(retry): diff --git a/fabfed/util/state.py b/fabfed/util/state.py index a89698f0..2928f9f5 100644 --- a/fabfed/util/state.py +++ b/fabfed/util/state.py @@ -89,7 +89,7 @@ def dump_plan(*, resources, to_json: bool, summary: bool = False): if not provider_supports_modifiable: in_config_file = details['in_config_file'] changed = not in_config_file or details['total_count'] != details['created_count'] - provider_resource_map[resource.provider.label] = provider_resource_map[resource.provider.label] or changed + provider_resource_map[resource.provider.label] = provider_resource_map[resource.provider.label] or changed for resource in resources: resource_dict = {} @@ -105,7 +105,7 @@ def dump_plan(*, resources, to_json: bool, summary: bool = False): resource_dict['to_be_deleted'] = 0 else: resource_dict['to_be_created'] = 0 - resource_dict['to_be_deleted'] = details['created_count'] - details['total_count'] + resource_dict['to_be_deleted'] = details['created_count'] - details['total_count'] else: resource_dict['to_be_created'] = 0 resource_dict['to_be_deleted'] = details['created_count'] @@ -354,7 +354,7 @@ def save_meta_data(meta_data: dict, friendly_name: str): raise StateException(f'Exception while saving state at temp file {file_path}:{e}') -def save_states(states: List[ProviderState], friendly_name): +def save_states(states: List[ProviderState], friendly_name: str): import yaml import os from fabfed.model.state import get_dumper @@ -375,6 +375,43 @@ def save_states(states: List[ProviderState], friendly_name): shutil.move(temp_file_path, file_path) +def reconcile_state(provider_state: ProviderState, saved_provider_state: ProviderState): + assert provider_state.number_of_created_resources() != provider_state.number_of_total_resources() + + creation_details = provider_state.creation_details + + for resource_state in saved_provider_state.states(): + resource_label = resource_state.label + + if resource_label in creation_details: + resource_details = creation_details[resource_label] + + if resource_details['total_count'] == resource_details['created_count']: + continue + + provider_state.add_if_not_found(resource_state) + + +def reconcile_states(provider_states: List[ProviderState], friendly_name: str) -> List[ProviderState]: + saved_states_map = load_states_as_dict(friendly_name) + if next(filter(lambda s: s.number_of_created_resources() > 0, saved_states_map.values()), None) is None: + return provider_states + + reconciled_states = [] + + for provider_state in provider_states: + if provider_state.label not in saved_states_map: + reconciled_states.append(provider_state) + elif provider_state.number_of_created_resources() == provider_state.number_of_total_resources(): + reconciled_states.append(provider_state) + else: + saved_provider_state = saved_states_map.get(provider_state.label) + reconcile_state(provider_state=provider_state, saved_provider_state=saved_provider_state) + reconciled_states.append(provider_state) + + return reconciled_states + + def save_stats(stats, friendly_name): import yaml import os diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 2475c45c..bdd68e34 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -61,7 +61,7 @@ def run_destroy_workflow(*, session, config_str) -> List[ProviderState]: controller = Controller(config=config, logger=logger) states = sutil.load_states(session) controller.init(session=session, provider_factory=default_provider_factory, provider_states=states) - controller.delete(provider_states=states) + controller.destroy(provider_states=states) sutil.save_states(states, session) if not states: diff --git a/tools/fabfed.py b/tools/fabfed.py index d5905783..a876fb6f 100644 --- a/tools/fabfed.py +++ b/tools/fabfed.py @@ -7,6 +7,7 @@ from fabfed.util import state as sutil from fabfed.util.config import WorkflowConfig from fabfed.util.stats import FabfedStats, Duration +from fabfed.util.constants import Constants def manage_workflow(args): @@ -57,7 +58,7 @@ def manage_workflow(args): sys.exit(1) states = sutil.load_states(args.session) - have_created_resources = next(filter(lambda s: s.number_of_created_resources() > 0, states), None) + # have_created_resources = next(filter(lambda s: s.number_of_created_resources() > 0, states), None) try: controller.init(session=args.session, provider_factory=default_provider_factory, provider_states=states) @@ -84,13 +85,11 @@ def manage_workflow(args): sys.exit(1) workflow_failed = False - interrupted = False try: controller.apply(provider_states=states) except KeyboardInterrupt as kie: logger.error(f"Keyboard Interrupt while creating resources ... {kie}") - interrupted = True workflow_failed = True except ControllerException as ce: logger.error(f"Exceptions while creating resources ... {ce}") @@ -105,25 +104,14 @@ def manage_workflow(args): providers_duration += stats.provider_duration.duration states = controller.get_states() - - if interrupted and have_created_resources: - saved_states_map = sutil.load_states_as_dict(args.session) - reconciled_states = [] - - for state in states: - if (state.number_of_failed_resources() + state.number_of_created_resources()) \ - == state.number_of_total_resources(): - reconciled_states.append(state) - else: - reconciled_states.append(saved_states_map.get(state.label, state)) - - states = reconciled_states - nodes, networks, services, pending, failed = utils.get_counters(states=states) if pending or failed: workflow_failed = True + if Constants.RECONCILE_STATES: + states = sutil.reconcile_states(states, args.session) + sutil.save_states(states, args.session) provider_stats = controller.get_stats() workflow_duration = time.time() - start