diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a34ceb9f11454..09d1ee437a873 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7122,6 +7122,12 @@ ], "sqlState" : "42601" }, + "WKB_PARSE_ERROR" : { + "message" : [ + "Error parsing WKB." + ], + "sqlState" : "22023" + }, "WRITE_STREAM_NOT_ALLOWED" : { "message" : [ "`writeStream` can be called only on streaming Dataset/DataFrame." diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala index d72e5987abebd..4d6ab7e9c8e50 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/GeographyType.scala @@ -179,7 +179,7 @@ object GeographyType extends SpatialType { GeographyType(MIXED_CRS, GEOGRAPHY_DEFAULT_ALGORITHM) /** Returns whether the given SRID is supported. */ - private[types] def isSridSupported(srid: Int): Boolean = { + def isSridSupported(srid: Int): Boolean = { GeographicSpatialReferenceSystemMapper.getStringId(srid) != null } diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 67ee98bd9430f..cabe23602fa9a 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -138,6 +138,12 @@ org.apache.datasketches datasketches-java + + org.locationtech.jts + jts-core + 1.20.0 + compile + target/scala-${scala.binary.version}/classes diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java index c46c2368832fe..1c3af8bd83e60 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/Geography.java @@ -16,6 +16,9 @@ */ package org.apache.spark.sql.catalyst.util; +import org.apache.spark.sql.errors.QueryExecutionErrors; +import org.apache.spark.sql.types.GeographyType; +import org.apache.spark.sql.util.GeoWkbReader; import org.apache.spark.unsafe.types.GeographyVal; import java.nio.ByteBuffer; @@ -77,9 +80,17 @@ public Geography copy() { // Returns a Geography object with the specified SRID value by parsing the input WKB. public static Geography fromWkb(byte[] wkb, int srid) { + // Verify that the provided SRID value is geographic. + if (!GeographyType.isSridSupported(srid)) { + throw QueryExecutionErrors.stInvalidSridValueError(srid); + } + // Create a new byte array to hold the Geography representation and populate the header. byte[] bytes = new byte[HEADER_SIZE + wkb.length]; ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid); - System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length); + // Parse the provided WKB and copy it into the Geography byte array after the header. + byte[] wkbBytes = GeoWkbReader.readWkb(wkb); + System.arraycopy(wkbBytes, 0, bytes, WKB_OFFSET, wkbBytes.length); + // Finally, create and return the Geography instance. return fromBytes(bytes); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/GeoWkbReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/GeoWkbReader.java new file mode 100644 index 0000000000000..af7df15cda299 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/GeoWkbReader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util; + +import org.apache.spark.sql.errors.QueryExecutionErrors; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.CoordinateXYM; +import org.locationtech.jts.geom.CoordinateXYZM; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; +import org.locationtech.jts.io.ByteOrderValues; + +public class GeoWkbReader { + + /** Common constants for geospatial standard format manipulation. */ + + // The default endianness for in-memory geo objects is Little Endian (NDR). + private static final int DEFAULT_ENDIANNESS = ByteOrderValues.LITTLE_ENDIAN; + + /** Utility methods for standard format parsing using JTS. */ + + // Private helper to determine dimension of geometry. + private static int getDimension(Geometry geometry) { + Coordinate[] coords = geometry.getCoordinates(); + if (coords.length == 0) return 2; + Coordinate c = coords[0]; + if (c instanceof CoordinateXYZM) return 4; + if (c instanceof CoordinateXYM) return 3; + if (!Double.isNaN(c.getZ())) return 3; + return 2; + } + + // WKB parser (private helper method). + private static Geometry parseWkb(byte[] bytes) { + try { + WKBReader reader = new WKBReader(); + return reader.read(bytes); + } catch (ParseException e) { + throw QueryExecutionErrors.wkbParseError(); + } + } + + /** Public geospatial standard format reader API. */ + + // WKB reader. + public static byte[] readWkb(byte[] bytes) { + // Parse the input WKB bytes to a Geometry object. + Geometry geometry = parseWkb(bytes); + // Write the Geometry object back to WKB format with the default endianness. + return new WKBWriter(getDimension(geometry), DEFAULT_ENDIANNESS).write(geometry); + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 55dcea57ff322..0ec9ba0e08988 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -665,6 +665,21 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE summary = "") } + def stInvalidSridValueError(srid: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "ST_INVALID_SRID_VALUE", + messageParameters = Map("srid" -> srid) + ) + } + + def stInvalidSridValueError(srid: Int): SparkIllegalArgumentException = { + stInvalidSridValueError(srid.toString) + } + + def wkbParseError(): SparkIllegalArgumentException = { + new SparkIllegalArgumentException(errorClass = "WKB_PARSE_ERROR") + } + def withSuggestionIntervalArithmeticOverflowError( suggestedFunc: String, context: QueryContext): ArithmeticException = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java index 078ee2a3dbfbc..fe426fd391046 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/GeographyExecutionSuite.java @@ -83,10 +83,9 @@ void testDefaultSrid() { /** Tests for Geography WKB parsing. */ @Test - void testFromWkbWithSridRudimentary() { - byte[] wkb = new byte[]{1, 2, 3}; - // Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented. - // Once we implement the appropriate parsing logic, this test should be updated accordingly. + void testFromWkbWithSrid() { + // Test data: WKB representation of LINESTRING EMPTY. + byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; Geography geography = Geography.fromWkb(wkb, 4326); assertNotNull(geography); assertArrayEquals(wkb, geography.toWkb()); @@ -94,16 +93,39 @@ void testFromWkbWithSridRudimentary() { } @Test - void testFromWkbNoSridRudimentary() { - byte[] wkb = new byte[]{1, 2, 3}; - // Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented. - // Once we implement the appropriate parsing logic, this test should be updated accordingly. + void testFromWkbNoSrid() { + // Test data: WKB representation of LINESTRING EMPTY. + byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; Geography geography = Geography.fromWkb(wkb); assertNotNull(geography); assertArrayEquals(wkb, geography.toWkb()); assertEquals(4326, geography.srid()); } + @Test + void testFromInvalidWkb() { + // Test data: invalid WKB byte array. + byte[] wkb = new byte[]{0x01, 0x02, 0x03}; + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> Geography.fromWkb(wkb) + ); + assertTrue(exception.getMessage().startsWith("[WKB_PARSE_ERROR] Error parsing WKB")); + } + + @Test + void testFromInvalidSrid() { + // Test data: WKB representation of LINESTRING EMPTY. + byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + int srid = 999; + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> Geography.fromWkb(wkb, srid) + ); + assertTrue(exception.getMessage().startsWith("[ST_INVALID_SRID_VALUE]")); + assertTrue(exception.getMessage().contains("value: " + srid + ".")); + } + /** Tests for Geography EWKB parsing. */ @Test diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java index 8ad4d4c36e45c..1ab0841d57e42 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java @@ -34,8 +34,10 @@ class STUtilsSuite { /** Common test data used across multiple tests below. */ - private final byte[] testWkb = new byte[] {0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + private final byte[] testWkbNdr = new byte[] {0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte)0xF0, 0x3F, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40}; + private final byte[] testWkbXdr = new byte[] {0x00, 0x00, 0x00, 0x00, 0x01, 0x3f, (byte)0xf0, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; // A sample Geography byte array for testing purposes, representing a POINT(1 2) with SRID 4326. private final int testGeographySrid = 4326; @@ -52,14 +54,14 @@ class STUtilsSuite { byte[] geogSrid = ByteBuffer.allocate(sridLen).order(end).putInt(testGeographySrid).array(); byte[] geomSrid = ByteBuffer.allocate(sridLen).order(end).putInt(testGeometrySrid).array(); // Initialize GEOGRAPHY. - int wkbLen = testWkb.length; + int wkbLen = testWkbNdr.length; testGeographyBytes = new byte[sridLen + wkbLen]; System.arraycopy(geogSrid, 0, testGeographyBytes, 0, sridLen); - System.arraycopy(testWkb, 0, testGeographyBytes, sridLen, wkbLen); + System.arraycopy(testWkbNdr, 0, testGeographyBytes, sridLen, wkbLen); // Initialize GEOMETRY. testGeometryBytes = new byte[sridLen + wkbLen]; System.arraycopy(geomSrid, 0, testGeometryBytes, 0, sridLen); - System.arraycopy(testWkb, 0, testGeometryBytes, sridLen, wkbLen); + System.arraycopy(testWkbNdr, 0, testGeometryBytes, sridLen, wkbLen); } /** Tests for ST expression utility methods. */ @@ -70,7 +72,7 @@ void testStAsBinaryGeography() { GeographyVal geographyVal = GeographyVal.fromBytes(testGeographyBytes); byte[] geographyWkb = STUtils.stAsBinary(geographyVal); assertNotNull(geographyWkb); - assertArrayEquals(testWkb, geographyWkb); + assertArrayEquals(testWkbNdr, geographyWkb); } @Test @@ -78,13 +80,20 @@ void testStAsBinaryGeometry() { GeometryVal geometryVal = GeometryVal.fromBytes(testGeometryBytes); byte[] geometryWkb = STUtils.stAsBinary(geometryVal); assertNotNull(geometryWkb); - assertArrayEquals(testWkb, geometryWkb); + assertArrayEquals(testWkbNdr, geometryWkb); } // ST_GeogFromWKB @Test - void testStGeogFromWKB() { - GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb); + void testStGeogFromWKB_NDR() { + GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkbNdr); + assertNotNull(geographyVal); + assertArrayEquals(testGeographyBytes, geographyVal.getBytes()); + } + + @Test + void testStGeogFromWKB_XDR() { + GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkbXdr); assertNotNull(geographyVal); assertArrayEquals(testGeographyBytes, geographyVal.getBytes()); } @@ -92,7 +101,7 @@ void testStGeogFromWKB() { // ST_GeomFromWKB @Test void testStGeomFromWKB() { - GeometryVal geometryVal = STUtils.stGeomFromWKB(testWkb); + GeometryVal geometryVal = STUtils.stGeomFromWKB(testWkbNdr); assertNotNull(geometryVal); assertArrayEquals(testGeometryBytes, geometryVal.getBytes()); } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/GeoWkbReaderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/GeoWkbReaderSuite.scala new file mode 100644 index 0000000000000..2757e58e800b9 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/GeoWkbReaderSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.SparkFunSuite + +class GeoWkbReaderSuite extends SparkFunSuite { + // scalastyle:off line.size.limit + val testData: Seq[(String, String, String)] = Seq( + ( + // POINT EMPTY + "0101000000000000000000f87f000000000000f87f", + "00000000017ff80000000000007ff8000000000000", + "0101000000000000000000f87f000000000000f87f" + ), + ( + // POINT(1 2) + "0101000000000000000000f03f0000000000000040", + "00000000013ff00000000000004000000000000000", + "0101000000000000000000f03f0000000000000040" + ), + ( + // POINTZ(1 2 3) + "01e9030000000000000000f03f00000000000000400000000000000840", + "00000003e93ff000000000000040000000000000004008000000000000", + "0101000080000000000000f03f00000000000000400000000000000840" + ), + ( + // POINTM(1 2 3) + "01d1070000000000000000f03f00000000000000400000000000000840", + "00000007d13ff000000000000040000000000000004008000000000000", + "0101000080000000000000f03f00000000000000400000000000000840" + ), + ( + // POINTZM(1 2 3 4) + "01b90b0000000000000000f03f000000000000004000000000000008400000000000001040", + "0000000bb93ff0000000000000400000000000000040080000000000004010000000000000", + "01010000c0000000000000f03f000000000000004000000000000008400000000000001040" + ), + ( + // LINESTRING EMPTY + "010200000000000000", + "000000000200000000", + "010200000000000000" + ), + ( + // LINESTRING(0 0, 1 0, 1 1, 0 1, 0 0) + "01020000000500000000000000000000000000000000000000000000000000f03f0000000000000000000000000000f03f000000000000f03f0000000000000000000000000000f03f00000000000000000000000000000000", + "000000000200000005000000000000000000000000000000003ff000000000000000000000000000003ff00000000000003ff000000000000000000000000000003ff000000000000000000000000000000000000000000000", + "01020000000500000000000000000000000000000000000000000000000000f03f0000000000000000000000000000f03f000000000000f03f0000000000000000000000000000f03f00000000000000000000000000000000" + ), + ) + // scalastyle:on line.size.limit + + // WKB reader. + test("Read WKB - NDR") { + // NDR encoded geometry inputs. + testData.foreach { case (wkbNDR, _, resultWkb) => + val wkbBytes = wkbNDR.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray + val geometry = GeoWkbReader.readWkb(wkbBytes) + assert(geometry != null) + val resultBytes = resultWkb.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray + assert(resultBytes.sameElements(geometry)) + } + } + + test("Read WKB - XDR") { + // NDR encoded geometry inputs. + testData.foreach { case (_, wkbXDR, resultWkb) => + val wkbBytes = wkbXDR.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray + val geometry = GeoWkbReader.readWkb(wkbBytes) + assert(geometry != null) + val resultBytes = resultWkb.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray + assert(resultBytes.sameElements(geometry)) + } + } + +} diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/nonansi/st-functions.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/nonansi/st-functions.sql.out index fe2dda3f1967b..cdaa76ee4af48 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/nonansi/st-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/nonansi/st-functions.sql.out @@ -22,6 +22,28 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d +- LocalRelation [col1#x] +-- !query +DROP TABLE IF EXISTS geoinvaliddata +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geoinvaliddata + + +-- !query +CREATE TABLE geoinvaliddata(wkb BINARY) USING parquet +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`geoinvaliddata`, false + + +-- !query +INSERT INTO geoinvaliddata VALUES +(X'010203') +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/geoinvaliddata, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/geoinvaliddata], Append, `spark_catalog`.`default`.`geoinvaliddata`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/geoinvaliddata), [wkb] ++- Project [col1#x AS wkb#x] + +- LocalRelation [col1#x] + + -- !query SELECT CAST(ST_GeogFromWKB(X'0101000000000000000000f03f0000000000000040') AS STRING) AS result -- !query analysis @@ -80,6 +102,51 @@ Project [hex(st_asbinary(st_geomfromwkb(0x0101000000000000000000F03F000000000000 +- OneRowRelation +-- !query +SELECT ST_GeogFromWKB(NULL) +-- !query analysis +Project [st_geogfromwkb(cast(null as binary)) AS st_geogfromwkb(NULL)#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')) +-- !query analysis +Project [st_asbinary(st_geogfromwkb(0x0101000000000000000000F03F0000000000000040)) AS st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040'))#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeomFromWKB(X'00000000013FF00000000000004000000000000000')) +-- !query analysis +Project [st_asbinary(st_geomfromwkb(0x00000000013FF00000000000004000000000000000)) AS st_asbinary(st_geomfromwkb(X'00000000013FF00000000000004000000000000000'))#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeomFromWKB(X'010203')) +-- !query analysis +Project [st_asbinary(st_geomfromwkb(0x010203)) AS st_asbinary(st_geomfromwkb(X'010203'))#x] ++- OneRowRelation + + +-- !query +SELECT COUNT(*) FROM geodata WHERE wkb IS NOT NULL AND ST_AsBinary(ST_GeogFromWKB(wkb)) IS NULL +-- !query analysis +Aggregate [count(1) AS count(1)#xL] ++- Filter (isnotnull(wkb#x) AND isnull(st_asbinary(st_geogfromwkb(wkb#x)))) + +- SubqueryAlias spark_catalog.default.geodata + +- Relation spark_catalog.default.geodata[wkb#x] parquet + + +-- !query +SELECT ST_AsBinary(ST_GeogFromWKB(wkb)) FROM geoinvaliddata +-- !query analysis +Project [st_asbinary(st_geogfromwkb(wkb#x)) AS st_asbinary(st_geogfromwkb(wkb))#x] ++- SubqueryAlias spark_catalog.default.geoinvaliddata + +- Relation spark_catalog.default.geoinvaliddata[wkb#x] parquet + + -- !query SELECT ST_Srid(NULL) -- !query analysis @@ -124,3 +191,10 @@ DROP TABLE geodata -- !query analysis DropTable false, false +- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geodata + + +-- !query +DROP TABLE geoinvaliddata +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geoinvaliddata diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out index fe2dda3f1967b..cdaa76ee4af48 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/st-functions.sql.out @@ -22,6 +22,28 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d +- LocalRelation [col1#x] +-- !query +DROP TABLE IF EXISTS geoinvaliddata +-- !query analysis +DropTable true, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geoinvaliddata + + +-- !query +CREATE TABLE geoinvaliddata(wkb BINARY) USING parquet +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`geoinvaliddata`, false + + +-- !query +INSERT INTO geoinvaliddata VALUES +(X'010203') +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/geoinvaliddata, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/geoinvaliddata], Append, `spark_catalog`.`default`.`geoinvaliddata`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/geoinvaliddata), [wkb] ++- Project [col1#x AS wkb#x] + +- LocalRelation [col1#x] + + -- !query SELECT CAST(ST_GeogFromWKB(X'0101000000000000000000f03f0000000000000040') AS STRING) AS result -- !query analysis @@ -80,6 +102,51 @@ Project [hex(st_asbinary(st_geomfromwkb(0x0101000000000000000000F03F000000000000 +- OneRowRelation +-- !query +SELECT ST_GeogFromWKB(NULL) +-- !query analysis +Project [st_geogfromwkb(cast(null as binary)) AS st_geogfromwkb(NULL)#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')) +-- !query analysis +Project [st_asbinary(st_geogfromwkb(0x0101000000000000000000F03F0000000000000040)) AS st_asbinary(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040'))#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeomFromWKB(X'00000000013FF00000000000004000000000000000')) +-- !query analysis +Project [st_asbinary(st_geomfromwkb(0x00000000013FF00000000000004000000000000000)) AS st_asbinary(st_geomfromwkb(X'00000000013FF00000000000004000000000000000'))#x] ++- OneRowRelation + + +-- !query +SELECT ST_AsBinary(ST_GeomFromWKB(X'010203')) +-- !query analysis +Project [st_asbinary(st_geomfromwkb(0x010203)) AS st_asbinary(st_geomfromwkb(X'010203'))#x] ++- OneRowRelation + + +-- !query +SELECT COUNT(*) FROM geodata WHERE wkb IS NOT NULL AND ST_AsBinary(ST_GeogFromWKB(wkb)) IS NULL +-- !query analysis +Aggregate [count(1) AS count(1)#xL] ++- Filter (isnotnull(wkb#x) AND isnull(st_asbinary(st_geogfromwkb(wkb#x)))) + +- SubqueryAlias spark_catalog.default.geodata + +- Relation spark_catalog.default.geodata[wkb#x] parquet + + +-- !query +SELECT ST_AsBinary(ST_GeogFromWKB(wkb)) FROM geoinvaliddata +-- !query analysis +Project [st_asbinary(st_geogfromwkb(wkb#x)) AS st_asbinary(st_geogfromwkb(wkb))#x] ++- SubqueryAlias spark_catalog.default.geoinvaliddata + +- Relation spark_catalog.default.geoinvaliddata[wkb#x] parquet + + -- !query SELECT ST_Srid(NULL) -- !query analysis @@ -124,3 +191,10 @@ DROP TABLE geodata -- !query analysis DropTable false, false +- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geodata + + +-- !query +DROP TABLE geoinvaliddata +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.geoinvaliddata diff --git a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql index dc688e4a89941..0c7fc52570559 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/st-functions.sql @@ -7,6 +7,14 @@ INSERT INTO geodata VALUES (NULL), (X'0101000000000000000000F03F0000000000000040'); +-- Create a table of invalid WKB values and insert test data. +DROP TABLE IF EXISTS geoinvaliddata; +CREATE TABLE geoinvaliddata(wkb BINARY) USING parquet; +-- See: https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry#Well-known_binary +-- to understand the formatting/layout of the input Well-Known Binary (WKB) values. +INSERT INTO geoinvaliddata VALUES +(X'010203'); + --- Casting geospatial data types -- GEOGRAPHY and GEOMETRY data types cannot be cast to/from other data types. @@ -19,6 +27,19 @@ SELECT CAST(X'0101000000000000000000f03f0000000000000040' AS GEOMETRY(4326)) AS SELECT hex(ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000f03f0000000000000040'))) AS result; SELECT hex(ST_AsBinary(ST_GeomFromWKB(X'0101000000000000000000f03f0000000000000040'))) AS result; +---- ST_GeogFromWKB + +-- 1. Driver-level queries. +SELECT ST_GeogFromWKB(NULL); +SELECT ST_AsBinary(ST_GeogFromWKB(X'0101000000000000000000F03F0000000000000040')); -- NDR. +SELECT ST_AsBinary(ST_GeomFromWKB(X'00000000013FF00000000000004000000000000000')); -- XDR. +SELECT ST_AsBinary(ST_GeomFromWKB(X'010203')); -- Invalid WKB. + +-- 2. Table-level queries. +SELECT COUNT(*) FROM geodata WHERE wkb IS NOT NULL AND ST_AsBinary(ST_GeogFromWKB(wkb)) IS NULL; +-- Error handling: invalid SRID. +SELECT ST_AsBinary(ST_GeogFromWKB(wkb)) FROM geoinvaliddata; + ------ ST accessor expressions ---- ST_Srid @@ -34,3 +55,4 @@ SELECT COUNT(*) FROM geodata WHERE ST_Srid(ST_GeomFromWKB(wkb)) <> 0; -- Drop the test table. DROP TABLE geodata; +DROP TABLE geoinvaliddata; diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/st-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/st-functions.sql.out index e75d4ba419e22..0611454811ca6 100644 Binary files a/sql/core/src/test/resources/sql-tests/results/nonansi/st-functions.sql.out and b/sql/core/src/test/resources/sql-tests/results/nonansi/st-functions.sql.out differ diff --git a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out index e75d4ba419e22..0611454811ca6 100644 Binary files a/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out and b/sql/core/src/test/resources/sql-tests/results/st-functions.sql.out differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala index 1b73617d050e8..618ad46bb34c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.st._ import org.apache.spark.sql.test.SharedSparkSession @@ -33,7 +34,12 @@ class STExpressionsSuite private final val defaultGeometrySrid: Int = ExpressionDefaults.DEFAULT_GEOMETRY_SRID private final val defaultGeometryType: DataType = GeometryType(defaultGeometrySrid) - /** ST reader/writer expressions. */ + // Private helper method to assert the data type of a query result. + private def assertType(query: String, expectedDataType: DataType) = { + assert(sql(query).schema.fields.head.dataType.sameType(expectedDataType)) + } + + /** ST writer expressions. */ test("ST_AsBinary") { // Test data: WKB representation of POINT(1 2). @@ -49,6 +55,69 @@ class STExpressionsSuite checkEvaluation(ST_AsBinary(geometryExpression), wkb) } + /** ST reader expressions. */ + + test("ST_GeogFromWKB - expressions") { + // Test data: WKB representation of POINT(1 2). + val validWkb = Hex.unhex("0101000000000000000000F03F0000000000000040".getBytes()) + val validWkbLiteral = Literal.create(validWkb, BinaryType) + val invalidWkb = Hex.unhex("010203".getBytes()) + val invalidWkbLiteral = Literal.create(invalidWkb, BinaryType) + + // ST_GeogFromWKB with valid WKB. + val geogLitValidWkb = ST_GeogFromWKB(validWkbLiteral) + assert(geogLitValidWkb.dataType.sameType(defaultGeographyType)) + checkEvaluation(ST_AsBinary(geogLitValidWkb), validWkb) + // ST_GeogFromWKB with invalid WKB. + val geogLitInvalidWkb = ST_GeogFromWKB(invalidWkbLiteral) + checkError( + exception = intercept[SparkIllegalArgumentException] { + geogLitInvalidWkb.eval() + }, + condition = "WKB_PARSE_ERROR", + parameters = Map.empty + ) + } + + test("ST_GeogFromWKB - table") { + // Test data: WKB representation of POINT(1 2). + val validWkbString = "0101000000000000000000F03F0000000000000040" + val validWkb = Hex.unhex(validWkbString.getBytes()) + val invalidWkbString = "010203" + + withTable("tbl") { + // Construct the test table. + sql(s"CREATE TABLE tbl (validWkb BINARY, invalidWkb BINARY)") + sql(s"INSERT INTO tbl VALUES (X'$validWkbString', X'$invalidWkbString')") + + // ST_GeogFromWKB with valid WKB column. + val geogCol = "ST_GeogFromWKB(validWkb)" + assertType(s"SELECT $geogCol FROM tbl", GeographyType(4326)) + checkAnswer(sql(s"SELECT ST_AsBinary($geogCol) FROM tbl"), Row(validWkb)) + // ST_GeogFromWKB with valid WKB literal. + val geogLit = s"ST_GeogFromWKB(X'$validWkbString')" + assertType(s"SELECT $geogLit", GeographyType(4326)) + checkAnswer(sql(s"SELECT ST_AsBinary($geogLit) AS wkb"), Row(validWkb)) + + // ST_GeogFromWKB with invalid WKB column. + checkError( + exception = intercept[SparkIllegalArgumentException] { + sql(s"SELECT ST_GeogFromWKB(invalidWkb) FROM tbl").collect() + }, + condition = "WKB_PARSE_ERROR", + parameters = Map.empty + ) + // ST_GeogFromWKB with invalid WKB literal. + checkError( + exception = intercept[SparkIllegalArgumentException] { + sql(s"SELECT ST_GeogFromWKB(X'$invalidWkbString')").collect() + }, + condition = "WKB_PARSE_ERROR", + parameters = Map.empty + ) + } + } + /** ST accessor expressions. */ test("ST_Srid") {