Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Bugfix, Enabled HDFS HA by default, other output beautification.
Browse files Browse the repository at this point in the history
  • Loading branch information
smithzc committed Mar 13, 2019
1 parent 8cc5daf commit 1af00dd
Showing 1 changed file with 81 additions and 43 deletions.
124 changes: 81 additions & 43 deletions v6/scripts/deploy_on_oci.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,34 @@ def wait_for_active_cluster_service_commands(active_command):
print('Exception waiting for active commands: {}'.format(e))


def wait_for_active_service_commands(active_command, service_name):
"""
Wait until Cloudera Cluster Service finishes running active_command
:param active_command: Descriptive of what should be running - this just waits if any task is detected running
:return:
"""
view = 'summary'
wait_status = '[*'
done = '0'

while done == '0':
sys.stdout.write('\r%s - Waiting: %s' % (active_command, wait_status))
try:
api_response = services_api.list_active_commands(cluster_name, service_name, view=view)
if not api_response.items:
if debug == 'True':
pprint(api_response)
done = '1'
sys.stdout.write(']\n')
break
else:
sys.stdout.flush()
time.sleep(10)
wait_status = wait_status + '*'
except ApiException as e:
print('Exception waiting for active service commands: {}'.format(e))


def wait_for_active_mgmt_commands(active_command):
"""
Wait until Cloudera Manager finishes running mgmt active_command
Expand Down Expand Up @@ -820,7 +848,7 @@ def monitor_parcel(parcel_product, parcel_version, target_stage):
if parcel.state.errors:
raise Exception(str(parcel.state.errors))

sys.stdout.write("\rParcel %s progress %s: %s / %s" % (parcel_product, parcel.stage, parcel.state.progress,
sys.stdout.write("\r\tParcel %s progress %s: %s / %s" % (parcel_product, parcel.stage, parcel.state.progress,
parcel.state.total_progress))
time.sleep(5)
sys.stdout.flush()
Expand All @@ -832,20 +860,21 @@ def monitor_parcel(parcel_product, parcel_version, target_stage):
if parcel.product == parcel_product:
parcel_version = parcel.version

print("Starting Parcel Download for %s - %s" % (parcel_product, parcel_version))
print("\tStarting Parcel Download for %s - %s" % (parcel_product, parcel_version))
parcel_api.start_download_command(cluster_name, parcel_product, parcel_version)
target_stage = 'DOWNLOADED'
monitor_parcel(parcel_product, parcel_version, target_stage)
print("\n%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name))
print("Starting Distribution for %s - %s" % (parcel_product, parcel_version))
print("\n\t%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name))
print("\tStarting Distribution for %s - %s" % (parcel_product, parcel_version))
parcel_api.start_distribution_command(cluster_name, parcel_product, parcel_version)
target_stage = 'DISTRIBUTED'
monitor_parcel(parcel_product, parcel_version, target_stage)
print("\n%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name))
print("Activating Parcel %s" % parcel_product)
print("\n\t%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name))
print("\tActivating Parcel %s" % parcel_product)
parcel_api.activate_command(cluster_name, parcel_product, parcel_version)
target_stage = 'ACTIVATED'
monitor_parcel(parcel_product, parcel_version, target_stage)
print('\n\n')


def get_parcel_status(parcel_product):
Expand Down Expand Up @@ -1242,7 +1271,7 @@ def push_rcg_config(config):
namenode_java_heapsize, namenode_log_dir, dfs_namenode_servicerpc_address]
for config in nn_config_list:
push_rcg_config(config)
create_role(rcg, rcg_roletype, service, snn_host_id, nn_hostname, 1)
create_role(rcg, rcg_roletype, service, nn_host_id, nn_hostname, 1)

if rcg == 'HDFS-DATANODE-BASE':
print('-->Updating RCG: %s' % rcg)
Expand Down Expand Up @@ -1296,7 +1325,7 @@ def push_rcg_config(config):
failover_controller_log_dir = [cm_client.ApiConfig(name='failover_controller_log_dir',
value=LOG_DIR + '/hadoop-hdfs')]
push_rcg_config(failover_controller_log_dir)
create_role(rcg, rcg_roletype, service, cm_host_id, cm_hostname, 1)
# create_role(rcg, rcg_roletype, service, snn_host_id, snn_hostname, 1)

if rcg == 'HDFS-HTTPFS-BASE':
print('-->Updating RCG: %s' % rcg)
Expand Down Expand Up @@ -1326,7 +1355,10 @@ def push_rcg_config(config):
rcg_roletype = 'JOURNALNODE'
dfs_journalnode_edits_dir = [cm_client.ApiConfig(name='dfs_journalnode_edits_dir',
value='/data/dfs/jn')]
journalnode_log_dir = [cm_client.ApiConfig(name='journalnode_log_dir',
value=LOG_DIR + '/hadoop-hdfs')]
push_rcg_config(dfs_journalnode_edits_dir)
push_rcg_config(journalnode_log_dir)
create_role(rcg, rcg_roletype, service, nn_host_id, nn_hostname, 1)
create_role(rcg, rcg_roletype, service, snn_host_id, snn_hostname, 2)
create_role(rcg, rcg_roletype, service, cm_host_id, cm_hostname, 3)
Expand Down Expand Up @@ -1954,7 +1986,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->restart_command {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'start_command':
Expand All @@ -1967,7 +1999,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->start_command {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'stop_command':
Expand All @@ -1980,7 +2012,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->stop_command {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'jmap_dump':
Expand All @@ -1993,7 +2025,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->jmap_dump {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'jmap_histo':
Expand All @@ -2006,7 +2038,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->jmap_history {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'jstack':
Expand All @@ -2019,7 +2051,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->jstack {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)

if action == 'lsof':
Expand All @@ -2032,7 +2064,7 @@ def mgmt_role_commands(action):
pprint(api_response)
except ApiException as e:
print('Exception running MgmtRoleCommandsResourceApi->lsof {}\n'.format(e))
active_command = action + ' ' + role
active_command = '\t' + action + ' ' + role
wait_for_active_mgmt_commands(active_command)


Expand Down Expand Up @@ -2094,7 +2126,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> start_command {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'restart_command':
Expand All @@ -2104,7 +2136,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> restart_command {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'stop_command':
Expand All @@ -2114,7 +2146,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> stop_command {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'auto_assign_roles':
Expand All @@ -2124,7 +2156,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> auto_assign_roles {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'auto_configure_roles':
Expand All @@ -2134,7 +2166,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> auto_configure_roles {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'delete_cms':
Expand All @@ -2144,7 +2176,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> delete_cms {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'enter_maintenance_mode':
Expand All @@ -2154,7 +2186,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> enter_maintenance_mode {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'exit_maintenance_mode':
Expand All @@ -2164,7 +2196,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> exit_maintenance_mode {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)

if action == 'auto_configure_roles':
Expand All @@ -2174,7 +2206,7 @@ def mgmt_service(action):
pprint(api_response)
except ApiException as e:
print('Exception calling MgmtServiceResourceApi -> auto_configure {}\n'.format(e))
active_command = 'MGMT ' + action
active_command = '\tMGMT ' + action
wait_for_active_mgmt_commands(active_command)


Expand Down Expand Up @@ -2379,10 +2411,12 @@ def update_license():
"""
try:
with open(license_file) as l:
body = l.read()
license_data = l.read()
body = license_file
try:
api_response = cloudera_manager_api.update_license(body=body)
if debug == 'True':
print('License File Content: \n%s' % license_data)
pprint(api_response)
except ApiException as e:
print('Exception calling Cloudera Manager Resource API -> update_license {}'.format(e))
Expand All @@ -2392,15 +2426,16 @@ def update_license():
begin_trial()


def hdfs_enable_nn_ha(nn_hostname, snn_hostname, snn_host_id):
def hdfs_enable_nn_ha(snn_host_id):
"""
Enable High Availability (HA) with Automatic Failover for an HDFS NameNode.
:return:
"""
body = cm_client.ApiEnableNnHaArguments(active_nn_name=nn_hostname, standby_nn_name=snn_hostname,
standby_nn_host_id=snn_host_id)
body = cm_client.ApiEnableNnHaArguments(active_nn_name='HDFS-NAMENODE-1',
standby_nn_host_id=snn_host_id,
nameservice='HDFS-HA-%s' % cluster_name)
try:
api_response = services_api.hdfs_enable_nn_ha_command(cluster_name, 'HDFS', body=body)
api_response = services_api.hdfs_enable_nn_ha_command(cluster_name=cluster_name, service_name='HDFS', body=body)
if debug == 'True':
pprint(api_response)
except ApiException as e:
Expand Down Expand Up @@ -2512,11 +2547,10 @@ def build_cloudera_cluster():
install_success = 'False'
while install_success == 'False':
install_hosts()
active_command = 'Host Agents Installing'
wait_for_active_cluster_commands(active_command)
wait_for_active_cluster_commands('Host Agents Installing')
print('->Host Installation Complete')
add_hosts_to_cluster(cluster_host_list)
active_command = 'Hosts Adding to Cluster ' + cluster_name
active_command = '\tHosts Adding to Cluster ' + cluster_name
wait_for_active_cluster_commands(active_command)
if host_install_failure == 'True':
print('->Host SCM Agent Install Failure Detected, trying again')
Expand Down Expand Up @@ -2568,10 +2602,15 @@ def build_cloudera_cluster():
api_response = clusters_api.first_run(cluster_name)
if debug == 'True':
pprint(api_response)
active_command = '\tFirst Run on ' + cluster_name
wait_for_active_cluster_commands(active_command)
time.sleep(5)
except ApiException as e:
print('Exception calling ClustersResourceApi -> first_run {}\n'.format(e))
active_command = 'First Run on ' + cluster_name
wait_for_active_cluster_commands(active_command)
sys.exit()
print('->Enabling HDFS HA')
hdfs_enable_nn_ha(snn_host_id)
wait_for_active_service_commands('\tEnable HDFS HA', 'HDFS')
if secure_cluster == 'True':
pass
else:
Expand All @@ -2586,10 +2625,10 @@ def enable_kerberos():
:return:
"""
config_mgmt_for_kerberos()
wait_for_active_mgmt_commands('Deploying MGMT Kerberos Configuration')
wait_for_active_mgmt_commands('\tDeploying MGMT Kerberos Configuration')
print('-->Import KDC Admin Credentials')
import_admin_credentials()
wait_for_active_cluster_commands('Importing KDC Account Management Credentials')
wait_for_active_cluster_commands('\tImporting KDC Account Management Credentials')
# Set Service Configurations
# SOLR
solr_security_authentication = [cm_client.ApiConfig(name='solr_security_authentication', value='kerberos')]
Expand Down Expand Up @@ -2620,21 +2659,21 @@ def enable_kerberos():
update_service_config(service_name='ZOOKEEPER', api_config_items=quorum_auth_enable_sasl)
print('-->Stop Cluster Services')
cluster_action('stop_command')
wait_for_active_cluster_service_commands('Stopping Cluster Services')
wait_for_active_cluster_service_commands('\tStopping Cluster Services')
mgmt_service('stop_command')
print('-->Configure Cluster for Kerberos')
configure_for_kerberos()
wait_for_active_cluster_service_commands('Configuring for Kerberos')
wait_for_active_cluster_service_commands('\tConfiguring for Kerberos')
print('-->Deploy Cluster Kerberos Configuration')
cluster_action('deploy_cluster_client_config')
wait_for_active_cluster_service_commands('Deploying Cluster Kerberos Client Config')
wait_for_active_cluster_service_commands('\tDeploying Cluster Kerberos Client Config')
print('-->Start Cluster Services')
cluster_action('start_command')
wait_for_active_cluster_service_commands('Starting Cluster Services')
wait_for_active_cluster_service_commands('\tStarting Cluster Services')
mgmt_service('start_command')
print('-->Deploy Client Configuration')
cluster_action('deploy_client_config')
wait_for_active_cluster_commands('Deploy Cluster Client Config')
wait_for_active_cluster_commands('\tDeploy Cluster Client Config')
if debug == 'True':
print('->Listing Kerberos Principals')
get_kerberos_principals()
Expand All @@ -2658,7 +2697,7 @@ def enable_kerberos():
build_api_endpoints(admin_user_name, admin_password)
print('-->Stop Cluster Services')
cluster_action('stop_command')
wait_for_active_cluster_service_commands('Stopping Cluster Services')
wait_for_active_cluster_service_commands('\tStopping Cluster Services')
delete_cluster()
print('%s Delete issued' % cluster_name)
exit(0)
Expand Down Expand Up @@ -2714,7 +2753,6 @@ def enable_kerberos():
pprint(cluster_host_list)
print('->Full Deployment Follows')
get_deployment_full()

else:
print('Cluster Check returned null: %s' % cluster_exists)
else:
Expand Down

0 comments on commit 1af00dd

Please sign in to comment.