Skip to content

Commit 058b06d

Browse files
committed
chore: add bidirectional streaming API to SpannerLib
Adds support for using a bidirectional gRPC stream for sending requests and receiving responses from SpannerLib. Also adds support for this to the .NET wrapper and modifies all tests to run using both the previous server-streaming API and the new bidirectional streaming API. The latter is the default for the .NET wrapper, and support for the server-streaming API might be removed in a future version.
1 parent 1da9a1c commit 058b06d

32 files changed

+8540
-870
lines changed

.github/workflows/spanner-lib-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ jobs:
145145
working-directory: spannerlib/wrappers/spannerlib-dotnet/spannerlib-dotnet-tests
146146
run: |
147147
dotnet --version
148+
if [ "$RUNNER_OS" == "Linux" ]; then
149+
export SKIP_SHARED_LIB_TESTS=true
150+
fi
148151
# Retry the tests 3 times before the step actually fails.
149152
# We do this because the tests are executed using both the internal gRPC server and using a shared
150153
# native library. The latter is not really supported due to an incompatibility between the .NET and Go

spannerlib/api/rows.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NextResultSet(ctx context.Context, poolId, connId, rowsId int64) (*spannerp
7070
// as it allows the library to re-use the encoding buffer.
7171
// TODO: Add an encoder function as input argument, instead of hardcoding protobuf encoding here.
7272
func NextEncoded(ctx context.Context, poolId, connId, rowsId int64) ([]byte, error) {
73-
_, bytes, err := next(ctx, poolId, connId, rowsId, true)
73+
_, bytes, err := next(ctx, poolId, connId, rowsId /*marshalResult=*/, true /*resetBuffer=*/, false)
7474
if err != nil {
7575
return nil, err
7676
}
@@ -79,7 +79,20 @@ func NextEncoded(ctx context.Context, poolId, connId, rowsId int64) ([]byte, err
7979

8080
// Next returns the next row as a protobuf ListValue.
8181
func Next(ctx context.Context, poolId, connId, rowsId int64) (*structpb.ListValue, error) {
82-
values, _, err := next(ctx, poolId, connId, rowsId, false)
82+
return nextWithBufferOption(ctx, poolId, connId, rowsId /*resetBuffer=*/, true)
83+
}
84+
85+
// NextBuffered returns the next row as a protobuf ListValue.
86+
// The same buffer is used to construct the ListValue for each call. This means that this function is only
87+
// safe to call if the result that is returned is copied into a different data structure before the next call
88+
// to this function. Safe use of this function for example includes calls that request the next row and then
89+
// serialize this result into a byte slice or send it as a gRPC message (which also serializes the result).
90+
func NextBuffered(ctx context.Context, poolId, connId, rowsId int64) (*structpb.ListValue, error) {
91+
return nextWithBufferOption(ctx, poolId, connId, rowsId /*resetBuffer=*/, false)
92+
}
93+
94+
func nextWithBufferOption(ctx context.Context, poolId, connId, rowsId int64, resetBuffer bool) (*structpb.ListValue, error) {
95+
values, _, err := next(ctx, poolId, connId, rowsId /*marshalResult=*/, false, resetBuffer)
8396
if err != nil {
8497
return nil, err
8598
}
@@ -90,7 +103,7 @@ func Next(ctx context.Context, poolId, connId, rowsId int64) (*structpb.ListValu
90103
// The row is returned as a protobuf ListValue if marshalResult==false.
91104
// The row is returned as a byte slice if marshalResult==true.
92105
// TODO: Add generics to the function and add input arguments for encoding instead of hardcoding it.
93-
func next(ctx context.Context, poolId, connId, rowsId int64, marshalResult bool) (*structpb.ListValue, []byte, error) {
106+
func next(ctx context.Context, poolId, connId, rowsId int64, marshalResult, resetBuffer bool) (*structpb.ListValue, []byte, error) {
94107
rows, err := findRows(poolId, connId, rowsId)
95108
if err != nil {
96109
return nil, nil, err
@@ -99,6 +112,9 @@ func next(ctx context.Context, poolId, connId, rowsId int64, marshalResult bool)
99112
if err != nil {
100113
return nil, nil, err
101114
}
115+
if resetBuffer {
116+
rows.buffer = nil
117+
}
102118
if !marshalResult || values == nil {
103119
return values, nil, nil
104120
}

0 commit comments

Comments
 (0)