Skip to content

Commit

Permalink
Additional filters added to most functions
Browse files Browse the repository at this point in the history
Also upped version number and added parameters to docstrings
  • Loading branch information
JMaynor committed Aug 9, 2024
1 parent 12975b6 commit 3a9f261
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 14 deletions.
94 changes: 84 additions & 10 deletions clope/snow/dimensions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pandas

from clope.snow.connection_handling import _get_snowflake_connection


Expand Down Expand Up @@ -55,13 +54,20 @@ def get_branches() -> pandas.DataFrame:
return df


def get_routes() -> pandas.DataFrame:
def get_routes(branch: int = None) -> pandas.DataFrame:
"""
Get list of routes.
:param branch: Filter by branch key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMROUTE_V"
conditions = []
if branch:
conditions.append(f"BRANCHKEY = {branch}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()
Expand All @@ -72,18 +78,23 @@ def get_routes() -> pandas.DataFrame:
return df


def get_customers(current: bool = False) -> pandas.DataFrame:
def get_customers(current: bool = False, branch: int = None) -> pandas.DataFrame:
"""
Get list of customers.
Implements SCD Type 2, so use current=True to get only current rows of
information and filter out historical data.
:param current: Filter by current row indicator
:param branch: Filter by branch key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMCUSTOMER_V"
conditions = []
if current:
conditions.append("CURRENTROWINDICATOR = 'Current'")
if branch:
conditions.append(f"BRANCHKEY = {branch}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
Expand All @@ -96,18 +107,23 @@ def get_customers(current: bool = False) -> pandas.DataFrame:
return df


def get_locations(current: bool = False) -> pandas.DataFrame:
def get_locations(current: bool = False, customer: int = None) -> pandas.DataFrame:
"""
Get list of locations.
Implements SCD Type 2, so use current=True to get only current rows of
information and filter out historical data.
:param current: Filter by current row indicator
:param customer: Filter by customer key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMLOCATION_V"
conditions = []
if current:
conditions.append("CURRENTROWINDICATOR = 'Current'")
if customer:
conditions.append(f"CUSTOMERKEY = {customer}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
Expand All @@ -120,18 +136,28 @@ def get_locations(current: bool = False) -> pandas.DataFrame:
return df


def get_machines(current: bool = False) -> pandas.DataFrame:
def get_machines(
current: bool = False, location: int = None, route: int = None
) -> pandas.DataFrame:
"""
Get list of machines.
Implements SCD Type 2, so use current=True to get only current rows of
information and filter out historical data.
:param current: Filter by current row indicator
:param location: Filter by location key
:param route: Filter by route key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMMACHINE_V"
conditions = []
if current:
conditions.append("CURRENTROWINDICATOR = 'Current'")
if location:
conditions.append(f"LOCATIONKEY = {location}")
if route:
conditions.append(f"ROUTEKEY = {route}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
Expand All @@ -144,19 +170,31 @@ def get_machines(current: bool = False) -> pandas.DataFrame:
return df


def get_coils(current: bool = False) -> pandas.DataFrame:
def get_coils(
current: bool = False,
machine: int = None,
item: int = None,
) -> pandas.DataFrame:
"""
Get list of coils. I.E. every coil in every machine planogram.
Quite a lot of data, but tells you which product is where.
Implements SCD Type 2, so use current=True to get only current rows of
information and filter out historical data.
:param current: Filter by current row indicator
:param machine: Filter by machine key
:param item: Filter by item key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMMACHINEPLANOGRAMCOIL_V"
conditions = []
if current:
conditions.append("CURRENTROWINDICATOR = 'Current'")
if machine:
conditions.append(f"MACHINEKEY = {machine}")
if item:
conditions.append(f"ITEMKEY = {item}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
Expand All @@ -169,18 +207,30 @@ def get_coils(current: bool = False) -> pandas.DataFrame:
return df


def get_micromarkets(current: bool = False) -> pandas.DataFrame:
def get_micromarkets(
current: bool = False,
location: int = None,
route: int = None,
) -> pandas.DataFrame:
"""
Get list of micromarkets.
Implements SCD Type 2, so use current=True to get only current rows of
information and filter out historical data.
:param current: Filter by current row indicator
:param location: Filter by location key
:param route: Filter by route key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMMICROMARKET_V"
conditions = []
if current:
conditions.append("CURRENTROWINDICATOR = 'Current'")
if location:
conditions.append(f"LOCATIONKEY = {location}")
if route:
conditions.append(f"ROUTEKEY = {route}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
Expand Down Expand Up @@ -234,13 +284,20 @@ def get_items(current: bool = False) -> pandas.DataFrame:
return df


def get_item_packs() -> pandas.DataFrame:
def get_item_packs(item: int = None) -> pandas.DataFrame:
"""
Get list of item packs.
:param item: Filter by item key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMITEMPACK_V"
conditions = []
if item:
conditions.append(f"ITEMKEY = {item}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()
Expand All @@ -251,13 +308,23 @@ def get_item_packs() -> pandas.DataFrame:
return df


def get_item_pack_barcodes() -> pandas.DataFrame:
def get_item_pack_barcodes(item: int = None, item_pack: int = None) -> pandas.DataFrame:
"""
Get list of item pack barcodes.
:param item: Filter by item key
:param item_pack: Filter by item pack key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMITEMPACKBARCODE_V"
conditions = []
if item:
conditions.append(f"ITEMKEY = {item}")
if item_pack:
conditions.append(f"ITEMPACKKEY = {item_pack}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()
Expand Down Expand Up @@ -320,13 +387,20 @@ def get_supplier_items() -> pandas.DataFrame:
return df


def get_warehouses() -> pandas.DataFrame:
def get_warehouses(branch: int = None) -> pandas.DataFrame:
"""
Get list of warehouses.
:param branch: Filter by branch key
"""
conn = _get_snowflake_connection()
try:
query = f"SELECT * FROM DIMWAREHOUSE_V"
conditions = []
if branch:
conditions.append(f"BRANCHKEY = {branch}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
cur = conn.cursor()
cur.execute(query)
df = cur.fetch_pandas_all()
Expand Down
Loading

0 comments on commit 3a9f261

Please sign in to comment.