Skip to content

Commit

Permalink
take a swag at implementing OLAP too
Browse files Browse the repository at this point in the history
  • Loading branch information
demmer committed Nov 2, 2023
1 parent 2b47e58 commit cfc9043
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
26 changes: 9 additions & 17 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,10 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
}
}()

/*
XXX/demmer figure out OLAP
if session.Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
}
*/
if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
}

result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))

Expand Down Expand Up @@ -311,18 +307,14 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
}
}()

/*
XXX/demmer figure out OLAP
if session.Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
}
*/
if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
}

qr, err := ph.proxy.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars)
if err != nil {
err = mysql.NewSQLErrorFromError(err)
return err
return mysql.NewSQLErrorFromError(err)
}
fillInTxStatusFlags(c, session)

Expand Down
19 changes: 18 additions & 1 deletion go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package vtgateproxy
import (
"context"
"flag"
"io"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -104,7 +105,23 @@ func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGat
}

func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "not implemented")
stream, err := session.StreamExecute(ctx, sql, bindVariables)
if err != nil {
return err
}

for {
qr, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
callback(qr)
}

return nil
}

func Init() error {
Expand Down

0 comments on commit cfc9043

Please sign in to comment.