-
Notifications
You must be signed in to change notification settings - Fork 62
Feature datasource standalone #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -31,6 +32,9 @@ private[simba] class ShapeType extends UserDefinedType[Shape] { | |||
s match { | |||
case o: Shape => | |||
new GenericArrayData(ShapeSerializer.serialize(o)) | |||
case g: JTSPolygon => // An ugly hack here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Skyprophet I met a serialization problem here, I've wrapped all elements in ShapefileRdd
with a Simba Shape
class, but it still give me a JTSPolygon
serialization issue, so I add an this hack to fix, if the following three lines are commented, then it gives me a MatchError
, here is the error stack:
scala.MatchError: POLYGON ((123.37969855694418 -10.691464934618121, 123.25330435203686 -10.73496681414191, 123.22308124558195 -10.829817485613273, 122.82189151450935 -10.91975208822901, 122.8071467465544 -10.780668259096789, 123.04320974794253 -10.729391677999741, 123.19637927774315 -10.592360700189607, 123.3108896398211 -10.592360700189607, 123.21251782973363 -10.579743286815225, 123.37764455941812 -10.430828466175731, 123.42540000189905 -10.47154163142446, 123.36986871164088 -10.543871687163643, 123.44227212443457 -10.495456031192182, 123.3439003143471 -10.586051993502416, 123.42613357244409 -10.62397759068006, 123.37969855694418 -10.691466080822098, 123.37969855694418 -10.691464934618121, 123.37969855694418 -10.691464934618121)) (of class com.vividsolutions.jts.geom.Polygon)
at edu.utah.cs.simba.ShapeType.serialize(ShapeType.scala:32)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:142)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:59)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:56)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Do you have some idea on UDT serialize problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Show me where you construct the Shape Object in the data source.
val gf = new GeometryFactory() | ||
val shapes = ShapeFile.Parser(path)(gf) | ||
val shapefileRdd = sqlContext.sparkContext | ||
.parallelize(shapes).map(_.g match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Skyprophet Here. Every _.g
is a JTSGeometry, and it's transformed into Simba Shape with a apply
function in the following lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide it an explicit schema and try again.
can use replace the datatype in the Simba to UDT of spark 2.1, I am asking
since Magellan replace all datatype into UDT of spark 2.1. If they UDT is
applied, does the performance improve?
…On Sat, Jan 7, 2017 at 5:45 PM, Gefei Li ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/main/scala/edu/utah/cs/simba/execution/datasources/
ShapefileRelation.scala
<#81 (review)>:
> +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+class ShapefileRelation(path: ***@***.*** val sqlContext: SQLContext)
+ extends BaseRelation with PrunedFilteredScan {
+
+ override val schema = {
+ StructType(StructField("shape", ShapeType, true) :: Nil)
+ }
+
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ val indices = requiredColumns.map(attr => schema.fieldIndex(attr))
+ val gf = new GeometryFactory()
+ val shapes = ShapeFile.Parser(path)(gf)
+ val shapefileRdd = sqlContext.sparkContext
+ .parallelize(shapes).map(_.g match {
@Skyprophet <https://github.com/Skyprophet> Here. Every _.g is a
JTSGeometry, and it's transformed into Simba Shape with a apply function
in the following lines.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#81 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABXY-bLSQnbvBJZ45UalNrb58rHXoqYNks5rQD_AgaJpZM4Lc1Dv>
.
|
The problem comes from |
@merlintang Simba uses UDT for spatial datatype since it is transplanted to standalone branch, this is an easy and feasible way for spatial datatype. The original way procssing spatial datatype in Further performance evaluation can be conducted if needed :) |
@gfl94 Could you write some unit tests using |
@gfl94 can you merge this function to 2.1 as well, I think this feature is very important. |
@Skyprophet Unit test added, pls check it :) |
@gfl94 Try to import these to |
Osm, geojson, shapefile datasources support for Simba.
Resolve #80 .