Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,36 @@ logging:
type: string
example: ~
default: "8794"
worker_log_server_ssl_cert:
description: |
Path to SSL certificate file for worker log server.
When both worker_log_server_ssl_cert and worker_log_server_ssl_key are provided,
the worker log server will use HTTPS instead of HTTP for secure log transmission.
This applies to both worker and triggerer log servers.
version_added: 3.0.0
type: string
example: "/path/to/cert.pem"
default: ""
worker_log_server_ssl_key:
description: |
Path to SSL private key file for worker log server.
When both worker_log_server_ssl_cert and worker_log_server_ssl_key are provided,
the worker log server will use HTTPS instead of HTTP for secure log transmission.
This applies to both worker and triggerer log servers.
version_added: 3.0.0
type: string
example: "/path/to/key.pem"
default: ""
worker_log_server_ssl_verify:
description: |
Whether to verify SSL certificate when fetching logs from workers and triggerers.
Can be set to "True" to use system CA certificates, "False" to disable verification
(useful for self-signed certificates in development, not recommended for production),
or a path to a CA bundle file for custom certificate authorities.
version_added: 3.0.0
type: string
example: "True"
default: "True"
interleave_timestamp_parser:
description: |
We must parse timestamps to interleave logs between trigger and task. To do so,
Expand Down
20 changes: 19 additions & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,22 @@ def _fetch_logs_from_service(url, log_relative_path):
expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
audience="task-instance-logs",
)

# SSL verification configuration
ssl_verify_conf = conf.get("logging", "worker_log_server_ssl_verify", fallback="True")
if ssl_verify_conf.lower() in ("true", "1", "yes"):
ssl_verify = True
elif ssl_verify_conf.lower() in ("false", "0", "no"):
ssl_verify = False
else:
# Path to CA bundle file
ssl_verify = ssl_verify_conf

response = requests.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
verify=ssl_verify,
)
response.encoding = "utf-8"
return response
Expand Down Expand Up @@ -451,9 +463,15 @@ def _get_log_retrieval_url(
hostname = ti.hostname
config_key = "worker_log_server_port"
config_default = 8793

# Determine protocol based on SSL configuration
ssl_cert = conf.get("logging", "worker_log_server_ssl_cert", fallback="")
ssl_key = conf.get("logging", "worker_log_server_ssl_key", fallback="")
protocol = "https" if (ssl_cert and ssl_key) else "http"

return (
urljoin(
f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
f"{protocol}://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
log_relative_path,
),
log_relative_path,
Expand Down
23 changes: 23 additions & 0 deletions airflow/utils/serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,29 @@ def serve_logs(port=None):
bind_option = GunicornOption("bind", f"0.0.0.0:{port}")

options = [bind_option, GunicornOption("workers", 2)]

# Configure SSL if certificates are provided
ssl_cert = conf.get("logging", "worker_log_server_ssl_cert", fallback="")
ssl_key = conf.get("logging", "worker_log_server_ssl_key", fallback="")

if ssl_cert and ssl_key:
if not os.path.isfile(ssl_cert):
raise ValueError(f"SSL certificate file does not exist: {ssl_cert}")
if not os.path.isfile(ssl_key):
raise ValueError(f"SSL key file does not exist: {ssl_key}")

options.append(GunicornOption("certfile", ssl_cert))
options.append(GunicornOption("keyfile", ssl_key))
logger.info("Worker log server starting with HTTPS on port %s (cert: %s)", port, ssl_cert)
elif ssl_cert or ssl_key:
logger.warning(
"Both worker_log_server_ssl_cert and worker_log_server_ssl_key must be provided to enable HTTPS. "
"Only one is configured. Starting with HTTP."
)
logger.info("Worker log server starting with HTTP on port %s", port)
else:
logger.info("Worker log server starting with HTTP on port %s (no SSL configured)", port)

StandaloneGunicornApplication(wsgi_app, options).run()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,92 @@ Communication between the webserver and the worker is signed with the key specif

We are using `Gunicorn <https://gunicorn.org/>`__ as a WSGI server. Its configuration options can be overridden with the ``GUNICORN_CMD_ARGS`` env variable. For details, see `Gunicorn settings <https://docs.gunicorn.org/en/latest/settings.html#settings>`__.

Securing log server with SSL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. versionadded:: 3.0.0

By default, the worker and triggerer log servers use HTTP for communication. For production deployments where security is a concern, you can enable HTTPS to encrypt log transmission between the webserver and workers/triggerers.

Enabling HTTPS on log servers
""""""""""""""""""""""""""""""

To enable HTTPS, you need to provide both an SSL certificate and a private key. Once both are configured, the log server will automatically use HTTPS instead of HTTP.

.. code-block:: ini

[logging]
worker_log_server_ssl_cert = /path/to/certificate.pem
worker_log_server_ssl_key = /path/to/private_key.pem

This configuration applies to both worker and triggerer log servers. When enabled:

- Worker log server will serve logs over HTTPS on port 8793 (or the port specified by ``worker_log_server_port``)
- Triggerer log server will serve logs over HTTPS on port 8794 (or the port specified by ``triggerer_log_server_port``)
- The webserver will automatically use ``https://`` URLs when fetching logs from workers and triggerers

.. note::
Both ``worker_log_server_ssl_cert`` and ``worker_log_server_ssl_key`` must be provided to enable HTTPS. If only one is configured, the server will fall back to HTTP and log a warning.

Configuring SSL certificate verification
"""""""""""""""""""""""""""""""""""""""""

When the webserver fetches logs from workers and triggerers over HTTPS, it verifies the SSL certificate by default. You can control this behavior with the ``worker_log_server_ssl_verify`` option:

.. code-block:: ini

[logging]
# Use system CA certificates (default)
worker_log_server_ssl_verify = True

# Disable SSL verification (not recommended for production)
worker_log_server_ssl_verify = False

# Use custom CA bundle
worker_log_server_ssl_verify = /path/to/ca-bundle.crt

.. warning::
Disabling SSL certificate verification (``worker_log_server_ssl_verify = False``) is not recommended for production environments as it makes the system vulnerable to man-in-the-middle attacks. This option should only be used in development environments with self-signed certificates.

Deployment considerations
""""""""""""""""""""""""""

When deploying with SSL-enabled log servers:

1. **Certificate distribution**: Ensure that SSL certificates and keys are available on all machines running workers and triggerers.

2. **Configuration synchronization**: The SSL configuration must be consistent across all Airflow components. All workers, triggerers, schedulers, and webservers should use the same ``airflow.cfg`` settings for the ``[logging]`` section.

3. **Time synchronization**: As with HTTP, ensure that time is synchronized across all machines (e.g., using ntpd) to prevent issues with signed requests and SSL certificate validation.

4. **Port access**: If you're using a firewall, ensure that the HTTPS ports (8793 and 8794 by default) are accessible from the webserver to the workers and triggerers.

Example configuration
"""""""""""""""""""""

Here's a complete example of a production-ready configuration with SSL enabled:

.. code-block:: ini

[logging]
# Base log folder
base_log_folder = /opt/airflow/logs

# Log server ports
worker_log_server_port = 8793
triggerer_log_server_port = 8794

# SSL configuration for log servers
worker_log_server_ssl_cert = /etc/airflow/certs/server.crt
worker_log_server_ssl_key = /etc/airflow/certs/server.key

# SSL verification (use system CA certificates)
worker_log_server_ssl_verify = True

[webserver]
# Secret key must be the same on all components
secret_key = your-secret-key-here

Implementing a custom file task handler
---------------------------------------

Expand Down
17 changes: 17 additions & 0 deletions docs/apache-airflow/security/webserver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ certs and keys.
ssl_cert = <path to cert>
ssl_cacert = <path to cacert>

Worker and Triggerer Log Server SSL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. versionadded:: 3.0.0

The worker and triggerer log servers can be configured to use SSL/HTTPS for secure log transmission.
This encrypts the communication when the webserver fetches logs from workers and triggerers during task execution.

.. code-block:: ini

[logging]
worker_log_server_ssl_cert = <path to cert>
worker_log_server_ssl_key = <path to key>
worker_log_server_ssl_verify = True

For more information about configuring SSL for log servers, see :ref:`serving-worker-trigger-logs`.

Rate limiting
-------------

Expand Down
121 changes: 121 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,3 +845,124 @@ def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
proxies = kwargs["proxies"]
assert "http" not in proxies.keys()
assert "no" not in proxies.keys()


class TestFileTaskHandlerSSL:
"""Tests for SSL configuration in FileTaskHandler."""

@pytest.mark.parametrize(
"ssl_verify_conf,expected_verify",
[
("True", True), # Default: verify enabled
("true", True), # Case insensitive
("1", True), # Alternative true value
("yes", True), # Alternative true value
("False", False), # Verification disabled
("false", False), # Case insensitive
("0", False), # Alternative false value
("no", False), # Alternative false value
("/path/to/ca-bundle.crt", "/path/to/ca-bundle.crt"), # Custom CA bundle
],
)
@mock.patch("requests.get")
def test_fetch_logs_ssl_verify_configuration(self, mock_get, ssl_verify_conf, expected_verify):
"""Test SSL verification configuration in _fetch_logs_from_service."""
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.encoding = "utf-8"
mock_get.return_value = mock_response

with conf_vars(
{
("logging", "worker_log_server_ssl_verify"): ssl_verify_conf,
("webserver", "secret_key"): "test_secret_key",
}
):
_fetch_logs_from_service(log_url, log_location)

# Verify that requests.get was called with correct verify parameter
mock_get.assert_called_once()
_, kwargs = mock_get.call_args
assert "verify" in kwargs
assert kwargs["verify"] == expected_verify

@pytest.mark.parametrize(
"ssl_cert,ssl_key,expected_protocol",
[
("", "", "http"), # No SSL configured
("/path/to/cert.pem", "", "http"), # Only cert, no key
("", "/path/to/key.pem", "http"), # Only key, no cert
("/path/to/cert.pem", "/path/to/key.pem", "https"), # Both provided
],
)
def test_log_retrieval_url_protocol(self, create_task_instance, ssl_cert, ssl_key, expected_protocol):
"""Test that correct protocol (http/https) is used based on SSL configuration."""
ti = create_task_instance(
dag_id="dag_for_testing_ssl_url",
task_id="task_for_testing_ssl_url",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
ti.hostname = "test-hostname"

with conf_vars(
{
("logging", "worker_log_server_ssl_cert"): ssl_cert,
("logging", "worker_log_server_ssl_key"): ssl_key,
}
):
fth = FileTaskHandler("")
url, _ = fth._get_log_retrieval_url(ti, "test/path.log")

assert url.startswith(f"{expected_protocol}://")
assert "test-hostname" in url
assert ":8793/log/test/path.log" in url

def test_log_retrieval_url_trigger_with_ssl(self, create_task_instance):
"""Test that trigger log URLs use correct protocol with SSL."""
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.trigger import Trigger

ti = create_task_instance(
dag_id="dag_for_testing_trigger_ssl",
task_id="task_for_testing_trigger_ssl",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
ti.hostname = "test-hostname"

trigger = Trigger("", {})
job = Job(TriggererJobRunner.job_type)
job.id = 123
trigger.triggerer_job = job
ti.trigger = trigger

with conf_vars(
{
("logging", "worker_log_server_ssl_cert"): "/path/to/cert.pem",
("logging", "worker_log_server_ssl_key"): "/path/to/key.pem",
}
):
fth = FileTaskHandler("")
url, path = fth._get_log_retrieval_url(ti, "test/path.log", log_type=LogType.TRIGGER)

assert url.startswith("https://")
assert ":8794/log/" in url
assert path.endswith(".trigger.123.log")

@mock.patch("requests.get")
def test_fetch_logs_ssl_verify_default(self, mock_get):
"""Test that SSL verification is enabled by default."""
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.encoding = "utf-8"
mock_get.return_value = mock_response

with conf_vars({("webserver", "secret_key"): "test_secret_key"}):
# Don't set worker_log_server_ssl_verify, use default
_fetch_logs_from_service(log_url, log_location)

mock_get.assert_called_once()
_, kwargs = mock_get.call_args
assert kwargs["verify"] is True
Loading