Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__pycache__/
*.pyc
.storage/
.DS_Store
.vscode/
160 changes: 160 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""The YNAB Custom integration."""

import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers import entity_registry as er
import re

from .const import DOMAIN, CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL, CONF_INCLUDE_CLOSED_ACCOUNTS, CONF_INCLUDE_HIDDEN_CATEGORIES, DEFAULT_INCLUDE_CLOSED_ACCOUNTS, DEFAULT_INCLUDE_HIDDEN_CATEGORIES, CONF_SELECTED_ACCOUNTS, CONF_SELECTED_CATEGORIES
from .coordinator import YNABDataUpdateCoordinator

_LOGGER = logging.getLogger(__name__)

def sanitize_budget_name(budget_name: str) -> str:
"""Sanitize the budget name to create a valid Home Assistant entity ID."""
# Replace spaces with underscores and remove any special characters
sanitized_name = re.sub(r'[^a-zA-Z0-9_]', '', budget_name.replace(" ", "_"))
return sanitized_name

async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Migrate config entry from version 1 to version 2."""
_LOGGER.info(f"Migrating YNAB config entry {entry.entry_id} from version {entry.version} to version 2")
_LOGGER.debug(f"Entry data before migration: {entry.data}")
_LOGGER.debug(f"Entry options before migration: {entry.options}")

try:
# Create new data and options dictionaries
new_data = dict(entry.data)
new_options = dict(entry.options) if entry.options else {}

# Migrate update_interval from data to options if it exists in data
if CONF_UPDATE_INTERVAL in entry.data:
new_options.setdefault(CONF_UPDATE_INTERVAL, entry.data[CONF_UPDATE_INTERVAL])
# Remove from data since it should be in options
new_data.pop(CONF_UPDATE_INTERVAL, None)
else:
# Ensure update_interval exists in options with default
new_options.setdefault(CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL)

# Add new checkbox options with safe defaults
new_options.setdefault(CONF_INCLUDE_CLOSED_ACCOUNTS, DEFAULT_INCLUDE_CLOSED_ACCOUNTS)
new_options.setdefault(CONF_INCLUDE_HIDDEN_CATEGORIES, DEFAULT_INCLUDE_HIDDEN_CATEGORIES)

# Update the config entry
hass.config_entries.async_update_entry(
entry,
data=new_data,
options=new_options,
version=2
)

_LOGGER.info(f"Successfully migrated YNAB config entry {entry.entry_id} to version 2")
_LOGGER.debug(f"Entry data after migration: {new_data}")
_LOGGER.debug(f"Entry options after migration: {new_options}")
return True

except Exception as e:
_LOGGER.exception(f"Failed to migrate YNAB config entry {entry.entry_id}: {e}")
return False

async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up YNAB Custom from a config entry."""
_LOGGER.debug(f"Setting up YNAB integration {entry.entry_id}")

budget_id = entry.data.get("budget_id")
budget_name = entry.data.get("budget_name")

if not budget_id or not budget_name:
_LOGGER.error("Missing budget_id or budget_name in config entry.")
return False

# Sanitize the budget name to avoid issues with special characters or spaces
sanitized_budget_name = sanitize_budget_name(budget_name)
coordinator = YNABDataUpdateCoordinator(hass, entry, budget_id, sanitized_budget_name)

# Load persistent data before first refresh
await coordinator.async_load_persistent_data()

await coordinator.async_config_entry_first_refresh() # Ensure the first update occurs
hass.async_create_task(coordinator.async_refresh()) # 🔹 Manually force an update on startup

if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}

hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, ["sensor", "number"])



# Set up options update listener
entry.async_on_unload(entry.add_update_listener(async_update_options))

_LOGGER.info(f"YNAB Custom integration for {sanitized_budget_name} successfully loaded.")
return True


async def async_update_options(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Update options."""

# Get the coordinator to check what changed
coordinator = hass.data.get(DOMAIN, {}).get(entry.entry_id)
if coordinator:
old_accounts = set(coordinator.selected_accounts)
old_categories = set(coordinator.selected_categories)
new_accounts = set(entry.data.get(CONF_SELECTED_ACCOUNTS, []))
new_categories = set(entry.data.get(CONF_SELECTED_CATEGORIES, []))

_LOGGER.debug(f"Options update - OLD accounts: {old_accounts}, NEW accounts: {new_accounts}")
_LOGGER.debug(f"Options update - OLD categories: {old_categories}, NEW categories: {new_categories}")

if old_accounts != new_accounts or old_categories != new_categories:
_LOGGER.info(f"Account/category selections changed - doing full unload/reload cycle")

# Force a complete unload/reload cycle to ensure clean state
unload_result = await hass.config_entries.async_unload(entry.entry_id)
setup_result = await hass.config_entries.async_setup(entry.entry_id)

_LOGGER.info(f"Unload/reload cycle complete - Unload: {unload_result}, Setup: {setup_result}")
return
else:
_LOGGER.debug(f"Only settings changed, doing simple reload")
else:
_LOGGER.warning(f"No coordinator found for {entry.entry_id}")

# Simple reload for non-selection changes
await hass.config_entries.async_reload(entry.entry_id)
_LOGGER.debug(f"Simple reload complete")


async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""

# Clean up entity registry entries for this config entry
entity_registry = er.async_get(hass)
entities_to_remove = []

for entity_entry in entity_registry.entities.values():
if entity_entry.config_entry_id == entry.entry_id:
entities_to_remove.append(entity_entry.entity_id)

# Remove entities from registry
removed_entities = []
for entity_id in entities_to_remove:
entity_registry.async_remove(entity_id)
removed_entities.append(entity_id)

coordinator = hass.data[DOMAIN].pop(entry.entry_id, None)

if coordinator:
await coordinator.async_shutdown()

result = await hass.config_entries.async_unload_platforms(entry, ["sensor"])

if removed_entities:
_LOGGER.debug(f"Unload complete for {entry.entry_id} - removed {len(removed_entities)} entities")
else:
_LOGGER.debug(f"Unload complete for {entry.entry_id}")

return result
166 changes: 166 additions & 0 deletions api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""YNAB API Handler."""

import aiohttp
import logging
from datetime import datetime, timedelta
from collections import deque

_LOGGER = logging.getLogger(__name__)

class YNABApi:
"""YNAB API class for handling requests."""

BASE_URL = "https://api.youneedabudget.com/v1"

def __init__(self, access_token: str, shared_tracking: dict = None):
"""Initialize the API client."""
self.access_token = access_token
self.headers = {"Authorization": f"Bearer {access_token}"}

# Use shared request tracking if provided, otherwise create local tracking
if shared_tracking:
self.request_timestamps = shared_tracking["request_timestamps"]
self._shared_tracking = shared_tracking # Keep reference for shared counter
else:
# Fallback to local tracking (for backwards compatibility)
self.request_timestamps = deque()
self._shared_tracking = None

def _track_request(self):
"""Track a new API request with timestamp."""
now = datetime.now()
self.request_timestamps.append(now)

# Update shared total counter (cumulative across all instances)
if self._shared_tracking:
self._shared_tracking["total_requests"] += 1 # Increment shared counter
else:
# Fallback for local tracking
if not hasattr(self, 'total_requests'):
self.total_requests = 0
self.total_requests += 1

# Clean old requests (older than 1 hour)
cutoff_time = now - timedelta(hours=1)
cleaned_count = 0
while self.request_timestamps and self.request_timestamps[0] < cutoff_time:
self.request_timestamps.popleft()
cleaned_count += 1


def get_rate_limit_info(self):
"""Get current rate limit status and statistics."""
now = datetime.now()

# Clean old requests first
cutoff_time = now - timedelta(hours=1)
while self.request_timestamps and self.request_timestamps[0] < cutoff_time:
self.request_timestamps.popleft()

requests_this_hour = len(self.request_timestamps)
estimated_remaining = max(0, 200 - requests_this_hour)

# Calculate when the rate limit resets (1 hour from oldest request)
if self.request_timestamps:
oldest_request = self.request_timestamps[0]
rate_limit_resets_at = oldest_request + timedelta(hours=1)
else:
rate_limit_resets_at = now + timedelta(hours=1)

return {
"requests_made_total": self._shared_tracking["total_requests"] if self._shared_tracking else getattr(self, 'total_requests', 0),
"requests_this_hour": requests_this_hour,
"estimated_remaining": estimated_remaining,
"rate_limit_resets_at": rate_limit_resets_at.strftime("%I:%M %p"),
}

async def get_budgets(self):
"""Fetch available budgets."""
url = f"{self.BASE_URL}/budgets"
_LOGGER.debug("Fetching budgets from URL: %s", url)
return await self._get(url)

async def get_budget(self, budget_id: str):
"""Fetch full details for a specific budget."""
if not budget_id or budget_id == "budgets":
_LOGGER.error("Invalid budget_id before API call: %s", budget_id)
return {}

url = f"{self.BASE_URL}/budgets/{budget_id}"
_LOGGER.debug("Fetching budget from URL: %s", url)
return await self._get(url)

async def get_accounts(self, budget_id: str):
"""Fetch accounts for a specific budget."""
if not budget_id or budget_id == "budgets":
_LOGGER.error("Invalid budget_id before accounts API call: %s", budget_id)
return {}

url = f"{self.BASE_URL}/budgets/{budget_id}/accounts"
_LOGGER.debug("Fetching accounts from URL: %s", url)
return await self._get(url)

async def get_categories(self, budget_id: str):
"""Fetch categories for a specific budget."""
if not budget_id or budget_id == "budgets":
_LOGGER.error("Invalid budget_id before categories API call: %s", budget_id)
return {}

url = f"{self.BASE_URL}/budgets/{budget_id}/categories"
_LOGGER.debug("Fetching categories from URL: %s", url)
return await self._get(url)

async def get_monthly_summary(self, budget_id: str, current_month: str):
"""Fetch the latest monthly summary for a specific budget."""
if not budget_id or budget_id == "budgets":
_LOGGER.error("Invalid budget_id before months API call: %s", budget_id)
return {}

# Form the URL to query the specific month
url = f"{self.BASE_URL}/budgets/{budget_id}/months/{current_month}" # Include the full date with day
_LOGGER.debug(f"Requesting URL: {url}") # Log the full URL being requested

# Fetch the data - let 429 errors propagate to coordinator
response = await self._get(url)

# Log the response data
_LOGGER.debug(f"Response for {url}: {response}")

# If response contains data, return it
if response:
return response
else:
_LOGGER.warning(f"No data found for {current_month} in budget: {budget_id}")
return {}

async def get_transactions(self, budget_id: str):
"""Fetch recent transactions for a specific budget."""
if not budget_id or budget_id == "budgets":
_LOGGER.error("Invalid budget_id before transactions API call: %s", budget_id)
return {}

url = f"{self.BASE_URL}/budgets/{budget_id}/transactions"
_LOGGER.debug("Fetching transactions from URL: %s", url)
return await self._get(url)

async def _get(self, url: str):
"""Generic GET request handler."""
# Track this request
self._track_request()


async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
# If the response is successful, return the data
data = await response.json()
return data.get("data", {})
else:
# Log if the request fails
_LOGGER.error("YNAB API error: %s - URL: %s", response.status, url)

# Raise exception for 429 errors so coordinator can catch them
if response.status == 429:
raise Exception(f"429 - Too Many Requests: {url}")

return {}
Loading