Skip to content

Commit

Permalink
rca about oms without obcluster (oceanbase#608)
Browse files Browse the repository at this point in the history
* rca about oms without obcluster

* rca about oms without obcluster

* rca about oms without obcluster
  • Loading branch information
wayyoungboy authored Dec 9, 2024
1 parent e052cbc commit 43858a4
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 46 deletions.
5 changes: 4 additions & 1 deletion core.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def set_context(self, handler_name, namespace, config):
cluster_config=config.get_ob_cluster_config,
obproxy_config=config.get_obproxy_config,
ocp_config=config.get_ocp_config,
oms_config=config.get_oms_config,
cmd=self.cmds,
options=self.options,
stdio=self.stdio,
Expand All @@ -148,6 +149,7 @@ def set_context_skip_cluster_conn(self, handler_name, namespace, config):
cluster_config=config.get_ob_cluster_config,
obproxy_config=config.get_obproxy_config,
ocp_config=config.get_ocp_config,
oms_config=config.get_oms_config,
cmd=self.cmds,
options=self.options,
stdio=self.stdio,
Expand Down Expand Up @@ -516,7 +518,8 @@ def rca_run(self, opts):
self._call_stdio('error', 'No such custum config')
return ObdiagResult(ObdiagResult.INPUT_ERROR_CODE, error_data='No such custum config')
else:
self.update_obcluster_nodes(config)
if config.get_ob_cluster_config.get("db_host") is not None and config.get_ob_cluster_config.get("servers") is not None:
self.update_obcluster_nodes(config)
self.set_context('rca_run', 'rca_run', config)
try:
handler = RCAHandler(self.context)
Expand Down
2 changes: 1 addition & 1 deletion diag_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from __future__ import absolute_import, division, print_function
from common.tool import Util, StringUtils

import os
import sys
import textwrap
Expand Down Expand Up @@ -291,6 +290,7 @@ def do_command(self):
ret = self._do_command(obdiag)
exit_code = 0
except Exception as e:
ROOT_IO.exception(e)
ROOT_IO.error('command failed. Please contact OceanBase community. e: {0}'.format(e))
ret = ObdiagResult(code=ObdiagResult.SERVER_ERROR_CODE, error_data="command failed. Please contact OceanBase community. e: {0}".format(e))
exit_code = 1
Expand Down
87 changes: 45 additions & 42 deletions handler/gather/gather_component_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
@file: gather_component_log.py
@desc:
"""
import copy
import datetime
import os
import tarfile
Expand Down Expand Up @@ -216,51 +217,52 @@ def handle(self):
try:
if not self.result.is_success():
return self.result

# run on every node
node_threads = []
self.gather_tuples = []
tasks = []
self.stdio.start_loading("gather start")
semaphore = mp.Semaphore(self.thread_nums)
for node in self.nodes:
new_context = self.context
new_context.stdio = self.stdio.sub_io()
# use Process must delete ssh_client, and GatherLogOnNode will rebuild it.
if "ssh_client" in node or "ssher" in node:
clear_node = node
if "ssh_client" in node:
del clear_node["ssh_client"]
if "ssher" in node:
del clear_node["ssher"]
tasks.append(GatherLogOnNode(new_context, clear_node, self.gather_log_conf_dict, semaphore))
else:
tasks.append(GatherLogOnNode(new_context, node, self.gather_log_conf_dict, semaphore))
file_queue = []
result_list = mp.Queue()
for task in tasks:
semaphore.acquire()
file_thread = mp.Process(target=task.handle, args=(result_list,))
file_thread.start()
file_queue.append(file_thread)
for file_thread in file_queue:
file_thread.join()
for _ in range(result_list.qsize()):
self.gather_tuples.append(result_list.get())
self.stdio.verbose("gather_tuples: {0}".format(self.gather_tuples))
self.stdio.stop_loading("succeed")
# save result
summary_tuples = self.__get_overall_summary(self.gather_tuples)
self.stdio.print(summary_tuples)
with open(os.path.join(self.store_dir, "result_summary.txt"), 'a', encoding='utf-8') as fileobj:
fileobj.write(summary_tuples.get_string())
self.stdio.stop_loading("succeed")
try:
semaphore = mp.Semaphore(self.thread_nums)
for node in self.nodes:
new_context = self.context
new_context.stdio = self.stdio.sub_io()
# use Process must delete ssh_client, and GatherLogOnNode will rebuild it.
if "ssh_client" in node or "ssher" in node:
clear_node = copy.deepcopy(node)
if "ssh_client" in node:
del clear_node["ssh_client"]
if "ssher" in node:
del clear_node["ssher"]
tasks.append(GatherLogOnNode(new_context, clear_node, self.gather_log_conf_dict, semaphore))
else:
tasks.append(GatherLogOnNode(new_context, node, self.gather_log_conf_dict, semaphore))
file_queue = []
result_list = mp.Queue()
for task in tasks:
semaphore.acquire()
file_thread = mp.Process(target=task.handle, args=(result_list,))
file_thread.start()
file_queue.append(file_thread)
for file_thread in file_queue:
file_thread.join()
for _ in range(result_list.qsize()):
self.gather_tuples.append(result_list.get())
self.stdio.verbose("gather_tuples: {0}".format(self.gather_tuples))
summary_tuples = self.__get_overall_summary(self.gather_tuples)
self.stdio.print(summary_tuples)
with open(os.path.join(self.store_dir, "result_summary.txt"), 'a', encoding='utf-8') as fileobj:
fileobj.write(summary_tuples.get_string())
except Exception as e:
self.stdio.verbose("gather log error: {0}".format(e))
finally:
self.stdio.stop_loading("succeed")

last_info = "For result details, please run cmd \033[32m' cat {0} '\033[0m\n".format(os.path.join(self.store_dir, "result_summary.txt"))
self.stdio.print(last_info)
try:
if self.redact and len(self.redact) > 0:
self.stdio.start_loading("gather redact start")
if self.redact and len(self.redact) > 0:
self.stdio.start_loading("gather redact start")
try:
self.stdio.verbose("redact_option is {0}".format(self.redact))
redact_dir = "{0}_redact".format(self.store_dir)
self.redact_dir = redact_dir
Expand All @@ -270,12 +272,13 @@ def handle(self):
redact.redact_files(self.redact, all_files)
self.stdio.print("redact success the log save on {0}".format(self.redact_dir))
self.__delete_all_files_in_tar()
self.stdio.stop_loading("succeed")
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"store_dir": redact_dir, "redact_dir": self.redact_dir})
except Exception as e:
self.stdio.verbose(traceback.format_exc())
self.stdio.error("redact failed {0}".format(e))
return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="redact failed {0}".format(e))
except Exception as e:
self.stdio.exception(e)
self.stdio.error("redact failed {0}".format(e))
return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="redact failed {0}".format(e))
finally:
self.stdio.stop_loading("succeed")
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"store_dir": self.store_dir})
except Exception as e:
self.stdio.verbose(traceback.format_exc())
Expand Down
9 changes: 7 additions & 2 deletions handler/rca/rca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, context):

# build ob_connector
try:
if self.ob_cluster is not None:
if self.ob_cluster.get("db_host") is not None:
ob_connector = OBConnector(
context=self.context,
ip=self.ob_cluster.get("db_host"),
Expand Down Expand Up @@ -292,7 +292,12 @@ def __init__(self, context):
self.save_path = self.context.get_variable("store_dir")
self.rca_report_type = Util.get_option(self.context.options, 'report_type')
self.scene = Util.get_option(self.context.options, "scene")
self.version = get_version_by_type(self.context, "observer")
self.version = "unknown"
try:
if self.context.get_variable("ob_cluster").get("db_host") is not None or len(self.context.cluster_config.get("servers")) > 0:
self.version = get_version_by_type(self.context, "observer")
except Exception as e:
self.stdio.warn("rca get obcluster version fail. if the scene need not it, skip it")

def set_save_path(self, save_path):
self.save_path = os.path.expanduser(save_path)
Expand Down
File renamed without changes.

0 comments on commit 43858a4

Please sign in to comment.