Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -7122,6 +7122,12 @@
],
"sqlState" : "42601"
},
"WKB_PARSE_ERROR" : {
"message" : [
"Error parsing WKB."
],
"sqlState" : "22023"
},
"WRITE_STREAM_NOT_ALLOWED" : {
"message" : [
"`writeStream` can be called only on streaming Dataset/DataFrame."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ object GeographyType extends SpatialType {
GeographyType(MIXED_CRS, GEOGRAPHY_DEFAULT_ALGORITHM)

/** Returns whether the given SRID is supported. */
private[types] def isSridSupported(srid: Int): Boolean = {
def isSridSupported(srid: Int): Boolean = {
GeographicSpatialReferenceSystemMapper.getStringId(srid) != null
}

Expand Down
6 changes: 6 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.20.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.catalyst.util;

import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.GeographyType;
import org.apache.spark.sql.util.GeoWkbReader;
import org.apache.spark.unsafe.types.GeographyVal;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -77,9 +80,17 @@ public Geography copy() {

// Returns a Geography object with the specified SRID value by parsing the input WKB.
public static Geography fromWkb(byte[] wkb, int srid) {
// Verify that the provided SRID value is geographic.
if (!GeographyType.isSridSupported(srid)) {
throw QueryExecutionErrors.stInvalidSridValueError(srid);
}
// Create a new byte array to hold the Geography representation and populate the header.
byte[] bytes = new byte[HEADER_SIZE + wkb.length];
ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
// Parse the provided WKB and copy it into the Geography byte array after the header.
byte[] wkbBytes = GeoWkbReader.readWkb(wkb);
System.arraycopy(wkbBytes, 0, bytes, WKB_OFFSET, wkbBytes.length);
// Finally, create and return the Geography instance.
return fromBytes(bytes);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util;

import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.CoordinateXYM;
import org.locationtech.jts.geom.CoordinateXYZM;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKBWriter;
import org.locationtech.jts.io.ByteOrderValues;

public class GeoWkbReader {

/** Common constants for geospatial standard format manipulation. */

// The default endianness for in-memory geo objects is Little Endian (NDR).
private static final int DEFAULT_ENDIANNESS = ByteOrderValues.LITTLE_ENDIAN;

/** Utility methods for standard format parsing using JTS. */

// Private helper to determine dimension of geometry.
private static int getDimension(Geometry geometry) {
Coordinate[] coords = geometry.getCoordinates();
if (coords.length == 0) return 2;
Coordinate c = coords[0];
if (c instanceof CoordinateXYZM) return 4;
if (c instanceof CoordinateXYM) return 3;
if (!Double.isNaN(c.getZ())) return 3;
return 2;
}

// WKB parser (private helper method).
private static Geometry parseWkb(byte[] bytes) {
try {
WKBReader reader = new WKBReader();
return reader.read(bytes);
} catch (ParseException e) {
throw QueryExecutionErrors.wkbParseError();
}
}

/** Public geospatial standard format reader API. */

// WKB reader.
public static byte[] readWkb(byte[] bytes) {
// Parse the input WKB bytes to a Geometry object.
Geometry geometry = parseWkb(bytes);
// Write the Geometry object back to WKB format with the default endianness.
return new WKBWriter(getDimension(geometry), DEFAULT_ENDIANNESS).write(geometry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,21 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
summary = "")
}

def stInvalidSridValueError(srid: String): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "ST_INVALID_SRID_VALUE",
messageParameters = Map("srid" -> srid)
)
}

def stInvalidSridValueError(srid: Int): SparkIllegalArgumentException = {
stInvalidSridValueError(srid.toString)
}

def wkbParseError(): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(errorClass = "WKB_PARSE_ERROR")
}

def withSuggestionIntervalArithmeticOverflowError(
suggestedFunc: String,
context: QueryContext): ArithmeticException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,49 @@ void testDefaultSrid() {
/** Tests for Geography WKB parsing. */

@Test
void testFromWkbWithSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
void testFromWkbWithSrid() {
// Test data: WKB representation of LINESTRING EMPTY.
byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
Geography geography = Geography.fromWkb(wkb, 4326);
assertNotNull(geography);
assertArrayEquals(wkb, geography.toWkb());
assertEquals(4326, geography.srid());
}

@Test
void testFromWkbNoSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
void testFromWkbNoSrid() {
// Test data: WKB representation of LINESTRING EMPTY.
byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
Geography geography = Geography.fromWkb(wkb);
assertNotNull(geography);
assertArrayEquals(wkb, geography.toWkb());
assertEquals(4326, geography.srid());
}

@Test
void testFromInvalidWkb() {
// Test data: invalid WKB byte array.
byte[] wkb = new byte[]{0x01, 0x02, 0x03};
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> Geography.fromWkb(wkb)
);
assertTrue(exception.getMessage().startsWith("[WKB_PARSE_ERROR] Error parsing WKB"));
}

@Test
void testFromInvalidSrid() {
// Test data: WKB representation of LINESTRING EMPTY.
byte[] wkb = new byte[]{0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
int srid = 999;
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> Geography.fromWkb(wkb, srid)
);
assertTrue(exception.getMessage().startsWith("[ST_INVALID_SRID_VALUE]"));
assertTrue(exception.getMessage().contains("value: " + srid + "."));
}

/** Tests for Geography EWKB parsing. */

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class STUtilsSuite {

/** Common test data used across multiple tests below. */

private final byte[] testWkb = new byte[] {0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
private final byte[] testWkbNdr = new byte[] {0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, (byte)0xF0, 0x3F, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40};
private final byte[] testWkbXdr = new byte[] {0x00, 0x00, 0x00, 0x00, 0x01, 0x3f, (byte)0xf0,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

// A sample Geography byte array for testing purposes, representing a POINT(1 2) with SRID 4326.
private final int testGeographySrid = 4326;
Expand All @@ -52,14 +54,14 @@ class STUtilsSuite {
byte[] geogSrid = ByteBuffer.allocate(sridLen).order(end).putInt(testGeographySrid).array();
byte[] geomSrid = ByteBuffer.allocate(sridLen).order(end).putInt(testGeometrySrid).array();
// Initialize GEOGRAPHY.
int wkbLen = testWkb.length;
int wkbLen = testWkbNdr.length;
testGeographyBytes = new byte[sridLen + wkbLen];
System.arraycopy(geogSrid, 0, testGeographyBytes, 0, sridLen);
System.arraycopy(testWkb, 0, testGeographyBytes, sridLen, wkbLen);
System.arraycopy(testWkbNdr, 0, testGeographyBytes, sridLen, wkbLen);
// Initialize GEOMETRY.
testGeometryBytes = new byte[sridLen + wkbLen];
System.arraycopy(geomSrid, 0, testGeometryBytes, 0, sridLen);
System.arraycopy(testWkb, 0, testGeometryBytes, sridLen, wkbLen);
System.arraycopy(testWkbNdr, 0, testGeometryBytes, sridLen, wkbLen);
}

/** Tests for ST expression utility methods. */
Expand All @@ -70,29 +72,36 @@ void testStAsBinaryGeography() {
GeographyVal geographyVal = GeographyVal.fromBytes(testGeographyBytes);
byte[] geographyWkb = STUtils.stAsBinary(geographyVal);
assertNotNull(geographyWkb);
assertArrayEquals(testWkb, geographyWkb);
assertArrayEquals(testWkbNdr, geographyWkb);
}

@Test
void testStAsBinaryGeometry() {
GeometryVal geometryVal = GeometryVal.fromBytes(testGeometryBytes);
byte[] geometryWkb = STUtils.stAsBinary(geometryVal);
assertNotNull(geometryWkb);
assertArrayEquals(testWkb, geometryWkb);
assertArrayEquals(testWkbNdr, geometryWkb);
}

// ST_GeogFromWKB
@Test
void testStGeogFromWKB() {
GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb);
void testStGeogFromWKB_NDR() {
GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkbNdr);
assertNotNull(geographyVal);
assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
}

@Test
void testStGeogFromWKB_XDR() {
GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkbXdr);
assertNotNull(geographyVal);
assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
}

// ST_GeomFromWKB
@Test
void testStGeomFromWKB() {
GeometryVal geometryVal = STUtils.stGeomFromWKB(testWkb);
GeometryVal geometryVal = STUtils.stGeomFromWKB(testWkbNdr);
assertNotNull(geometryVal);
assertArrayEquals(testGeometryBytes, geometryVal.getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.SparkFunSuite

class GeoWkbReaderSuite extends SparkFunSuite {
// scalastyle:off line.size.limit
val testData: Seq[(String, String, String)] = Seq(
(
// POINT EMPTY
"0101000000000000000000f87f000000000000f87f",
"00000000017ff80000000000007ff8000000000000",
"0101000000000000000000f87f000000000000f87f"
),
(
// POINT(1 2)
"0101000000000000000000f03f0000000000000040",
"00000000013ff00000000000004000000000000000",
"0101000000000000000000f03f0000000000000040"
),
(
// POINTZ(1 2 3)
"01e9030000000000000000f03f00000000000000400000000000000840",
"00000003e93ff000000000000040000000000000004008000000000000",
"0101000080000000000000f03f00000000000000400000000000000840"
),
(
// POINTM(1 2 3)
"01d1070000000000000000f03f00000000000000400000000000000840",
"00000007d13ff000000000000040000000000000004008000000000000",
"0101000080000000000000f03f00000000000000400000000000000840"
),
(
// POINTZM(1 2 3 4)
"01b90b0000000000000000f03f000000000000004000000000000008400000000000001040",
"0000000bb93ff0000000000000400000000000000040080000000000004010000000000000",
"01010000c0000000000000f03f000000000000004000000000000008400000000000001040"
),
(
// LINESTRING EMPTY
"010200000000000000",
"000000000200000000",
"010200000000000000"
),
(
// LINESTRING(0 0, 1 0, 1 1, 0 1, 0 0)
"01020000000500000000000000000000000000000000000000000000000000f03f0000000000000000000000000000f03f000000000000f03f0000000000000000000000000000f03f00000000000000000000000000000000",
"000000000200000005000000000000000000000000000000003ff000000000000000000000000000003ff00000000000003ff000000000000000000000000000003ff000000000000000000000000000000000000000000000",
"01020000000500000000000000000000000000000000000000000000000000f03f0000000000000000000000000000f03f000000000000f03f0000000000000000000000000000f03f00000000000000000000000000000000"
),
)
// scalastyle:on line.size.limit

// WKB reader.
test("Read WKB - NDR") {
// NDR encoded geometry inputs.
testData.foreach { case (wkbNDR, _, resultWkb) =>
val wkbBytes = wkbNDR.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
val geometry = GeoWkbReader.readWkb(wkbBytes)
assert(geometry != null)
val resultBytes = resultWkb.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
assert(resultBytes.sameElements(geometry))
}
}

test("Read WKB - XDR") {
// NDR encoded geometry inputs.
testData.foreach { case (_, wkbXDR, resultWkb) =>
val wkbBytes = wkbXDR.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
val geometry = GeoWkbReader.readWkb(wkbBytes)
assert(geometry != null)
val resultBytes = resultWkb.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
assert(resultBytes.sameElements(geometry))
}
}

}
Loading