Skip to content

Commit

Permalink
SparkContext does not exist in Spark connect
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Aug 15, 2024
1 parent 50f395d commit 082cc7b
Showing 1 changed file with 1 addition and 5 deletions.
6 changes: 1 addition & 5 deletions python/gresearch/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
try:
from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
from pyspark.sql.connect.readwriter import DataFrameReader as ConnectDataFrameReader
from pyspark.sql.connect.session import SparkContext as ConnectSparkContext
from pyspark.sql.connect.session import SparkSession as ConnectSparkSession
has_connect = True
except ModuleNotFoundError:
Expand All @@ -49,7 +48,7 @@

def _get_jvm(obj: Any) -> JVMView:
# helper method to assert the JVM is accessible and provide a useful error message
if has_connect and isinstance(obj, (ConnectDataFrame, ConnectDataFrameReader, ConnectSparkSession, ConnectSparkContext)):
if has_connect and isinstance(obj, (ConnectDataFrame, ConnectDataFrameReader, ConnectSparkSession)):
raise RuntimeError('This feature is not supported for Spark Connect. Please use a classic Spark client. https://github.com/G-Research/spark-extension#spark-connect-server')
if isinstance(obj, DataFrame):
return _get_jvm(obj._sc)
Expand Down Expand Up @@ -448,7 +447,6 @@ def create_temporary_dir(spark: Union[SparkSession, SparkContext], prefix: str)

if has_connect:
ConnectSparkSession.create_temporary_dir = create_temporary_dir
ConnectSparkContext.create_temporary_dir = create_temporary_dir


def install_pip_package(spark: Union[SparkSession, SparkContext], *package_or_pip_option: str) -> None:
Expand Down Expand Up @@ -486,7 +484,6 @@ def install_pip_package(spark: Union[SparkSession, SparkContext], *package_or_pi

if has_connect:
ConnectSparkSession.install_pip_package = install_pip_package
ConnectSparkContext.install_pip_package = install_pip_package


def install_poetry_project(spark: Union[SparkSession, SparkContext],
Expand Down Expand Up @@ -567,4 +564,3 @@ def build_wheel(project: Path) -> Path:

if has_connect:
ConnectSparkSession.install_poetry_project = install_poetry_project
ConnectSparkContext.install_poetry_project = install_poetry_project

0 comments on commit 082cc7b

Please sign in to comment.