Skip to content

Commit

Permalink
[SPARK-43429][CONNECT] Deflake SparkSessionSuite
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR tries to fix flakiness in the `SparkSessionSuite.active session in multiple threads` test. There was a chance that modification could happen before the other thread could check the state. This PR decouples modifcations from checks.

### Why are the changes needed?
Flaky tests are no bueno.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
It is a test.

Closes apache#42406 from hvanhovell/SPARK-43429-deflake.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
hvanhovell authored and HyukjinKwon committed Aug 9, 2023
1 parent 0b757d3 commit 27c5a1f
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite {

try {
val script1 = execute { phaser =>
// Step 0 - check initial state
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.contains(session1))
assert(SparkSession.getActiveSession.contains(session2))

// Step 1 - new active session in script 2
phaser.arriveAndAwaitAdvance()

// Step2 - script 1 is unchanged, script 2 has new active session
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.contains(session1))
assert(SparkSession.getActiveSession.contains(session2))

// Step 3 - close session 1, no more default session in both scripts
phaser.arriveAndAwaitAdvance()
session1.close()

// Step 4 - no default session, same active session.
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.isEmpty)
assert(SparkSession.getActiveSession.contains(session2))

// Step 5 - clear active session in script 1
phaser.arriveAndAwaitAdvance()
SparkSession.clearActiveSession()

// Step 6 - no default/no active session in script 1, script2 unchanged.
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.isEmpty)
assert(SparkSession.getActiveSession.isEmpty)

// Step 7 - close active session in script2
phaser.arriveAndAwaitAdvance()
}
val script2 = execute { phaser =>
// Step 0 - check initial state
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.contains(session1))
assert(SparkSession.getActiveSession.contains(session2))

// Step 1 - new active session in script 2
phaser.arriveAndAwaitAdvance()
SparkSession.clearActiveSession()
val internalSession = SparkSession.builder().remote(connectionString3).getOrCreate()

// Step2 - script 1 is unchanged, script 2 has new active session
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.contains(session1))
assert(SparkSession.getActiveSession.contains(internalSession))

// Step 3 - close session 1, no more default session in both scripts
phaser.arriveAndAwaitAdvance()

// Step 4 - no default session, same active session.
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.isEmpty)
assert(SparkSession.getActiveSession.contains(internalSession))

// Step 5 - clear active session in script 1
phaser.arriveAndAwaitAdvance()

// Step 6 - no default/no active session in script 1, script2 unchanged.
phaser.arriveAndAwaitAdvance()
assert(SparkSession.getDefaultSession.isEmpty)
assert(SparkSession.getActiveSession.contains(internalSession))

// Step 7 - close active session in script2
phaser.arriveAndAwaitAdvance()
internalSession.close()
assert(SparkSession.getActiveSession.isEmpty)
}
Expand Down

0 comments on commit 27c5a1f

Please sign in to comment.