Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] DELETE destination only supports Delta sources #3643

Open
2 of 8 tasks
tonkolviktor opened this issue Sep 4, 2024 · 0 comments
Open
2 of 8 tasks

[BUG][Spark] DELETE destination only supports Delta sources #3643

tonkolviktor opened this issue Sep 4, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@tonkolviktor
Copy link

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

I have a delta table:
image

I want to call delete operation, when I use the API it fails, however if I use SQL it works:
image

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[18], line 1
----> 1 deltaTable.delete((F.col("GLOBAL_ID").isin(['test']))) 

File /usr/local/lib/python3.10/dist-packages/delta/tables.py:106, in DeltaTable.delete(self, condition)
    104     self._jdt.delete()
    105 else:
--> 106     self._jdt.delete(DeltaTable._condition_to_jcolumn(condition))

File /usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/local/lib/python3.10/dist-packages/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

AnalysisException: DELETE destination only supports Delta sources.
Some(DeleteFromTable GLOBAL_ID#223 IN (test)
+- SubqueryAlias spark_catalog.silver.YAK
   +- Project [FIRMA#38, LAGER#39, VON_AN_LAGER#40, BESCHAFFUNGSART#41, BELEGART#42, UNTERBELEGART#43, KENNZEICHEN_STORNO#44, BESTANDSSTATUS#45, AKSTB2#46, KUNDE_LIEFERANT#47, KUNDE#48, KUNDE_STATISTIK#49, KUNDENNR_KREDIT#50, KONTO_ZAHLUNG#51, KUNDENNUMMER_LIEFERANT_FIBU#52, HAUPTNUMMER_KUNDE_LIEFERANT#53, VERBAND_ZUGEORDNET#54, LIEFERANT#55, LIEFERANTENNUMMER_FRACHT#56, FRACHTSTATUS#57, ORIGINAL_POSITION#58, ORIGINAL_SET_POSITION#59, BELEGNUMMER#60, BELEGNUMMER_LIEFERSCHEIN#61, ... 162 more fields]
      +- Relation spark_catalog.silver.yak[FIRMA#38,LAGER#39,VON_AN_LAGER#40,BESCHAFFUNGSART#41,BELEGART#42,UNTERBELEGART#43,KENNZEICHEN_STORNO#44,BESTANDSSTATUS#45,AKSTB2#46,KUNDE_LIEFERANT#47,KUNDE#48,KUNDE_STATISTIK#49,KUNDENNR_KREDIT#50,KONTO_ZAHLUNG#51,KUNDENNUMMER_LIEFERANT_FIBU#52,HAUPTNUMMER_KUNDE_LIEFERANT#53,VERBAND_ZUGEORDNET#54,LIEFERANT#55,LIEFERANTENNUMMER_FRACHT#56,FRACHTSTATUS#57,ORIGINAL_POSITION#58,ORIGINAL_SET_POSITION#59,BELEGNUMMER#60,BELEGNUMMER_LIEFERSCHEIN#61,... 162 more fields] parquet

Steps to reproduce

Get Spark 3.4.3 (git revision 1eb558c3a6f) built for Hadoop 3.3.4
Get delta 2.4.0

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2c58c979-8fcb-4371-b930-681b6429a110;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 135ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2c58c979-8fcb-4371-b930-681b6429a110
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/5ms)
24/09/04 15:19:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/04 15:19:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/04 15:19:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.

Get a Table similar to the one above.
Sometime like this fails

deltaTable = DeltaTable.forName(spark, 'silver.' + TN)
deltaTable.delete((F.col("GLOBAL_ID").isin(['test']))) 

Observed results

For some reason delta think data is parquet when using the API.

Expected results

Works

Further details

Please note that if I create a fresh delta table like so it works:
image

df = spark.createDataFrame([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
df.write.format('delta').save('/tmp/temp')
spark.sql('create table temp using delta location "/tmp/temp"')
spark.sql('delete from temp where _1 in (1, 4)')

from delta.tables import *
deltaTable = DeltaTable.forName(spark, 'temp')
deltaTable.delete((F.col("_1").isin([1, 2])) & (F.col('_2') > 4))

Please note that we've just migrated from Spark: 3.3.* and Delta 2.3.* where it worked.

Environment information

  • Delta Lake version: 2.4.*
  • Spark version: 3.4.3
  • Scala version:

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@tonkolviktor tonkolviktor added the bug Something isn't working label Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant