Skip to content

Commit

Permalink
data cleaning pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
priyaananthasankar committed May 25, 2019
1 parent 0a5b2a1 commit 00bb265
Show file tree
Hide file tree
Showing 28 changed files with 1,221 additions and 57 deletions.
35 changes: 35 additions & 0 deletions CleanTrigger2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
import json
import azure.functions as func
from . import clean as cleaning_service

def main(req: func.HttpRequest) -> func.HttpResponse:
req_body = req.get_json()

if is_validation_event(req_body):
return func.HttpResponse(validate_eg(req_body))

elif is_blob_created_event(req_body):
result = cleaning_service.clean(req_body)

if result is "Success":
return func.HttpResponse("Successfully cleaned data",status_code=200)
else:
return func.HttpResponse("Bad Request", status_code=400)

else: # don't care about other events
pass

# Check for validation event from event grid
def is_validation_event(req_body):
return req_body and req_body[0] and req_body[0]['eventType'] and req_body[0]['eventType'] == "Microsoft.EventGrid.SubscriptionValidationEvent"

# If blob created event, then true
def is_blob_created_event(req_body):
return req_body and req_body[0] and req_body[0]['eventType'] and req_body[0]['eventType'] == "Microsoft.Storage.BlobCreated"

# Respond to event grid webhook validation event
def validate_eg(req_body):
result = {}
result['validationResponse'] = req_body[0]['data']['validationCode']
return json.dumps(result)
37 changes: 37 additions & 0 deletions CleanTrigger2/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from io import StringIO

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
out_blob_container_name = os.getenv("C2")

def clean(req_body):
blob_obj,filename = extract_blob_props(req_body[0]['data']['url'])
df = pd.read_csv(StringIO(blob_obj.content))
result = clean_blob(df, filename)
return result

def extract_blob_props(url):
blob_file_name = url.rsplit('/',1)[-1]
in_container_name = url.rsplit('/',2)[-2]
readblob = block_blob_service.get_blob_to_text(in_container_name,blob_file_name)
return readblob, blob_file_name

def clean_blob(df, blob_file_name):
# group by names and item and sum the sales and units
df1 = df.groupby(["names","item"],as_index=False)[["units","price"]].sum().reset_index()

# pick one region based on request
df2 = df1[df1["item"] == 'binder']
outcsv = df2.to_csv(index=False)

cleaned_blob_file_name = "cleaned_" +blob_file_name
block_blob_service.create_blob_from_text(out_blob_container_name, cleaned_blob_file_name, outcsv)
return "Success"

20 changes: 20 additions & 0 deletions CleanTrigger2/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
3 changes: 3 additions & 0 deletions CleanTrigger2/sample.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "Azure"
}
88 changes: 31 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,57 +1,31 @@
# Project Name

(short, 1-3 sentenced, description of the project)

## Features

This project framework provides the following features:

* Feature 1
* Feature 2
* ...

## Getting Started

### Prerequisites

(ideally very short, if any)

- OS
- Library version
- ...

### Installation

(ideally very short)

- npm install [package name]
- mvn install
- ...

### Quickstart
(Add steps to get up and running quickly)

1. git clone [repository clone url]
2. cd [respository name]
3. ...


## Demo

A demo app is included to show how to use the project.

To run the demo, follow these steps:

(Add steps to start up the demo)

1.
2.
3.

## Resources

(Any additional resources or related projects)

- Link to supporting information
- Link to similar sample
- ...
# Blob - Event Grid Subscription - Python Function Pattern

This sample demonstrates a common pattern of blob to event grid to python functions architecture. This architecture represents a common Machine learning pipeline where
we see raw data uploaded to a blob which fires an event into Azure Event Grid, that is connected to a Python function, which uses pandas to clean data and other libraries to preprocess data for ML training/inference purposes.

# Azure Resources

- Azure Function V2 on Linux Consumption Plan with two HttpTriggers
- Azure Storage V2 with two containers, raw and clean
- Azure Event Grid Subscription from Blob Storage with advanced subject filters

# Steps to get this sample working in your subscription

## Prerequisites
- Install Azure CLI Latest
- Install Functions Core Tools and other prerequisites for functions to run

## Steps (Note: TODO, convert steps into a script)
- az login
- Provide names for params in azuredeploy.parameters.json. If the name is not unique deployments can fail (TODO: autogenerate names to avoid conflicts)
- Create Resource group in Azure
- az group deployment create -g <Resource Group Name> --template-file azure-deploy-linux-app-plan.json
- Deploy your function app using func azure functionapp publish `functionappname` from previous step
- az group deployment create -g <Resource Group Name> --template-file azure-deploy-event-grid-subscription.json

# Test Sample

- Drop a txt file into "raw" container in your deployed Azure Storage
- Check the "cleaned" container in your deployed Azure Storage for the same file with extension filename_cleaned.fileextension


18 changes: 18 additions & 0 deletions Reconcile/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging
import json
import azure.functions as func
from . import clean as cleaning_service

def main(req: func.HttpRequest) -> func.HttpResponse:
# This will output to postman
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = req.get_json()
f1_url = req_body.get('file_1_url')
f2_url = req_body.get('file_2_url')
batch_id = req_body.get('batchId')
except:
return func.HttpResponse("Bad Request", status_code=400)

result = cleaning_service.clean(f1_url,f2_url,batch_id)
return func.HttpResponse(result,status_code=200)
42 changes: 42 additions & 0 deletions Reconcile/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from io import StringIO
from . import fetch_blob as fetching_service

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
out_blob_container_name = os.getenv("FINAL")

# Clean blob flow from event grid events
# This function will call all the other functions in clean.py

def clean(file_1_url,file_2_url,batch_id):
f1_container = file_1_url.rsplit('/', 2)[-2]
f2_container = file_2_url.rsplit('/', 2)[-2]
f2_df, f1_df = fetch_blobs(batch_id,f2_container,f1_container)
result = final_reconciliation(f2_df, f1_df,batch_id)
return 'Success'

def fetch_blobs(batch_id,file_2_container_name,file_1_container_name):
# Create container & blob dictionary with helper function
blob_dict = fetching_service.blob_to_dict(batch_id,file_2_container_name,file_1_container_name)

# Create F1 DF
filter_string = 'c1'
f1_df = fetching_service.blob_dict_to_df(blob_dict, filter_string)

# Create F2 df
filter_string = 'c2'
f2_df = fetching_service.blob_dict_to_df(blob_dict, filter_string)
return f2_df, f1_df

def final_reconciliation(f2_df, f1_df,batch_id):
outcsv = f2_df.to_csv(index=False)
cleaned_blob_file_name = "reconciled_" + batch_id
block_blob_service.create_blob_from_text(out_blob_container_name, cleaned_blob_file_name, outcsv)
return "Success"
48 changes: 48 additions & 0 deletions Reconcile/fetch_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
import os
import collections
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from io import StringIO
#kill $(lsof -t -i :7071)

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)

def blob_dict_to_df(my_ordered_dict, filter_string):
logging.warning('blob_dict_to_df')
filtered_dict = {k:v for k,v in my_ordered_dict.items() if filter_string in k}
logging.warning(filtered_dict)
container_key = list(filtered_dict.keys())[0]
latest_file = list(filtered_dict.values())[0]
blobstring = block_blob_service.get_blob_to_text(container_key, latest_file).content
df = pd.read_csv(StringIO(blobstring),dtype=str)
return df

def blob_to_dict(batchId,*args):
# add containers to list
container_list = []
arg_len = (len(args))
i = 0
for i in range(arg_len):
container_list.append(args[i])
''.join([str(i) for i in container_list])
logging.info(container_list)
# get blob file names from container... azure SDK returns a generator object
ii = 0
file_names = []
for container in container_list:
logging.warning('FOR LOOP')
generator = block_blob_service.list_blobs(container)
logging.warning(list(generator))
for file in generator:
if "cleaned" in file.name:
file_names.append(file.name)
ii = ii+1
# Merge the two lists to create a dictionary
container_file_dict = collections.OrderedDict()
container_file_dict = dict(zip(container_list,file_names))
return container_file_dict
20 changes: 20 additions & 0 deletions Reconcile/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
3 changes: 3 additions & 0 deletions Reconcile/sample.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "Azure"
}
63 changes: 63 additions & 0 deletions azure-deploy-event-grid-subscription.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"eventSubName1": {
"type": "string",
"defaultValue": "subToStorage1",
"metadata": {
"description": "Provide a name for the Event Grid subscription."
}
},
"eventSubName2": {
"type": "string",
"defaultValue": "subToStorage2",
"metadata": {
"description": "Provide a name for the Event Grid subscription."
}
},
"endpoint1": {
"type": "string",
"metadata": {
"description": "Provide the URL for the WebHook to receive events. Create your own endpoint for events."
}
},
"storageName": {
"type": "string",
"defaultValue": "203014767teststorage",
"metadata": {
"description": "Provide a name for the Event Grid subscription."
}
}
},
"resources": [
{
"type": "Microsoft.Storage/storageAccounts/providers/eventSubscriptions",
"name": "[concat(parameters('storageName'), '/Microsoft.EventGrid/', parameters('eventSubName1'))]",
"apiVersion": "2018-01-01",
"properties": {
"destination": {
"endpointType": "WebHook",
"properties": {
"endpointUrl": "[parameters('endpoint1')]"
}
},
"filter": {
"subjectBeginsWith": "",
"subjectEndsWith": "",
"isSubjectCaseSensitive": false,
"includedEventTypes": [
"All"
],
"advancedFilters": [
{
"operatorType": "StringContains",
"key": "Subject",
"values": ["raw"]
}
]
}
}
}
]
}
Loading

0 comments on commit 00bb265

Please sign in to comment.