Skip to content

Commit

Permalink
[MINOR] Fix and enhance the spark session example
Browse files Browse the repository at this point in the history
  • Loading branch information
haoxins committed Aug 22, 2024
1 parent 072f287 commit 78decf6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
15 changes: 10 additions & 5 deletions cmd/spark-connect-example-spark-session/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ import (
"github.com/apache/spark-connect-go/v35/spark/sql/utils"
)

var remote = flag.String("remote", "sc://localhost:15002",
"the remote address of Spark Connect server to connect to")
var (
remote = flag.String("remote", "sc://localhost:15002",
"the remote address of Spark Connect server to connect to")

filedir = flag.String("filedir", "file:///tmp/spark-connect-write-example-output.parquet",
"the directory to save the output files")
)

func main() {
flag.Parse()
Expand All @@ -39,7 +44,7 @@ func main() {
}
defer utils.WarnOnError(spark.Stop, func(err error) {})

df, err := spark.Sql(ctx, "select id2 from range(100)")
df, err := spark.Sql(ctx, "select id from range(100)")
if err != nil {
log.Fatalf("Failed: %s", err)
}
Expand Down Expand Up @@ -111,13 +116,13 @@ func main() {

err = df.Writer().Mode("overwrite").
Format("parquet").
Save(ctx, "file:///tmp/spark-connect-write-example-output.parquet")
Save(ctx, *filedir)
if err != nil {
log.Fatalf("Failed: %s", err)
}

df, err = spark.Read().Format("parquet").
Load("file:///tmp/spark-connect-write-example-output.parquet")
Load(*filedir)
if err != nil {
log.Fatalf("Failed: %s", err)
}
Expand Down
4 changes: 3 additions & 1 deletion spark/sql/dataframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ type DataFrame interface {
Repartition(numPartitions int, columns []string) (DataFrame, error)
// RepartitionByRange re-partitions a data frame by range partition.
RepartitionByRange(numPartitions int, columns []RangePartitionColumn) (DataFrame, error)

// Filter filters the data frame by a column condition.
Filter(condition column.Column) (DataFrame, error)
// FilterByString filters the data frame by a string condition.
FilterByString(condition string) (DataFrame, error)
// Col returns a column by name.
Col(name string) (column.Column, error)
}

Expand Down

0 comments on commit 78decf6

Please sign in to comment.