From 1af00dd5aac65d078858e96db85dce471f7dc314 Mon Sep 17 00:00:00 2001 From: Zachary Smith Date: Wed, 13 Mar 2019 23:53:19 +0000 Subject: [PATCH] Bugfix, Enabled HDFS HA by default, other output beautification. --- v6/scripts/deploy_on_oci.py | 124 +++++++++++++++++++++++------------- 1 file changed, 81 insertions(+), 43 deletions(-) diff --git a/v6/scripts/deploy_on_oci.py b/v6/scripts/deploy_on_oci.py index b85fcff..39b6d6b 100644 --- a/v6/scripts/deploy_on_oci.py +++ b/v6/scripts/deploy_on_oci.py @@ -330,6 +330,34 @@ def wait_for_active_cluster_service_commands(active_command): print('Exception waiting for active commands: {}'.format(e)) +def wait_for_active_service_commands(active_command, service_name): + """ + Wait until Cloudera Cluster Service finishes running active_command + :param active_command: Descriptive of what should be running - this just waits if any task is detected running + :return: + """ + view = 'summary' + wait_status = '[*' + done = '0' + + while done == '0': + sys.stdout.write('\r%s - Waiting: %s' % (active_command, wait_status)) + try: + api_response = services_api.list_active_commands(cluster_name, service_name, view=view) + if not api_response.items: + if debug == 'True': + pprint(api_response) + done = '1' + sys.stdout.write(']\n') + break + else: + sys.stdout.flush() + time.sleep(10) + wait_status = wait_status + '*' + except ApiException as e: + print('Exception waiting for active service commands: {}'.format(e)) + + def wait_for_active_mgmt_commands(active_command): """ Wait until Cloudera Manager finishes running mgmt active_command @@ -820,7 +848,7 @@ def monitor_parcel(parcel_product, parcel_version, target_stage): if parcel.state.errors: raise Exception(str(parcel.state.errors)) - sys.stdout.write("\rParcel %s progress %s: %s / %s" % (parcel_product, parcel.stage, parcel.state.progress, + sys.stdout.write("\r\tParcel %s progress %s: %s / %s" % (parcel_product, parcel.stage, parcel.state.progress, parcel.state.total_progress)) time.sleep(5) sys.stdout.flush() @@ -832,20 +860,21 @@ def monitor_parcel(parcel_product, parcel_version, target_stage): if parcel.product == parcel_product: parcel_version = parcel.version - print("Starting Parcel Download for %s - %s" % (parcel_product, parcel_version)) + print("\tStarting Parcel Download for %s - %s" % (parcel_product, parcel_version)) parcel_api.start_download_command(cluster_name, parcel_product, parcel_version) target_stage = 'DOWNLOADED' monitor_parcel(parcel_product, parcel_version, target_stage) - print("\n%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name)) - print("Starting Distribution for %s - %s" % (parcel_product, parcel_version)) + print("\n\t%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name)) + print("\tStarting Distribution for %s - %s" % (parcel_product, parcel_version)) parcel_api.start_distribution_command(cluster_name, parcel_product, parcel_version) target_stage = 'DISTRIBUTED' monitor_parcel(parcel_product, parcel_version, target_stage) - print("\n%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name)) - print("Activating Parcel %s" % parcel_product) + print("\n\t%s parcel %s version %s on cluster %s" % (target_stage, parcel_product, parcel_version, cluster_name)) + print("\tActivating Parcel %s" % parcel_product) parcel_api.activate_command(cluster_name, parcel_product, parcel_version) target_stage = 'ACTIVATED' monitor_parcel(parcel_product, parcel_version, target_stage) + print('\n\n') def get_parcel_status(parcel_product): @@ -1242,7 +1271,7 @@ def push_rcg_config(config): namenode_java_heapsize, namenode_log_dir, dfs_namenode_servicerpc_address] for config in nn_config_list: push_rcg_config(config) - create_role(rcg, rcg_roletype, service, snn_host_id, nn_hostname, 1) + create_role(rcg, rcg_roletype, service, nn_host_id, nn_hostname, 1) if rcg == 'HDFS-DATANODE-BASE': print('-->Updating RCG: %s' % rcg) @@ -1296,7 +1325,7 @@ def push_rcg_config(config): failover_controller_log_dir = [cm_client.ApiConfig(name='failover_controller_log_dir', value=LOG_DIR + '/hadoop-hdfs')] push_rcg_config(failover_controller_log_dir) - create_role(rcg, rcg_roletype, service, cm_host_id, cm_hostname, 1) + # create_role(rcg, rcg_roletype, service, snn_host_id, snn_hostname, 1) if rcg == 'HDFS-HTTPFS-BASE': print('-->Updating RCG: %s' % rcg) @@ -1326,7 +1355,10 @@ def push_rcg_config(config): rcg_roletype = 'JOURNALNODE' dfs_journalnode_edits_dir = [cm_client.ApiConfig(name='dfs_journalnode_edits_dir', value='/data/dfs/jn')] + journalnode_log_dir = [cm_client.ApiConfig(name='journalnode_log_dir', + value=LOG_DIR + '/hadoop-hdfs')] push_rcg_config(dfs_journalnode_edits_dir) + push_rcg_config(journalnode_log_dir) create_role(rcg, rcg_roletype, service, nn_host_id, nn_hostname, 1) create_role(rcg, rcg_roletype, service, snn_host_id, snn_hostname, 2) create_role(rcg, rcg_roletype, service, cm_host_id, cm_hostname, 3) @@ -1954,7 +1986,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->restart_command {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'start_command': @@ -1967,7 +1999,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->start_command {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'stop_command': @@ -1980,7 +2012,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->stop_command {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'jmap_dump': @@ -1993,7 +2025,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->jmap_dump {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'jmap_histo': @@ -2006,7 +2038,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->jmap_history {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'jstack': @@ -2019,7 +2051,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->jstack {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) if action == 'lsof': @@ -2032,7 +2064,7 @@ def mgmt_role_commands(action): pprint(api_response) except ApiException as e: print('Exception running MgmtRoleCommandsResourceApi->lsof {}\n'.format(e)) - active_command = action + ' ' + role + active_command = '\t' + action + ' ' + role wait_for_active_mgmt_commands(active_command) @@ -2094,7 +2126,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> start_command {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'restart_command': @@ -2104,7 +2136,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> restart_command {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'stop_command': @@ -2114,7 +2146,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> stop_command {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'auto_assign_roles': @@ -2124,7 +2156,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> auto_assign_roles {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'auto_configure_roles': @@ -2134,7 +2166,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> auto_configure_roles {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'delete_cms': @@ -2144,7 +2176,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> delete_cms {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'enter_maintenance_mode': @@ -2154,7 +2186,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> enter_maintenance_mode {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'exit_maintenance_mode': @@ -2164,7 +2196,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> exit_maintenance_mode {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) if action == 'auto_configure_roles': @@ -2174,7 +2206,7 @@ def mgmt_service(action): pprint(api_response) except ApiException as e: print('Exception calling MgmtServiceResourceApi -> auto_configure {}\n'.format(e)) - active_command = 'MGMT ' + action + active_command = '\tMGMT ' + action wait_for_active_mgmt_commands(active_command) @@ -2379,10 +2411,12 @@ def update_license(): """ try: with open(license_file) as l: - body = l.read() + license_data = l.read() + body = license_file try: api_response = cloudera_manager_api.update_license(body=body) if debug == 'True': + print('License File Content: \n%s' % license_data) pprint(api_response) except ApiException as e: print('Exception calling Cloudera Manager Resource API -> update_license {}'.format(e)) @@ -2392,15 +2426,16 @@ def update_license(): begin_trial() -def hdfs_enable_nn_ha(nn_hostname, snn_hostname, snn_host_id): +def hdfs_enable_nn_ha(snn_host_id): """ Enable High Availability (HA) with Automatic Failover for an HDFS NameNode. :return: """ - body = cm_client.ApiEnableNnHaArguments(active_nn_name=nn_hostname, standby_nn_name=snn_hostname, - standby_nn_host_id=snn_host_id) + body = cm_client.ApiEnableNnHaArguments(active_nn_name='HDFS-NAMENODE-1', + standby_nn_host_id=snn_host_id, + nameservice='HDFS-HA-%s' % cluster_name) try: - api_response = services_api.hdfs_enable_nn_ha_command(cluster_name, 'HDFS', body=body) + api_response = services_api.hdfs_enable_nn_ha_command(cluster_name=cluster_name, service_name='HDFS', body=body) if debug == 'True': pprint(api_response) except ApiException as e: @@ -2512,11 +2547,10 @@ def build_cloudera_cluster(): install_success = 'False' while install_success == 'False': install_hosts() - active_command = 'Host Agents Installing' - wait_for_active_cluster_commands(active_command) + wait_for_active_cluster_commands('Host Agents Installing') print('->Host Installation Complete') add_hosts_to_cluster(cluster_host_list) - active_command = 'Hosts Adding to Cluster ' + cluster_name + active_command = '\tHosts Adding to Cluster ' + cluster_name wait_for_active_cluster_commands(active_command) if host_install_failure == 'True': print('->Host SCM Agent Install Failure Detected, trying again') @@ -2568,10 +2602,15 @@ def build_cloudera_cluster(): api_response = clusters_api.first_run(cluster_name) if debug == 'True': pprint(api_response) + active_command = '\tFirst Run on ' + cluster_name + wait_for_active_cluster_commands(active_command) + time.sleep(5) except ApiException as e: print('Exception calling ClustersResourceApi -> first_run {}\n'.format(e)) - active_command = 'First Run on ' + cluster_name - wait_for_active_cluster_commands(active_command) + sys.exit() + print('->Enabling HDFS HA') + hdfs_enable_nn_ha(snn_host_id) + wait_for_active_service_commands('\tEnable HDFS HA', 'HDFS') if secure_cluster == 'True': pass else: @@ -2586,10 +2625,10 @@ def enable_kerberos(): :return: """ config_mgmt_for_kerberos() - wait_for_active_mgmt_commands('Deploying MGMT Kerberos Configuration') + wait_for_active_mgmt_commands('\tDeploying MGMT Kerberos Configuration') print('-->Import KDC Admin Credentials') import_admin_credentials() - wait_for_active_cluster_commands('Importing KDC Account Management Credentials') + wait_for_active_cluster_commands('\tImporting KDC Account Management Credentials') # Set Service Configurations # SOLR solr_security_authentication = [cm_client.ApiConfig(name='solr_security_authentication', value='kerberos')] @@ -2620,21 +2659,21 @@ def enable_kerberos(): update_service_config(service_name='ZOOKEEPER', api_config_items=quorum_auth_enable_sasl) print('-->Stop Cluster Services') cluster_action('stop_command') - wait_for_active_cluster_service_commands('Stopping Cluster Services') + wait_for_active_cluster_service_commands('\tStopping Cluster Services') mgmt_service('stop_command') print('-->Configure Cluster for Kerberos') configure_for_kerberos() - wait_for_active_cluster_service_commands('Configuring for Kerberos') + wait_for_active_cluster_service_commands('\tConfiguring for Kerberos') print('-->Deploy Cluster Kerberos Configuration') cluster_action('deploy_cluster_client_config') - wait_for_active_cluster_service_commands('Deploying Cluster Kerberos Client Config') + wait_for_active_cluster_service_commands('\tDeploying Cluster Kerberos Client Config') print('-->Start Cluster Services') cluster_action('start_command') - wait_for_active_cluster_service_commands('Starting Cluster Services') + wait_for_active_cluster_service_commands('\tStarting Cluster Services') mgmt_service('start_command') print('-->Deploy Client Configuration') cluster_action('deploy_client_config') - wait_for_active_cluster_commands('Deploy Cluster Client Config') + wait_for_active_cluster_commands('\tDeploy Cluster Client Config') if debug == 'True': print('->Listing Kerberos Principals') get_kerberos_principals() @@ -2658,7 +2697,7 @@ def enable_kerberos(): build_api_endpoints(admin_user_name, admin_password) print('-->Stop Cluster Services') cluster_action('stop_command') - wait_for_active_cluster_service_commands('Stopping Cluster Services') + wait_for_active_cluster_service_commands('\tStopping Cluster Services') delete_cluster() print('%s Delete issued' % cluster_name) exit(0) @@ -2714,7 +2753,6 @@ def enable_kerberos(): pprint(cluster_host_list) print('->Full Deployment Follows') get_deployment_full() - else: print('Cluster Check returned null: %s' % cluster_exists) else: