Skip to content

Commit

Permalink
Detect and test Spark Connect server (#247)
Browse files Browse the repository at this point in the history
Most features are not supported by PySpark with a Spark Connect server.
This adds readable error message to the package and test capability to
the CI.
  • Loading branch information
EnricoMi authored Aug 16, 2024
1 parent 4ba4159 commit 3a046ca
Show file tree
Hide file tree
Showing 20 changed files with 384 additions and 65 deletions.
58 changes: 56 additions & 2 deletions .github/actions/test-python/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ inputs:
spark-compat-version:
description: Spark compatibility version, e.g. 3.4
required: true
hadoop-version:
description: Hadoop version, e.g. 2.7 or 2
required: true
scala-compat-version:
description: Scala compatibility version, e.g. 2.12
required: true
Expand All @@ -40,6 +43,26 @@ runs:
name: Binaries-${{ inputs.spark-compat-version }}-${{ inputs.scala-compat-version }}
path: .

- name: Cache Spark Binaries
uses: actions/cache@v4
if: inputs.scala-compat-version == '2.12' && ! contains(inputs.spark-version, '-SNAPSHOT')
with:
path: ~/spark
key: ${{ runner.os }}-spark-binaries-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}

- name: Setup Spark Binaries
if: inputs.scala-compat-version == '2.12' && ! contains(inputs.spark-version, '-SNAPSHOT')
env:
SPARK_PACKAGE: spark-${{ inputs.spark-version }}/spark-${{ inputs.spark-version }}-bin-hadoop${{ inputs.hadoop-version }}${{ inputs.scala-compat-version == '2.13' && '-scala2.13' || '' }}.tgz
run: |
if [[ ! -e ~/spark ]]
then
wget --progress=dot:giga "https://www.apache.org/dyn/closer.lua/spark/${SPARK_PACKAGE}?action=download" -O - | tar -xzC "${{ runner.temp }}"
archive=$(basename "${SPARK_PACKAGE}") bash -c "mv -v "${{ runner.temp }}/\${archive/%.tgz/}" ~/spark"
fi
echo "SPARK_BIN_HOME=$(cd ~/spark; pwd)" >> $GITHUB_ENV
shell: bash

- name: Cache Maven packages
if: github.event_name != 'merge_group'
uses: actions/cache@v4
Expand Down Expand Up @@ -105,14 +128,42 @@ runs:
run: mvn --batch-mode --update-snapshots install -Dspotless.check.skip -DskipTests -Dmaven.test.skip=true -Dgpg.skip
shell: bash

- name: Start Spark Connect
id: spark-connect
if: (inputs.spark-compat-version == '3.4' || inputs.spark-compat-version == '3.5' || startsWith('4.', inputs.spark-compat-version)) && inputs.scala-compat-version == '2.12' && ! contains(inputs.spark-version, '-SNAPSHOT')
run: |
$SPARK_BIN_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_${{ inputs.scala-compat-version }}:${{ inputs.spark-version }}
shell: bash

- name: Python Unit Tests (Spark Connect)
if: steps.spark-connect.outcome == 'success'
env:
PYTHONPATH: python:python/test
TEST_SPARK_CONNECT_SERVER: sc://localhost:15002
run: |
pip install pyspark[connect]
python -m pytest python/test --junit-xml test-results-connect/pytest-$(date +%s.%N)-$RANDOM.xml
shell: bash

- name: Stop Spark Connect
if: always() && steps.spark-connect.outcome == 'success'
run: |
$SPARK_BIN_HOME/sbin/stop-connect-server.sh
echo "::group::Spark Connect server log"
# thoughs started in $SPARK_BIN_HOME/sbin, logs go to $SPARK_HOME/logs
ls -lah $SPARK_HOME/logs || true
cat $SPARK_HOME/logs/spark-*-org.apache.spark.sql.connect.service.SparkConnectServer-*.out || true
echo "::endgroup::"
shell: bash

- name: Python Integration Tests
env:
PYTHONPATH: python:python/test
run: |
find python/test -name 'test*.py' > tests
while read test
do
if ! $SPARK_HOME/bin/spark-submit --master "local[2]" --packages uk.co.gresearch.spark:spark-extension_${{ inputs.scala-compat-version }}:$SPARK_EXTENSION_VERSION "$test" test-results
if ! $SPARK_HOME/bin/spark-submit --master "local[2]" --packages uk.co.gresearch.spark:spark-extension_${{ inputs.scala-compat-version }}:$SPARK_EXTENSION_VERSION "$test" test-results-submit
then
state="fail"
fi
Expand All @@ -135,7 +186,10 @@ runs:
uses: actions/upload-artifact@v4
with:
name: Python Test Results (Spark ${{ inputs.spark-version }} Scala ${{ inputs.scala-version }} Python ${{ inputs.python-version }})
path: test-results/*.xml
path: |
test-results/*.xml
test-results-submit/*.xml
test-results-connect/*.xml
branding:
icon: 'check-circle'
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,34 @@ jobs:
include:
- spark-compat-version: '3.0'
spark-version: '3.0.3'
hadoop-version: '2.7'
scala-compat-version: '2.12'
scala-version: '2.12.10'
python-version: '3.8'
- spark-compat-version: '3.1'
spark-version: '3.1.3'
hadoop-version: '2.7'
scala-compat-version: '2.12'
scala-version: '2.12.10'
python-version: '3.8'
- spark-compat-version: '3.2'
spark-version: '3.2.4'
hadoop-version: '2.7'
scala-compat-version: '2.12'
scala-version: '2.12.15'
- spark-compat-version: '3.3'
spark-version: '3.3.4'
hadoop-version: '3'
scala-compat-version: '2.12'
scala-version: '2.12.15'
- spark-compat-version: '3.4'
spark-version: '3.4.2'
hadoop-version: '3'
scala-compat-version: '2.12'
scala-version: '2.12.17'
- spark-compat-version: '3.5'
spark-version: '3.5.1'
hadoop-version: '3'
scala-compat-version: '2.12'
scala-version: '2.12.18'

Expand All @@ -55,4 +61,5 @@ jobs:
scala-version: ${{ matrix.scala-version }}
spark-compat-version: ${{ matrix.spark-compat-version }}
scala-compat-version: ${{ matrix.scala-compat-version }}
hadoop-version: ${{ matrix.hadoop-version }}
python-version: ${{ matrix.python-version }}
2 changes: 2 additions & 0 deletions DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ The latter variant is prefixed with `_with_options`.
* `def diff(self: DataFrame, other: DataFrame, *id_columns: str) -> DataFrame`
* `def diffwith(self: DataFrame, other: DataFrame, *id_columns: str) -> DataFrame:`

Note that this feature is not supported in Python when connected with a [Spark Connect server](README.md#spark-connect-server).

## Diff Spark application

There is also a Spark application that can be used to create a diff DataFrame. The application reads two DataFrames
Expand Down
2 changes: 2 additions & 0 deletions HISTOGRAM.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ In Python, call:
import gresearch.spark

df.histogram([100, 200], 'user').orderBy('user')

Note that this feature is not supported in Python when connected with a [Spark Connect server](README.md#spark-connect-server).
4 changes: 4 additions & 0 deletions PARQUET.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,7 @@ spark.read.parquet_blocks("/path/to/parquet", parallelism=100)
spark.read.parquet_block_columns("/path/to/parquet", parallelism=100)
spark.read.parquet_partitions("/path/to/parquet", parallelism=100)
```

## Known Issues

Note that this feature is not supported in Python when connected with a [Spark Connect server](README.md#spark-connect-server).
4 changes: 4 additions & 0 deletions PYSPARK-DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ Finally, shutdown the example cluster:
```shell
docker compose -f docker-compose.yml down
```

## Known Issues

Note that this feature is not supported in Python when connected with a [Spark Connect server](README.md#spark-connect-server).
28 changes: 20 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@

This project provides extensions to the [Apache Spark project](https://spark.apache.org/) in Scala and Python:

**[Diff](DIFF.md):** A `diff` transformation and application for `Dataset`s that computes the differences between
**[Diff](DIFF.md) [<sup>[*]</sup>](#spark-connect-server):** A `diff` transformation and application for `Dataset`s that computes the differences between
two datasets, i.e. which rows to _add_, _delete_ or _change_ to get from one dataset to the other.

**[SortedGroups](GROUPS.md):** A `groupByKey` transformation that groups rows by a key while providing
a **sorted** iterator for each group. Similar to `Dataset.groupByKey.flatMapGroups`, but with order guarantees
for the iterator.

**[Histogram](HISTOGRAM.md):** A `histogram` transformation that computes the histogram DataFrame for a value column.
**[Histogram](HISTOGRAM.md) [<sup>[*]</sup>](#spark-connect-server):** A `histogram` transformation that computes the histogram DataFrame for a value column.

**[Global Row Number](ROW_NUMBER.md):** A `withRowNumbers` transformation that provides the global row number w.r.t.
**[Global Row Number](ROW_NUMBER.md) [<sup>[*]</sup>](#spark-connect-server):** A `withRowNumbers` transformation that provides the global row number w.r.t.
the current order of the Dataset, or any given order. In contrast to the existing SQL function `row_number`, which
requires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.

**[Partitioned Writing](PARTITIONING.md):** The `writePartitionedBy` action writes your `Dataset` partitioned and
efficiently laid out with a single operation.

**[Inspect Parquet files](PARQUET.md):** The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to [parquet-tools](https://pypi.org/project/parquet-tools/)
**[Inspect Parquet files](PARQUET.md) [<sup>[*]</sup>](#spark-connect-server):** The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to [parquet-tools](https://pypi.org/project/parquet-tools/)
or [parquet-cli](https://pypi.org/project/parquet-cli/) by reading from a simple Spark data source.
This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

**[Install Python packages into PySpark job](PYSPARK-DEPS.md):** Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):
**[Install Python packages into PySpark job](PYSPARK-DEPS.md) [<sup>[*]</sup>](#spark-connect-server):** Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):

```python
# noinspection PyUnresolvedReferences
Expand Down Expand Up @@ -84,7 +84,7 @@ This is a handy way to ensure column names with special characters like dots (`.
**Count null values:** `count_null(e: Column)`: an aggregation function like `count` that counts null values in column `e`.
This is equivalent to calling `count(when(e.isNull, lit(1)))`.

**.Net DateTime.Ticks:** Convert .Net (C#, F#, Visual Basic) `DateTime.Ticks` into Spark timestamps, seconds and nanoseconds.
**.Net DateTime.Ticks[<sup>[*]</sup>](#spark-connect-server):** Convert .Net (C#, F#, Visual Basic) `DateTime.Ticks` into Spark timestamps, seconds and nanoseconds.

<details>
<summary>Available methods:</summary>
Expand Down Expand Up @@ -117,7 +117,7 @@ unix_epoch_nanos_to_dotnet_ticks(column_or_name)
```
</details>

**Spark temporary directory**: Create a temporary directory that will be removed on Spark application shutdown.
**Spark temporary directory[<sup>[*]</sup>](#spark-connect-server)**: Create a temporary directory that will be removed on Spark application shutdown.

<details>
<summary>Examples:</summary>
Expand All @@ -138,7 +138,7 @@ dir = spark.create_temporary_dir("prefix")
```
</details>

**Spark job description:** Set Spark job description for all Spark jobs within a context.
**Spark job description[<sup>[*]</sup>](#spark-connect-server):** Set Spark job description for all Spark jobs within a context.

<details>
<summary>Examples:</summary>
Expand Down Expand Up @@ -306,6 +306,18 @@ on a filesystem where it is accessible by the notebook, and reference that jar f

Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.

## Known issues
### Spark Connect Server

Most features are not supported **in Python** in conjunction with a [Spark Connect server](https://spark.apache.org/docs/latest/spark-connect-overview.html).
This also holds for Databricks Runtime environment 13.x and above. Details can be found [in this blog](https://semyonsinchenko.github.io/ssinchenko/post/how-databricks-14x-breaks-3dparty-compatibility/).

Calling any of those features when connected to a Spark Connect server will raise this error:

This feature is not supported for Spark Connect.

Use a classic connection to a Spark cluster instead.

## Build

You can build this project against different versions of Spark and Scala.
Expand Down
4 changes: 4 additions & 0 deletions ROW_NUMBER.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,7 @@ WindowExec: No Partition Defined for Window operation! Moving all data to a sing
```
This warning is unavoidable, because `withRowNumbers` has to pull information about the initial partitions into a single partition.
Fortunately, there are only 12 Bytes per input partition required, so this amount of data usually fits into a single partition and the warning can safely be ignored.

## Known issues

Note that this feature is not supported in Python when connected with a [Spark Connect server](README.md#spark-connect-server).
Loading

0 comments on commit 3a046ca

Please sign in to comment.