From 26be7a9822eebd0f8fd81d331728449841104a9b Mon Sep 17 00:00:00 2001 From: Tobias Windisch Date: Fri, 15 Dec 2023 21:15:20 +0100 Subject: [PATCH] update docs --- docs/source/decorators.rst | 101 +++++++++++++++------- docs/source/index.rst | 1 + docs/source/tutorials/databricks.rst | 123 +++++++++++++++++++++++++++ luisy/__init__.py | 4 + 4 files changed, 198 insertions(+), 31 deletions(-) create mode 100644 docs/source/tutorials/databricks.rst diff --git a/docs/source/decorators.rst b/docs/source/decorators.rst index 4602c92..f116660 100644 --- a/docs/source/decorators.rst +++ b/docs/source/decorators.rst @@ -11,8 +11,8 @@ Decorators are used for: * Sorting tasks into `raw/interim/final` directory structure -Outputs -------- +Local targets +------------- Here is a list of decorators for the outfile of a task: Generally, tasks with those decorators can persist their output with @@ -28,8 +28,7 @@ output with `self.input().read()` * :py:func:`~luisy.decorators.pickle_output`: For tasks whose output is a pickle. May be used to (de)serialize any python object. * :py:func:`~luisy.decorators.parquetdir_output`: For tasks whose output - is a directory holding parquet files, like the result of an Apache - Spark computation. + is a directory holding parquet files, like the result of a Spark computation. * :py:func:`~luisy.decorators.make_directory_output`: Factory to create your own directory output decorator. This method wants you to pass a function, which tells luisy how to handle the files in your directory. @@ -39,56 +38,96 @@ output with `self.input().read()` Parquet dir output ~~~~~~~~~~~~~~~~~~ -In many cases, a pyspark job writes a folder holding multiple -parquet-files into a Blob-storage. Using the -:py:func:`~luisy.decorators.parquetdir_output` together with the -cloud-synchronisation, those files can be automatically downloaded: - +When persisting results of a spark computation, the prefered output +format are parquet files. This may look as follows: .. code-block:: python import luisy @luisy.raw - @luisy.parquetdir_output - class SomePySparkResult(luisy.ExternalTask): + @luisy.requires(SomeTask) + @luisy.parquet_output('some_dir/spark_result') + class SomePySparkResult(luisy.SparkTask): + + def run(self): + df = self.read_input() + # do something + self.write(df) + +If the resulting path needs to be parametrized, the method +:code:`get_folder_name` needs to be implemented: - partition = luigi.Parameter() +.. code-block:: python - def get_folder_name(self): - return f"pyspark_result/Partition=self.partition" + import luisy + @luisy.raw + @luisy.requires(SomeTask) + @luisy.parquet_output + class SomePySparkResult(luisy.SparkTask): + + partition = luigi.Parameter() -This task points to the folder -:code:`[project_name]/raw/pyspark_result/` in the Blob storage holding -multiple parquet-files. The output can be used in a subsequent task as -follows: + def run(self): + df = self.read_input() + # do something + self.write(df) + def get_folder_name(self): + return f"some_dir/spark_result/Partition=self.partition" + + +In some cases, only the result of a spark pipeline triggered +externally need to be further processed. Here, these files are external inputs to +the pipeline and +:py:func:`~luisy.decorators.parquetdir_output` together with the +cloud-synchronisation can be used download these files automatically +and process them locally: .. code-block:: python - @luisy.interim - @luisy.requires(SomePySparkResult) - class ProcessedResult(luisy.Task): + import luisy + + @luisy.raw + @luisy.parquetdir_output('some_dir/some_pyspark_output') + class SomePySparkResult(luisy.ExternalTask): + pass + + +This task points to the folder +:code:`[project_name]/raw/some_dir/some_py/` in the Blob storage holding +multiple parquet-files. - partition = luigi.Parameter() - def run(self): +Cloud targets +------------- - df = self.input().read() - # do something - self.write(df) +The following targets can be used in combination with a +`luisy.task.SparkTask`: -Invoking +* :py:func:`~luisy.decorators.deltatable_output`: For spark tasks whose + output should be saved in a deltatable +* :py:func:`~luisy.decorators.azure_blob_storage_output`: For spark tasks whose + output should be saved in a azure blob storage. -.. code-block:: bash +See :ref:`databricks` for more details on how to use these targets. - luisy --module [project_name].[module] ProcessedResult --partition=my_partition --download +Cloud inputs +------------ +To ease the development of cloud pipelines using +`luisy.task.SparkTask` that should access data already persisted in +cloud storages, we provide input decorators that render the usage of +:py:class:`~luisy.tasks.base.ExternalTask` unnecessary: -will first download the parquet-files locally and then run the -subsequent task. +* :py:func:`~luisy.decorators.deltatable_input`: For spark tasks that should acess + data saved in a deltatable +* :py:func:`~luisy.decorators.azure_blob_storage_input`: For spark tasks whose + output should be saved in a azure blob storage. +See :ref:`databricks` for more details on how to implement pipelines +using these inputs. Directory structure ------------------- diff --git a/docs/source/index.rst b/docs/source/index.rst index 59a677d..0730a82 100755 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -63,6 +63,7 @@ Learn more about luisy in our :doc:`Tutorials <./tutorials/getting_started>`. Directory Output Up- and Downloading Files Trigger reruns by changing code + Interact with Databricks .. toctree:: diff --git a/docs/source/tutorials/databricks.rst b/docs/source/tutorials/databricks.rst new file mode 100644 index 0000000..d10760c --- /dev/null +++ b/docs/source/tutorials/databricks.rst @@ -0,0 +1,123 @@ + +.. _databricks: + +Interaction with Databricks +=========================== + +.. note:: + + This is an experimental feature. Expect sharp edges and bugs. + + +Overview +-------- + +The prefered way to interact with databricks objects like a pyspark +cluster or delta tables is by using it in a databricks notebook. Its +also possible to connect from a local session using +:py:mod:`databricks_connect` (see :ref:`databricks-connect`). + + +Example pipeline +---------------- + +.. note:: + All task have to be implemented in a python + package, only execution can be done via a databricks notebook. + +This is how a cloud pipeline may looks like: + +.. code-block:: python + + import luisy + + @luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw') + @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim') + class TaskA(SparkTask): + + def run(self): + df = self.input().read() + df = df.drop('c') + self.write(df) + + + @luisy.requires(TaskA) + @luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final') + class TaskB(SparkTask): + + def run(self): + df = self.input().read() + df = df.withColumn('f', 2*df.a) + self.write(df) + + @luisy.requires(TaskB) + @luisy.final + @luisy.pickle_output + class TaskC(SparkTask): + def run(self): + df = self.input().read() + self.write(df) + + +Here, :code:`TaskA` and :code:`TaskB` read and write their data from +and to delta tables and process them with spark. :code:`TaskC`, +however, persits its output into a pickle file stored in dbfs. + +Running a pipeline +------------------ + +First, the working dir needs to be set. Here, we can use databricks +file system (:code:`dbfs`) allowing to run the pipeline completely in +the cloud. The :py:class:`pyspark.SparkContext()` is automatically +propagated to :py:mod:`luisy` from the active session: + +.. code-block:: python + + working_dir = "/dbfs/FileStore/my_working_dir" + Config().set_param("working_dir", working_dir) + + +A given pipeline can be executed as follows: + +.. code-block:: python + + build(SomeTask(), cloud_mode=True) + +Here, all :py:class:`~luisy.tasks.base.SparkTask` objects use the +pyspark cluster of the databricks instance. + +.. _databricks-connect: + +Using databricks connect +------------------------ + +Using :py:mod:`databricks-connect`, cloud pipelines can be triggered +from python sessions outside of databricks. There, a local proxy for the remote spark +session from databricks is created in the local spark. First, +databricks connect needs to be installed. + +.. code-block:: bash + + pip install databricks-connect + +Make sure that the version of databricks-connect is compatible with +the spark version in the databricks cluster. + +To run the cloud pipelines locally, the following parameters need to +be set: + +.. code-block:: python + + spark = DatabricksSession.builder.remote( + host="https://adb-<...>.azuredatabricks.net", + token="", + cluster_id=", + ).getOrCreate() + + Config().set_param('spark', spark) + +From there, everything works as in a databricks notebook. + +.. note:: + + The unity catalog needs to be enabled in your databricks instance. diff --git a/luisy/__init__.py b/luisy/__init__.py index 20597cc..3f3cbbc 100644 --- a/luisy/__init__.py +++ b/luisy/__init__.py @@ -37,6 +37,10 @@ from luisy.decorators import hdf_output # noqa from luisy.decorators import pickle_output # noqa from luisy.decorators import parquetdir_output # noqa +from luisy.decorators import deltatable_output # noqa +from luisy.decorators import deltatable_input # noqa +from luisy.decorators import azure_blob_storage_output # noqa +from luisy.decorators import azure_blob_storage_input # noqa from luigi import Parameter # noqa from luigi import IntParameter # noqa