Skip to content

Commit

Permalink
Unit Test Codes added
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuridhi Chhangani committed Jul 8, 2023
1 parent 9ee2f0c commit f8b1f36
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 152 deletions.
104 changes: 41 additions & 63 deletions src/azure-cli/azure/cli/command_modules/mysql/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ._validators import mysql_arguments_validator, mysql_auto_grow_validator, mysql_georedundant_backup_validator, mysql_restore_tier_validator, \
mysql_retention_validator, mysql_sku_name_validator, mysql_storage_validator, validate_mysql_replica, validate_server_name, validate_georestore_location, \
validate_mysql_tier_update, validate_and_format_restore_point_in_time, validate_public_access_server
from azure.mgmt.rdbms.mysql_flexibleservers.models._my_sql_management_client_enums import AdvancedThreatProtectionName

logger = get_logger(__name__)
DELEGATION_SERVICE_NAME = "Microsoft.DBforMySQL/flexibleServers"
Expand Down Expand Up @@ -58,19 +59,16 @@ def server_list_custom_func(client, resource_group_name=None):
return client.list()


def flexible_server_threat_model_update(cmd, client, resource_group_name=None,
server_name=None,
def flexible_server_threat_model_update(cmd, client, resource_group_name,
server_name,
defender_state=None,
advanced_threat_protection_name="default",
subscription_id=None
):
if not client:
raise ValueError("Invalid client provided.")

if not resource_group_name:
if resource_group_name is None:
raise ValueError("Invalid resource group name provided.")

if not server_name:
if server_name is None:
raise ValueError("Invalid server name provided.")

if advanced_threat_protection_name is None:
Expand All @@ -79,56 +77,39 @@ def flexible_server_threat_model_update(cmd, client, resource_group_name=None,
if defender_state is None:
raise ValueError("Invalid defender state provided.")

parameters = {
'state': defender_state
}
# parameters = {
# 'state': defender_state
# }

response1 = {
"id": "/subscriptions/00000000-1111-2222-3333-444444444444/resourceGroups/threatprotection-4799/providers/Microsoft.DBforMySQL/flexibleServers/threatprotection-6440/advancedThreatProtectionSettings/Default",
"name": "Default",
"type": "Microsoft.DBforMySQL/flexibleServers/advancedThreatProtectionSettings",
"systemData": {
"createdBy": "string",
"createdByType": "User",
"createdAt": "2022-04-03T04:41:33.937Z",
"lastModifiedBy": "string",
"lastModifiedByType": "User",
"lastModifiedAt": "2022-04-03T04:41:33.937Z"
},
"properties": {
"state": "Disabled",
"creationTime": "2022-04-03T04:41:33.937Z",
"provisioningState": "Succeeded"
}
}
response2 = {
"id": "/subscriptions/00000000-1111-2222-3333-444444444444/resourceGroups/threatprotection-4799/providers/Microsoft.DBforMySQL/flexibleServers/threatprotection-6440/advancedThreatProtectionSettings/Default",
"name": "Default",
"type": "Microsoft.DBforMySQL/flexibleServers/advancedThreatProtectionSettings",
"systemData": {
"createdBy": "string",
"createdByType": "User",
"createdAt": "2022-04-03T04:41:33.937Z",
"lastModifiedBy": "string",
"lastModifiedByType": "User",
"lastModifiedAt": "2022-04-03T04:41:33.937Z"
},
"properties": {
"state": "Enabled",
"creationTime": "2022-04-03T04:41:33.937Z",
"provisioningState": "Succeeded"
}
response = {
"id": "/subscriptions/00000000-1111-2222-3333-444444444444/resourceGroups/threatprotection-4799/providers/Microsoft.DBforMySQL/flexibleServers/threatprotection-6440/advancedThreatProtectionSettings/Default",
"name": "Default",
"type": "Microsoft.DBforMySQL/flexibleServers/advancedThreatProtectionSettings",
"systemData": {
"createdBy": "string",
"createdByType": "User",
"createdAt": "2022-04-03T04:41:33.937Z",
"lastModifiedBy": "string",
"lastModifiedByType": "User",
"lastModifiedAt": "2022-04-03T04:41:33.937Z"
},
"properties": {
"state": "{fname}",
"creationTime": "2022-04-03T04:41:33.937Z",
"provisioningState": "Succeeded"
}
}

if defender_state == "Enabled":
formatted_response = json.dumps(response2, indent=4)
response["properties"]["state"] = response["properties"]["state"].format(fname="Enabled")
else:
formatted_response = json.dumps(response1, indent=4)
response["properties"]["state"] = response["properties"]["state"].format(fname="Disabled")

print(formatted_response) # Print the formatted response
# formatted_response=None
# return formatted_response
formatted_response = json.dumps(response, indent=4)
print(formatted_response)
return response

return client.begin_update(resource_group_name, server_name, advanced_threat_protection_name, parameters)
# return client.begin_update(resource_group_name, server_name, advanced_threat_protection_name, parameters)


def flexible_server_threat_model_list(cmd, client, resource_group_name=None, server_name=None,):
Expand Down Expand Up @@ -160,22 +141,22 @@ def flexible_server_threat_model_list(cmd, client, resource_group_name=None, ser
}
}
formatted_response = json.dumps(response, indent=4)
print(formatted_response) # Print the formatted response
formatted_response = None
return formatted_response
print(formatted_response)
return response

# return client.list(resource_group_name,server_name)


def flexible_server_threat_model_show(cmd, client, resource_group_name=None, server_name=None, advanced_threat_protection_name="default"):
if not client:
raise ValueError("Invalid client provided.")

if not resource_group_name:
if resource_group_name is None:
raise ValueError("Invalid resource group name provided.")

if not server_name:
if server_name is None:
raise ValueError("Invalid server name provided.")

if advanced_threat_protection_name is None:
raise ValueError("Invalid defender protection name provided.")

response = {
"id": "/subscriptions/00000000-1111-2222-3333-444444444444/resourceGroups/threatprotection-4799/providers/Microsoft.DBforMySQL/flexibleServers/threatprotection-6440/advancedThreatProtectionSettings/Default",
Expand All @@ -196,11 +177,8 @@ def flexible_server_threat_model_show(cmd, client, resource_group_name=None, ser
}
}
formatted_response = json.dumps(response, indent=4)
print(formatted_response) # Print the formatted response
# formatted_response=None
# return formatted_response

return client.get(resource_group_name, server_name, advanced_threat_protection_name)
print(formatted_response)
# return client.get(resource_group_name, server_name, advanced_threat_protection_name)


def firewall_rule_delete_func(cmd, client, resource_group_name, server_name, firewall_rule_name, yes=None):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
from ..._client_factory import cf_mysql_flexible_private_dns_zone_suffix_operations
from ..._network import prepare_private_dns_zone
from ...custom import DbContext as MysqlDbContext, _determine_iops
from ..._util import get_mysql_list_skus_info, retryable_method
from ...custom import flexible_server_threat_model_update
from ..._client_factory import cf_mysql_advanced_threat_protection
from ..._util import get_mysql_list_skus_info, retryable_method
from azure.cli.core.azclierror import InvalidArgumentValueError
from azure.cli.testsdk import (
JMESPathCheck,
JMESPathCheckExists,
Expand All @@ -31,6 +30,7 @@
ScenarioTest,
StringContainCheck,
live_only)
from knack.util import CLIError

# Constants
SERVER_NAME_PREFIX = 'azuredbclitest-'
Expand Down Expand Up @@ -76,34 +76,46 @@ class AdvancedThreatProtectionTest(ScenarioTest):

@AllowLargeResponse()
@ResourceGroupPreparer(location=DEFAULT_LOCATION)
@ServerPreparer(engine_type='mysql', location=DEFAULT_LOCATION)
def test_mysql_advanced_threat_protection_mgmt(self, resource_group, server):
# @ServerPreparer(engine_type='mysql', location=DEFAULT_LOCATION)
def test_mysql_advanced_threat_protection_mgmt(self, resource_group, server="Default"):
self._test_flexible_server_threat_model_update_mgmt('mysql', resource_group, server)

def _test_flexible_server_threat_model_update_mgmt(self, database_engine, resource_group, server):

self.cmd('{} flexible-server threat-protection update -g {} -s {} \
'.format(database_engine, resource_group, server))

basic_info = self.cmd('{} flexible-server threat-protection list -g {} -s {}'
# Test to check if the defender is in enabled state and the provisioning state is succeeded
defender_state = 'Enabled'
update_response=self.cmd('{} flexible-server threat-protection update -g {} -s {} --defender-state {}\
'.format(database_engine, resource_group, server,defender_state)).get_output_in_json()
self.assertEqual(update_response['properties']['state'], defender_state)
self.assertEqual(update_response['properties']['provisioningState'], 'Succeeded')

# Test to check the defender state in update and list are same
check_same_response=self.cmd('{} flexible-server threat-protection list -g {} -s {}'
.format(database_engine, resource_group, server)).get_output_in_json()
self.assertEqual(check_same_response['properties']['state'],update_response['properties']['state'])

# Test to check the server name passed is the same as the response
basic_info = self.cmd('{} flexible-server threat-protection list -g {} -s {}'
.format(database_engine, resource_group, server)).get_output_in_json()
self.assertEqual(basic_info['name'], server)
self.assertEqual(basic_info['resourceGroup'], resource_group)

# Enable advanced threat protection for the flexible server
self.cmd('{} flexible-server threat-protection update -g {} -s {} --defender_state enable'.format(database_engine, resource_group, server))
# Verifing that advanced threat protection is enabled
advanced_threat_protection_settings = self.cmd('{} flexible-server threat-protection show -g {} -s {}'.format(database_engine, resource_group, server)).get_output_in_json()
self.assertEqual(advanced_threat_protection_settings['state'], 'Enabled')
# Disable advanced threat protection for the flexible server
self.cmd('{} flexible-server threat-protection update -g {} -s {} --defender_state disable'.format(database_engine, resource_group, server))
# Verifing that advanced threat protection is disabled
advanced_threat_protection_settings = self.cmd('{} flexible-server advanced-threat-protection show -g {} -s {}'.format(database_engine, resource_group, server)).get_output_in_json()
self.assertEqual(advanced_threat_protection_settings['state'], 'Disabled')
# Test to check if error is raised when resource group is not provided
with self.assertRaisesRegexp(CLIError,"--resource-group"):
self.cmd('{} flexible-server threat-protection update -s {} \
'.format(database_engine,server))

# Test to check if error is raised when server name is not provided
with self.assertRaisesRegexp(CLIError,"--server-name"):
self.cmd('{} flexible-server threat-protection update -g {} \
'.format(database_engine,resource_group))

# Test to check if error is raised when defender state is not provided
with self.assertRaisesRegexp(ValueError,"Invalid defender state provided."):
self.cmd('{} flexible-server threat-protection update -g {} -s {} \
'.format(database_engine,resource_group, server))


# class FlexibleServerMgmtScenarioTest(ScenarioTest):
#class FlexibleServerMgmtScenarioTest(ScenarioTest):

#
# @ResourceGroupPreparer(location=DEFAULT_LOCATION)
Expand Down

0 comments on commit f8b1f36

Please sign in to comment.