diff --git a/tools/mig-test/datastream/run_migration_test.rb b/tools/mig-test/datastream/run_migration_test.rb index 61ae868ce0..a02b79b509 100644 --- a/tools/mig-test/datastream/run_migration_test.rb +++ b/tools/mig-test/datastream/run_migration_test.rb @@ -19,6 +19,7 @@ require 'pathname' require 'securerandom' +WAITING_SECONDS = 20 FLINK_HOME = ENV['FLINK_HOME'] throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil? FLINK_HOME = Pathname.new(FLINK_HOME).realpath @@ -58,7 +59,7 @@ def test_migration_chore(from_version, to_version) random_string_1 = SecureRandom.hex(8) put_mystery_data random_string_1 - sleep 5 + sleep WAITING_SECONDS ensure_mystery_data random_string_1 puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}` @@ -69,7 +70,7 @@ def test_migration_chore(from_version, to_version) puts "Submitted job at #{to_version} as #{new_job_id}" random_string_2 = SecureRandom.hex(8) put_mystery_data random_string_2 - sleep 10 + sleep WAITING_SECONDS ensure_mystery_data random_string_2 puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}` true @@ -130,6 +131,7 @@ def test_migration(from_version, to_version) rescue LoadError puts 'Test summary: ', printable_result end +puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`" if @failures.filter { |_, new_version| new_version == version_list.last }.any? abort 'Some migration to snapshot version tests failed.' diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb index d3f8788035..5bb31cf308 100644 --- a/tools/mig-test/run_migration_test.rb +++ b/tools/mig-test/run_migration_test.rb @@ -19,6 +19,7 @@ require 'pathname' require 'securerandom' +WAITING_SECONDS = 20 FLINK_HOME = ENV['FLINK_HOME'] throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil? FLINK_HOME = Pathname.new(FLINK_HOME).realpath @@ -70,7 +71,7 @@ def test_migration_chore(from_version, to_version) # Verify if data sync works random_string_1 = SecureRandom.hex(8) put_mystery_data random_string_1 - sleep 5 + sleep WAITING_SECONDS ensure_mystery_data random_string_1 # Stop current job and create a savepoint @@ -90,7 +91,7 @@ def test_migration_chore(from_version, to_version) puts "Submitted job at #{to_version} as #{new_job_id}" random_string_2 = SecureRandom.hex(8) put_mystery_data random_string_2 - sleep 10 + sleep WAITING_SECONDS ensure_mystery_data random_string_2 puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}` true