Skip to content

Commit

Permalink
Added safe method for PID file creation
Browse files Browse the repository at this point in the history
  • Loading branch information
OliverWannenwetsch committed Oct 17, 2024
1 parent 0c24fa9 commit e01aa8b
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion providers/src/airflow/providers/edge/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ def _get_sysinfo() -> dict:
def _pid_file_path(pid_file: str | None) -> str:
return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[0]

def _write_pid_to_pidfile(pid_file_path):
"""Write PID file to disk."""
if pid_file_path.exists():
# Handle existing PID files on disk
logger.info("An existing PID file has been found: %s.", pid_file_path)
pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
if os.getpid() == pid_stored_in_pid_file:
# case 1: PID file writing was triggered before for this instance
logger.info("PID file belongs to this process. File not updated.")
return
else:
# case 2: PID file was written by dead / already running instance
logger.info("PID file does not belong to this process.")
if psutil.pid_exists(pid_stored_in_pid_file):
# case 2a: another instance uses the same path for its PID file
raise SystemExit(
f"The PID file {pid_file_path} contains the PID of another running process. "
"Configuration issue: edge worker instance must use different PID file paths!"
)
else:
# case 2b: previous instance crashed without cleaning up its PID file
logger.info("PID file is orphaned. Cleaning up.")
pid_file_path.unlink()
logger.info("PID file written to %s.", pid_file_path)
write_pid_to_pidfile(pid_file_path)


@dataclass
class _Job:
Expand Down Expand Up @@ -155,7 +181,7 @@ def start(self):
if "404:NOT FOUND" in str(e):
raise SystemExit("Error: API endpoint is not ready, please set [edge] api_enabled=True.")
raise SystemExit(str(e))
write_pid_to_pidfile(self.pid_file_path)
_write_pid_to_pidfile(self.pid_file_path)
signal.signal(signal.SIGINT, _EdgeWorkerCli.signal_handler)
try:
while not _EdgeWorkerCli.drain or self.jobs:
Expand Down

0 comments on commit e01aa8b

Please sign in to comment.