Skip to content

Commit

Permalink
[Integration][Kubecost][Opencost] Add support for filters (#749)
Browse files Browse the repository at this point in the history
  • Loading branch information
lordsarcastic authored Jul 23, 2024
1 parent acd9544 commit 1879502
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 83 deletions.
13 changes: 13 additions & 0 deletions integrations/kubecost/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

# Port_Ocean 0.1.59 (2024-07-15)

### Features

- Added support for filter in 'cloud' and 'kubesystem' kind (#749)
- Added separate resource config for 'allocation' v1 and v2 (#749)

### Improvements

- Added separate resource config for 'cloudCost' v1 and v2 (#749)
- Separated 'kubesystem' resource config from 'cloud' (#749)


# port_ocean 0.1.58 (2024-07-15)

### Bug Fixes
Expand Down
42 changes: 28 additions & 14 deletions integrations/kubecost/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import typing
from typing import Any

import httpx
from loguru import logger

from integration import KubecostResourceConfig, KubecostSelector
from port_ocean.context.event import event
from port_ocean.utils import http_async_client

from integration import (
CloudCostV1Selector,
CloudCostV2Selector,
KubecostV1Selector,
KubecostV2Selector,
)

KUBECOST_API_VERSION_1 = "v1"


Expand All @@ -17,16 +20,26 @@ def __init__(self, kubecost_host: str, kubecost_api_version: str):
self.kubecost_api_version = kubecost_api_version
self.http_client = http_async_client

def generate_params(self, selector: KubecostSelector) -> dict[str, str]:
def generate_params(
self,
selector: (
CloudCostV1Selector
| CloudCostV2Selector
| KubecostV1Selector
| KubecostV2Selector
),
) -> dict[str, str]:
params = selector.dict(exclude_unset=True, by_alias=True)
params.pop("query")
return params

async def get_kubesystem_cost_allocation(self) -> list[dict[str, Any]]:
async def get_kubesystem_cost_allocation(
self, selector: KubecostV1Selector | KubecostV2Selector
) -> list[dict[str, Any]]:
"""Calls the Kubecost allocation endpoint to return data for cost and usage
https://docs.kubecost.com/apis/apis-overview/api-allocation
"""
selector = typing.cast(KubecostResourceConfig, event.resource_config).selector

params: dict[str, str] = {
"window": selector.window,
**self.generate_params(selector),
Expand All @@ -48,21 +61,22 @@ async def get_kubesystem_cost_allocation(self) -> list[dict[str, Any]]:
logger.error(f"HTTP occurred while fetching kubecost data: {e}")
raise

async def get_cloud_cost_allocation(self) -> list[dict[str, Any]]:
async def get_cloud_cost_allocation(
self, selector: CloudCostV1Selector | CloudCostV2Selector
) -> list[dict[str, Any]]:
"""Calls the Kubecost cloud allocation API. It uses the Aggregate endpoint which returns detailed cloud cost data
https://docs.kubecost.com/apis/apis-overview/cloud-cost-api
"""
selector = typing.cast(KubecostResourceConfig, event.resource_config).selector
url = f"{self.kubecost_host}/model/cloudCost"

if self.kubecost_api_version == KUBECOST_API_VERSION_1:
url = f"{self.kubecost_host}/model/cloudCost/aggregate"

params: dict[str, str] = {
"window": selector.window,
**self.generate_params(selector),
}

if self.kubecost_api_version == KUBECOST_API_VERSION_1:
url = f"{self.kubecost_host}/model/cloudCost/aggregate"
else:
url = f"{self.kubecost_host}/model/cloudCost"

try:
response = await self.http_client.get(
url=url,
Expand Down
216 changes: 201 additions & 15 deletions integrations/kubecost/integration.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import re
from typing import Literal

from pydantic.fields import Field

from port_ocean.core.handlers.port_app_config.api import APIPortAppConfig
from port_ocean.core.handlers.port_app_config.models import (
PortAppConfig,
ResourceConfig,
Selector,
)
from port_ocean.core.integrations.base import BaseIntegration
from pydantic.fields import Field


class DatePairField(str):
Expand Down Expand Up @@ -56,7 +55,83 @@ def validate(cls, value: str) -> None:
)


class KubecostSelector(Selector):
class CloudCostV1Selector(Selector):
window: (
Literal[
"today",
"week",
"month",
"yesterday",
"lastweek",
"lastmonth",
"30m",
"12h",
"7d",
]
| DatePairField
| UnixtimePairField
) = Field(default="today")
aggregate: AggregationField | None = Field(
description="Field by which to aggregate the results.",
)
filter_invoice_entity_ids: str | None = Field(
alias="filterInvoiceEntityIDs", description="GCP only, filter for projectID"
)
filter_account_ids: str | None = Field(
alias="filterAccountIDs", description="Filter for account"
)
filter_providers: str | None = Field(
alias="filterProviders", description="Filter for cloud service provider"
)
filter_label: str | None = Field(
alias="filterLabel",
description="Filter for a specific label. Does not support filtering for multiple labels at once.",
)
filter_services: str | None = Field(
alias="filterServices",
description="Comma-separated list of services to match; e.g. frontend-one,frontend-two will return results with either of those two services",
)


class CloudCostV2Selector(Selector):
window: (
Literal[
"today",
"week",
"month",
"yesterday",
"lastweek",
"lastmonth",
"30m",
"12h",
"7d",
]
| DatePairField
| UnixtimePairField
) = Field(default="today")
aggregate: AggregationField | None = Field(
description="Field by which to aggregate the results.",
)
accumulate: bool = Field(
default=False,
description="If true, sum the entire range of sets into a single set. Default value is false",
)
offset: int | None = Field(
description="Number of items to skip before starting to collect the result set.",
)
limit: int | None = Field(
description="Maximum number of items to return in the result set.",
)
filter: str | None = Field(
description=(
"Filter results by any category which that can be aggregated by,"
" can support multiple filterable items in the same category in"
" a comma-separated list."
),
)


class KubecostV1Selector(Selector):
window: (
Literal[
"today",
Expand Down Expand Up @@ -154,27 +229,138 @@ class KubecostSelector(Selector):
default=0.0,
description="Floating-point value representing a monthly cost to share with the remaining non-idle, unshared allocations; e.g. 30.42 ($1.00/day == $30.42/month) for the query yesterday (1 day) will split and distribute exactly $1.00 across the allocations. Default is 0.0.",
)
filter_invoice_entity_ids: str | None = Field(
alias="filterInvoiceEntityIDs", description="Filter for account"


class KubecostV2Selector(Selector):
window: (
Literal[
"today",
"week",
"month",
"yesterday",
"lastweek",
"lastmonth",
"30m",
"12h",
"7d",
]
| DatePairField
| UnixtimePairField
) = Field(default="today")
aggregate: AggregationField | None = Field(
description="Field by which to aggregate the results.",
)
filter_account_ids: str | None = Field(
alias="filterAccountIDs", description="GCP only, filter for projectID"
step: DurationField | None = Field(
description="Duration of a single allocation set (e.g., '30m', '2h', '1d'). Default is window.",
)
filter_providers: str | None = Field(
alias="filterProviders", description="Filter for cloud service provider"
accumulate: bool = Field(
default=False,
description="If true, sum the entire range of sets into a single set. Default value is false",
)
filter_label: str | None = Field(
alias="filterLabel",
description="Filter for a specific label. Does not support filtering for multiple labels at once.",
idle: bool = Field(
default=True,
description="If true, include idle cost (i.e. the cost of the un-allocated assets) as its own allocation",
)
external: bool = Field(
default=False,
description="If true, include external, or out-of-cluster costs in each allocation. Default is false.",
)
offset: int | None = Field(
description="Number of items to skip before starting to collect the result set.",
)
limit: int | None = Field(
description="Maximum number of items to return in the result set.",
)
filter: str | None = Field(
description=(
"Filter results by any category which that can be aggregated by,"
" can support multiple filterable items in the same category in"
" a comma-separated list."
),
)
format: Literal["csv", "pdf"] | None = Field(
description="Format of the output. Default is JSON.",
)
cost_metric: Literal["cummulative", "hourly", "daily", "monthly"] = Field(
description="Cost metric format.", default="cummulative", alias="costMetric"
)
share_idle: bool = Field(
alias="shareIdle",
default=False,
description="If true, idle cost is allocated proportionally across all non-idle allocations, per-resource. That is, idle CPU cost is shared with each non-idle allocation's CPU cost, according to the percentage of the total CPU cost represented. Default is false",
)
split_idle: bool = Field(
alias="splitIdle",
default=False,
description="If true, and shareIdle == false, Idle Allocations are created on a per cluster or per node basis rather than being aggregated into a single idle allocation. Default is false",
)
idle_by_node: bool = Field(
alias="idleByNode",
default=False,
description="f true, idle allocations are created on a per node basis. Which will result in different values when shared and more idle allocations when split. Default is false.",
)
include_shared_cost_breakdown: bool = Field(
alias="includeSharedCostBreakdown",
default=True,
description="If true, the cost breakdown for shared costs is included in the response. Default is false.",
)
reconcile: bool = Field(
default=True,
description="If true, pulls data from the Assets cache and corrects prices of Allocations according to their related Assets",
)
share_tenancy_costs: bool = Field(
alias="shareTenancyCosts",
description="If true, share the cost of cluster overhead assets such as cluster management costs and node attached volumes across tenants of those resources.",
default=True,
)
share_namespaces: str | None = Field(
alias="shareNamespaces",
description="Comma-separated list of namespaces to share; e.g. kube-system, kubecost will share the costs of those two namespaces with the remaining non-idle, unshared allocations.",
)
share_labels: str | None = Field(
alias="shareLabels",
description="Comma-separated list of labels to share; e.g. env:staging, app:test will share the costs of those two label values with the remaining non-idle, unshared allocations.",
)
share_cost: float = Field(
alias="shareCost",
default=0.0,
description="Floating-point value representing a monthly cost to share with the remaining non-idle, unshared allocations; e.g. 30.42 ($1.00/day == $30.42/month) for the query yesterday (1 day) will split and distribute exactly $1.00 across the allocations. Default is 0.0.",
)
share_split: Literal["weighted", "even"] = Field(
alias="shareSplit",
default="weighted",
description="Determines how to split shared costs among non-idle, unshared allocations.",
)


class CloudCostV1ResourceConfig(ResourceConfig):
selector: CloudCostV1Selector
kind: Literal["cloud"]


class CloudCostV2ResourceConfig(ResourceConfig):
selector: CloudCostV2Selector
kind: Literal["cloud"]


class KubecostV1ResourceConfig(ResourceConfig):
selector: KubecostV1Selector
kind: Literal["kubesystem"]


class KubecostResourceConfig(ResourceConfig):
selector: KubecostSelector
class KubecostV2ResourceConfig(ResourceConfig):
selector: KubecostV2Selector
kind: Literal["kubesystem"]


class KubecostPortAppConfig(PortAppConfig):
resources: list[KubecostResourceConfig] = Field(default_factory=list) # type: ignore
resources: list[
KubecostV1ResourceConfig
| KubecostV2ResourceConfig
| CloudCostV1ResourceConfig
| CloudCostV2ResourceConfig
| ResourceConfig
] = Field(default_factory=list)


class KubecostIntegration(BaseIntegration):
Expand Down
21 changes: 18 additions & 3 deletions integrations/kubecost/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import typing
from typing import Any

from client import KubeCostClient
from port_ocean.context.event import event
from port_ocean.context.ocean import ocean

from client import KubeCostClient
from integration import (
CloudCostV1ResourceConfig,
CloudCostV2ResourceConfig,
KubecostV1ResourceConfig,
KubecostV2ResourceConfig,
)


def init_client() -> KubeCostClient:
return KubeCostClient(
Expand All @@ -14,14 +23,20 @@ def init_client() -> KubeCostClient:
@ocean.on_resync("kubesystem")
async def on_kubesystem_cost_resync(kind: str) -> list[dict[Any, Any]]:
client = init_client()
data = await client.get_kubesystem_cost_allocation()
selector = typing.cast(
KubecostV1ResourceConfig | KubecostV2ResourceConfig, event.resource_config
).selector
data = await client.get_kubesystem_cost_allocation(selector)
return [value for item in data if item is not None for value in item.values()]


@ocean.on_resync("cloud")
async def on_cloud_cost_resync(kind: str) -> list[dict[Any, Any]]:
client = init_client()
data = await client.get_cloud_cost_allocation()
selector = typing.cast(
CloudCostV1ResourceConfig | CloudCostV2ResourceConfig, event.resource_config
).selector
data = await client.get_cloud_cost_allocation(selector)
return [value for item in data for value in item.get("cloudCosts", {}).values()]


Expand Down
Loading

0 comments on commit 1879502

Please sign in to comment.