Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cohesivenet/api/vns3/firewall_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2162,7 +2162,7 @@ def get_firewall_fwsets(api_client, **kwargs): # noqa: E501


def post_create_firewall_fwset(
api_client, name=None, type=None, entries=None, description=None, **kwargs
api_client, name=None, type=None, entries=None, target=None, description=None, **kwargs
): # noqa: E501
"""post_create_firewall_fwset # noqa: E501

Expand All @@ -2173,13 +2173,14 @@ def post_create_firewall_fwset(
>>> response = await api.post_create_firewall_fwset("fwset1")

:param name str: FWset name (required)
:param type str: FWset type. One of net, list, service or port (required)
:param type str: FWset type. One of net, list, service, port, or clientpack_tag_group (required)
:param description:
:param entries list[str|dict]: list of entries. can be list of strings or list of entry dicts:
{
entry: str,
comment: str
}
:param target str: target for clientpack_tag_group fwset type
:param async_req bool: execute request asynchronously
:param _return_http_data_only: response data without head status code
and headers
Expand All @@ -2195,7 +2196,7 @@ def post_create_firewall_fwset(

local_var_params = dict(locals(), **kwargs)

request_params = ["name", "type", "description", "entries"]
request_params = ["name", "type", "description", "entries", "target"]

collection_formats = {}

Expand All @@ -2210,6 +2211,8 @@ def post_create_firewall_fwset(

body_params = {}
for param in [p for p in request_params if local_var_params.get(p) is not None]:
if param == "entries" and local_var_params.get("type") == "clientpack_tag_group":
continue # Skip entries for clientpack_tag_group type
body_params[param] = local_var_params[param]

# HTTP header `Accept`
Expand Down Expand Up @@ -2246,7 +2249,6 @@ def post_create_firewall_fwset(
collection_formats=collection_formats,
)


def get_firewall_fwset(api_client, fwset_name, **kwargs): # noqa: E501
"""get_firewall_fwset # noqa: E501

Expand Down
151 changes: 150 additions & 1 deletion cohesivenet/macros/firewall.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,154 @@
from cohesivenet import util, Logger, VNS3Client, CohesiveSDKException
from cohesivenet import util, Logger, VNS3Client, CohesiveSDKException, VNS3Client

def create_firewall_fwsets(clients, fwset_rules):
results = {}
for client in clients:
successes = []
errors = []

existing_fwsets = client.firewall.get_firewall_fwsets().response
existing_fwset_names = {fwset['name'] for fwset in existing_fwsets}

for fwset in fwset_rules:
name = fwset['name']
fwset_type = fwset['type']

if fwset_type == "clientpack_tag_group":
target = fwset['target']
fwset_data = {"target": target}
else:
entries = [{"entry": val} for val in fwset['entries']]
fwset_data = {"entries": entries}

if 'sync' in fwset:
fwset_data['sync'] = fwset['sync']

if name in existing_fwset_names:
# Update existing fwset
try:
client.firewall.put_update_firewall_fwset(
fwset_name=name,
**fwset_data
)
successes.append(f"Successfully updated fwset '{name}'")
except Exception as e:
errors.append(f"Error updating fwset '{name}': {str(e)}")
else:
# Create new fwset
try:
client.firewall.post_create_firewall_fwset(
name=name,
type=fwset_type,
**fwset_data
)
successes.append(f"Successfully created fwset '{name}'")
except Exception as e:
errors.append(f"Error creating fwset '{name}': {str(e)}")

# Delete fwsets that are not in the input list
for existing_name in existing_fwset_names:
if existing_name not in [fwset['name'] for fwset in fwset_rules]:
try:
client.firewall.delete_firewall_fwset(fwset_name=existing_name)
successes.append(f"Successfully deleted fwset '{existing_name}'")
except Exception as e:
errors.append(f"Error deleting fwset '{existing_name}': {str(e)}")

results[client.host_uri] = (successes, errors)

return results

def create_firewall_policies(clients, firewall_rules, state={}):
"""Create a group of firewall rules for multiple clients.

Arguments:
clients {List[VNS3Client]} - List of VNS3Client instances
firewall_rules {List[dict]} - [{
'position': int,
'rule': str
}, ...]

Keyword Arguments:
state {dict} - State to format rules with. (can call client.state for each client)

Returns:
dict: A dictionary with client host_uri as keys and tuples of successes and errors lists as values
"""
results = {}
for client in clients:
successes, errors = [], []
Logger.debug(
"Checking firewall policy for client.",
host=client.host_uri,
rule_count=len(firewall_rules),
)

# Fetch current firewall rules from the client
current_rules_response = client.firewall.get_firewall_rules()
if hasattr(current_rules_response, 'response') and current_rules_response.response:
current_rules = [rule['rule'].strip() for rule in current_rules_response.response if 'rule' in rule]
else:
current_rules = []
errors.append(f"Failed to fetch current firewall rules for {client.host_uri}")

# Format desired rules
desired_rules = []
for rule_args in firewall_rules:
rule, err = util.format_string(rule_args["rule"], state)
if err:
errors.append(err)
continue

rule_args.update(rule=rule)
desired_rules.append(rule_args)

# Compare desired rules with current rules
rules_to_add = []
rules_to_delete = []
for current_rule in current_rules:
if current_rule not in [r['rule'] for r in desired_rules]:
rules_to_delete.append(current_rule)

for desired_rule in desired_rules:
if desired_rule['rule'] not in current_rules:
rules_to_add.append(desired_rule)

# Delete rules that are not defined in firewall_rules
for rule_to_delete in rules_to_delete:
response = client.firewall.delete_firewall_rule_by_rule(rule=rule_to_delete)
if hasattr(response, 'status') and response.status == 200: # Assuming 'status' attribute and success code 200
successes.append(f'Rule "{rule_to_delete}" deleted')
else:
errors.append(f'Error deleting rule "{rule_to_delete}"')

# Add only the rules that are not already present
for rule_args in rules_to_add:
response = client.firewall.post_create_firewall_rule(rule=rule_args['rule'], position=rule_args['position'])
if hasattr(response, 'status') and response.status == 200: # Assuming 'status' attribute and success code 200
successes.append(f'Rule "{rule_args["rule"]}" inserted at position {rule_args["position"]}')
else:
errors.append(f'Error inserting rule "{rule_args["rule"]}" at position {rule_args["position"]}')

# Identify and delete duplicate rules
current_rules_response = client.firewall.get_firewall_rules()
if hasattr(current_rules_response, 'response') and current_rules_response.response:
current_rules = current_rules_response.response
seen_rules = set()
for rule in current_rules:
if 'rule' in rule:
rule_str = rule['rule'].strip()
if rule_str in seen_rules:
response = client.firewall.delete_firewall_rule_by_position(position=rule['position'])
if hasattr(response, 'status') and response.status == 200: # Assuming 'status' attribute and success code 200
successes.append(f'Duplicate rule "{rule_str}" at position {rule["position"]} deleted')
else:
errors.append(f'Error deleting duplicate rule "{rule_str}" at position {rule["position"]}')
else:
seen_rules.add(rule_str)

results[client.host_uri] = (successes, errors)

return results

def create_firewall_policy(client: VNS3Client, firewall_rules, state={}):
"""Create group of firewall rules
Expand Down
26 changes: 26 additions & 0 deletions cohesivenet/macros/overlay_network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
from cohesivenet import util, VNS3Client


def add_clientpack_tags(client: VNS3Client, clientpack_tags):
"""Add tags to clientpacks.

Arguments:
client {VNS3Client} - The client to use for API calls.
clientpack_tags {List[dict]} - List of tags to add to clientpacks. Each tag should be a dictionary with
'clientpack_name', 'key', and 'value' keys.

Returns:
dict: A dictionary with clientpack names as keys and the result of the tag addition operation as values.
"""
results = {}
for tag_info in clientpack_tags:
clientpack_name = tag_info['clientpack_name']
key = tag_info['key']
value = tag_info['value']

try:
response = client.overlay_network.post_create_clientpack_tag(clientpack_name, key, value)
results[clientpack_name] = {'success': True, 'response': response}
except Exception as e:
results[clientpack_name] = {'success': False, 'error': str(e)}

return results


def segment_overlay_clients(
client: VNS3Client, groups=None, number_groups=None, group_ratios=None
):
Expand Down
Loading