Skip to content

Commit

Permalink
[SPARK-48951] Adding column and functions packages
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This patch provides additional base capabilities that are needed to parallelize development more by adding very skeleton behavior for the `Column` abstraction in Spark.

This allows the users to use the following APIs:

```
df, _ := spark.Sql("select * from range(100)")
col, _ := df.Col("id")
df, _ := df.Filter(col.Gt(functions.Lit(50))
df.Show(ctx, 100, false)
```

### Why are the changes needed?
Compatibility

### Does this PR introduce _any_ user-facing change?
Adds the necessary public API for `Column` and `functions`.

### How was this patch tested?
Added new tests.

Closes #35 from grundprinzip/plans_and_exprs.

Authored-by: Martin Grund <martin.grund@databricks.com>
Signed-off-by: Martin Grund <martin.grund@databricks.com>
  • Loading branch information
grundprinzip committed Jul 19, 2024
1 parent 3123425 commit 53797bd
Show file tree
Hide file tree
Showing 17 changed files with 735 additions and 37 deletions.
31 changes: 30 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,33 @@ This requires the following tools to be present in your PATH:

1. Java for checking license headers
2. `gofumpt` for formatting Go code
3. `golangci-lint` for linting Go code
3. `golangci-lint` for linting Go code


### How to write tests

Please make sure that you have proper testing for the new code your adding. As part of the
code base we started to add mocks that allow you to simulate a lot of the necessary API
and don't require a running Spark instance.

`mock.ProtoClient` is a mock implementation of the `SparkConnectService_ExecutePlanClient`
interface which is the server-side stream of messages coming as a response from the server.

`testutils.NewConnectServiceClientMock` will create a mock client that implements the
`SparkConnectServiceClient` interface.

The combination of these two mocks allows you to test the client side of the code without
having to connect to Spark.

### What to contribute

We welcome contributions of all kinds to the `spark-connect-go` project. Some examples of
contributions are providing implementations of functionality that is missing in the Go
implementation. Some examples are, but are not limited to:

* Adding an existing feature of the DataFrame API in Golang.
* Adding support for a builtin function in the Spark API in Golang.
* Improving error handling in the client.

If you are unsure about whether a contribution is a good fit, feel free to open an issue
in the Apache Spark Jira.
36 changes: 35 additions & 1 deletion cmd/spark-connect-example-spark-session/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"flag"
"log"

"github.com/apache/spark-connect-go/v35/spark/sql/functions"

"github.com/apache/spark-connect-go/v35/spark/sql"
"github.com/apache/spark-connect-go/v35/spark/sql/utils"
)
Expand All @@ -37,7 +39,39 @@ func main() {
}
defer utils.WarnOnError(spark.Stop, func(err error) {})

df, err := spark.Sql(ctx, "select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
//df, err := spark.Sql(ctx, "select * from range(100)")
//if err != nil {
// log.Fatalf("Failed: %s", err)
//}
//
//df, _ = df.FilterByString("id < 10")
//err = df.Show(ctx, 100, false)
//if err != nil {
// log.Fatalf("Failed: %s", err)
//}
//
//df, err = spark.Sql(ctx, "select * from range(100)")
//if err != nil {
// log.Fatalf("Failed: %s", err)
//}
//
//df, _ = df.Filter(functions.Col("id").Lt(functions.Expr("10")))
//err = df.Show(ctx, 100, false)
//if err != nil {
// log.Fatalf("Failed: %s", err)
//}

df, _ := spark.Sql(ctx, "select * from range(100)")
df, err = df.Filter(functions.Col("id").Lt(functions.Lit(20)))
if err != nil {
log.Fatalf("Failed: %s", err)
}
err = df.Show(ctx, 100, false)
if err != nil {
log.Fatalf("Failed: %s", err)
}

df, err = spark.Sql(ctx, "select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
if err != nil {
log.Fatalf("Failed: %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
In your Go project `go.mod` file, add `spark-connect-go` library:
```
require (
github.com/apache/spark-connect-go/v1 master
github.com/apache/spark-connect-go/v35 master
)
```

Expand Down Expand Up @@ -113,9 +113,9 @@ func main() {

## Start Spark Connect Server (Driver)

Download a Spark distribution (3.4.0+), unzip the folder, run command:
Download a Spark distribution (3.5.0+), unzip the folder, run command:
```
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0
```

## Run Spark Connect Client Application
Expand Down
5 changes: 1 addition & 4 deletions spark/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ func (c *ExecutePlanClient) ToTable() (*types.StructType, arrow.Table, error) {
case *proto.ExecutePlanResponse_ResultComplete_:
c.done = true
default:
fmt.Printf("Received unsupported response ")
//return nil, nil, &sparkerrors.UnsupportedResponseTypeError{
// ResponseType: x,
//}
// Explicitly ignore messages that we cannot process at the moment.
}
}

Expand Down
1 change: 1 addition & 0 deletions spark/client/testutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type connectServiceClient struct {

func (c *connectServiceClient) ExecutePlan(ctx context.Context, in *proto.ExecutePlanRequest, opts ...grpc.CallOption) (proto.SparkConnectService_ExecutePlanClient, error) {
if c.expectedExecutePlanRequest != nil {
// Check that the plans in both requests are identical
assert.Equal(c.t, c.expectedExecutePlanRequest, in)
}
return c.executePlanClient, c.err
Expand Down
1 change: 1 addition & 0 deletions spark/sparkerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
ReadError = errorType(errors.New("read error"))
ExecutionError = errorType(errors.New("execution error"))
InvalidInputError = errorType(errors.New("invalid input"))
InvalidPlanError = errorType(errors.New("invalid plan"))
)

type UnsupportedResponseTypeError struct {
Expand Down
65 changes: 65 additions & 0 deletions spark/sql/column/column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package column

import proto "github.com/apache/spark-connect-go/v35/internal/generated"

type Column struct {
expr Expression
}

func (c *Column) ToPlan() (*proto.Expression, error) {
return c.expr.ToPlan()
}

func (c Column) Lt(other Column) Column {
return NewColumn(NewUnresolvedFunction("<", []Expression{c.expr, other.expr}, false))
}

func (c Column) Le(other Column) Column {
return NewColumn(NewUnresolvedFunction("<=", []Expression{c.expr, other.expr}, false))
}

func (c Column) Gt(other Column) Column {
return NewColumn(NewUnresolvedFunction(">", []Expression{c.expr, other.expr}, false))
}

func (c Column) Ge(other Column) Column {
return NewColumn(NewUnresolvedFunction(">=", []Expression{c.expr, other.expr}, false))
}

func (c Column) Eq(other Column) Column {
return NewColumn(NewUnresolvedFunction("==", []Expression{c.expr, other.expr}, false))
}

func (c Column) Neq(other Column) Column {
cmp := NewUnresolvedFunction("==", []Expression{c.expr, other.expr}, false)
return NewColumn(NewUnresolvedFunction("not", []Expression{cmp}, false))
}

func (c Column) Mul(other Column) Column {
return NewColumn(NewUnresolvedFunction("*", []Expression{c.expr, other.expr}, false))
}

func (c Column) Div(other Column) Column {
return NewColumn(NewUnresolvedFunction("/", []Expression{c.expr, other.expr}, false))
}

func NewColumn(expr Expression) Column {
return Column{
expr: expr,
}
}
83 changes: 83 additions & 0 deletions spark/sql/column/column_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package column

import (
"testing"

proto "github.com/apache/spark-connect-go/v35/internal/generated"
"github.com/stretchr/testify/assert"
)

func TestColumnFunctions(t *testing.T) {
col1 := NewColumn(NewColumnReference("col1"))
col2 := NewColumn(NewColumnReference("col2"))

tests := []struct {
name string
arg Column
want *proto.Expression
}{
{
name: "TestNewUnresolvedFunction",
arg: NewColumn(NewUnresolvedFunction("id", nil, false)),
want: &proto.Expression{
ExprType: &proto.Expression_UnresolvedFunction_{
UnresolvedFunction: &proto.Expression_UnresolvedFunction{
FunctionName: "id",
IsDistinct: false,
},
},
},
},
{
name: "TestComparison",
arg: col1.Lt(col2),
want: &proto.Expression{
ExprType: &proto.Expression_UnresolvedFunction_{
UnresolvedFunction: &proto.Expression_UnresolvedFunction{
FunctionName: "<",
IsDistinct: false,
Arguments: []*proto.Expression{
{
ExprType: &proto.Expression_UnresolvedAttribute_{
UnresolvedAttribute: &proto.Expression_UnresolvedAttribute{
UnparsedIdentifier: "col1",
},
},
},
{
ExprType: &proto.Expression_UnresolvedAttribute_{
UnresolvedAttribute: &proto.Expression_UnresolvedAttribute{
UnparsedIdentifier: "col2",
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.arg.ToPlan()
assert.NoError(t, err)
expected := tt.want
assert.Equalf(t, expected, got, "Input: %v", tt.arg.expr.DebugString())
})
}
}
Loading

0 comments on commit 53797bd

Please sign in to comment.