diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py b/providers/src/airflow/providers/edge/cli/edge_command.py index 09998ffe80281..214bcc0bbae9d 100644 --- a/providers/src/airflow/providers/edge/cli/edge_command.py +++ b/providers/src/airflow/providers/edge/cli/edge_command.py @@ -102,6 +102,32 @@ def _get_sysinfo() -> dict: def _pid_file_path(pid_file: str | None) -> str: return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[0] +def _write_pid_to_pidfile(pid_file_path): + """Write PID file to disk.""" + if pid_file_path.exists(): + # Handle existing PID files on disk + logger.info("An existing PID file has been found: %s.", pid_file_path) + pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path) + if os.getpid() == pid_stored_in_pid_file: + # case 1: PID file writing was triggered before for this instance + logger.info("PID file belongs to this process. File not updated.") + return + else: + # case 2: PID file was written by dead / already running instance + logger.info("PID file does not belong to this process.") + if psutil.pid_exists(pid_stored_in_pid_file): + # case 2a: another instance uses the same path for its PID file + raise SystemExit( + f"The PID file {pid_file_path} contains the PID of another running process. " + "Configuration issue: edge worker instance must use different PID file paths!" + ) + else: + # case 2b: previous instance crashed without cleaning up its PID file + logger.info("PID file is orphaned. Cleaning up.") + pid_file_path.unlink() + logger.info("PID file written to %s.", pid_file_path) + write_pid_to_pidfile(pid_file_path) + @dataclass class _Job: @@ -155,7 +181,7 @@ def start(self): if "404:NOT FOUND" in str(e): raise SystemExit("Error: API endpoint is not ready, please set [edge] api_enabled=True.") raise SystemExit(str(e)) - write_pid_to_pidfile(self.pid_file_path) + _write_pid_to_pidfile(self.pid_file_path) signal.signal(signal.SIGINT, _EdgeWorkerCli.signal_handler) try: while not _EdgeWorkerCli.drain or self.jobs: