diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f4691a679496d..bdaac52c7cc83 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -945,6 +945,36 @@ logging: type: string example: ~ default: "8794" + worker_log_server_ssl_cert: + description: | + Path to SSL certificate file for worker log server. + When both worker_log_server_ssl_cert and worker_log_server_ssl_key are provided, + the worker log server will use HTTPS instead of HTTP for secure log transmission. + This applies to both worker and triggerer log servers. + version_added: 3.0.0 + type: string + example: "/path/to/cert.pem" + default: "" + worker_log_server_ssl_key: + description: | + Path to SSL private key file for worker log server. + When both worker_log_server_ssl_cert and worker_log_server_ssl_key are provided, + the worker log server will use HTTPS instead of HTTP for secure log transmission. + This applies to both worker and triggerer log servers. + version_added: 3.0.0 + type: string + example: "/path/to/key.pem" + default: "" + worker_log_server_ssl_verify: + description: | + Whether to verify SSL certificate when fetching logs from workers and triggerers. + Can be set to "True" to use system CA certificates, "False" to disable verification + (useful for self-signed certificates in development, not recommended for production), + or a path to a CA bundle file for custom certificate authorities. + version_added: 3.0.0 + type: string + example: "True" + default: "True" interleave_timestamp_parser: description: | We must parse timestamps to interleave logs between trigger and task. To do so, diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index c5798c1e49599..fdf93bd2284c7 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -104,10 +104,22 @@ def _fetch_logs_from_service(url, log_relative_path): expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), audience="task-instance-logs", ) + + # SSL verification configuration + ssl_verify_conf = conf.get("logging", "worker_log_server_ssl_verify", fallback="True") + if ssl_verify_conf.lower() in ("true", "1", "yes"): + ssl_verify = True + elif ssl_verify_conf.lower() in ("false", "0", "no"): + ssl_verify = False + else: + # Path to CA bundle file + ssl_verify = ssl_verify_conf + response = requests.get( url, timeout=timeout, headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, + verify=ssl_verify, ) response.encoding = "utf-8" return response @@ -451,9 +463,15 @@ def _get_log_retrieval_url( hostname = ti.hostname config_key = "worker_log_server_port" config_default = 8793 + + # Determine protocol based on SSL configuration + ssl_cert = conf.get("logging", "worker_log_server_ssl_cert", fallback="") + ssl_key = conf.get("logging", "worker_log_server_ssl_key", fallback="") + protocol = "https" if (ssl_cert and ssl_key) else "http" + return ( urljoin( - f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", + f"{protocol}://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", log_relative_path, ), log_relative_path, diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py index 31ef86600da79..2bbaab750c17a 100644 --- a/airflow/utils/serve_logs.py +++ b/airflow/utils/serve_logs.py @@ -180,6 +180,29 @@ def serve_logs(port=None): bind_option = GunicornOption("bind", f"0.0.0.0:{port}") options = [bind_option, GunicornOption("workers", 2)] + + # Configure SSL if certificates are provided + ssl_cert = conf.get("logging", "worker_log_server_ssl_cert", fallback="") + ssl_key = conf.get("logging", "worker_log_server_ssl_key", fallback="") + + if ssl_cert and ssl_key: + if not os.path.isfile(ssl_cert): + raise ValueError(f"SSL certificate file does not exist: {ssl_cert}") + if not os.path.isfile(ssl_key): + raise ValueError(f"SSL key file does not exist: {ssl_key}") + + options.append(GunicornOption("certfile", ssl_cert)) + options.append(GunicornOption("keyfile", ssl_key)) + logger.info("Worker log server starting with HTTPS on port %s (cert: %s)", port, ssl_cert) + elif ssl_cert or ssl_key: + logger.warning( + "Both worker_log_server_ssl_cert and worker_log_server_ssl_key must be provided to enable HTTPS. " + "Only one is configured. Starting with HTTP." + ) + logger.info("Worker log server starting with HTTP on port %s", port) + else: + logger.info("Worker log server starting with HTTP on port %s (no SSL configured)", port) + StandaloneGunicornApplication(wsgi_app, options).run() diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst index fa68ffca63e97..b9dfb5fe816cb 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst @@ -182,6 +182,92 @@ Communication between the webserver and the worker is signed with the key specif We are using `Gunicorn `__ as a WSGI server. Its configuration options can be overridden with the ``GUNICORN_CMD_ARGS`` env variable. For details, see `Gunicorn settings `__. +Securing log server with SSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. versionadded:: 3.0.0 + +By default, the worker and triggerer log servers use HTTP for communication. For production deployments where security is a concern, you can enable HTTPS to encrypt log transmission between the webserver and workers/triggerers. + +Enabling HTTPS on log servers +"""""""""""""""""""""""""""""" + +To enable HTTPS, you need to provide both an SSL certificate and a private key. Once both are configured, the log server will automatically use HTTPS instead of HTTP. + +.. code-block:: ini + + [logging] + worker_log_server_ssl_cert = /path/to/certificate.pem + worker_log_server_ssl_key = /path/to/private_key.pem + +This configuration applies to both worker and triggerer log servers. When enabled: + +- Worker log server will serve logs over HTTPS on port 8793 (or the port specified by ``worker_log_server_port``) +- Triggerer log server will serve logs over HTTPS on port 8794 (or the port specified by ``triggerer_log_server_port``) +- The webserver will automatically use ``https://`` URLs when fetching logs from workers and triggerers + +.. note:: + Both ``worker_log_server_ssl_cert`` and ``worker_log_server_ssl_key`` must be provided to enable HTTPS. If only one is configured, the server will fall back to HTTP and log a warning. + +Configuring SSL certificate verification +""""""""""""""""""""""""""""""""""""""""" + +When the webserver fetches logs from workers and triggerers over HTTPS, it verifies the SSL certificate by default. You can control this behavior with the ``worker_log_server_ssl_verify`` option: + +.. code-block:: ini + + [logging] + # Use system CA certificates (default) + worker_log_server_ssl_verify = True + + # Disable SSL verification (not recommended for production) + worker_log_server_ssl_verify = False + + # Use custom CA bundle + worker_log_server_ssl_verify = /path/to/ca-bundle.crt + +.. warning:: + Disabling SSL certificate verification (``worker_log_server_ssl_verify = False``) is not recommended for production environments as it makes the system vulnerable to man-in-the-middle attacks. This option should only be used in development environments with self-signed certificates. + +Deployment considerations +"""""""""""""""""""""""""" + +When deploying with SSL-enabled log servers: + +1. **Certificate distribution**: Ensure that SSL certificates and keys are available on all machines running workers and triggerers. + +2. **Configuration synchronization**: The SSL configuration must be consistent across all Airflow components. All workers, triggerers, schedulers, and webservers should use the same ``airflow.cfg`` settings for the ``[logging]`` section. + +3. **Time synchronization**: As with HTTP, ensure that time is synchronized across all machines (e.g., using ntpd) to prevent issues with signed requests and SSL certificate validation. + +4. **Port access**: If you're using a firewall, ensure that the HTTPS ports (8793 and 8794 by default) are accessible from the webserver to the workers and triggerers. + +Example configuration +""""""""""""""""""""" + +Here's a complete example of a production-ready configuration with SSL enabled: + +.. code-block:: ini + + [logging] + # Base log folder + base_log_folder = /opt/airflow/logs + + # Log server ports + worker_log_server_port = 8793 + triggerer_log_server_port = 8794 + + # SSL configuration for log servers + worker_log_server_ssl_cert = /etc/airflow/certs/server.crt + worker_log_server_ssl_key = /etc/airflow/certs/server.key + + # SSL verification (use system CA certificates) + worker_log_server_ssl_verify = True + + [webserver] + # Secret key must be the same on all components + secret_key = your-secret-key-here + Implementing a custom file task handler --------------------------------------- diff --git a/docs/apache-airflow/security/webserver.rst b/docs/apache-airflow/security/webserver.rst index fb570bb9856af..855238eb3aae9 100644 --- a/docs/apache-airflow/security/webserver.rst +++ b/docs/apache-airflow/security/webserver.rst @@ -89,6 +89,23 @@ certs and keys. ssl_cert = ssl_cacert = +Worker and Triggerer Log Server SSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. versionadded:: 3.0.0 + +The worker and triggerer log servers can be configured to use SSL/HTTPS for secure log transmission. +This encrypts the communication when the webserver fetches logs from workers and triggerers during task execution. + +.. code-block:: ini + + [logging] + worker_log_server_ssl_cert = + worker_log_server_ssl_key = + worker_log_server_ssl_verify = True + +For more information about configuring SSL for log servers, see :ref:`serving-worker-trigger-logs`. + Rate limiting ------------- diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 80a20bf66e465..d6b28beeca74f 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -845,3 +845,124 @@ def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch): proxies = kwargs["proxies"] assert "http" not in proxies.keys() assert "no" not in proxies.keys() + + +class TestFileTaskHandlerSSL: + """Tests for SSL configuration in FileTaskHandler.""" + + @pytest.mark.parametrize( + "ssl_verify_conf,expected_verify", + [ + ("True", True), # Default: verify enabled + ("true", True), # Case insensitive + ("1", True), # Alternative true value + ("yes", True), # Alternative true value + ("False", False), # Verification disabled + ("false", False), # Case insensitive + ("0", False), # Alternative false value + ("no", False), # Alternative false value + ("/path/to/ca-bundle.crt", "/path/to/ca-bundle.crt"), # Custom CA bundle + ], + ) + @mock.patch("requests.get") + def test_fetch_logs_ssl_verify_configuration(self, mock_get, ssl_verify_conf, expected_verify): + """Test SSL verification configuration in _fetch_logs_from_service.""" + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + with conf_vars( + { + ("logging", "worker_log_server_ssl_verify"): ssl_verify_conf, + ("webserver", "secret_key"): "test_secret_key", + } + ): + _fetch_logs_from_service(log_url, log_location) + + # Verify that requests.get was called with correct verify parameter + mock_get.assert_called_once() + _, kwargs = mock_get.call_args + assert "verify" in kwargs + assert kwargs["verify"] == expected_verify + + @pytest.mark.parametrize( + "ssl_cert,ssl_key,expected_protocol", + [ + ("", "", "http"), # No SSL configured + ("/path/to/cert.pem", "", "http"), # Only cert, no key + ("", "/path/to/key.pem", "http"), # Only key, no cert + ("/path/to/cert.pem", "/path/to/key.pem", "https"), # Both provided + ], + ) + def test_log_retrieval_url_protocol(self, create_task_instance, ssl_cert, ssl_key, expected_protocol): + """Test that correct protocol (http/https) is used based on SSL configuration.""" + ti = create_task_instance( + dag_id="dag_for_testing_ssl_url", + task_id="task_for_testing_ssl_url", + run_type=DagRunType.SCHEDULED, + logical_date=DEFAULT_DATE, + ) + ti.hostname = "test-hostname" + + with conf_vars( + { + ("logging", "worker_log_server_ssl_cert"): ssl_cert, + ("logging", "worker_log_server_ssl_key"): ssl_key, + } + ): + fth = FileTaskHandler("") + url, _ = fth._get_log_retrieval_url(ti, "test/path.log") + + assert url.startswith(f"{expected_protocol}://") + assert "test-hostname" in url + assert ":8793/log/test/path.log" in url + + def test_log_retrieval_url_trigger_with_ssl(self, create_task_instance): + """Test that trigger log URLs use correct protocol with SSL.""" + from airflow.jobs.job import Job + from airflow.jobs.triggerer_job_runner import TriggererJobRunner + from airflow.models.trigger import Trigger + + ti = create_task_instance( + dag_id="dag_for_testing_trigger_ssl", + task_id="task_for_testing_trigger_ssl", + run_type=DagRunType.SCHEDULED, + logical_date=DEFAULT_DATE, + ) + ti.hostname = "test-hostname" + + trigger = Trigger("", {}) + job = Job(TriggererJobRunner.job_type) + job.id = 123 + trigger.triggerer_job = job + ti.trigger = trigger + + with conf_vars( + { + ("logging", "worker_log_server_ssl_cert"): "/path/to/cert.pem", + ("logging", "worker_log_server_ssl_key"): "/path/to/key.pem", + } + ): + fth = FileTaskHandler("") + url, path = fth._get_log_retrieval_url(ti, "test/path.log", log_type=LogType.TRIGGER) + + assert url.startswith("https://") + assert ":8794/log/" in url + assert path.endswith(".trigger.123.log") + + @mock.patch("requests.get") + def test_fetch_logs_ssl_verify_default(self, mock_get): + """Test that SSL verification is enabled by default.""" + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + with conf_vars({("webserver", "secret_key"): "test_secret_key"}): + # Don't set worker_log_server_ssl_verify, use default + _fetch_logs_from_service(log_url, log_location) + + mock_get.assert_called_once() + _, kwargs = mock_get.call_args + assert kwargs["verify"] is True diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py index 667ac95db1154..a3b58b55f3110 100644 --- a/tests/utils/test_serve_logs.py +++ b/tests/utils/test_serve_logs.py @@ -19,6 +19,7 @@ from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING +from unittest import mock import jwt import pytest @@ -236,3 +237,87 @@ def test_missing_claims(self, claim_to_remove: str, client: FlaskClient, secret_ ).status_code == 403 ) + + +class TestServeLogsSSL: + """Tests for SSL configuration in serve_logs.""" + + @pytest.mark.parametrize( + "ssl_cert,ssl_key,expected_https", + [ + ("", "", False), # No SSL configured + ("/path/to/cert.pem", "", False), # Only cert, no key + ("", "/path/to/key.pem", False), # Only key, no cert + ("/path/to/cert.pem", "/path/to/key.pem", True), # Both provided + ], + ) + @mock.patch("airflow.utils.serve_logs.StandaloneGunicornApplication") + @mock.patch("airflow.utils.serve_logs.os.path.isfile") + def test_ssl_configuration( + self, mock_isfile, mock_gunicorn_app, ssl_cert, ssl_key, expected_https, tmp_path + ): + """Test that SSL is enabled only when both cert and key are provided.""" + from airflow.utils.serve_logs import serve_logs + + # Mock file existence + mock_isfile.return_value = True + + with conf_vars( + { + ("logging", "worker_log_server_ssl_cert"): ssl_cert, + ("logging", "worker_log_server_ssl_key"): ssl_key, + ("logging", "base_log_folder"): str(tmp_path), + } + ): + serve_logs(port=8793) + + # Get the options passed to Gunicorn + call_args = mock_gunicorn_app.call_args + options_list = call_args[0][1] + options_dict = {opt.key: opt.value for opt in options_list} + + if expected_https: + assert "certfile" in options_dict + assert "keyfile" in options_dict + assert options_dict["certfile"] == ssl_cert + assert options_dict["keyfile"] == ssl_key + else: + assert "certfile" not in options_dict + assert "keyfile" not in options_dict + + @mock.patch("airflow.utils.serve_logs.StandaloneGunicornApplication") + def test_ssl_with_nonexistent_cert(self, mock_gunicorn_app, tmp_path): + """Test that serve_logs raises error when cert file doesn't exist.""" + from airflow.utils.serve_logs import serve_logs + + with conf_vars( + { + ("logging", "worker_log_server_ssl_cert"): "/nonexistent/cert.pem", + ("logging", "worker_log_server_ssl_key"): "/nonexistent/key.pem", + ("logging", "base_log_folder"): str(tmp_path), + } + ): + with pytest.raises(ValueError, match="SSL certificate file does not exist"): + serve_logs(port=8793) + + @mock.patch("airflow.utils.serve_logs.StandaloneGunicornApplication") + @mock.patch("airflow.utils.serve_logs.os.path.isfile") + def test_ssl_with_nonexistent_key(self, mock_isfile, mock_gunicorn_app, tmp_path): + """Test that serve_logs raises error when key file doesn't exist.""" + from airflow.utils.serve_logs import serve_logs + + # Cert exists but key doesn't + def isfile_side_effect(path): + return "cert.pem" in path + + mock_isfile.side_effect = isfile_side_effect + + with conf_vars( + { + ("logging", "worker_log_server_ssl_cert"): "/path/to/cert.pem", + ("logging", "worker_log_server_ssl_key"): "/nonexistent/key.pem", + ("logging", "base_log_folder"): str(tmp_path), + } + ): + with pytest.raises(ValueError, match="SSL key file does not exist"): + serve_logs(port=8793)