Skip to content

Commit

Permalink
add e2e vstreamclient test
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <derek@nozzle.io>
  • Loading branch information
derekperkins committed Jan 10, 2025
1 parent ecb3720 commit f9d7402
Showing 1 changed file with 158 additions and 0 deletions.
158 changes: 158 additions & 0 deletions go/test/endtoend/vreplication/vstreamclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package vreplication

import (
"context"
"fmt"
"reflect"
"slices"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/vstreamclient"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// Customer is the concrete type that will be built from the stream
type Customer struct {
ID int64 `vstream:"customer_id"`
Email string `vstream:"email"`
DeletedAt time.Time `vstream:"-"`
}

// To run the tests, this currently expects the local example to be running
// ./101_initial_cluster.sh; mysql < ../common/insert_commerce_data.sql; ./201_customer_tablets.sh; ./202_move_tables.sh; ./203_switch_reads.sh; ./204_switch_writes.sh; ./205_clean_commerce.sh; ./301_customer_sharded.sh; ./302_new_shards.sh; ./303_reshard.sh; ./304_switch_reads.sh; ./305_switch_writes.sh; ./306_down_shard_0.sh; ./307_delete_shard_0.sh
func TestVStreamClient(t *testing.T) {
vc = NewVitessCluster(t, nil)
defer vc.TearDown()

require.NotNil(t, vc)
defaultReplicas = 2
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)
insertInitialData(t)

ctx := context.Background()
conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer conn.Close()

flushCount := 0
gotCustomers := make([]*Customer, 0)

tables := []vstreamclient.TableConfig{{
Keyspace: "customer",
Table: "customer",
MaxRowsPerFlush: 7,
DataType: &Customer{},
FlushFn: func(ctx context.Context, rows []vstreamclient.Row, meta vstreamclient.FlushMeta) error {
flushCount++

fmt.Printf("upserting %d customers\n", len(rows))
for i, row := range rows {
switch {
// delete event
case row.RowChange.After == nil:
customer := row.Data.(*Customer)
customer.DeletedAt = time.Now()

gotCustomers = append(gotCustomers, customer)
fmt.Printf("deleting customer %d: %v\n", i, row)

// insert event
case row.RowChange.Before == nil:
gotCustomers = append(gotCustomers, row.Data.(*Customer))
fmt.Printf("inserting customer %d: %v\n", i, row)

// update event
case row.RowChange.Before != nil:
gotCustomers = append(gotCustomers, row.Data.(*Customer))
fmt.Printf("updating customer %d: %v\n", i, row)
}
}

// a real implementation would do something more meaningful here. For a data warehouse type workload,
// it would probably look like streaming rows into the data warehouse, or for more complex versions,
// write newline delimited json or a parquet file to object storage, then trigger a load job.
return nil
},
}}

t.Run("first vstream run, should succeed", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

vstreamClient, err := vstreamclient.New(ctx, "bob", conn, tables,
vstreamclient.WithMinFlushDuration(500*time.Millisecond),
vstreamclient.WithHeartbeatSeconds(1),
vstreamclient.WithStateTable("commerce", "vstreams"),
vstreamclient.WithEventFunc(func(ctx context.Context, ev *binlogdatapb.VEvent) error {
fmt.Printf("** FIELD EVENT: %v\n", ev)
return nil
}, binlogdatapb.VEventType_FIELD),
)
if err != nil {
t.Fatalf("failed to create VStreamClient: %v", err)
}

err = vstreamClient.Run(ctx)
if err != nil && ctx.Err() == nil {
t.Fatalf("failed to run vstreamclient: %v", err)
}

slices.SortFunc(gotCustomers, func(a, b *Customer) int {
return int(a.ID - b.ID)
})

wantCustomers := []*Customer{
{ID: 1, Email: "alice@domain.com"},
{ID: 2, Email: "bob@domain.com"},
{ID: 3, Email: "charlie@domain.com"},
{ID: 4, Email: "dan@domain.com"},
{ID: 5, Email: "eve@domain.com"},
}

fmt.Printf("got %d customers | flushed %d times\n", len(gotCustomers), flushCount)
if !reflect.DeepEqual(gotCustomers, wantCustomers) {
t.Fatalf("got %d customers, want %d", len(gotCustomers), len(wantCustomers))
}
})

// this should fail because we're going to restart the stream, but with an additional table
t.Run("second vstream run, should fail", func(t *testing.T) {
withAdditionalTable := append(tables, vstreamclient.TableConfig{
Keyspace: "customer",
Table: "corder",
DataType: &Customer{},
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_, err := vstreamclient.New(ctx, "bob", conn, withAdditionalTable,
vstreamclient.WithStateTable("commerce", "vstreams"),
)
if err == nil {
t.Fatalf("expected VStreamClient error, got nil")
} else if err.Error() != "vstreamclient: provided tables do not match stored tables" {
t.Fatalf("expected error 'vstreamclient: provided tables do not match stored tables', got '%v'", err)
}
})
}

func getConn(t *testing.T, ctx context.Context) *vtgateconn.VTGateConn {
t.Helper()
conn, err := vtgateconn.Dial(ctx, "localhost:15991")
if err != nil {
t.Fatal(err)
}
return conn
}

0 comments on commit f9d7402

Please sign in to comment.