From 78b4a2754a799fd95d6b367f54598b30087a66b6 Mon Sep 17 00:00:00 2001 From: Tobias Windisch Date: Mon, 25 Dec 2023 20:14:12 +0100 Subject: [PATCH] update documentation Signed-off-by: Tobias Windisch --- docs/source/decorators.rst | 4 +- docs/source/index.rst | 2 +- docs/source/tutorials/databricks.rst | 123 ------------- docs/source/tutorials/spark.rst | 191 ++++++++++++++++++++ tests/{test_databricks.py => test_spark.py} | 0 5 files changed, 194 insertions(+), 126 deletions(-) delete mode 100644 docs/source/tutorials/databricks.rst create mode 100644 docs/source/tutorials/spark.rst rename tests/{test_databricks.py => test_spark.py} (100%) diff --git a/docs/source/decorators.rst b/docs/source/decorators.rst index f116660..d85f137 100644 --- a/docs/source/decorators.rst +++ b/docs/source/decorators.rst @@ -111,7 +111,7 @@ The following targets can be used in combination with a * :py:func:`~luisy.decorators.azure_blob_storage_output`: For spark tasks whose output should be saved in a azure blob storage. -See :ref:`databricks` for more details on how to use these targets. +See :ref:`pyspark` for more details on how to use these targets. Cloud inputs ------------ @@ -126,7 +126,7 @@ cloud storages, we provide input decorators that render the usage of * :py:func:`~luisy.decorators.azure_blob_storage_input`: For spark tasks whose output should be saved in a azure blob storage. -See :ref:`databricks` for more details on how to implement pipelines +See :ref:`pyspark` for more details on how to implement pipelines using these inputs. Directory structure diff --git a/docs/source/index.rst b/docs/source/index.rst index 0730a82..9c4aa02 100755 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -63,7 +63,7 @@ Learn more about luisy in our :doc:`Tutorials <./tutorials/getting_started>`. Directory Output Up- and Downloading Files Trigger reruns by changing code - Interact with Databricks + Interaction with PySpark .. toctree:: diff --git a/docs/source/tutorials/databricks.rst b/docs/source/tutorials/databricks.rst deleted file mode 100644 index 54fd127..0000000 --- a/docs/source/tutorials/databricks.rst +++ /dev/null @@ -1,123 +0,0 @@ - -.. _databricks: - -Interaction with Databricks -=========================== - -.. note:: - - This is an experimental feature. Expect sharp edges and bugs. - - -Overview --------- - -The prefered way to interact with databricks objects like a pyspark -cluster or delta tables is by using it in a databricks notebook. Its -also possible to connect from a local session using -:py:mod:`databricks_connect` (see :ref:`databricks-connect`). - - -Example pipeline ----------------- - -.. note:: - All task have to be implemented in a python - package, only execution can be done via a databricks notebook. - -This is how a cloud pipeline may looks like: - -.. code-block:: python - - import luisy - - @luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw') - @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim') - class TaskA(SparkTask): - - def run(self): - df = self.input().read() - df = df.drop('c') - self.write(df) - - - @luisy.requires(TaskA) - @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final') - class TaskB(SparkTask): - - def run(self): - df = self.input().read() - df = df.withColumn('f', 2*df.a) - self.write(df) - - @luisy.requires(TaskB) - @luisy.final - @luisy.pickle_output - class TaskC(SparkTask): - def run(self): - df = self.input().read() - self.write(df) - - -Here, :code:`TaskA` and :code:`TaskB` read and write their data from -and to delta tables and process them with spark. :code:`TaskC`, -however, persists its output into a pickle file stored in dbfs. - -Running a pipeline ------------------- - -First, the working dir needs to be set. Here, we can use databricks -file system (:code:`dbfs`) allowing to run the pipeline completely in -the cloud. The :py:class:`pyspark.SparkContext()` is automatically -propagated to :py:mod:`luisy` from the active session: - -.. code-block:: python - - working_dir = "/dbfs/FileStore/my_working_dir" - Config().set_param("working_dir", working_dir) - - -A given pipeline can be executed as follows: - -.. code-block:: python - - build(SomeTask(), cloud_mode=True) - -Here, all :py:class:`~luisy.tasks.base.SparkTask` objects use the -pyspark cluster of the databricks instance. - -.. _databricks-connect: - -Using databricks connect ------------------------- - -Using :py:mod:`databricks-connect`, cloud pipelines can be triggered -from python sessions outside of databricks. There, a local proxy for the remote spark -session from databricks is created in the local spark. First, -databricks connect needs to be installed. - -.. code-block:: bash - - pip install databricks-connect - -Make sure that the version of databricks-connect is compatible with -the spark version in the databricks cluster. - -To run the cloud pipelines locally, the following parameters need to -be set: - -.. code-block:: python - - spark = DatabricksSession.builder.remote( - host="https://adb-<...>.azuredatabricks.net", - token="", - cluster_id=", - ).getOrCreate() - - Config().set_param('spark', spark) - -From there, everything works as in a databricks notebook. - -.. note:: - - The unity catalog needs to be enabled in your databricks instance. diff --git a/docs/source/tutorials/spark.rst b/docs/source/tutorials/spark.rst new file mode 100644 index 0000000..eccb1e1 --- /dev/null +++ b/docs/source/tutorials/spark.rst @@ -0,0 +1,191 @@ + +.. _pyspark: + +Interaction with PySpark +======================== + +.. note:: + + This is an experimental feature. Expect sharp edges and bugs. + + +Overview +-------- + +In this tutorial, we show how tasks can be executed on a +Spark-cluster. The key difference of a :py:class:`~luisy.tasks.base.Task` and a +:py:class:`~luisy.tasks.base.SparkTask` is that the objects handling the data are +not :py:class:`pandas.DataFrame` but :py:class:`pyspark.sql.DataFrame` +objects. + +Generelly, a :py:class:`~luisy.tasks.base.SparkTask` creates from the +output files of its input a :py:class:`pyspark.sql.DataFrame` and +when saving to a :py:class:`luisy.targets.CloudTarget`, the respective +spark method is used. Here, the user has to make sure that the spark +cluster has the required permissions to read and write from the +respective locations. Whenever a :py:class:`~luisy.tasks.base.SparkTask` writes or reads from a +:py:class:`luisy.targets.LocalTarget`, a serialization into a single +:py:class:`pandas.DataFrame` takes place and the user has to make sure +that data fits into memory. + + +Example pipeline +---------------- + +This is how a :py:mod:`spark`- pipeline may looks like: + +.. code-block:: python + + import luisy + + @luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw') + @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim') + class TaskA(SparkTask): + + def run(self): + df = self.input().read() + df = df.drop('c') + self.write(df) + + + @luisy.requires(TaskA) + @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final') + class TaskB(SparkTask): + + def run(self): + df = self.input().read() + df = df.withColumn('f', 2*df.a) + self.write(df) + + @luisy.requires(TaskB) + @luisy.final + @luisy.pickle_output + class TaskC(SparkTask): + def run(self): + df = self.input().read() + self.write(df) + + +Here, :code:`TaskA` and :code:`TaskB` read and write their data from +and to delta tables and process them with spark. :code:`TaskC`, +however, persists its output into a pickle file, which requires +:py:mod:`luisy` to serialize all the data to a +:py:mod:`pandas.DataFrame` beforehand. + +Running a pipeline +------------------ + +When the pipeline should be executed within an active python session, +running the pipeline can be done as follows: + +.. code-block:: python + + from luisy.cli import build + + build(TaskC(), cloud_mode=True) + +In this case, the :py:class:`pyspark.SparkContext()` is automatically +propagated to :py:mod:`luisy` from the active session. Alternatively, +if a special spark context has to be used, the spark context need to +be attached to the :py:class:`~luisy.config.Config` first as follows: + +.. code-block:: python + + + Config().set_param('spark', some_predefined_spark_context) + + +For instance, this could be a spark instance created via spark +connect: + + +.. code-block:: python + + spark = SparkSession.builder.remote("sc://my-cluster:15002").getOrCreate() + Config().set_param('spark', spark) + +Be aware that all :py:class:`~luisy.targets.LocalTarget` point to +locations on the system of the python session where :py:mod:`luisy` +runs in. + +.. _databricks: + +Using databricks +---------------- + +A convinient way to interact with pyspark clusters is by using the +databricks abstraction through a databricks notebook. Its +also possible to connect from a local session using +:py:mod:`databricks_connect` (see :ref:`databricks-connect`). + +.. note:: + + When using :py:mod:`luisy` in a databricks cluster, additional + charges are generated for the user. The amount of expenses depends + among others on the cloud provider and the cluster configuration. + :py:mod:`luisy` has no influences on the generated costs and we + recommend to monitor cloud costs closely. + + +.. note:: + The tasks itself cannot be implemented within the notebook and need + to be implemented in a standalone python package or module. Only + execution can be done via a databricks notebook. + + +Initial configuration +~~~~~~~~~~~~~~~~~~~~~ + +Using :py:mod:`luisy` within a databricks cluster, the databricks file +system (:code:`dbfs`) can be used as local file system allowing to run +the pipeline completely in +the cloud, even non-:py:class:`~luisy.tasks.base.SparkTask`. + +.. code-block:: python + + working_dir = "/dbfs/FileStore/my_working_dir" + Config().set_param("working_dir", working_dir) + + +A given pipeline can be executed as follows: + +.. code-block:: python + + build(SomeTask(), cloud_mode=True) + +Here, all :py:class:`~luisy.tasks.base.SparkTask` objects use the +pyspark cluster of the databricks instance. + +.. _databricks-connect: + +Trigger from remote +~~~~~~~~~~~~~~~~~~~ + +Using :py:mod:`databricks-connect`, cloud pipelines can be triggered +from python sessions outside of databricks. There, a local proxy for the remote spark +session from databricks is created in the local spark. First, +databricks connect needs to be installed. + +.. code-block:: bash + + pip install databricks-connect + +Make sure that the version of databricks-connect is compatible with +the spark version in the databricks cluster. + +To run the cloud pipelines locally, the following parameters need to +be set: + +.. code-block:: python + + spark = DatabricksSession.builder.remote( + host="https://adb-<...>.azuredatabricks.net", + token="", + cluster_id=", + ).getOrCreate() + + Config().set_param('spark', spark) + +.. note:: + + The unity catalog needs to be enabled in your databricks instance. diff --git a/tests/test_databricks.py b/tests/test_spark.py similarity index 100% rename from tests/test_databricks.py rename to tests/test_spark.py