Skip to content

Commit

Permalink
pkg/loader: Fix unstable unit tests (pingcap#818)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored and IANTHEREAL committed Nov 20, 2019
1 parent f59b37a commit d5169d3
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions pkg/loader/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (s *txnManagerSuite) TestRunTxnManager(c *check.C) {
output := txnManager.run()
// send 5 txns (size 3) to txnManager, the 5th txn should get blocked at cond.Wait()
for i := 0; i < 5; i++ {
inputTxnInTime(c, input, txn, 10*time.Microsecond)
inputTxnInTime(c, input, txn, time.Millisecond)
}
c.Assert(output, check.HasLen, 4)
// Next txn should be blocked
Expand Down Expand Up @@ -494,10 +494,10 @@ func (s *txnManagerSuite) TestCloseLoaderInput(c *check.C) {
txnManager := newTxnManager(1, input)
output := txnManager.run()

inputTxnInTime(c, input, txn, 50*time.Microsecond)
inputTxnInTime(c, input, txn, time.Millisecond)
close(input)

t := outputTxnInTime(c, output, 10*time.Microsecond)
t := outputTxnInTime(c, output, time.Millisecond)
txnManager.pop(t)
c.Assert(t, check.DeepEquals, txn)

Expand Down Expand Up @@ -565,13 +565,13 @@ func (s *runSuite) TestShouldExecuteAllPendingDMLsOnClose(c *check.C) {
}

func (s *runSuite) TestShouldFlushWhenInputIsEmpty(c *check.C) {
var executed []*DML
executed := make(chan []*DML, 2)
origF := fNewBatchManager
fNewBatchManager = func(s *loaderImpl) *batchManager {
return &batchManager{
limit: 1024,
fExecDMLs: func(dmls []*DML) error {
executed = dmls
executed <- dmls
return nil
},
fDMLsSuccessCallback: func(txns ...*Txn) {},
Expand All @@ -597,17 +597,20 @@ func (s *runSuite) TestShouldFlushWhenInputIsEmpty(c *check.C) {
loader.input <- &Txn{DMLs: dmls}
}

for i := 0; i < 3; i++ {
addTxn(i)
}
// Wait 10ms for loader to exhaust the input channel
time.Sleep(10 * time.Millisecond)
c.Assert(executed, check.HasLen, 3)
for i := 0; i < 7; i++ {
addTxn(i)
assertExecuted := func(n int) {
for i := 0; i < n; i++ {
addTxn(i)
}
select {
case dmls := <-executed:
c.Assert(dmls, check.HasLen, n)
case <-time.After(time.Second):
c.Fatal("Timeout waiting to be executed.")
}
}
time.Sleep(10 * time.Millisecond)
c.Assert(executed, check.HasLen, 7)

assertExecuted(3)
assertExecuted(7)
}

type markSuccessesSuite struct{}
Expand Down

0 comments on commit d5169d3

Please sign in to comment.