Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/common/helpers/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import shutil
import signal
import tempfile
import threading
import time
import traceback
from multiprocessing import Process, Manager, Pipe, TimeoutError
from multiprocessing.pool import ThreadPool
from ansible.executor.process.worker import WorkerProcess

from psutil import wait_procs

Expand All @@ -17,6 +19,50 @@
logger = logging.getLogger(__name__)


def patch_ansible_worker_process():
"""Patch AnsibleWorkerProcess to avoid logging deadlock after fork."""

def start(self):
self._save_stdin()
try:
return super(WorkerProcess, self).start()
finally:
self._new_stdin.close()

WorkerProcess.start = start


# NOTE: https://github.com/google/python-atfork/blob/main/atfork/stdlib_fixer.py
# This is to avoid any deadlock issues with logging module after fork.
_forked_handlers = set()
_forked_handlers_lock = threading.Lock()
os.register_at_fork(before=logging._acquireLock,
after_in_parent=logging._releaseLock,
after_in_child=logging._releaseLock)


def fix_logging_handler_fork_lock():
"""Prevent logging handlers from deadlocking after fork."""
# Collect all loggers including root
loggers = [logging.getLogger()] + list(logging.Logger.manager.loggerDict.values())
handlers = set()
for logger in loggers:
if hasattr(logger, 'handlers'):
handlers.update(logger.handlers)
for handler in handlers:
new_handlers = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_handlers list is created inside the lock but only used outside - move the new_handlers = [] declaration outside the for handler in handlers: loop to avoid unnecessary recreations

Suggested change
new_handlers = []
new_handlers = []
for handler in handlers:
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/common/helpers/parallel.py
Line: 53:53

Comment:
`new_handlers` list is created inside the lock but only used outside - move the `new_handlers = []` declaration outside the `for handler in handlers:` loop to avoid unnecessary recreations

```suggestion
    new_handlers = []
    for handler in handlers:
```

How can I resolve this? If you propose a fix, please make it concise.

with _forked_handlers_lock:
if handler not in _forked_handlers and handler.lock is not None:
os.register_at_fork(before=handler.lock.acquire,
after_in_parent=handler.lock.release,
after_in_child=handler.lock.release)
new_handlers.append(handler)
_forked_handlers.add(handler)

if new_handlers:
logging.debug("Add handler %s to forked handlers list", new_handlers)


class SonicProcess(Process):
"""
Wrapper class around multiprocessing.Process that would capture the exception thrown if the Process throws
Expand Down Expand Up @@ -136,6 +182,10 @@ def force_terminate(workers, init_result):
) if timeout else None
failed_processes = {}

# Before spawning the child process, ensure current thread is
# holding the logging handler locks to avoid deadlock in child process.
fix_logging_handler_fork_lock()

while tasks_done < total_tasks:
# If execution time of processes exceeds timeout, need to force
# terminate them all.
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
from tests.common.utilities import InterruptableThread
from tests.common.plugins.ptfadapter.dummy_testutils import DummyTestUtils
from tests.common.helpers.multi_thread_utils import SafeThreadPoolExecutor
from tests.common.helpers.parallel import patch_ansible_worker_process
from tests.common.helpers.parallel import fix_logging_handler_fork_lock

import tests.common.gnmi_setup as gnmi_setup

Expand Down Expand Up @@ -113,6 +115,10 @@
'tests.common.fixtures.duthost_utils')


patch_ansible_worker_process()
fix_logging_handler_fork_lock()


def pytest_addoption(parser):
parser.addoption("--testbed", action="store", default=None, help="testbed name")
parser.addoption("--testbed_file", action="store", default=None, help="testbed file name")
Expand Down