Skip to content

Commit

Permalink
#61 Added support for Enel Distribuzione (Italian grid authority) (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickvorgers authored Oct 19, 2024
1 parent 6827fa8 commit 444808e
Show file tree
Hide file tree
Showing 4 changed files with 3,286 additions and 0 deletions.
357 changes: 357 additions & 0 deletions Datasources/Enel Distribuzione/EnelDistribuzioneDataPrepare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
import datetime
import glob
import json
import math
import os
import sys
from collections import namedtuple
from typing import List

import pandas as pd

# DataFilter named tuple definition
# column: The name of the column on which the filter should be applied
# value: The value on which should be filtered (regular expressions can be used)
# equal: Boolean value indicating whether the filter should be inclusive or exclusive (True/False)
DataFilter = namedtuple("DataFilter", ["column", "value", "equal"])

# OutputFileDefinition named tuple definition
# outputFileName: The name of the output file
# valueColumnName: The name of the column holding the value
# dataFilters: A list of datafilters (see above the definition of a datafilter)
# recalculate: Boolean value indication whether the data should be recalculated,
# because the source is not an increasing value
OutputFileDefinition = namedtuple(
"OutputFileDefinition",
["outputFileName", "valueColumnName", "dataFilters", "recalculate"],
)

# ---------------------------------------------------------------------------------------------------------------------
# TEMPLATE SETUP
# ---------------------------------------------------------------------------------------------------------------------

# Name of the energy provider
energyProviderName = "Enel Distribuzione"

# Inputfile(s): filename extension
inputFileNameExtension = ".csv"
# Inputfile(s): Name of the column containing the date of the reading.
# Use this in case date and time is combined in one field.
inputFileDateColumnName = "Giorno"
# Inputfile(s): Name of the column containing the time of the reading.
# Leave empty in case date and time is combined in one field.
inputFileTimeColumnName = "_Time"
# Inputfile(s): Date/time format used in the datacolumn.
# Combine the format of the date and time in case date and time are two seperate fields.
inputFileDateTimeColumnFormat = "%d/%m/%Y %H:%M"
# Inputfile(s): Data seperator being used in the .csv input file
inputFileDataSeperator = ";"
# Inputfile(s): Decimal token being used in the input file
inputFileDataDecimal = ","
# Inputfile(s): Number of header rows in the input file
inputFileNumHeaderRows = 0
# Inputfile(s): Number of footer rows in the input file
inputFileNumFooterRows = 0
# Inputfile(s): Json path of the records (only needed for json files)
# Example: inputFileJsonPath: List[str] = ['energy', 'values']
inputFileJsonPath: List[str] = []
# Inputfile(s): Name or index of the excel sheet (only needed for excel files containing more sheets,
# leave at 0 for the first sheet)
inputFileExcelSheetName = 0

# Name used for the temporary date/time field.
# This needs normally no change only when it conflicts with existing columns.
dateTimeColumnName = "_DateTime"

# List of one or more output file definitions
outputFiles = [
OutputFileDefinition(
"elec_feed_in_tariff_1_high_resolution.csv",
"_Value",
[],
True,
),
]


# Prepare the input data (before date/time manipulation)
def customPrepareDataPre(dataFrame: pd.DataFrame) -> pd.DataFrame:
df_clean = dataFrame.copy()

# Extract the 'from' part of each column header (before the hyphen)
# If the column doesn't contain a hyphen, leave it as is
df_clean.columns = [
time.split("-")[0] if "-" in time else time for time in df_clean.columns
]

# Melt the DataFrame to create 'Date', 'Time', and 'Value' columns
df_melted = pd.melt(
df_clean, id_vars=["Giorno"], var_name="_Time", value_name="_Value"
)

return df_melted


# Prepare the input data (after date/time manipulation)
def customPrepareDataPost(dataFrame: pd.DataFrame) -> pd.DataFrame:
# Default no manipulation, add code if needed
return dataFrame


# ---------------------------------------------------------------------------------------------------------------------

# Template version number
versionNumber = "1.5.0"


# Prepare the input data
def prepareData(dataFrame: pd.DataFrame) -> pd.DataFrame:
print("Preparing data")

# Handle any custom dataframe manipulation (Pre)
dataFrame = customPrepareDataPre(dataFrame)

# Check if we have to combine a date and time field
if inputFileTimeColumnName != "":
# Take note that the format is changed in case the column was parsed as date.
# For excel change the type of the cell to text or adjust the format accordingly,
# use statement print(dataFrame) to get information about the used format.
dataFrame[dateTimeColumnName] = pd.to_datetime(
dataFrame[inputFileDateColumnName].astype(str)
+ " "
+ dataFrame[inputFileTimeColumnName].astype(str),
format=inputFileDateTimeColumnFormat,
utc=True,
)
else:
dataFrame[dateTimeColumnName] = pd.to_datetime(
dataFrame[inputFileDateColumnName],
format=inputFileDateTimeColumnFormat,
utc=True,
)
# Remove the timezone (if it exists)
dataFrame[dateTimeColumnName] = dataFrame[dateTimeColumnName].dt.tz_localize(None)

# Select only correct dates
df = dataFrame.loc[
(
dataFrame[dateTimeColumnName]
>= datetime.datetime.strptime("01-01-1970", "%d-%m-%Y")
)
& (
dataFrame[dateTimeColumnName]
<= datetime.datetime.strptime("31-12-2099", "%d-%m-%Y")
)
]

# Make sure that the data is correctly sorted
df.sort_values(by=dateTimeColumnName, ascending=True, inplace=True)

# Transform the date into unix timestamp for Home-Assistant
df[dateTimeColumnName] = (
df[dateTimeColumnName].astype("int64") / 1000000000
).astype("int64")

# Handle any custom dataframe manipulation (Post)
df = customPrepareDataPost(df)

return df


# Filter the data based on the provided dataFilter(s)
def filterData(dataFrame: pd.DataFrame, filters: List[DataFilter]) -> pd.DataFrame:
df = dataFrame
# Iterate all the provided filters
for dataFilter in filters:
# Determine the subset based on the provided filter (regular expression)
series = (
df[dataFilter.column].astype(str).str.contains(dataFilter.value, regex=True)
)

# Validate whether the data is included or excluded
if not dataFilter.equal:
series = ~series

df = df[series]

return df


# Recalculate the data so that the value increases
def recalculateData(dataFrame: pd.DataFrame, dataColumnName: str) -> pd.DataFrame:
df = dataFrame

# Make the value column increasing (skip first row)
previousRowIndex = -1
for index, _ in df.iterrows():
# Check if the current row contains a valid value
if math.isnan(df.at[index, dataColumnName]):
df.at[index, dataColumnName] = 0.0

if previousRowIndex > -1:
# Add the value of the previous row to the current row
df.at[index, dataColumnName] = round(
df.at[index, dataColumnName] + df.at[previousRowIndex, dataColumnName],
3,
)
previousRowIndex = index

return df


# Generate the datafile which can be imported
def generateImportDataFile(
dataFrame: pd.DataFrame,
outputFile: str,
dataColumnName: str,
filters: list[DataFilter],
recalculate: bool,
):
# Check if the column exists
if dataColumnName in dataFrame.columns:
print("Creating file: " + outputFile)
dataFrameFiltered = filterData(dataFrame, filters)

# Check if we have to recalculate the data
if recalculate:
dataFrameFiltered = recalculateData(dataFrameFiltered, dataColumnName)

# Select only the needed data
dataFrameFiltered = dataFrameFiltered.filter(
[dateTimeColumnName, dataColumnName]
)

# Create the output file
dataFrameFiltered.to_csv(
outputFile, sep=",", decimal=".", header=False, index=False
)
else:
print(
"Could not create file: "
+ outputFile
+ " because column: "
+ dataColumnName
+ " does not exist"
)


# Read the inputfile
def readInputFile(inputFileName: str) -> pd.DataFrame:
# Read the specified file
print("Loading data: " + inputFileName)

# Check if we have a supported extension
if inputFileNameExtension == ".csv":
# Read the CSV file
df = pd.read_csv(
inputFileName,
sep=inputFileDataSeperator,
decimal=inputFileDataDecimal,
skiprows=inputFileNumHeaderRows,
skipfooter=inputFileNumFooterRows,
index_col=False,
engine="python",
)
elif (inputFileNameExtension == ".xlsx") or (inputFileNameExtension == ".xls"):
# Read the XLSX/XLS file
df = pd.read_excel(
inputFileName,
sheet_name=inputFileExcelSheetName,
decimal=inputFileDataDecimal,
skiprows=inputFileNumHeaderRows,
skipfooter=inputFileNumFooterRows,
)
elif inputFileNameExtension == ".json":
# Read the JSON file
jsonData = json.load(open(inputFileName))
df = pd.json_normalize(jsonData, record_path=inputFileJsonPath)
else:
raise Exception("Unsupported extension: " + inputFileNameExtension)

return df


# Check if all the provided files have the correct extension
def correctFileExtensions(fileNames: list[str]) -> bool:
# Check all filenames for the right extension
for fileName in fileNames:
_, fileNameExtension = os.path.splitext(fileName)
if fileNameExtension != inputFileNameExtension:
return False
return True


# Generate the datafiles which can be imported
def generateImportDataFiles(inputFileNames: str):
# Find the file(s)
fileNames = glob.glob(inputFileNames)
if len(fileNames) > 0:
print("Found files based on: " + inputFileNames)

# Check if all the found files are of the correct type
if correctFileExtensions(fileNames):
# Read all the found files and concat the data
dataFrame = pd.concat(
map(readInputFile, fileNames), ignore_index=True, sort=True
)

# Prepare the data
dataFrame = prepareData(dataFrame)

# Create the output files
for outputFile in outputFiles:
generateImportDataFile(
dataFrame,
outputFile.outputFileName,
outputFile.valueColumnName,
outputFile.dataFilters,
outputFile.recalculate,
)

print("Done")
else:
print("Only " + inputFileNameExtension + " datafiles are allowed")
else:
print("No files found based on : " + inputFileNames)


# Validate that the script is started from the command prompt
if __name__ == "__main__":
print(energyProviderName + " Data Prepare")
print("")
print(
"This python script prepares "
+ energyProviderName
+ " data for import into Home Assistant."
)
print(
"The files will be prepared in the current directory any previous files will be overwritten!"
)
print("")
if len(sys.argv) == 2:
if (
input("Are you sure you want to continue [Y/N]?: ").lower().strip()[:1]
== "y"
):
generateImportDataFiles(sys.argv[1])
else:
print(energyProviderName + "PrepareData usage:")
print(
energyProviderName
+ "PrepareData <"
+ energyProviderName
+ " "
+ inputFileNameExtension
+ " filename (wildcard)>"
)
print()
print(
"Enclose the path/filename in quotes in case wildcards are being used on Linux based systems."
)
print(
"Example: "
+ energyProviderName
+ 'PrepareData "*'
+ inputFileNameExtension
+ '"'
)
18 changes: 18 additions & 0 deletions Datasources/Enel Distribuzione/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Energy provider: Enel Distribuzione

Enel Distribuzione, part of the Italian grid authority, allows users to export their electricity consumption data, which can be processed and imported into Home Assistant.

**Data provided**
- Electricity consumption - Tariff 1 - High resolution (15-minute interval) - kWh

**Tooling needed**
- Python 3
- Pandas python library ```pip install pandas```

**How-to**
- Export your electricity consumption data from the Enel Distribuzione website (Italian grid authority).
- Download the ```EnelDistribuzioneDataPrepare.py``` script and place it in the same directory as the exported Enel Distribuzione data.
- Execute the Python script by providing the name of the exported file as a parameter. Example:
```python EnelDistribuzioneDataPrepare.py ExportData_*.csv```.
The script will generate the necessary files for importing the data into Home Assistant.
- Follow the steps in the overall Home Assistant import guide for integrating the data into your setup.
Loading

0 comments on commit 444808e

Please sign in to comment.