From bdb7291898666ddbfb87d50689e36ba947bc912d Mon Sep 17 00:00:00 2001 From: Gwang Il Kang Date: Mon, 17 Jun 2024 04:49:11 +0900 Subject: [PATCH] etc: support latest plugin framework Signed-off-by: Gwangil Kang --- Dockerfile | 17 +- src/plugin/__init__.py | 3 + src/plugin/conf/__init__.py | 0 src/plugin/conf/cost_conf.py | 3 + src/plugin/connector/__init__.py | 2 + src/plugin/connector/bigquery_connector.py | 52 +++++ src/plugin/main.py | 157 +++++++++++++++ src/plugin/manager/__init__.py | 3 + src/plugin/manager/cost_manager.py | 212 +++++++++++++++++++++ src/plugin/manager/data_source_manager.py | 44 +++++ src/plugin/manager/job_manager.py | 116 +++++++++++ src/setup.py | 2 +- 12 files changed, 601 insertions(+), 10 deletions(-) create mode 100644 src/plugin/__init__.py create mode 100644 src/plugin/conf/__init__.py create mode 100644 src/plugin/conf/cost_conf.py create mode 100644 src/plugin/connector/__init__.py create mode 100644 src/plugin/connector/bigquery_connector.py create mode 100644 src/plugin/main.py create mode 100644 src/plugin/manager/__init__.py create mode 100644 src/plugin/manager/cost_manager.py create mode 100644 src/plugin/manager/data_source_manager.py create mode 100644 src/plugin/manager/job_manager.py diff --git a/Dockerfile b/Dockerfile index 219b7f6..4deae4b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ -FROM cloudforet/python-core:1 +FROM cloudforet/python-core:2.0 ENV PYTHONUNBUFFERED 1 -ENV CLOUDONE_PORT 50051 +ENV SPACEONE_PORT 50051 ENV SERVER_TYPE grpc ENV PKG_DIR /tmp/pkg ENV SRC_DIR /tmp/src @@ -9,17 +9,16 @@ ENV SRC_DIR /tmp/src RUN apt update && apt upgrade -y COPY pkg/*.txt ${PKG_DIR}/ + RUN pip install --upgrade pip && \ - pip install --upgrade --use-deprecated=legacy-resolver -r ${PKG_DIR}/pip_requirements.txt && \ - pip install --upgrade spaceone-api + pip install --upgrade -r ${PKG_DIR}/pip_requirements.txt && \ + pip install --upgrade --pre spaceone-core spaceone-api spaceone-cost-analysis==2.0.dev120 COPY src ${SRC_DIR} - WORKDIR ${SRC_DIR} -RUN python3 setup.py install && \ - rm -rf /tmp/* +RUN python3 setup.py install && rm -rf /tmp/* -EXPOSE ${CLOUDONE_PORT} +EXPOSE ${SPACEONE_PORT} ENTRYPOINT ["spaceone"] -CMD ["grpc", "spaceone.cost_analysis"] +CMD ["run", "plugin-server", "plugin"] diff --git a/src/plugin/__init__.py b/src/plugin/__init__.py new file mode 100644 index 0000000..8f5eb37 --- /dev/null +++ b/src/plugin/__init__.py @@ -0,0 +1,3 @@ +from cloudforet.cost_analysis.manager.data_source_manager import DataSourceManager +from cloudforet.cost_analysis.manager.job_manager import JobManager +from cloudforet.cost_analysis.manager.cost_manager import CostManager diff --git a/src/plugin/conf/__init__.py b/src/plugin/conf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/plugin/conf/cost_conf.py b/src/plugin/conf/cost_conf.py new file mode 100644 index 0000000..d20ba7f --- /dev/null +++ b/src/plugin/conf/cost_conf.py @@ -0,0 +1,3 @@ +DEFAULT_BILLING_DATASET = 'spaceone_billing_data' +BIGQUERY_TABLE_PREFIX = 'gcp_billing_export_v1' +SECRET_TYPE_DEFAULT = 'MANUAL' \ No newline at end of file diff --git a/src/plugin/connector/__init__.py b/src/plugin/connector/__init__.py new file mode 100644 index 0000000..d55e674 --- /dev/null +++ b/src/plugin/connector/__init__.py @@ -0,0 +1,2 @@ +from cloudforet.cost_analysis.connector.bigquery_connector import BigqueryConnector +from cloudforet.cost_analysis.connector.cloud_billing_connector import CloudBillingConnector diff --git a/src/plugin/connector/bigquery_connector.py b/src/plugin/connector/bigquery_connector.py new file mode 100644 index 0000000..9b14622 --- /dev/null +++ b/src/plugin/connector/bigquery_connector.py @@ -0,0 +1,52 @@ +import logging +import google.oauth2.service_account +import pandas_gbq +from googleapiclient.discovery import build + +from spaceone.core.connector import BaseConnector +from cloudforet.cost_analysis.error import * + +_LOGGER = logging.getLogger(__name__) + +REQUIRED_SECRET_KEYS = ["project_id", "private_key", "token_uri", "client_email"] + + +class BigqueryConnector(BaseConnector): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.project_id = None + self.credentials = None + self.google_client = None + + def create_session(self, options: dict, secret_data: dict, schema: str): + self._check_secret_data(secret_data) + self.project_id = secret_data['project_id'] + + self.credentials = google.oauth2.service_account.Credentials.from_service_account_info(secret_data) + self.google_client = build('bigquery', 'v2', credentials=self.credentials) + + def list_tables(self, billing_export_project_id, dataset_id, **query): + table_list = [] + + query.update({'projectId': billing_export_project_id, + 'datasetId': dataset_id}) + + request = self.google_client.tables().list(**query) + while request is not None: + response = request.execute() + for table in response.get('tables', []): + table_list.append(table) + request = self.google_client.tables().list_next(previous_request=request, previous_response=response) + + return table_list + + def read_df_from_bigquery(self, query): + return pandas_gbq.read_gbq(query, project_id=self.project_id, credentials=self.credentials) + + @staticmethod + def _check_secret_data(secret_data): + missing_keys = [key for key in REQUIRED_SECRET_KEYS if key not in secret_data] + if missing_keys: + for key in missing_keys: + raise ERROR_REQUIRED_PARAMETER(key=f"secret_data.{key}") diff --git a/src/plugin/main.py b/src/plugin/main.py new file mode 100644 index 0000000..3f6c31f --- /dev/null +++ b/src/plugin/main.py @@ -0,0 +1,157 @@ +from typing import Generator +from spaceone.cost_analysis.plugin.data_source.lib.server import DataSourcePluginServer +from .manager.data_source_manager import DataSourceManager +from .manager.job_manager import JobManager +from .manager.cost_manager import CostManager + +app = DataSourcePluginServer() + + +@app.route("DataSource.init") +def data_source_init(params: dict) -> dict: + """init plugin by options + + Args: + params (DataSourceInitRequest): { + 'options': 'dict', # Required + 'domain_id': 'str' # Required + } + + Returns: + PluginResponse: { + 'metadata': 'dict' + } + """ + options = params["options"] + + data_source_mgr = DataSourceManager() + return data_source_mgr.init_response(options) + + +@app.route("DataSource.verify") +def data_source_verify(params: dict) -> None: + """Verifying data source plugin + + Args: + params (CollectorVerifyRequest): { + 'options': 'dict', # Required + 'secret_data': 'dict', # Required + 'schema': 'str', + 'domain_id': 'str' # Required + } + + Returns: + None + """ + + options = params["options"] + secret_data = params["secret_data"] + domain_id = params.get("domain_id") + schema = params.get("schema") + + data_source_mgr = DataSourceManager() + data_source_mgr.verify_plugin(options, secret_data, domain_id, schema) + + +@app.route("Job.get_tasks") +def job_get_tasks(params: dict) -> dict: + """Get job tasks + + Args: + params (JobGetTaskRequest): { + 'options': 'dict', # Required + 'secret_data': 'dict', # Required + 'schema': 'str', + 'start': 'str', + 'last_synchronized_at': 'datetime', + 'domain_id': 'str' # Required + } + + Returns: + TasksResponse: { + 'tasks': 'list', + 'changed': 'list' + } + + """ + + domain_id = params["domain_id"] + options = params["options"] + secret_data = params["secret_data"] + + schema = params.get("schema") + start = params.get("start") + last_synchronized_at = params.get("last_synchronized_at") + + job_mgr = JobManager() + return job_mgr.get_tasks( + domain_id, options, secret_data, schema, start, last_synchronized_at + ) + + +@app.route("Cost.get_data") +def cost_get_data(params: dict) -> Generator[dict, None, None]: + """Get external cost data + + Args: + params (CostGetDataRequest): { + 'options': 'dict', # Required + 'secret_data': 'dict', # Required + 'schema': 'str', + 'task_options': 'dict', + 'domain_id': 'str' # Required + } + + Returns: + Generator[ResourceResponse, None, None] + { + 'cost': 'float', + 'usage_quantity': 'float', + 'usage_unit': 'str', + 'provider': 'str', + 'region_code': 'str', + 'product': 'str', + 'usage_type': 'str', + 'resource': 'str', + 'tags': 'dict' + 'additional_info': 'dict' + 'data': 'dict' + 'billed_date': 'str' + } + """ + + options = params["options"] + secret_data = params["secret_data"] + + task_options = params.get("task_options", {}) + schema = params.get("schema") + + cost_mgr = CostManager() + return cost_mgr.get_data(options, secret_data, task_options, schema) + + +@app.route("Cost.get_linked_accounts") +def cost_get_linked_accounts(params: dict) -> dict: + """ get linked accounts + + Args: + params: (CostGetLinkedAccountsRequest): { + 'options': 'dict' + 'schema': 'str' + 'secret_data': 'dict' + 'domain_id': 'str' + } + + Returns: + { + 'account_id': 'str' + 'name': 'str' + } + """ + options = params["options"] + secret_data = params["secret_data"] + + schema = params.get("schema") + + cost_mgr = CostManager() + return cost_mgr.get_linked_accounts(options, secret_data, schema) diff --git a/src/plugin/manager/__init__.py b/src/plugin/manager/__init__.py new file mode 100644 index 0000000..8f5eb37 --- /dev/null +++ b/src/plugin/manager/__init__.py @@ -0,0 +1,3 @@ +from cloudforet.cost_analysis.manager.data_source_manager import DataSourceManager +from cloudforet.cost_analysis.manager.job_manager import JobManager +from cloudforet.cost_analysis.manager.cost_manager import CostManager diff --git a/src/plugin/manager/cost_manager.py b/src/plugin/manager/cost_manager.py new file mode 100644 index 0000000..fb80108 --- /dev/null +++ b/src/plugin/manager/cost_manager.py @@ -0,0 +1,212 @@ +import logging +from typing import Generator, Union +from datetime import datetime, timedelta + +from spaceone.core import utils +from spaceone.core.manager import BaseManager +from spaceone.core.error import * + +from ..conf.cost_conf import BIGQUERY_TABLE_PREFIX +from ..connector.bigquery_connector import BigqueryConnector + +_LOGGER = logging.getLogger('spaceone') + +REQUIRED_TASK_OPTIONS = ["start", "billing_export_project_id", "billing_dataset_id", "billing_account_id"] +REQUIRED_OPTIONS = ["billing_export_project_id", "billing_dataset_id", "billing_account_id"] +EXCLUSIVE_PRODUCT = ['Invoice'] + +class CostManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.bigquery_connector = BigqueryConnector() + self.billing_export_project_id = None + self.billing_dataset = None + self.billing_table = None + + def get_linked_accounts(self, options: dict, secret_data: dict, schema: str) -> dict: + + linked_accounts = [] + self.bigquery_connector.create_session(options, secret_data, schema) + self._check_options(options) + + self.billing_export_project_id = options['billing_export_project_id'] + self.billing_dataset = options['billing_dataset_id'] + billing_account_id = options['billing_account_id'] + + self.billing_table = f'{BIGQUERY_TABLE_PREFIX}_{billing_account_id.replace("-", "_")}' + self._validate_table_exists() + + start_month = self._get_start_month() + + query = self._create_linked_accounts_google_sql(start_month) + response_stream = self.bigquery_connector.read_df_from_bigquery(query) + for index, row in response_stream.iterrows(): + _LOGGER.debug(f'[get_linked_accounts] row: {row}]') + if row.id is not None: + linked_accounts.append({ + 'account_id': row.id, + 'name': row.project_name + }) + + return {'results': linked_accounts} + + def get_data( + self, options: dict, secret_data: dict, task_options: dict, schema: str = None + ) -> Generator[dict, None, None]: + self.bigquery_connector.create_session(options, secret_data, schema) + self._check_task_options(task_options) + + start = task_options['start'] + self.billing_export_project_id = task_options['billing_export_project_id'] + self.billing_dataset = task_options['billing_dataset_id'] + billing_account_id = task_options['billing_account_id'] + self.target_project_id = task_options['project_id'] + + self.billing_table = f'{BIGQUERY_TABLE_PREFIX}_{billing_account_id.replace("-", "_")}' + self._validate_table_exists() + + _LOGGER.debug(f'[get_data] task_options: {task_options} / start: {start})') + + query = self._create_google_sql(start) + response_stream = self.bigquery_connector.read_df_from_bigquery(query) + for index, row in response_stream.iterrows(): + yield self._make_cost_data(row) + + yield {"results": []} + + def _make_cost_data(self, row) -> dict: + """ Source Data Model (DataFrame) + class CostSummaryItem(DataFrame): + billed_at: str + billing_account_id: str + sku_description: str + id: str + name: str + region_code: str + currency_conversion_rate: float + pricing_unit: str + month: str + cost_type: str + labels: str(list of dict) + cost: float + usage_quantity: float + """ + costs_data = [] + + try: + if row.product not in EXCLUSIVE_PRODUCT: + data = { + 'cost': row.cost, + 'usage_quantity': row.usage_quantity, + 'provider': 'google_cloud', + 'product': row.description, + 'region_code': row.region_code, + 'usage_type': row.sku_description, + 'usage_unit': row.pricing_unit, + 'billed_date': self._change_datetime_to_string(row.billed_at), + 'additional_info': { + 'Project ID': row.id, + 'Project Name': row.project_name, + 'Billing Account ID': row.billing_account_id, + 'Cost Type': row.cost_type, + 'Invoice Month': row.month, + }, + 'tags': {} + } + + if labels := eval(row.labels): + for label_object in labels: + data['tags'][label_object['key']] = label_object['value'] + + costs_data.append(data) + + except Exception as e: + _LOGGER.error(f'[_make_cost_data] make data error: {e}', exc_info=True) + raise e + + return {"results": costs_data} + + @staticmethod + def _check_task_options(task_options): + missing_keys = [key for key in REQUIRED_TASK_OPTIONS if key not in task_options] + if missing_keys: + for key in missing_keys: + raise ERROR_REQUIRED_PARAMETER(key=f"task_options.{key}") + + @staticmethod + def _check_options(options): + missing_keys = [key for key in REQUIRED_OPTIONS if key not in options] + if missing_keys: + for key in missing_keys: + raise ERROR_REQUIRED_PARAMETER(key=f"options.{key}") + + def _validate_table_exists(self): + bigquery_tables_info = self.bigquery_connector.list_tables(self.billing_export_project_id, self.billing_dataset) + bigquery_table_names = [table_info["tableReference"]["tableId"] for table_info in bigquery_tables_info] + + if self.billing_table not in bigquery_table_names: + raise ERROR_REQUIRED_PARAMETER(key=f"not found table {bigquery_table_names}") + + def _create_google_sql(self, start): + where_condition = f""" + WHERE usage_start_time >= TIMESTAMP('{start}-01') + """ + if self.target_project_id != '*': + where_condition += f" AND project.id = '{self.target_project_id}'" + + query = f""" + SELECT + timestamp_trunc(usage_start_time, DAY) as billed_at, + billing_account_id, + service.description, + sku.description as sku_description, + project.id, + project.name as project_name, + IFNULL((location.region), 'global') as region_code, + usage.pricing_unit, + invoice.month, + cost_type, + TO_JSON_STRING(labels) as labels, + + SUM(cost) + + SUM(IFNULL((SELECT SUM(c.amount) + FROM UNNEST(credits) c), 0)) + AS cost, + + SUM(usage.amount_in_pricing_units) as usage_quantity, + FROM `{self.billing_export_project_id}.{self.billing_dataset}.{self.billing_table}` + {where_condition} + GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 + ORDER BY billed_at desc + ; + """ + return query + + def _create_linked_accounts_google_sql(self, start): + where_condition = f""" + WHERE usage_start_time >= TIMESTAMP('{start}-01') + """ + + query = f""" + SELECT + distinct project.id, project.name as project_name + FROM `{self.billing_export_project_id}.{self.billing_dataset}.{self.billing_table}` + {where_condition} + ; + """ + return query + + @staticmethod + def _change_datetime_to_string(date_time): + return str(date_time.strftime("%Y-%m-%d")) + + @staticmethod + def _get_start_month(): + start_time: datetime = datetime.utcnow() - timedelta(days=365) + start_time = start_time.replace(day=1) + + start_time = start_time.replace( + hour=0, minute=0, second=0, microsecond=0, tzinfo=None + ) + + return start_time.strftime("%Y-%m") \ No newline at end of file diff --git a/src/plugin/manager/data_source_manager.py b/src/plugin/manager/data_source_manager.py new file mode 100644 index 0000000..731a476 --- /dev/null +++ b/src/plugin/manager/data_source_manager.py @@ -0,0 +1,44 @@ +import logging + +from spaceone.core.manager import BaseManager +from ..connector.bigquery_connector import BigqueryConnector + +_LOGGER = logging.getLogger('spaceone') + + +class DataSourceManager(BaseManager): + + @staticmethod + def init_response(options: dict) -> dict: + metadata = { + "currency": "KRW", + "supported_secret_types": ["MANUAL"], + "use_account_routing": False, + "data_source_rules": [ + { + "name": "match_service_account", + "conditions_policy": "ALWAYS", + "actions": { + "match_service_account": { + "source": "additional_info.Project ID", + "target": "data.project_id", + } + }, + "options": {"stop_processing": True}, + } + ], + } + if options.get("use_account_routing", False): + metadata["use_account_routing"] = True + if account_match_key := options.get("account_match_key", "additional_info.Project ID"): + metadata["account_match_key"] = account_match_key + + return {"metadata": metadata} + + @staticmethod + def verify_plugin( + options: dict, secret_data: dict, domain_id: str, schema: str = None + ) -> None: + + bigquery_connector = BigqueryConnector() + bigquery_connector.create_session(options, secret_data, schema) diff --git a/src/plugin/manager/job_manager.py b/src/plugin/manager/job_manager.py new file mode 100644 index 0000000..7e4b04d --- /dev/null +++ b/src/plugin/manager/job_manager.py @@ -0,0 +1,116 @@ +import logging +from datetime import datetime, timedelta + +from spaceone.core.error import * +from spaceone.core.manager import BaseManager + +from ..conf.cost_conf import BIGQUERY_TABLE_PREFIX +from ..connector.bigquery_connector import BigqueryConnector + + +_LOGGER = logging.getLogger('spaceone') + +REQUIRED_OPTIONS = ["billing_export_project_id", "billing_dataset_id", "billing_account_id"] + + +class JobManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.bigquery_connector = BigqueryConnector() + self.billing_export_project_id = None + self.billing_dataset = None + self.billing_table = None + + def get_tasks( + self, + domain_id: str, + options: dict, + secret_data: dict, + schema: str = None, + start: str = None, + last_synchronized_at: datetime = None) -> dict: + + self.bigquery_connector.create_session(options, secret_data, schema) + self._check_options(options) + + self.billing_export_project_id = options['billing_export_project_id'] + self.billing_dataset = options['billing_dataset_id'] + billing_account_id = options['billing_account_id'] + + self.billing_table = f'{BIGQUERY_TABLE_PREFIX}_{billing_account_id.replace("-", "_")}' + self._validate_table_exists() + + tasks = [] + changed = [] + + start_month = self._get_start_month(start, last_synchronized_at) + + query = self._create_google_sql(start_month) + response_stream = self.bigquery_connector.read_df_from_bigquery(query) + + for index, row in response_stream.iterrows(): + tasks.append( + { + "task_options": { + "start": start_month, + "compartment_id": row.id + } + } + ) + + changed.append({"start": start_month}) + + return {"tasks": tasks, "changed": changed} + + def _get_start_month(self, start, last_synchronized_at=None): + if start: + start_time: datetime = self._parse_start_time(start) + elif last_synchronized_at: + start_time: datetime = last_synchronized_at - timedelta(days=7) + start_time = start_time.replace(day=1) + else: + start_time: datetime = datetime.utcnow() - timedelta(days=365) + start_time = start_time.replace(day=1) + + start_time = start_time.replace( + hour=0, minute=0, second=0, microsecond=0, tzinfo=None + ) + + return start_time.strftime("%Y-%m") + + @staticmethod + def _parse_start_time(start_str): + date_format = "%Y-%m" + + try: + return datetime.strptime(start_str, date_format) + except Exception as e: + raise ERROR_INVALID_PARAMETER_TYPE(key="start", type=date_format) + + @staticmethod + def _check_options(options): + missing_keys = [key for key in REQUIRED_OPTIONS if key not in options] + if missing_keys: + for key in missing_keys: + raise ERROR_REQUIRED_PARAMETER(key=f"options.{key}") + + def _validate_table_exists(self): + bigquery_tables_info = self.bigquery_connector.list_tables(self.billing_export_project_id, self.billing_dataset) + bigquery_table_names = [table_info["tableReference"]["tableId"] for table_info in bigquery_tables_info] + + if self.billing_table not in bigquery_table_names: + raise ERROR_REQUIRED_PARAMETER(key=f"not found table {bigquery_table_names}") + + def _create_google_sql(self, start): + where_condition = f""" + WHERE usage_start_time >= TIMESTAMP('{start}-01') + """ + + query = f""" + SELECT + distinct project.id + FROM `{self.billing_export_project_id}.{self.billing_dataset}.{self.billing_table}` + {where_condition} + ; + """ + return query diff --git a/src/setup.py b/src/setup.py index d8adad2..330742d 100644 --- a/src/setup.py +++ b/src/setup.py @@ -32,7 +32,7 @@ packages=find_packages(), install_requires=[ 'spaceone-api', - 'schematics', + 'spaceone-cost-analysis', 'google-api-python-client', 'pandas-gbq', 'tqdm'