Skip to content

Commit

Permalink
release 1.4.6 (#395)
Browse files Browse the repository at this point in the history
## [1.4.6](https://github.com/quintoandar/butterfree/releases/tag/1.4.6)

### Fixed
* [MLOP-2519] avoid configuring logger at lib level
([#393](#393))
* Rollback to latest stable release
([#391](#391))

[MLOP-2519]:
https://quintoandar.atlassian.net/browse/MLOP-2519?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Mayara Moromisato <44944954+moromimay@users.noreply.github.com>
Co-authored-by: hmeretti <hmeretti@gmail.com>
Co-authored-by: Henrique Camargo <henriquecamargo@spf1lt-pj000560.ldap.quintoandar.com.br>
Co-authored-by: AlvaroMarquesAndrade <45604858+AlvaroMarquesAndrade@users.noreply.github.com>
Co-authored-by: Igor Gustavo Hoelscher <19557581+roelschr@users.noreply.github.com>
Co-authored-by: Mayara Moromisato <may.alveslima@gmail.com>
Co-authored-by: Felipe Victorino Caputo <13631451+fvcaputo@users.noreply.github.com>
Co-authored-by: Rodrigo Martins de Oliveira <allrod5@users.noreply.github.com>
Co-authored-by: Gabriel Brandão <37742275+GaBrandao@users.noreply.github.com>
Co-authored-by: Jay Vala <24193355+jdvala@users.noreply.github.com>
Co-authored-by: Lucas Fonseca <lucas.fonseca@quintoandar.com.br>
Co-authored-by: Lucas Cardozo <lucasecardozo@gmail.com>
Co-authored-by: Ralph Rassweiler <ralphrass@gmail.com>
Co-authored-by: João Albuquerque <albjoaov@gmail.com>
Co-authored-by: Fernando Barbosa <fernando.cardoso@quintoandar.com.br>
Co-authored-by: João Albuquerque <joao.albuquerque@quintoandar.com.br>
Co-authored-by: Lucas Cardozo <8867239+lecardozo@users.noreply.github.com>
  • Loading branch information
18 people authored Jan 7, 2025
1 parent 583b647 commit 0e32e65
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 169 deletions.
30 changes: 0 additions & 30 deletions .checklist.yaml

This file was deleted.

17 changes: 0 additions & 17 deletions .github/workflows/skip_lint.yml

This file was deleted.

6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ All notable changes to this project will be documented in this file.

Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each release or unreleased log for a better organization.

## [Unreleased]
## [1.4.6](https://github.com/quintoandar/butterfree/releases/tag/1.4.6)

### Fixed
* [MLOP-2519] avoid configuring logger at lib level ([#393](https://github.com/quintoandar/butterfree/pull/393))
* Rollback to latest stable release ([#391](https://github.com/quintoandar/butterfree/pull/391))

## [1.4.5](https://github.com/quintoandar/butterfree/releases/tag/1.4.5)
* Rollback repartitions ([#386](https://github.com/quintoandar/butterfree/pull/386))
Expand Down
18 changes: 4 additions & 14 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import datetime
import importlib
import inspect
import logging
import os
import pkgutil
import sys
from typing import Set, Type
from typing import Set

import boto3
import setuptools
import typer
from botocore.exceptions import ClientError

from butterfree.configs import environment
from butterfree.configs.logger import __logger
from butterfree.migrations.database_migration import ALLOWED_DATABASE
from butterfree.pipelines import FeatureSetPipeline

app = typer.Typer(
help="Apply the automatic migrations in a database.", no_args_is_help=True
)

logger = __logger("migrate", True)
logger = logging.getLogger(__name__)


def __find_modules(path: str) -> Set[str]:
Expand Down Expand Up @@ -90,18 +90,8 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:

instances.add(value)

def create_instance(cls: Type[FeatureSetPipeline]) -> FeatureSetPipeline:
sig = inspect.signature(cls.__init__)
parameters = sig.parameters

if "run_date" in parameters:
run_date = datetime.datetime.today().strftime("%Y-%m-%d")
return cls(run_date)

return cls()

logger.info("Creating instances...")
return set(create_instance(value) for value in instances) # type: ignore
return set(value() for value in instances) # type: ignore


PATH = typer.Argument(
Expand Down
6 changes: 6 additions & 0 deletions butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""CassandraClient entity."""

import logging
from ssl import CERT_REQUIRED, PROTOCOL_TLSv1
from typing import Dict, List, Optional, Union

Expand All @@ -23,6 +24,11 @@
EMPTY_STRING_HOST_ERROR = "The value of Cassandra host is empty. Please fill correctly with your endpoints" # noqa: E501
GENERIC_INVALID_HOST_ERROR = "The Cassandra host must be a valid string, a string that represents a list or list of strings" # noqa: E501

logger = logging.getLogger(__name__)

EMPTY_STRING_HOST_ERROR = "The value of Cassandra host is empty. Please fill correctly with your endpoints" # noqa: E501
GENERIC_INVALID_HOST_ERROR = "The Cassandra host must be a valid string, a string that represents a list or list of strings" # noqa: E501


class CassandraColumn(TypedDict):
"""Type for cassandra columns.
Expand Down
14 changes: 4 additions & 10 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import List, Optional

from pyspark.sql import DataFrame
from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.extract.readers.reader import Reader
Expand Down Expand Up @@ -96,21 +95,16 @@ def construct(
DataFrame with the query result against all readers.
"""
# Step 1: Build temporary views for each reader
for reader in self.readers:
reader.build(client=client, start_date=start_date, end_date=end_date)
reader.build(
client=client, start_date=start_date, end_date=end_date
) # create temporary views for each reader

# Step 2: Execute SQL query on the combined readers
dataframe = client.sql(self.query)

# Step 3: Cache the dataframe if necessary, using memory and disk storage
if not dataframe.isStreaming and self.eager_evaluation:
# Persist to ensure the DataFrame is stored in mem and disk (if necessary)
dataframe.persist(StorageLevel.MEMORY_AND_DISK)
# Trigger the cache/persist operation by performing an action
dataframe.count()
dataframe.cache().count()

# Step 4: Run post-processing hooks on the dataframe
post_hook_df = self.run_post_hooks(dataframe)

return post_hook_df
5 changes: 3 additions & 2 deletions butterfree/load/writers/delta_writer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging

from delta.tables import DeltaTable
from pyspark.sql.dataframe import DataFrame

from butterfree.clients import SparkClient
from butterfree.configs.logger import __logger

logger = __logger("delta_writer", True)
logger = logging.getLogger(__name__)


class DeltaWriter:
Expand Down
20 changes: 2 additions & 18 deletions butterfree/migrations/database_migration/cassandra_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ def _get_alter_table_add_query(self, columns: List[Diff], table_name: str) -> st
def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str:
"""Creates CQL statement to alter columns' types.
In Cassandra 3.4.x to 3.11.x alter type is not allowed.
This method creates a temp column to comply.
Args:
columns: list of Diff objects with ALTER_TYPE kind.
table_name: table name.
Expand All @@ -89,23 +86,10 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str:
Alter column type query.
"""
temp_column_name = f"{column.column}_temp"

add_temp_column_query = (
f"ALTER TABLE {table_name} ADD {temp_column_name} {column.value};"
)
copy_data_to_temp_query = (
f"UPDATE {table_name} SET {temp_column_name} = {column.column};"
)

drop_old_column_query = f"ALTER TABLE {table_name} DROP {column.column};"
rename_temp_column_query = (
f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};"
)
parsed_columns = self._get_parsed_columns([column])

return (
f"{add_temp_column_query} {copy_data_to_temp_query} "
f"{drop_old_column_query} {rename_temp_column_query};"
f"ALTER TABLE {table_name} ALTER {parsed_columns.replace(' ', ' TYPE ')};"
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""Migration entity."""

import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Set

from butterfree.clients import AbstractClient
from butterfree.configs.logger import __logger
from butterfree.load.writers.writer import Writer
from butterfree.transform import FeatureSet

logger = __logger("database_migrate", True)
logger = logging.getLogger(__name__)


@dataclass
Expand Down
31 changes: 8 additions & 23 deletions butterfree/pipelines/feature_set_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from typing import List, Optional

from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.dataframe_service import repartition_sort_df
from butterfree.extract import Source
Expand Down Expand Up @@ -211,48 +209,35 @@ def run(
soon. Use only if strictly necessary.
"""
# Step 1: Construct input dataframe from the source.
dataframe = self.source.construct(
client=self.spark_client,
start_date=self.feature_set.define_start_date(start_date),
end_date=end_date,
)

# Step 2: Repartition and sort if required, avoid if not necessary.
if partition_by:
order_by = order_by or partition_by
current_partitions = dataframe.rdd.getNumPartitions()
optimal_partitions = num_processors or current_partitions
if current_partitions != optimal_partitions:
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

# Step 3: Construct the feature set dataframe using defined transformations.
transformed_dataframe = self.feature_set.construct(
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

dataframe = self.feature_set.construct(
dataframe=dataframe,
client=self.spark_client,
start_date=start_date,
end_date=end_date,
num_processors=num_processors,
)

if transformed_dataframe.storageLevel != StorageLevel(
False, False, False, False, 1
):
dataframe.unpersist() # Clear the data from the cache (disk and memory)

# Step 4: Load the data into the configured sink.
self.sink.flush(
dataframe=transformed_dataframe,
dataframe=dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)

# Step 5: Validate the output if not streaming and data volume is reasonable.
if not transformed_dataframe.isStreaming:
if not dataframe.isStreaming:
self.sink.validate(
dataframe=transformed_dataframe,
dataframe=dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)
Expand Down
Loading

0 comments on commit 0e32e65

Please sign in to comment.