Skip to content

Commit

Permalink
[Fetch Migration] Suppress exit code for graceful termination (SIGTER…
Browse files Browse the repository at this point in the history
…M) (opensearch-project#478)

The orchestrator now suppresses exit code 143 if it is returned by the migration monitor, since this is a result of a workaround for opensearch-project/data-prepper#3141 .
Unit tests have been added to cover this scenario. This change also ensures that a return_code is always returned by the function, and updates some stale comments.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg authored Jan 23, 2024
1 parent bf48fad commit f7b10a9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
9 changes: 7 additions & 2 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def write_inline_target_host(pipeline_file_path: str, inline_target_host: str):
yaml.safe_dump(pipeline_yaml, pipeline_file)


def run(params: FetchOrchestratorParams) -> Optional[int]:
def run(params: FetchOrchestratorParams) -> int:
# This is expected to be a base64 encoded string
inline_pipeline = __get_env_string("INLINE_PIPELINE")
inline_target_host = __get_env_string("INLINE_TARGET_HOST")
Expand All @@ -93,6 +93,7 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
report=True, dryrun=params.is_dry_run)
logging.info("Running metadata migration...\n")
metadata_migration_result = metadata_migration.run(metadata_migration_params)
return_code: int = 0
if metadata_migration_result.target_doc_count == 0:
logging.warning("Target document count is zero, skipping data migration...")
elif len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration():
Expand All @@ -103,8 +104,12 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count,
params.get_local_endpoint())
logging.info("Starting migration monitor...\n")
return migration_monitor.run(migration_monitor_params, proc)
return_code = migration_monitor.run(migration_monitor_params, proc)
# Suppress non-zero return code for graceful termination (SIGTERM)
if return_code == 143:
return_code = 0
logging.info("Fetch Migration workflow concluded\n")
return return_code


if __name__ == '__main__': # pragma no cover
Expand Down
52 changes: 45 additions & 7 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def tearDown(self) -> None:
def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file")
# Setup mock pre-migration
# Setup mock metadata migration
expected_metadata_migration_input = \
MetadataMigrationParams(fetch_params.pipeline_file_path,
fetch_params.data_prepper_path + "/pipelines/pipeline.yaml",
Expand All @@ -63,13 +63,15 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
@mock.patch.dict(os.environ, {}, clear=True)
def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
# Setup empty result from pre-migration
# Setup empty result from metadata migration
mock_metadata_migration.return_value = MetadataMigrationResult()
orchestrator.run(FetchOrchestratorParams("test", "test"))
result = orchestrator.run(FetchOrchestratorParams("test", "test"))
mock_metadata_migration.assert_called_once_with(ANY)
# Subsequent steps should not be called
mock_subprocess.assert_not_called()
mock_monitor.assert_not_called()
# Expect successful exit code
self.assertEqual(0, result)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
Expand All @@ -79,18 +81,20 @@ def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, moc
def test_orchestrator_run_create_only(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file", create_only=True)
# Setup mock pre-migration
# Setup mock metadata migration
expected_metadata_migration_input = \
MetadataMigrationParams(fetch_params.pipeline_file_path,
fetch_params.data_prepper_path + "/pipelines/pipeline.yaml",
report=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
mock_metadata_migration.return_value = test_result
# Run test
orchestrator.run(fetch_params)
result = orchestrator.run(fetch_params)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
mock_subprocess.assert_not_called()
mock_monitor.assert_not_called()
# Expect successful exit code
self.assertEqual(0, result)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
Expand All @@ -100,18 +104,52 @@ def test_orchestrator_run_create_only(self, mock_metadata_migration: MagicMock,
def test_orchestrator_dryrun(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
fetch_params = FetchOrchestratorParams("test_dp_path", "test_pipeline_file", dryrun=True)
# Setup mock pre-migration
# Setup mock metadata migration
expected_metadata_migration_input = \
MetadataMigrationParams(fetch_params.pipeline_file_path,
fetch_params.data_prepper_path + "/pipelines/pipeline.yaml",
report=True, dryrun=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
mock_metadata_migration.return_value = test_result
# Run test
orchestrator.run(fetch_params)
result = orchestrator.run(fetch_params)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
mock_subprocess.assert_not_called()
mock_monitor.assert_not_called()
# Expect successful exit code
self.assertEqual(0, result)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
@mock.patch.dict(os.environ, {}, clear=True)
def test_orchestrator_suppressed_exit_code(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
# Setup mock metadata migration
test_result = MetadataMigrationResult(10, {"index1", "index2"})
mock_metadata_migration.return_value = test_result
# Set up graceful termination exit code
mock_monitor.return_value = 143
result = orchestrator.run(FetchOrchestratorParams("test_dp_path", "test_pipeline_file"))
# Expect non-zero exit code to be suppressed
self.assertEqual(0, result)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
@mock.patch.dict(os.environ, {}, clear=True)
def test_orchestrator_nonzero_exit_code(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
# Setup mock metadata migration
test_result = MetadataMigrationResult(10, {"index1", "index2"})
mock_metadata_migration.return_value = test_result
sigkill_exit_code: int = 137
mock_monitor.return_value = sigkill_exit_code
result = orchestrator.run(FetchOrchestratorParams("test_dp_path", "test_pipeline_file"))
# Expect non-zero exit code to be returned
self.assertEqual(sigkill_exit_code, result)

@patch('fetch_orchestrator.write_inline_target_host')
@patch('metadata_migration.run')
Expand Down

0 comments on commit f7b10a9

Please sign in to comment.