Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T6364: CGNAT drop hard limit that allows only one translation rule #3483

Merged
merged 2 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions interface-definitions/nat_cgnat.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<validator name="ipv4-host"/>
<validator name="ipv4-range"/>
</constraint>
<multi/>
</properties>
</leafNode>
</children>
Expand Down
110 changes: 66 additions & 44 deletions src/conf_mode/nat_cgnat.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ def verify(config):
if 'rule' not in config:
raise ConfigError(f'Rule must be defined!')

# As PoC allow only one rule for CGNAT translations
# one internal pool and one external pool
if len(config['rule']) > 1:
raise ConfigError(f'Only one rule is allowed for translations!')

for pool in ('external', 'internal'):
if pool not in config['pool']:
raise ConfigError(f'{pool} pool must be defined!')
Expand All @@ -208,6 +203,8 @@ def verify(config):
internal_pools_query = "keys(pool.internal)"
internal_pools: list = jmespath.search(internal_pools_query, config)

used_external_pools = {}
used_internal_pools = {}
for rule, rule_config in config['rule'].items():
if 'source' not in rule_config:
raise ConfigError(f'Rule "{rule}" source pool must be defined!')
Expand All @@ -217,57 +214,82 @@ def verify(config):
if 'translation' not in rule_config:
raise ConfigError(f'Rule "{rule}" translation pool must be defined!')

# Check if pool exists
internal_pool = rule_config['source']['pool']
if internal_pool not in internal_pools:
raise ConfigError(f'Internal pool "{internal_pool}" does not exist!')

external_pool = rule_config['translation']['pool']
if external_pool not in external_pools:
raise ConfigError(f'External pool "{external_pool}" does not exist!')

# Check pool duplication in different rules
if external_pool in used_external_pools:
raise ConfigError(
f'External pool "{external_pool}" is already used in rule '
f'{used_external_pools[external_pool]} and cannot be used in '
f'rule {rule}!'
)

if internal_pool in used_internal_pools:
raise ConfigError(
f'Internal pool "{internal_pool}" is already used in rule '
f'{used_internal_pools[internal_pool]} and cannot be used in '
f'rule {rule}!'
)

used_external_pools[external_pool] = rule
used_internal_pools[internal_pool] = rule


def generate(config):
if not config:
return None
# first external pool as we allow only one as PoC
ext_pool_name = jmespath.search("rule.*.translation | [0]", config).get('pool')
int_pool_name = jmespath.search("rule.*.source | [0]", config).get('pool')
ext_query = f'pool.external."{ext_pool_name}".range | keys(@)'
int_query = f'pool.internal."{int_pool_name}".range'
external_ranges = jmespath.search(ext_query, config)
internal_ranges = [jmespath.search(int_query, config)]

external_list_count = []
external_list_hosts = []
internal_list_count = []
internal_list_hosts = []
for ext_range in external_ranges:
# External hosts count
e_count = IPOperations(ext_range).get_ips_count()
external_list_count.append(e_count)
# External hosts list
e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
external_list_hosts.extend(e_hosts)
for int_range in internal_ranges:
# Internal hosts count
i_count = IPOperations(int_range).get_ips_count()
internal_list_count.append(i_count)
# Internal hosts list
i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
internal_list_hosts.extend(i_hosts)

external_host_count = sum(external_list_count)
internal_host_count = sum(internal_list_count)
ports_per_user = int(
jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
)
external_port_range: str = jmespath.search(
f'pool.external."{ext_pool_name}".external_port_range', config
)

proto_maps, other_maps = generate_port_rules(
external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
)
proto_maps = []
other_maps = []

for rule, rule_config in config['rule'].items():
ext_pool_name: str = rule_config['translation']['pool']
int_pool_name: str = rule_config['source']['pool']

external_ranges: list = [range for range in config['pool']['external'][ext_pool_name]['range']]
internal_ranges: list = [range for range in config['pool']['internal'][int_pool_name]['range']]
external_list_hosts_count = []
external_list_hosts = []
internal_list_hosts_count = []
internal_list_hosts = []

for ext_range in external_ranges:
# External hosts count
e_count = IPOperations(ext_range).get_ips_count()
external_list_hosts_count.append(e_count)
# External hosts list
e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
external_list_hosts.extend(e_hosts)

for int_range in internal_ranges:
# Internal hosts count
i_count = IPOperations(int_range).get_ips_count()
internal_list_hosts_count.append(i_count)
# Internal hosts list
i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
internal_list_hosts.extend(i_hosts)

external_host_count = sum(external_list_hosts_count)
internal_host_count = sum(internal_list_hosts_count)
ports_per_user = int(
jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
)
external_port_range: str = jmespath.search(
f'pool.external."{ext_pool_name}".external_port_range', config
)

rule_proto_maps, rule_other_maps = generate_port_rules(
external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
)

proto_maps.extend(rule_proto_maps)
other_maps.extend(rule_other_maps)

config['proto_map_elements'] = ', '.join(proto_maps)
config['other_map_elements'] = ', '.join(other_maps)
Expand Down
Loading