PyWren is an open source project whose goals are massively scaling the execution of Python code and its dependencies on serverless computing platforms and monitoring the results. PyWren delivers the user’s code into the serverless platform without requiring knowledge of how functions are invoked and run.
PyWren provides great value for the variety of uses cases, like processing data in object storage, running embarrassingly parallel compute jobs (e.g. Monte-Carlo simulations), enriching data with additional attributes and many more
This repository is based on PyWren main branch and adapted for IBM Cloud Functions and IBM Cloud Object Storage. PyWren for IBM Cloud is based on Docker images and we also extended PyWren to execute a reduce function, which now enables PyWren to run complete map reduce flows. In extending PyWren to work with IBM Cloud Object Storage, we also added a partition discovery component that allows PyWren to process large amounts of data stored in the IBM Cloud Object Storage. See changelog for more details.
This document describes the steps to use PyWren-IBM-Cloud over IBM Cloud Functions and IBM Cloud Object Storage (IBM COS)
IBM Academic Initiative is a special program that allows free trial of IBM Cloud for Academic institutions. This program is provided for students and faculty staff members, and allow up to 12 month of free usage. You can register your university email and get a free of charge account.
- Initial requirements
- PyWren setup
- Configuration
- Verify installation
- How to use PyWren for IBM Cloud
- Using PyWren to process data from IBM Cloud Object Storage
- PyWren on IBM Watson Studio and Jupyter notebooks
- Additional resources
- IBM Cloud Functions account, as described here. Make sure you can run end-to-end example with Python.
- IBM Cloud Object Storage account
- Python 3.5, Python 3.6 or Python 3.7
Install PyWren from the PyPi repository:
pip3 install pywren-ibm-cloud
Installation for developers can be found here.
Configure PyWren client with access details to your IBM Cloud Object Storage (COS) account, and with your IBM Cloud Functions account.
Access details to IBM Cloud Functions can be obtained here. Details on your IBM Cloud Object Storage account can be obtained from the "service credentials" page on the UI of your COS account. More details on "service credentials" can be obtained here.
There are two options to configure PyWren:
Copy the config/default_config.yaml.template
into ~/.pywren_config
Edit ~/.pywren_config
and configure the following entries:
pywren:
storage_bucket: <BUCKET_NAME>
ibm_cf:
# Region endpoint example: https://us-east.functions.cloud.ibm.com
endpoint : <REGION_ENDPOINT> # make sure to use https:// as prefix
namespace : <NAMESPACE>
api_key : <API_KEY>
ibm_cos:
# Region endpoint example: https://s3.us-east.cloud-object-storage.appdomain.cloud
endpoint : <REGION_ENDPOINT> # make sure to use https:// as prefix
# this is preferable authentication method for IBM COS
api_key : <API_KEY>
# alternatively you may use HMAC authentication method
# access_key : <ACCESS_KEY>
# secret_key : <SECRET_KEY>
You can choose different name for the config file or keep it into different folder. If this is the case make sure you configure system variable
PYWREN_CONFIG_FILE=<LOCATION OF THE CONFIG FILE>
This option allows you pass all the configuration details as part of the PyWren invocation in runtime. All you need is to configure a Python dictionary with keys and values, for example:
config = {'pywren' : {'storage_bucket' : 'BUCKET_NAME'},
'ibm_cf': {'endpoint': 'HOST',
'namespace': 'NAMESPACE',
'api_key': 'API_KEY'},
'ibm_cos': {'endpoint': 'REGION_ENDPOINT',
'api_key': 'API_KEY'}}
You can find more configuration keys here.
Using configuration file you can obtain PyWren executor with:
import pywren_ibm_cloud as pywren
pw = pywren.ibm_cf_executor()
Having a Python dictionary configuration allows you to provide it to the PyWren as follows:
import pywren_ibm_cloud as pywren
pw = pywren.ibm_cf_executor(config=config)
The runtime is the place where your functions will be executed. In IBM-PyWren, runtimes are based on docker images. It includes by default three different runtimes that allow to run the functions in Python 3.5, Python 3.6 and Python 3.7 environments.
Runtime name | Python version | Packages included |
---|---|---|
ibmfunctions/pywren:3.5 | 3.5 | |
ibmfunctions/action-python-v3.6 | 3.6 | list of packages |
ibmfunctions/action-python-v3.7 | 3.7 | list of packages |
IBM-PyWren automatically deploys the default runtime in the first execution, based on the Python version that you are using.
By default, it uses 256MB as runtime memory size. However, you can change it in the config
or when you obtain the executor, for example:
import pywren_ibm_cloud as pywren
pw = pywren.ibm_cf_executor(runtime_memory=128)
You can also build custom runtimes with libraries that your functions depends on. Check more information about runtimes here.
To test that all is working, use the command:
python -m pywren_ibm_cloud.tests
Notice that if you didn't set a local PyWren's config file, you need to provide it as a json file path by -c <CONFIG>
flag.
Alternatively, for debugging purposes, you can run specific tests by -f <TESTNAME>
. use --help
flag to get more information about the test script.
-
Single function execution example.
import pywren_ibm_cloud as pywren def my_function(x): return x + 7 pw = pywren.ibm_cf_executor() pw.call_async(my_function, 3) result = pw.get_result()
-
Multiple function execution (Map).
To run multiple functions in parallel, the executor contains a method called map() which applies a function to a list of data in the cloud. The map() method will launch one function for each entry of the list. To get the results of a map() call
get_result()
method. The results are returned within an ordered list, where each element of the list is the result of one invocation. For example, in the next code PyWren will launch one function for each value withiniterdata
:import pywren_ibm_cloud as pywren iterdata = [1, 2, 3, 4] def my_map_function(x): return x + 7 pw = pywren.ibm_cf_executor() pw.map(my_map_function, iterdata) result = pw.get_result()
and
result
will be:[8, 9, 10, 11]
-
Multiple function execution with reduce (map-reduce).
PyWren allows to run a reduce function over the results of the map. The
map_reduce()
method waits until it gets the results from all the map functions, and then launches the reduce function. By default the reduce method waits locally to get all the results. This approach does not consumes CPU time in Cloud Functions, but it has the tradeoff of greater data transfers because it has to download all the results and then upload them again for processing with the reduce function. After call themap_reduce()
, it is possible to get the result from it by calling theget_result()
method.import pywren_ibm_cloud as pywren iterdata = [1, 2, 3, 4] def my_map_function(x): return x + 7 def my_reduce_function(results): total = 0 for map_result in results: total = total + map_result return total pw = pywren.ibm_cf_executor() pw.map_reduce(my_map_function, iterdata, my_reduce_function) result = pw.get_result()
In this example the
result
will be38
By default the reducer waits locally for the results, and then launches the reduce() function in the cloud. You can change this behaviour and make the reducer waits remotely for the results by setting the
reducer_wait_local
parameter of the map_reduce() method toFalse
.pw.map_reduce(my_map_function, iterdata, my_reduce_function, reducer_wait_local=False)
PyWren for IBM Cloud functions has a built-in method for processing data objects from the IBM Cloud Object Storage.
We designed a partitioner within the map_reduce() method which is configurable by specifying the size of the chunk. The input to the partitioner may be either a list of data objects, a list of URLs or the entire bucket itself. The partitioner is activated inside PyWren and it responsible to split the objects into smaller chunks. It executes one my_map_function
for each object chunk and when all executions are completed, the partitioner executes the my_reduce_function
. The reduce function will wait for all the partial results before processing them.
In the parameters of the my_map_function
function you must specify a parameter called data_stream. This variable allows access to the data stream of the object.
map_reduce
method has different signatures as shown in the following examples
import pywren_ibm_cloud as pywren
iterdata = ['bucket1/object1', 'bucket1/object2', 'bucket1/object3']
def my_map_function(key, data_stream):
for line in data_stream:
# Do some process
return partial_intersting_data
def my_reduce_function(results):
for partial_intersting_data in results:
# Do some process
return final_result
chunk_size = 4*1024**2 # 4MB
pw = pywren.ibm_cf_executor()
pw.map_reduce(my_map_function, iterdata, my_reduce_function, chunk_size)
result = pw.get_result()
method | method signature |
---|---|
pw.map_reduce (my_map_function , iterdata , my_reduce_function , chunk_size ) |
iterdata contains list of objects in the format of bucket_name/object_name |
my_map_function (key , data_stream ) |
key is an entry from iterdata that is assigned to the invocation |
Commonly, a dataset may contains hundreds or thousands of files, so the previous approach where you have to specify each object one by one is not well suited in this case. With this new map_reduce() method you can specify, instead, the bucket name which contains all the object of the dataset.
import pywren_ibm_cloud as pywren
bucket_name = 'my_data_bucket'
def my_map_function(bucket, key, data_stream, ibm_cos):
for line in data_stream:
# Do some process
return partial_intersting_data
def my_reduce_function(results):
for partial_intersting_data in results:
# Do some process
return final_result
chunk_size = 4*1024**2 # 4MB
pw = pywren.ibm_cf_executor()
pw.map_reduce(my_map_function, bucket_name, my_reduce_function, chunk_size)
result = pw.get_result()
- If
chunk_size=None
then partitioner's granularity is a single object.
method | method signature |
---|---|
pw.map_reduce (my_map_function , bucket_name , my_reduce_function , chunk_size ) |
bucket_name contains the name of the bucket |
my_map_function (bucket , key , data_stream , ibm_cos ) |
key is a data object from bucket that is assigned to the invocation. ibm_cos is an optional parameter which provides a boto3_client (see here) |
import pywren_ibm_cloud as pywren
iterdata = ['http://myurl/myobject1', 'http://myurl/myobject1']
def my_map_function(url, data_stream):
for line in data_stream:
# Do some process
return partial_intersting_data
def my_reduce_function(results):
for partial_intersting_data in results:
# Do some process
return final_result
chunk_size = 4*1024**2 # 4MB
pw = pywren.ibm_cf_executor()
pw.map_reduce(my_map_function, iterdata, my_reduce_function, chunk_size)
result = pw.get_result()
method | method signature |
---|---|
pw.map_reduce (my_map_function , iterdata , my_reduce_function , chunk_size ) |
iterdata contains list of objects in the format of http://myurl/myobject.data |
my_map_function (url , data_stream ) |
url is an entry from iterdata that is assigned to the invocation |
By default there will be one reducer for all the objects. If you need one reducer for each object, you must set the parameter
reducer_one_per_object=True
into the map_reduce() method.
pw.map_reduce(my_map_function, bucket_name, my_reduce_function,
chunk_size, reducer_one_per_object=True)
Any map function can get ibm_cos
parameter which is boto3_client. This allows you to access your IBM COS account from any map function, for example:
import pywren_ibm_cloud as pywren
iterdata = [1, 2, 3, 4]
def my_map_function(x, ibm_cos):
data_object = ibm_cos.get_object(Bucket='mybucket', Key='mydata.data')
# Do some process over the object
return x + 7
pw = pywren.ibm_cf_executor()
pw.map(my_map_function, iterdata)
result = pw.get_result()
You can use IBM-PyWren inside an IBM Watson Studio or Jupyter notebooks in order to execute parallel data analytics by using IBM Cloud functions.
As the current IBM Watson Studio runtimes does not contains the PyWren package, it is needed to install it. Add these lines at the beginning of the notebook:
import sys
try:
import pywren_ibm_cloud as pywren
except:
!{sys.executable} -m pip install pywren-ibm-cloud
import pywren_ibm_cloud as pywren
Installation supports PyWren version as an input parameter, for example:
!{sys.executable} -m pip install -U pywren-ibm-cloud==1.0.15
Once installed, you can use IBM-PyWren as usual inside a notebook. Don't forget of the configuration:
import pywren_ibm_cloud as pywren
iterdata = [1, 2, 3, 4]
def my_map_function(x):
return x + 7
pw = pywren.ibm_cf_executor()
pw.map(my_map_function, iterdata)
result = pw.get_result()
- Ants, serverless computing, and simplified data processing
- Speed up data pre-processing with PyWren in deep learning
- Predicting the future with Monte Carlo simulations over IBM Cloud Functions
- Process large data sets at massive scale with PyWren over IBM Cloud Functions
- PyWren for IBM Cloud on CODAIT
- Industrial project in Technion on PyWren-IBM
- Serverless data analytics in the IBM Cloud - Proceedings of the 19th International Middleware Conference (Industry)
This work has been supported by the EU H2020 project CLASS, contract #780622.