diff --git a/components/api-server/src/bin/api_server.rs b/components/api-server/src/bin/api_server.rs index 739dc1e0c9..b365f6cd83 100644 --- a/components/api-server/src/bin/api_server.rs +++ b/components/api-server/src/bin/api_server.rs @@ -1,11 +1,7 @@ use anyhow::Context; use clap::Parser; use clp_rust_utils::{clp_config::package, serde::yaml}; -use tracing_appender::{ - non_blocking::WorkerGuard, - rolling::{RollingFileAppender, Rotation}, -}; -use tracing_subscriber::{self, fmt::writer::MakeWriterExt}; +use tracing_subscriber; #[derive(Parser)] #[command(version, about = "API Server for CLP.")] @@ -42,13 +38,8 @@ fn read_config_and_credentials( Ok((config, credentials)) } -fn set_up_logging() -> anyhow::Result { - let logs_directory = - std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?; - let logs_directory = std::path::Path::new(logs_directory.as_str()); - let file_appender = - RollingFileAppender::new(Rotation::HOURLY, logs_directory, "api_server.log"); - let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender); +fn set_up_logging() { + // Logs to stdout only - Docker's fluentd driver captures and forwards to Fluent Bit tracing_subscriber::fmt() .event_format( tracing_subscriber::fmt::format() @@ -60,9 +51,8 @@ fn set_up_logging() -> anyhow::Result { ) .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_ansi(false) - .with_writer(std::io::stdout.and(non_blocking_writer)) + .with_writer(std::io::stdout) .init(); - Ok(guard) } async fn shutdown_signal() { @@ -81,7 +71,7 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); let (config, credentials) = read_config_and_credentials(&args)?; - let _guard = set_up_logging()?; + set_up_logging(); let addr = format!( "{}:{}", diff --git a/components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py b/components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py index 413ff9f620..03148509fd 100644 --- a/components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py +++ b/components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py @@ -1,15 +1,13 @@ """CLP MCP Server entry point.""" import ipaddress -import logging import os import socket import sys -from pathlib import Path import click from clp_py_utils.clp_config import ClpConfig, MCP_SERVER_COMPONENT_NAME -from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level +from clp_py_utils.clp_logging import get_logger, set_logging_level from clp_py_utils.core import read_yaml_config_file from pydantic import ValidationError @@ -38,11 +36,7 @@ def main(host: str, port: int, config_path: Path) -> int: :param config_path: The path to server's configuration file. :return: Exit code (0 for success, non-zero for failure). """ - # Setup logging to file - log_file_path = Path(os.getenv("CLP_LOGS_DIR")) / "mcp_server.log" - logging_file_handler = logging.FileHandler(filename=log_file_path, encoding="utf-8") - logging_file_handler.setFormatter(get_logging_formatter()) - logger.addHandler(logging_file_handler) + # Set logging level from environment set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) exit_code = 0 diff --git a/components/clp-package-utils/clp_package_utils/controller.py b/components/clp-package-utils/clp_package_utils/controller.py index 31e6b93d75..ecab3950a9 100644 --- a/components/clp-package-utils/clp_package_utils/controller.py +++ b/components/clp-package-utils/clp_package_utils/controller.py @@ -32,6 +32,7 @@ DatabaseEngine, DB_COMPONENT_NAME, DeploymentType, + FLUENT_BIT_COMPONENT_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME, LOG_INGESTOR_COMPONENT_NAME, MCP_SERVER_COMPONENT_NAME, @@ -711,6 +712,7 @@ def _set_up_env_for_webui(self, container_clp_config: ClpConfig) -> EnvVarsDict: "LogViewerDir": str(container_webui_dir / "yscope-log-viewer"), "StreamTargetUncompressedSize": self._clp_config.stream_output.target_uncompressed_size, "ClpQueryEngine": self._clp_config.package.query_engine, + "OperationalLogsDir": "/var/log/clp", } stream_storage = self._clp_config.stream_output.storage @@ -852,6 +854,28 @@ def _set_up_env_for_garbage_collector(self) -> EnvVarsDict: return env_vars + def _set_up_env_for_fluent_bit(self) -> EnvVarsDict: + """ + Sets up environment variables for the Fluent Bit component. + + Fluent Bit receives logs from Docker's fluentd logging driver and writes + them to the logs directory for hot tier access. + + :return: Dictionary of environment variables necessary to launch the component. + """ + component_name = FLUENT_BIT_COMPONENT_NAME + logger.info(f"Setting up environment for {component_name}...") + + env_vars = EnvVarsDict() + + # Paths + fluent_bit_conf_dir = self._conf_dir / "fluent-bit" + env_vars |= { + "CLP_FLUENT_BIT_CONF_DIR_HOST": str(fluent_bit_conf_dir), + } + + return env_vars + def _read_and_update_settings_json( self, settings_file_path: pathlib.Path, updates: dict[str, Any] ) -> dict[str, Any]: @@ -997,6 +1021,7 @@ def set_up_env(self) -> None: env_vars |= self._set_up_env_for_webui(container_clp_config) env_vars |= self._set_up_env_for_mcp_server() env_vars |= self._set_up_env_for_garbage_collector() + env_vars |= self._set_up_env_for_fluent_bit() # Write the environment variables to the `.env` file. with open(f"{self._clp_home}/.env", "w") as env_file: diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 4edb94b4f0..88658be9a6 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -45,6 +45,7 @@ WEBUI_COMPONENT_NAME = "webui" MCP_SERVER_COMPONENT_NAME = "mcp_server" GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector" +FLUENT_BIT_COMPONENT_NAME = "fluent_bit" # Action names ARCHIVE_MANAGER_ACTION_NAME = "archive_manager" diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index 89f9965301..fd849a285c 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -347,7 +347,6 @@ def run_clp( worker_config: WorkerConfig, clp_config: ClpIoConfig, clp_home: pathlib.Path, - logs_dir: pathlib.Path, job_id: int, task_id: int, tag_ids: list[int], @@ -362,7 +361,6 @@ def run_clp( :param worker_config: WorkerConfig :param clp_config: ClpIoConfig :param clp_home: - :param logs_dir: :param job_id: :param task_id: :param tag_ids: @@ -444,17 +442,22 @@ def cleanup_temporary_files(): if converted_inputs_dir is not None: shutil.rmtree(converted_inputs_dir) - # Open stderr log file - stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log" - stderr_log_file = open(stderr_log_path, "w") + def log_stderr(stderr_data: bytes, process_name: str): + """Log stderr content through the logger (captured by Docker's fluentd driver).""" + stderr_text = stderr_data.decode("utf-8").strip() + if stderr_text: + for line in stderr_text.split("\n"): + logger.info(f"[{process_name} stderr] {line}") conversion_return_code = 0 if conversion_cmd is not None: logger.debug("Execute log-converter with command: %s", conversion_cmd) conversion_proc = subprocess.Popen( - conversion_cmd, stdout=subprocess.DEVNULL, stderr=stderr_log_file, env=conversion_env + conversion_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, env=conversion_env ) - conversion_return_code = conversion_proc.wait() + _, stderr_data = conversion_proc.communicate() + conversion_return_code = conversion_proc.returncode + log_stderr(stderr_data, "log-converter") if conversion_return_code != 0: cleanup_temporary_files() @@ -464,16 +467,15 @@ def cleanup_temporary_files(): worker_output = { "total_uncompressed_size": 0, "total_compressed_size": 0, - "error_message": f"Check logs in {stderr_log_path}", + "error_message": "See worker logs (captured by Fluent Bit)", } - stderr_log_file.close() return CompressionTaskStatus.FAILED, worker_output # Start compression logger.debug("Compressing...") compression_successful = False proc = subprocess.Popen( - compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env + compression_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=compression_env ) # Compute the total amount of data compressed @@ -550,14 +552,16 @@ def cleanup_temporary_files(): ) try: - subprocess.run( + result = subprocess.run( indexer_cmd, stdout=subprocess.DEVNULL, - stderr=stderr_log_file, + stderr=subprocess.PIPE, check=True, env=indexer_env, ) - except subprocess.CalledProcessError: + log_stderr(result.stderr, "indexer") + except subprocess.CalledProcessError as e: + log_stderr(e.stderr if e.stderr else b"", "indexer") logger.exception("Failed to index archive.") if enable_s3_write: @@ -565,8 +569,10 @@ def cleanup_temporary_files(): last_archive_stats = stats - # Wait for compression to finish + # Wait for compression to finish and capture stderr + compression_stderr = proc.stderr.read() return_code = proc.wait() + log_stderr(compression_stderr, "compression") if 0 != return_code: logger.error(f"Failed to compress, return_code={return_code!s}") @@ -576,9 +582,6 @@ def cleanup_temporary_files(): logger.debug("Compressed.") - # Close stderr log file - stderr_log_file.close() - worker_output = { "total_uncompressed_size": total_uncompressed_size, "total_compressed_size": total_compressed_size, @@ -588,7 +591,7 @@ def cleanup_temporary_files(): return CompressionTaskStatus.SUCCEEDED, worker_output error_msgs = [] if compression_successful is False: - error_msgs.append(f"See logs {stderr_log_path}") + error_msgs.append("See worker logs (captured by Fluent Bit)") if s3_error is not None: error_msgs.append(s3_error) worker_output["error_message"] = "\n".join(error_msgs) @@ -606,10 +609,8 @@ def compression_entry_point( ): clp_home = pathlib.Path(os.getenv("CLP_HOME")) - # Set logging level - logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR")) - clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) - set_logging_level(logger, clp_logging_level) + # Set logging level from environment + set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) # Load configuration try: @@ -637,7 +638,6 @@ def compression_entry_point( worker_config, clp_io_config, clp_home, - logs_dir, job_id, task_id, tag_ids, diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 2ac1214b7c..15507f221e 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -191,10 +191,8 @@ def extract_stream( ) -> dict[str, Any]: task_name = "Stream Extraction" - # Setup logging to file - clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) - clp_logging_level = os.getenv("CLP_LOGGING_LEVEL") - set_logging_level(logger, clp_logging_level) + # Set logging level from environment + set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) logger.info(f"Started {task_name} task for job {job_id}") @@ -242,7 +240,6 @@ def extract_stream( task_results, task_stdout_str = run_query_task( sql_adapter=sql_adapter, logger=logger, - clp_logs_dir=clp_logs_dir, task_command=task_command, env_vars=core_clp_env_vars, task_name=task_name, @@ -289,7 +286,7 @@ def extract_stream( if upload_error: task_results.status = QueryTaskStatus.FAILED - task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH")) + task_results.error_log_path = "See worker logs (captured by Fluent Bit)" else: logger.info("Finished uploading streams.") diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 052f10cca9..7c4310d28f 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -199,10 +199,8 @@ def search( ) -> dict[str, Any]: task_name = "search" - # Setup logging to file - clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) - clp_logging_level = os.getenv("CLP_LOGGING_LEVEL") - set_logging_level(logger, clp_logging_level) + # Set logging level from environment + set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) logger.info(f"Started {task_name} task for job {job_id}") @@ -242,7 +240,6 @@ def search( task_results, _ = run_query_task( sql_adapter=sql_adapter, logger=logger, - clp_logs_dir=clp_logs_dir, task_command=task_command, env_vars=core_clp_env_vars, task_name=task_name, @@ -268,7 +265,7 @@ def search( except Exception as err: logger.error(f"Failed to upload query results {dest_path}: {err}") task_results.status = QueryTaskStatus.FAILED - task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH")) + task_results.error_log_path = "See worker logs (captured by Fluent Bit)" src_file.unlink() diff --git a/components/job-orchestration/job_orchestration/executor/query/utils.py b/components/job-orchestration/job_orchestration/executor/query/utils.py index 8505d7f594..5938de269b 100644 --- a/components/job-orchestration/job_orchestration/executor/query/utils.py +++ b/components/job-orchestration/job_orchestration/executor/query/utils.py @@ -3,9 +3,7 @@ import signal import subprocess import sys -from contextlib import closing from logging import Logger -from pathlib import Path from typing import Any from clp_py_utils.clp_config import QUERY_TASKS_TABLE_NAME @@ -14,12 +12,6 @@ from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus -def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Path: - worker_logs_dir = clp_logs_dir / job_id - worker_logs_dir.mkdir(exist_ok=True, parents=True) - return worker_logs_dir / f"{task_id}-clo.log" - - def report_task_failure( sql_adapter: SqlAdapter, task_id: int, @@ -42,7 +34,6 @@ def report_task_failure( def run_query_task( sql_adapter: SqlAdapter, logger: Logger, - clp_logs_dir: Path, task_command: list[str], env_vars: dict[str, str] | None, task_name: str, @@ -50,9 +41,6 @@ def run_query_task( task_id: int, start_time: datetime.datetime, ) -> tuple[QueryTaskResult, str]: - clo_log_path = get_task_log_file_path(clp_logs_dir, job_id, task_id) - clo_log_file = open(clo_log_path, "w") - task_status = QueryTaskStatus.RUNNING update_query_task_metadata( sql_adapter, task_id, dict(status=task_status, start_time=start_time) @@ -64,7 +52,7 @@ def run_query_task( preexec_fn=os.setpgrp, close_fds=True, stdout=subprocess.PIPE, - stderr=clo_log_file, + stderr=subprocess.PIPE, env=env_vars, ) @@ -86,8 +74,15 @@ def sigterm_handler(_signo, _stack_frame): logger.info(f"Waiting for {task_name} to finish") # `communicate` is equivalent to `wait` in this case, but avoids deadlocks when piping to # stdout/stderr. - stdout_data, _ = task_proc.communicate() + stdout_data, stderr_data = task_proc.communicate() return_code = task_proc.returncode + + # Log stderr content (captured by Docker's fluentd driver) + stderr_text = stderr_data.decode("utf-8").strip() + if stderr_text: + for line in stderr_text.split("\n"): + logger.info(f"[{task_name} stderr] {line}") + if 0 != return_code: task_status = QueryTaskStatus.FAILED logger.error( @@ -97,7 +92,6 @@ def sigterm_handler(_signo, _stack_frame): task_status = QueryTaskStatus.SUCCEEDED logger.info(f"{task_name} task {task_id} completed for job {job_id}") - clo_log_file.close() duration = (datetime.datetime.now() - start_time).total_seconds() update_query_task_metadata( @@ -111,7 +105,7 @@ def sigterm_handler(_signo, _stack_frame): ) if QueryTaskStatus.FAILED == task_status: - task_result.error_log_path = str(clo_log_path) + task_result.error_log_path = "See worker logs (captured by Fluent Bit)" return task_result, stdout_data.decode("utf-8") diff --git a/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py index c8dbcac0b4..dca4e7d627 100644 --- a/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py +++ b/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py @@ -192,10 +192,8 @@ def _collect_and_sweep_expired_archives( raise ValueError(f"Unsupported Storage engine: {storage_engine}.") -async def archive_garbage_collector( - clp_config: ClpConfig, log_directory: pathlib.Path, logging_level: str -) -> None: - configure_logger(logger, logging_level, log_directory, ARCHIVE_GARBAGE_COLLECTOR_NAME) +async def archive_garbage_collector(clp_config: ClpConfig, logging_level: str) -> None: + configure_logger(logger, logging_level) archive_output_config = clp_config.archive_output storage_engine = clp_config.package.storage_engine diff --git a/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py index 007459315b..4c0ecf1eee 100644 --- a/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py +++ b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py @@ -35,10 +35,9 @@ async def main(argv: list[str]) -> int: args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.") parsed_args = args_parser.parse_args(argv[1:]) - # Setup logging to file - logs_directory = Path(os.getenv("CLP_LOGS_DIR")) + # Set logging level from environment logging_level = os.getenv("CLP_LOGGING_LEVEL") - configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_COMPONENT_NAME) + configure_logger(logger, logging_level) # Load configuration config_path = Path(parsed_args.config) @@ -71,9 +70,7 @@ async def main(argv: list[str]) -> int: continue logger.info(f"Creating {gc_name} with retention period = {retention_period} minutes") gc_tasks.append( - asyncio.create_task( - task_method(clp_config, logs_directory, logging_level), name=gc_name - ) + asyncio.create_task(task_method(clp_config, logging_level), name=gc_name) ) # Poll and report any task that finished unexpectedly diff --git a/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py index da38224b0e..2cc379d09f 100644 --- a/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py +++ b/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py @@ -70,10 +70,8 @@ def _collect_and_sweep_expired_search_results( logger.debug("No search results matched the expiry criteria.") -async def search_result_garbage_collector( - clp_config: ClpConfig, log_directory: pathlib.Path, logging_level: str -) -> None: - configure_logger(logger, logging_level, log_directory, SEARCH_RESULT_GARBAGE_COLLECTOR_NAME) +async def search_result_garbage_collector(clp_config: ClpConfig, logging_level: str) -> None: + configure_logger(logger, logging_level) sweep_interval_secs = clp_config.garbage_collector.sweep_interval.search_result * MIN_TO_SECONDS diff --git a/components/job-orchestration/job_orchestration/garbage_collector/utils.py b/components/job-orchestration/job_orchestration/garbage_collector/utils.py index 1305d546ef..4c76b395aa 100644 --- a/components/job-orchestration/job_orchestration/garbage_collector/utils.py +++ b/components/job-orchestration/job_orchestration/garbage_collector/utils.py @@ -18,13 +18,8 @@ from job_orchestration.garbage_collector.constants import MIN_TO_SECONDS -def configure_logger( - logger: logging.Logger, logging_level: str, log_directory: pathlib.Path, handler_name: str -): - log_file = log_directory / f"{handler_name}.log" - logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") - logging_file_handler.setFormatter(get_logging_formatter()) - logger.addHandler(logging_file_handler) +def configure_logger(logger: logging.Logger, logging_level: str): + """Configure the logging level for a logger (logs go to stdout, captured by Docker).""" set_logging_level(logger, logging_level) diff --git a/components/job-orchestration/job_orchestration/reducer/reducer.py b/components/job-orchestration/job_orchestration/reducer/reducer.py index 2bd6fe3f6a..9559f933a1 100644 --- a/components/job-orchestration/job_orchestration/reducer/reducer.py +++ b/components/job-orchestration/job_orchestration/reducer/reducer.py @@ -1,14 +1,13 @@ #!/usr/bin/env python3 import argparse -import logging import os import subprocess import sys from pathlib import Path from clp_py_utils.clp_config import ClpConfig -from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level +from clp_py_utils.clp_logging import get_logger, set_logging_level from clp_py_utils.core import read_yaml_config_file from pydantic import ValidationError @@ -29,14 +28,7 @@ def main(argv: list[str]) -> int: parsed_args = args_parser.parse_args(argv[1:]) - # Setup logging to file - logs_dir = Path(os.getenv("CLP_LOGS_DIR")) - log_file = Path(os.getenv("CLP_LOGS_DIR")) / "reducer.log" - logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") - logging_file_handler.setFormatter(get_logging_formatter()) - logger.addHandler(logging_file_handler) - - # Update logging level based on config + # Set logging level from environment set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) # Load configuration @@ -68,15 +60,11 @@ def main(argv: list[str]) -> int: for i in range(concurrency): reducer_instance_cmd = reducer_cmd + [str(clp_config.reducer.base_port + i)] - log_file_path = logs_dir / ("reducer-" + str(i) + ".log") - log_file = open(log_file_path, "a") - + # Let subprocess inherit stdout/stderr - Docker captures these via fluentd driver reducers.append( subprocess.Popen( reducer_instance_cmd, close_fds=True, - stdout=log_file, - stderr=log_file, ) ) @@ -93,9 +81,6 @@ def main(argv: list[str]) -> int: logger.error("All reducers terminated") - logger.removeHandler(logging_file_handler) - logging_file_handler.close() - return 0 diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index ba5a077a19..8bb068d27b 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -426,13 +426,7 @@ def main(argv) -> int | None: args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.") args = args_parser.parse_args(argv[1:]) - # Setup logging - log_file = Path(os.getenv("CLP_LOGS_DIR")) / "compression_scheduler.log" - logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") - logging_file_handler.setFormatter(get_logging_formatter()) - logger.addHandler(logging_file_handler) - - # Update logging level based on config + # Set logging level from environment set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) # Register the SIGTERM handler diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index a8be65dd62..2df2d970c1 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -1147,13 +1147,7 @@ async def main(argv: list[str]) -> int: parsed_args = args_parser.parse_args(argv[1:]) - # Setup logging to file - log_file = Path(os.getenv("CLP_LOGS_DIR")) / "query_scheduler.log" - logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") - logging_file_handler.setFormatter(get_logging_formatter()) - logger.addHandler(logging_file_handler) - - # Update logging level based on config + # Set logging level from environment set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) # Load configuration diff --git a/components/log-ingestor/src/bin/log_ingestor.rs b/components/log-ingestor/src/bin/log_ingestor.rs index 355ee80928..c71b588930 100644 --- a/components/log-ingestor/src/bin/log_ingestor.rs +++ b/components/log-ingestor/src/bin/log_ingestor.rs @@ -2,8 +2,7 @@ use anyhow::Context; use clap::Parser; use clp_rust_utils::{clp_config::package, serde::yaml}; use log_ingestor::{ingestion_job_manager::IngestionJobManagerState, routes::create_router}; -use tracing_appender::rolling::{RollingFileAppender, Rotation}; -use tracing_subscriber::{self, fmt::writer::MakeWriterExt}; +use tracing_subscriber; #[derive(Parser)] #[command(version, about = "log-ingestor for CLP.")] @@ -43,13 +42,8 @@ fn read_config_and_credentials( Ok((config, credentials)) } -fn set_up_logging() -> anyhow::Result { - let logs_directory = - std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?; - let logs_directory = std::path::Path::new(logs_directory.as_str()); - let file_appender = - RollingFileAppender::new(Rotation::HOURLY, logs_directory, "log_ingestor.log"); - let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender); +fn set_up_logging() { + // Logs to stdout only - Docker's fluentd driver captures and forwards to Fluent Bit tracing_subscriber::fmt() .event_format( tracing_subscriber::fmt::format() @@ -61,9 +55,8 @@ fn set_up_logging() -> anyhow::Result anyhow::Result<()> { let args = Args::parse(); let (config, credentials) = read_config_and_credentials(&args)?; - let _guard = set_up_logging()?; + set_up_logging(); let addr = format!("{}:{}", args.host, args.port); let listener = tokio::net::TcpListener::bind(&addr) diff --git a/components/package-template/src/etc/fluent-bit/filters.conf b/components/package-template/src/etc/fluent-bit/filters.conf new file mode 100644 index 0000000000..0f76825759 --- /dev/null +++ b/components/package-template/src/etc/fluent-bit/filters.conf @@ -0,0 +1,8 @@ +# Filter configuration for CLP operational logging + +# Add common metadata to all logs +[FILTER] + Name modify + Match clp.* + Add clp_deployment clp-package + Add source fluentd-driver diff --git a/components/package-template/src/etc/fluent-bit/fluent-bit.conf b/components/package-template/src/etc/fluent-bit/fluent-bit.conf new file mode 100644 index 0000000000..7324c317b1 --- /dev/null +++ b/components/package-template/src/etc/fluent-bit/fluent-bit.conf @@ -0,0 +1,20 @@ +# Fluent Bit configuration for CLP operational logging +# This configuration collects logs from CLP components and stores them in organized file storage. + +[SERVICE] + Daemon Off + Flush 5 + Log_Level ${FLUENT_BIT_LOG_LEVEL} + Parsers_File /fluent-bit/etc/parsers.conf + HTTP_Server On + HTTP_Listen 0.0.0.0 + HTTP_Port 2020 + Health_Check On + storage.path /var/log/clp-operational/buffer + storage.sync normal + storage.checksum off + storage.backlog.mem_limit 5M + +@INCLUDE inputs.conf +@INCLUDE filters.conf +@INCLUDE outputs.conf diff --git a/components/package-template/src/etc/fluent-bit/inputs.conf b/components/package-template/src/etc/fluent-bit/inputs.conf new file mode 100644 index 0000000000..48afe14ffd --- /dev/null +++ b/components/package-template/src/etc/fluent-bit/inputs.conf @@ -0,0 +1,9 @@ +# Input configuration for CLP operational logging +# Receives logs from Docker's fluentd logging driver + +[INPUT] + Name forward + Listen 0.0.0.0 + Port 24224 + Buffer_Chunk_Size 1M + Buffer_Max_Size 6M diff --git a/components/package-template/src/etc/fluent-bit/outputs.conf b/components/package-template/src/etc/fluent-bit/outputs.conf new file mode 100644 index 0000000000..f6f68b8d2f --- /dev/null +++ b/components/package-template/src/etc/fluent-bit/outputs.conf @@ -0,0 +1,69 @@ +# Output configuration for CLP operational logging +# Hot tier: Write to organized file storage for real-time access + +# First-party services +[OUTPUT] + Name file + Match clp.compression-scheduler + Path /var/log/clp/compression_scheduler + Mkdir On + +[OUTPUT] + Name file + Match clp.compression-worker + Path /var/log/clp/compression_worker + Mkdir On + +[OUTPUT] + Name file + Match clp.spider-compression-worker + Path /var/log/clp/spider_compression_worker + Mkdir On + +[OUTPUT] + Name file + Match clp.query-scheduler + Path /var/log/clp/query_scheduler + Mkdir On + +[OUTPUT] + Name file + Match clp.query-worker + Path /var/log/clp/query_worker + Mkdir On + +[OUTPUT] + Name file + Match clp.reducer + Path /var/log/clp/reducer + Mkdir On + +[OUTPUT] + Name file + Match clp.garbage-collector + Path /var/log/clp/garbage_collector + Mkdir On + +[OUTPUT] + Name file + Match clp.webui + Path /var/log/clp/webui + Mkdir On + +[OUTPUT] + Name file + Match clp.api-server + Path /var/log/clp/api_server + Mkdir On + +[OUTPUT] + Name file + Match clp.log-ingestor + Path /var/log/clp/log_ingestor + Mkdir On + +[OUTPUT] + Name file + Match clp.mcp-server + Path /var/log/clp/mcp_server + Mkdir On diff --git a/components/package-template/src/etc/fluent-bit/parsers.conf b/components/package-template/src/etc/fluent-bit/parsers.conf new file mode 100644 index 0000000000..9b38f9c1c6 --- /dev/null +++ b/components/package-template/src/etc/fluent-bit/parsers.conf @@ -0,0 +1,11 @@ +# Parser configuration for CLP operational logging +# Note: Most logs arrive as JSON from Docker's fluentd logging driver. +# These parsers are provided for optional post-processing. + +# Docker JSON log format (default from fluentd driver) +[PARSER] + Name docker + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L + Time_Keep On diff --git a/components/webui/client/src/api/operational-logs/index.ts b/components/webui/client/src/api/operational-logs/index.ts new file mode 100644 index 0000000000..4030a27a21 --- /dev/null +++ b/components/webui/client/src/api/operational-logs/index.ts @@ -0,0 +1,58 @@ +import { + ComponentsListing, + LogContentResponse, + LogFilesListing, +} from "@webui/common/schemas/operational-logs"; +import axios from "axios"; + + +/** + * Lists available components with operational logs. + * + * @return Promise resolving to list of components + */ +const listComponents = async (): Promise => { + const {data} = await axios.get("/api/operational-logs/components"); + + return data; +}; + +/** + * Lists log files, optionally filtered by component. + * + * @param component Optional component name to filter by + * @return Promise resolving to list of log files + */ +const listLogFiles = async (component?: string): Promise => { + const {data} = await axios.get("/api/operational-logs/files", { + params: {component}, + }); + + return data; +}; + +/** + * Reads log content from a file. + * + * @param path The path to the log file + * @param offset Line offset to start from + * @param limit Maximum number of lines to return + * @return Promise resolving to log content + */ +const readLogContent = async ( + path: string, + offset = 0, + limit = 100 +): Promise => { + const {data} = await axios.get("/api/operational-logs/content", { + params: {path, offset, limit}, + }); + + return data; +}; + +export { + listComponents, + listLogFiles, + readLogContent, +}; diff --git a/components/webui/client/src/components/Layout/MainLayout.module.css b/components/webui/client/src/components/Layout/MainLayout.module.css index 1cebf0aaad..61d63063e8 100644 --- a/components/webui/client/src/components/Layout/MainLayout.module.css +++ b/components/webui/client/src/components/Layout/MainLayout.module.css @@ -2,6 +2,20 @@ min-height: 100vh; } +.mainLayout :global(.ant-layout-sider) { + position: sticky; + top: 0; + height: 100vh; + overflow-y: auto; +} + +.siderContent { + display: flex; + flex-direction: column; + justify-content: space-between; + min-height: calc(100vh - 48px); /* Fill viewport height minus collapse trigger */ +} + .siderLogoContainer { display: flex; justify-content: center; diff --git a/components/webui/client/src/components/Layout/MainLayout.tsx b/components/webui/client/src/components/Layout/MainLayout.tsx index 39510c9a18..c53904b7cd 100644 --- a/components/webui/client/src/components/Layout/MainLayout.tsx +++ b/components/webui/client/src/components/Layout/MainLayout.tsx @@ -5,6 +5,7 @@ import { } from "react-router"; import { + FileTextOutlined, SearchOutlined, UploadOutlined, } from "@ant-design/icons"; @@ -21,11 +22,15 @@ const {Sider} = Layout; type MenuItem = Required["items"][number]; -const SIDEBAR_MENU_ITEMS: MenuItem[] = [ +const SIDEBAR_MAIN_MENU_ITEMS: MenuItem[] = [ {label: Ingest, key: "/ingest", icon: }, {label: Search, key: "/search", icon: }, ]; +const SIDEBAR_BOTTOM_MENU_ITEMS: MenuItem[] = [ + {label: Logs, key: "/logs", icon: }, +]; + /** * The main layout of web ui. * @@ -45,15 +50,22 @@ const MainLayout = () => { setCollapsed(value); }} > -
- {"CLP +
+
+
+ {"CLP +
+ +
+
- diff --git a/components/webui/client/src/components/VirtualTable/index.tsx b/components/webui/client/src/components/VirtualTable/index.tsx index 45276b6449..da5c7f3340 100644 --- a/components/webui/client/src/components/VirtualTable/index.tsx +++ b/components/webui/client/src/components/VirtualTable/index.tsx @@ -3,7 +3,7 @@ import React, { useRef, } from "react"; -import {Table} from "antd"; +import {Empty, Table} from "antd"; import { SCROLL_INCREMENT, @@ -11,6 +11,26 @@ import { type VirtualTableProps, } from "./typings"; +interface FullHeightEmptyProps { + height: number; +} + +/** + * Empty component that fills the specified height. + */ +const FullHeightEmpty = ({height}: FullHeightEmptyProps) => ( +
+ +
+); + /** * Virtual table that supports keyboard navigation. @@ -20,6 +40,8 @@ import { * @return */ const VirtualTable = >({ + scroll, + locale, ...tableProps }: VirtualTableProps) => { const containerRef = useRef(null); @@ -70,6 +92,17 @@ const VirtualTable = >({ e.preventDefault(); }, []); + // Use scroll.y height for the empty state if available + const emptyHeight = "number" === typeof scroll?.y ? scroll.y : undefined; + + // Build locale with full-height empty state + const fullHeightLocale = emptyHeight ? + { + ...locale, + emptyText: , + } : + locale; + return (
>({ > virtual={true} - {...tableProps}/> + {...tableProps} + {...(scroll && {scroll})} + {...(fullHeightLocale && {locale: fullHeightLocale})}/>
); }; diff --git a/components/webui/client/src/pages/OperationalLogsPage/index.module.css b/components/webui/client/src/pages/OperationalLogsPage/index.module.css new file mode 100644 index 0000000000..aad67e3158 --- /dev/null +++ b/components/webui/client/src/pages/OperationalLogsPage/index.module.css @@ -0,0 +1,70 @@ +.operationalLogsPage { + display: flex; + flex-direction: column; + height: 100%; + padding: 16px; + gap: 16px; +} + +.header { + display: flex; + flex-direction: row; + gap: 16px; + align-items: center; + flex-wrap: wrap; +} + +.pageTitle { + margin: 0 !important; + margin-right: auto !important; +} + +.componentSelect { + min-width: 200px; +} + +.fileSelect { + min-width: 300px; +} + +.refreshButton { +} + +.tableContainer { + flex: 1; + min-height: 0; +} + +.logTimestamp { + font-family: monospace; + white-space: nowrap; + color: #888; +} + +.logSource { + font-family: monospace; + padding: 2px 6px; + border-radius: 4px; + background-color: #e6f7ff; + color: #1890ff; +} + +.logSourceStderr { + background-color: #fff1f0; + color: #ff4d4f; +} + +.logMessage { + font-family: monospace; + white-space: pre-wrap; + word-break: break-word; +} + +.emptyState { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + height: 300px; + color: #888; +} diff --git a/components/webui/client/src/pages/OperationalLogsPage/index.tsx b/components/webui/client/src/pages/OperationalLogsPage/index.tsx new file mode 100644 index 0000000000..672a3b6764 --- /dev/null +++ b/components/webui/client/src/pages/OperationalLogsPage/index.tsx @@ -0,0 +1,295 @@ +import { + useEffect, + useMemo, + useRef, + useState, +} from "react"; + +import {ReloadOutlined} from "@ant-design/icons"; +import {useQuery} from "@tanstack/react-query"; +import { + Button, + Empty, + Select, + Spin, + Typography, +} from "antd"; +import type {ColumnsType} from "antd/es/table"; + +import { + listComponents, + listLogFiles, + readLogContent, +} from "../../api/operational-logs"; +import VirtualTable from "../../components/VirtualTable"; +import styles from "./index.module.css"; + + +const {Text, Title} = Typography; + +// Padding accounts for: page bottom padding (16px) + table header (~39px) + buffer +const TABLE_BOTTOM_PADDING = 60; + +interface LogTableEntry { + key: string; + timestamp: number; + source: string; + log: string; +} + +/** + * Formats a Unix timestamp to a readable date/time string. + * + * @param timestamp Unix timestamp in seconds + * @return Formatted date/time string + */ +const formatTimestamp = (timestamp: number): string => { + const date = new Date(timestamp * 1000); + return date.toISOString().replace("T", " ") + .slice(0, 23); +}; + +/** + * Operational Logs page for viewing CLP component logs. + * + * @return + */ +const OperationalLogsPage = () => { + const [selectedComponent, setSelectedComponent] = useState(undefined); + const [selectedFile, setSelectedFile] = useState(undefined); + const [tableHeight, setTableHeight] = useState(400); + const tableContainerRef = useRef(null); + + // Calculate table height based on available space + useEffect(() => { + const updateHeight = () => { + if (tableContainerRef.current) { + const {top} = tableContainerRef.current.getBoundingClientRect(); + const availableHeight = window.innerHeight - top - TABLE_BOTTOM_PADDING; + setTableHeight(Math.max(200, availableHeight)); + } + }; + + updateHeight(); + window.addEventListener("resize", updateHeight); + + return () => { + window.removeEventListener("resize", updateHeight); + }; + }, []); + + // Fetch available components + const { + data: components = [], + isLoading: isLoadingComponents, + refetch: refetchComponents, + } = useQuery({ + queryKey: ["operational-logs", + "components"], + queryFn: listComponents, + }); + + // Fetch log files for selected component + const { + data: logFiles = [], + isLoading: isLoadingFiles, + refetch: refetchFiles, + } = useQuery({ + queryKey: ["operational-logs", + "files", + selectedComponent], + queryFn: () => listLogFiles(selectedComponent), + enabled: true, + }); + + // Fetch log content for selected file + const { + data: logContent, + isLoading: isLoadingContent, + refetch: refetchContent, + } = useQuery({ + queryKey: ["operational-logs", + "content", + selectedFile], + queryFn: () => { + if ("undefined" === typeof selectedFile) { + return Promise.resolve({entries: [], totalLines: 0, hasMore: false}); + } + + return readLogContent(selectedFile, 0, 1000); + }, + enabled: "undefined" !== typeof selectedFile, + }); + + // Auto-select first file when component changes + useEffect(() => { + if (0 < logFiles.length) { + const relevantFiles = "undefined" !== typeof selectedComponent ? + logFiles.filter((f) => f.component === selectedComponent) : + logFiles; + + if (0 < relevantFiles.length) { + setSelectedFile(relevantFiles[0]?.path); + } + } + }, [logFiles, + selectedComponent]); + + // Create component options + const componentOptions = useMemo(() => { + return components.map((c) => ({ + label: `${c.name}${c.hasLogs ? + "" : + " (no logs)"}`, + value: c.name, + disabled: false === c.hasLogs, + })); + }, [components]); + + // Create file options + const fileOptions = useMemo(() => { + const relevantFiles = "undefined" !== typeof selectedComponent ? + logFiles.filter((f) => f.component === selectedComponent) : + logFiles; + + return relevantFiles.map((f) => ({ + label: `${f.component}/${f.filename}`, + value: f.path, + })); + }, [logFiles, + selectedComponent]); + + // Create table data + const tableData = useMemo((): LogTableEntry[] => { + if ("undefined" === typeof logContent) { + return []; + } + + return logContent.entries.map((entry, index) => ({ + key: `${entry.timestamp}-${index}`, + timestamp: entry.timestamp, + source: entry.source, + log: entry.log, + })); + }, [logContent]); + + // Define table columns + const columns: ColumnsType = [ + { + title: "Timestamp", + dataIndex: "timestamp", + key: "timestamp", + width: 200, + render: (timestamp: number) => ( + + {formatTimestamp(timestamp)} + + ), + }, + { + title: "Source", + dataIndex: "source", + key: "source", + width: 80, + render: (source: string) => ( + + {source} + + ), + }, + { + title: "Message", + dataIndex: "log", + key: "log", + render: (log: string) => ( + + {log} + + ), + }, + ]; + + const handleRefresh = () => { + refetchComponents(); + refetchFiles(); + if ("undefined" !== typeof selectedFile) { + refetchContent(); + } + }; + + const isLoading = isLoadingComponents || isLoadingFiles || isLoadingContent; + + return ( +
+
+ + {"undefined" !== typeof selectedFile ? + `Log entries (${logContent?.totalLines ?? 0} total)` : + "Operational Logs"} + + + +
+ +
+ {isLoading ? + ( +
+ +
+ ) : + 0 === tableData.length ? + ( + + ) : + ( + + className={styles["logTable"] ?? ""} + columns={columns} + dataSource={tableData} + pagination={false} + scroll={{y: tableHeight}} + size={"small"}/> + )} +
+
+ ); +}; + +export default OperationalLogsPage; diff --git a/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.module.css b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.module.css new file mode 100644 index 0000000000..64a78a2729 --- /dev/null +++ b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.module.css @@ -0,0 +1,4 @@ +.tableContainer { + display: flex; + flex-direction: column; +} diff --git a/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.tsx b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.tsx index 71820701a9..9ccbe22811 100644 --- a/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.tsx +++ b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/index.tsx @@ -8,6 +8,7 @@ import {CLP_QUERY_ENGINES} from "@webui/common/config"; import {SETTINGS_QUERY_ENGINE} from "../../../../config"; import usePrestoSearchState from "../../SearchState/Presto"; +import styles from "./index.module.css"; import SearchResultsVirtualTable from "./Native/SearchResultsVirtualTable"; import PrestoResultsVirtualTable from "./Presto/PrestoResultsVirtualTable"; import {TABLE_BOTTOM_PADDING} from "./typings"; @@ -45,7 +46,8 @@ const SearchResultsTable = () => { return (
0 ? tableHeight : undefined}} > {CLP_QUERY_ENGINES.PRESTO === SETTINGS_QUERY_ENGINE ? ( diff --git a/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/typings.ts b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/typings.ts index 940970bd09..521dea25ad 100644 --- a/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/typings.ts +++ b/components/webui/client/src/pages/SearchPage/SearchResults/SearchResultsTable/typings.ts @@ -1,7 +1,7 @@ /** * Padding for the table to the bottom of the page. */ -const TABLE_BOTTOM_PADDING = 95; +const TABLE_BOTTOM_PADDING = 24; /** * The maximum number of results to retrieve for a search. diff --git a/components/webui/client/src/router.tsx b/components/webui/client/src/router.tsx index babeb1f411..03a3f894c2 100644 --- a/components/webui/client/src/router.tsx +++ b/components/webui/client/src/router.tsx @@ -6,6 +6,7 @@ import { import MainLayout from "./components/Layout/MainLayout"; import IngestPage from "./pages/IngestPage"; import QueryStatus from "./pages/LogViewerLoadingPage/QueryStatus"; +import OperationalLogsPage from "./pages/OperationalLogsPage"; import SearchPage from "./pages/SearchPage"; @@ -22,6 +23,7 @@ const router = createBrowserRouter([ }, {path: "ingest", Component: IngestPage}, {path: "search", Component: SearchPage}, + {path: "logs", Component: OperationalLogsPage}, ], }, { diff --git a/components/webui/common/src/schemas/operational-logs.ts b/components/webui/common/src/schemas/operational-logs.ts new file mode 100644 index 0000000000..164d4caa3d --- /dev/null +++ b/components/webui/common/src/schemas/operational-logs.ts @@ -0,0 +1,130 @@ +import { + Static, + Type, +} from "@sinclair/typebox"; + +import {StringSchema} from "./common.js"; + + +/** + * Known CLP component names for operational logs. + */ +const CLP_COMPONENTS = [ + "api_server", + "compression_scheduler", + "compression_worker", + "database", + "garbage_collector", + "log_ingestor", + "mcp_server", + "query_scheduler", + "query_worker", + "queue", + "reducer", + "redis", + "results_cache", + "webui", +] as const; + +const ComponentNameSchema = Type.Union( + CLP_COMPONENTS.map((c) => Type.Literal(c)) +); + +type ComponentName = Static; + +/** + * Schema for a component's log file metadata. + */ +const LogFileMetadataSchema = Type.Object({ + component: StringSchema, + filename: StringSchema, + path: StringSchema, + sizeBytes: Type.Number(), + modifiedTime: Type.Number(), +}); + +type LogFileMetadata = Static; + +/** + * Schema for listing available log files. + */ +const LogFilesListingSchema = Type.Array(LogFileMetadataSchema); + +type LogFilesListing = Static; + +/** + * Schema for listing components request. + */ +const ListComponentsRequestSchema = Type.Object({}); + +/** + * Schema for listing components response. + */ +const ComponentsListingSchema = Type.Array(Type.Object({ + name: StringSchema, + hasLogs: Type.Boolean(), +})); + +type ComponentsListing = Static; + +/** + * Schema for listing log files request. + */ +const ListLogFilesRequestSchema = Type.Object({ + component: Type.Optional(Type.String()), +}); + +/** + * Schema for reading log content request. + */ +const ReadLogContentRequestSchema = Type.Object({ + path: StringSchema, + offset: Type.Optional(Type.Number({minimum: 0})), + limit: Type.Optional(Type.Number({minimum: 1, maximum: 10000})), +}); + +/** + * Schema for a parsed log entry from Fluent Bit output. + */ +const LogEntrySchema = Type.Object({ + timestamp: Type.Number(), + source: Type.String(), + log: Type.String(), + containerId: Type.Optional(Type.String()), + containerName: Type.Optional(Type.String()), + clpDeployment: Type.Optional(Type.String()), +}); + +type LogEntry = Static; + +/** + * Schema for log content response. + */ +const LogContentResponseSchema = Type.Object({ + entries: Type.Array(LogEntrySchema), + totalLines: Type.Number(), + hasMore: Type.Boolean(), +}); + +type LogContentResponse = Static; + +export { + CLP_COMPONENTS, + ComponentNameSchema, + ComponentsListingSchema, + ListComponentsRequestSchema, + ListLogFilesRequestSchema, + LogContentResponseSchema, + LogEntrySchema, + LogFileMetadataSchema, + LogFilesListingSchema, + ReadLogContentRequestSchema, +}; +export type { + ComponentName, + ComponentsListing, + LogContentResponse, + LogEntry, + LogFileMetadata, + LogFilesListing, +}; diff --git a/components/webui/server/settings.json b/components/webui/server/settings.json index da8b8094b5..839c87f4ac 100644 --- a/components/webui/server/settings.json +++ b/components/webui/server/settings.json @@ -29,5 +29,7 @@ "ClpQueryEngine": "clp-s", "ClpStorageEngine": "clp-s", "PrestoHost": "localhost", - "PrestoPort": 8889 + "PrestoPort": 8889, + + "OperationalLogsDir": "/var/log/clp" } diff --git a/components/webui/server/src/routes/api/operational-logs/index.ts b/components/webui/server/src/routes/api/operational-logs/index.ts new file mode 100644 index 0000000000..77118992f0 --- /dev/null +++ b/components/webui/server/src/routes/api/operational-logs/index.ts @@ -0,0 +1,280 @@ +import {FastifyPluginAsyncTypebox} from "@fastify/type-provider-typebox"; +import { + ComponentsListingSchema, + ListComponentsRequestSchema, + ListLogFilesRequestSchema, + LogContentResponseSchema, + LogFilesListingSchema, + ReadLogContentRequestSchema, +} from "@webui/common/schemas/operational-logs"; +import fs from "fs/promises"; +import {constants} from "http2"; +import path from "path"; + +import settings from "../../../../settings.json" with {type: "json"}; + + +/** + * Parses a Fluent Bit log line. + * + * Format: `{tag}: [{timestamp}, {json_data}]` + * + * @param line The raw log line + * @return Parsed log entry or null if parsing fails + */ +const parseFluentBitLogLine = (line: string): { + timestamp: number; + source: string; + log: string; + containerId?: string; + containerName?: string; + clpDeployment?: string; +} | null => { + // Match pattern: tag: [timestamp, {json}] + const match = line.match(/^[^:]+:\s*\[(\d+(?:\.\d+)?),\s*(.+)\]$/); + if (null === match) { + // Try to parse as plain text log line + return { + timestamp: Date.now() / 1000, + source: "stdout", + log: line, + }; + } + + const timestamp = parseFloat(match[1] ?? "0"); + const jsonStr = match[2]; + + if ("undefined" === typeof jsonStr) { + return null; + } + + try { + const data = JSON.parse(jsonStr) as { + source?: string; + log?: string; + container_id?: string; + container_name?: string; + clp_deployment?: string; + }; + + const result: { + timestamp: number; + source: string; + log: string; + containerId?: string; + containerName?: string; + clpDeployment?: string; + } = { + timestamp: timestamp, + source: data.source ?? "stdout", + log: data.log ?? "", + }; + + if ("undefined" !== typeof data.container_id) { + result.containerId = data.container_id; + } + if ("undefined" !== typeof data.container_name) { + result.containerName = data.container_name; + } + if ("undefined" !== typeof data.clp_deployment) { + result.clpDeployment = data.clp_deployment; + } + + return result; + } catch { + return null; + } +}; + +/** + * Operational logs API routes. + * + * @param fastify + */ +const plugin: FastifyPluginAsyncTypebox = async (fastify) => { + const operationalLogsDir = (settings as {OperationalLogsDir?: string}).OperationalLogsDir ?? + "/var/log/clp"; + + /** + * Lists available components with logs. + */ + fastify.get( + "/components", + { + schema: { + querystring: ListComponentsRequestSchema, + response: { + [constants.HTTP_STATUS_OK]: ComponentsListingSchema, + }, + tags: ["Operational Logs"], + }, + }, + async (request, reply) => { + try { + await fs.access(operationalLogsDir); + } catch { + return await reply.notFound(`Operational logs directory not found: ${operationalLogsDir}`); + } + + try { + const entries = await fs.readdir(operationalLogsDir, {withFileTypes: true}); + const components = await Promise.all( + entries + .filter((entry) => entry.isDirectory()) + .map(async (entry) => { + const componentDir = path.join(operationalLogsDir, entry.name); + const files = await fs.readdir(componentDir); + return { + name: entry.name, + hasLogs: files.length > 0, + }; + }) + ); + + return components.sort((a, b) => a.name.localeCompare(b.name)); + } catch (e: unknown) { + const errMsg = "Unable to list components"; + request.log.error(e, errMsg); + + return reply.internalServerError(errMsg); + } + } + ); + + /** + * Lists log files for a component or all components. + */ + fastify.get( + "/files", + { + schema: { + querystring: ListLogFilesRequestSchema, + response: { + [constants.HTTP_STATUS_OK]: LogFilesListingSchema, + }, + tags: ["Operational Logs"], + }, + }, + async (request, reply) => { + const {component} = request.query; + + try { + await fs.access(operationalLogsDir); + } catch { + return await reply.notFound(`Operational logs directory not found: ${operationalLogsDir}`); + } + + try { + const logFiles: Array<{ + component: string; + filename: string; + path: string; + sizeBytes: number; + modifiedTime: number; + }> = []; + + // Get list of component directories to scan + let componentDirs: string[]; + if ("undefined" !== typeof component && "" !== component) { + componentDirs = [component]; + } else { + const entries = await fs.readdir(operationalLogsDir, {withFileTypes: true}); + componentDirs = entries + .filter((entry) => entry.isDirectory()) + .map((entry) => entry.name); + } + + // Scan each component directory for log files + for (const componentName of componentDirs) { + const componentDir = path.join(operationalLogsDir, componentName); + + try { + const files = await fs.readdir(componentDir); + for (const filename of files) { + const filePath = path.join(componentDir, filename); + const stats = await fs.stat(filePath); + + if (stats.isFile()) { + logFiles.push({ + component: componentName, + filename: filename, + path: filePath, + sizeBytes: stats.size, + modifiedTime: stats.mtimeMs, + }); + } + } + } catch { + // Skip directories that don't exist or can't be read + } + } + + // Sort by modified time descending (newest first) + logFiles.sort((a, b) => b.modifiedTime - a.modifiedTime); + + return logFiles; + } catch (e: unknown) { + const errMsg = "Unable to list log files"; + request.log.error(e, errMsg); + + return reply.internalServerError(errMsg); + } + } + ); + + /** + * Reads and parses log content from a file. + */ + fastify.get( + "/content", + { + schema: { + querystring: ReadLogContentRequestSchema, + response: { + [constants.HTTP_STATUS_OK]: LogContentResponseSchema, + }, + tags: ["Operational Logs"], + }, + }, + async (request, reply) => { + const {path: requestedPath, offset = 0, limit = 100} = request.query; + + // Security: Ensure path is within operational logs directory + const normalizedPath = path.normalize(requestedPath); + if (false === normalizedPath.startsWith(operationalLogsDir)) { + return await reply.badRequest("Invalid log file path"); + } + + try { + await fs.access(normalizedPath); + } catch { + return await reply.notFound(`Log file not found: ${normalizedPath}`); + } + + try { + const content = await fs.readFile(normalizedPath, "utf-8"); + const lines = content.split("\n").filter((line) => line.trim().length > 0); + const totalLines = lines.length; + + // Apply offset and limit + const selectedLines = lines.slice(offset, offset + limit); + const entries = selectedLines + .map(parseFluentBitLogLine) + .filter((entry): entry is NonNullable => null !== entry); + + return { + entries: entries, + totalLines: totalLines, + hasMore: offset + limit < totalLines, + }; + } catch (e: unknown) { + const errMsg = "Unable to read log file"; + request.log.error(e, errMsg); + + return reply.internalServerError(errMsg); + } + } + ); +}; + +export default plugin; diff --git a/docs/src/user-docs/guides-multi-host.md b/docs/src/user-docs/guides-multi-host.md index e9e5ecd436..88e7f7243b 100755 --- a/docs/src/user-docs/guides-multi-host.md +++ b/docs/src/user-docs/guides-multi-host.md @@ -319,6 +319,49 @@ To view logs for a specific service: docker compose --project-name clp-package- logs -f ``` +## Operational logging + +CLP uses [Fluent Bit][fluent-bit] to collect and aggregate operational logs from all services. Logs +are written to `var/log//` on each host. + +### Current limitations in multi-host deployments + +:::{warning} +In multi-host deployments, operational logs are currently stored locally on each host. This means: + +* Each host runs its own Fluent Bit instance that collects logs from containers on that host. +* Logs are written to the local filesystem (`var/log//`) and are not automatically + aggregated across hosts. +* To view logs from a specific host, you must access that host directly. +::: + +### Workarounds + +Until centralized log aggregation is implemented, you can: + +1. **Use a shared filesystem**: Mount `var/log/` to a shared filesystem (e.g., NFS, SeaweedFS) so + all hosts write logs to the same location. Note that you may need to include the hostname in the + log path to avoid conflicts. + +2. **Use `docker compose logs`**: View real-time logs for a service using: + + ```bash + docker compose --project-name clp-package- logs -f + ``` + +3. **Manual log collection**: Periodically copy logs from each host to a central location for + analysis. + +### Future improvements + +A future release will add support for shipping logs to S3 or other centralized storage, enabling: + +* Centralized log aggregation across all hosts +* Log viewing through the webui regardless of which host generated the logs +* Long-term log retention with tiered storage (hot/warm/cold) + +[fluent-bit]: https://fluentbit.io/ + ## Setting up SeaweedFS The instructions below are for running a simple SeaweedFS cluster on a set of hosts. For other use diff --git a/tools/deployment/package-helm/Chart.yaml b/tools/deployment/package-helm/Chart.yaml index 1da0d1598e..d26da895a4 100644 --- a/tools/deployment/package-helm/Chart.yaml +++ b/tools/deployment/package-helm/Chart.yaml @@ -1,6 +1,6 @@ apiVersion: "v2" name: "clp" -version: "0.1.2-dev.7" +version: "0.1.2-dev.8" description: "A Helm chart for CLP's (Compressed Log Processor) package deployment" type: "application" appVersion: "0.7.1-dev" diff --git a/tools/deployment/package-helm/templates/NOTES.txt b/tools/deployment/package-helm/templates/NOTES.txt new file mode 100644 index 0000000000..97a261da8a --- /dev/null +++ b/tools/deployment/package-helm/templates/NOTES.txt @@ -0,0 +1 @@ +TODO: This should be filled with usage instructions. \ No newline at end of file diff --git a/tools/deployment/package-helm/templates/api-server-deployment.yaml b/tools/deployment/package-helm/templates/api-server-deployment.yaml new file mode 100644 index 0000000000..26da76486f --- /dev/null +++ b/tools/deployment/package-helm/templates/api-server-deployment.yaml @@ -0,0 +1,107 @@ +{{- if .Values.clpConfig.api_server }} +apiVersion: "apps/v1" +kind: "Deployment" +metadata: + name: {{ include "clp.fullname" . }}-api-server + labels: + {{- include "clp.labels" . | nindent 4 }} + app.kubernetes.io/component: "api-server" +spec: + replicas: 1 + selector: + matchLabels: + {{- include "clp.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: "api-server" + template: + metadata: + labels: + {{- include "clp.labels" . | nindent 8 }} + app.kubernetes.io/component: "api-server" + spec: + serviceAccountName: {{ include "clp.fullname" . }}-job-watcher + terminationGracePeriodSeconds: 10 + securityContext: + runAsUser: {{ .Values.securityContext.firstParty.uid }} + runAsGroup: {{ .Values.securityContext.firstParty.gid }} + fsGroup: {{ .Values.securityContext.firstParty.gid }} + initContainers: + - {{- include "clp.waitFor" (dict + "root" . + "type" "job" + "name" "db-table-creator" + ) | nindent 10 }} + - {{- include "clp.waitFor" (dict + "root" . + "type" "job" + "name" "results-cache-indices-creator" + ) | nindent 10 }} + containers: + - name: "api-server" + image: "{{ include "clp.image.ref" . }}" + imagePullPolicy: "{{ .Values.image.clpPackage.pullPolicy }}" + env: + - name: "CLP_DB_PASS" + valueFrom: + secretKeyRef: + name: {{ include "clp.fullname" . }}-database + key: "password" + - name: "CLP_DB_USER" + valueFrom: + secretKeyRef: + name: {{ include "clp.fullname" . }}-database + key: "username" + - name: "CLP_LOGS_DIR" + value: "/var/log/api_server" + - name: "RUST_LOG" + value: "INFO" + ports: + - name: "api-server" + containerPort: 3001 + volumeMounts: + - name: {{ include "clp.volumeName" (dict + "component_category" "api-server" + "name" "logs" + ) | quote }} + mountPath: "/var/log/api_server" + - name: "config" + mountPath: "/etc/clp-config.yaml" + subPath: "clp-config.yaml" + readOnly: true + {{- if eq .Values.clpConfig.stream_output.storage.type "fs" }} + - name: {{ include "clp.volumeName" (dict + "component_category" "shared-data" + "name" "streams" + ) | quote }} + mountPath: "/var/data/streams" + {{- end }} + command: [ + "/opt/clp/bin/api_server", + "--host", "0.0.0.0", + "--port", "3001", + "--config", "/etc/clp-config.yaml" + ] + readinessProbe: + {{- include "clp.readinessProbeTimings" . | nindent 12 }} + httpGet: &api-server-health-check + path: "/health" + port: 3001 + livenessProbe: + {{- include "clp.livenessProbeTimings" . | nindent 12 }} + httpGet: *api-server-health-check + volumes: + - {{- include "clp.pvcVolume" (dict + "root" . + "component_category" "api-server" + "name" "logs" + ) | nindent 10 }} + - name: "config" + configMap: + name: {{ include "clp.fullname" . }}-config + {{- if eq .Values.clpConfig.stream_output.storage.type "fs" }} + - {{- include "clp.pvcVolume" (dict + "root" . + "component_category" "shared-data" + "name" "streams" + ) | nindent 10 }} + {{- end }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/api-server-logs-pv.yaml b/tools/deployment/package-helm/templates/api-server-logs-pv.yaml new file mode 100644 index 0000000000..89fd30dda8 --- /dev/null +++ b/tools/deployment/package-helm/templates/api-server-logs-pv.yaml @@ -0,0 +1,11 @@ +{{- if .Values.clpConfig.api_server }} +{{- include "clp.createLocalPv" (dict + "root" . + "component_category" "api-server" + "name" "logs" + "nodeRole" "control-plane" + "capacity" "5Gi" + "accessModes" (list "ReadWriteOnce") + "hostPath" (printf "%s/api_server" .Values.clpConfig.logs_directory) +) }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/api-server-logs-pvc.yaml b/tools/deployment/package-helm/templates/api-server-logs-pvc.yaml new file mode 100644 index 0000000000..d9429b6dad --- /dev/null +++ b/tools/deployment/package-helm/templates/api-server-logs-pvc.yaml @@ -0,0 +1,9 @@ +{{- if .Values.clpConfig.api_server }} +{{- include "clp.createPvc" (dict + "root" . + "component_category" "api-server" + "name" "logs" + "capacity" "5Gi" + "accessModes" (list "ReadWriteOnce") +) }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/api-server-service.yaml b/tools/deployment/package-helm/templates/api-server-service.yaml new file mode 100644 index 0000000000..0aed0e7efa --- /dev/null +++ b/tools/deployment/package-helm/templates/api-server-service.yaml @@ -0,0 +1,18 @@ +{{- if .Values.clpConfig.api_server }} +apiVersion: "v1" +kind: "Service" +metadata: + name: {{ include "clp.fullname" . }}-api-server + labels: + {{- include "clp.labels" . | nindent 4 }} + app.kubernetes.io/component: "api-server" +spec: + type: "NodePort" + selector: + {{- include "clp.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: "api-server" + ports: + - port: 3001 + targetPort: "api-server" + nodePort: {{ .Values.clpConfig.api_server.port }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml b/tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml index f58e9594b8..37a86ca019 100644 --- a/tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml +++ b/tools/deployment/package-helm/templates/compression-scheduler-deployment.yaml @@ -66,11 +66,6 @@ spec: - name: "PYTHONPATH" value: "/opt/clp/lib/python3/site-packages" volumeMounts: - - {{- include "clp.logsInputVolumeMount" . | nindent 14 }} - - name: "config" - mountPath: "/etc/clp-config.yaml" - subPath: "clp-config.yaml" - readOnly: true - name: {{ include "clp.volumeName" (dict "component_category" "compression-scheduler" "name" "logs" @@ -81,13 +76,24 @@ spec: "name" "user-logs" ) | quote }} mountPath: "/var/log/user" + - name: "config" + mountPath: "/etc/clp-config.yaml" + subPath: "clp-config.yaml" + readOnly: true + {{- if .Values.clpConfig.aws_config_directory }} + - name: "aws-config" + mountPath: "/opt/clp/.aws" + readOnly: true + {{- end }} + {{- if eq .Values.clpConfig.logs_input.type "fs" }} + - {{- include "clp.logsInputVolumeMount" . | nindent 14 }} + {{- end }} command: [ "python3", "-u", "-m", "job_orchestration.scheduler.compress.compression_scheduler", "--config", "/etc/clp-config.yaml" ] volumes: - - {{- include "clp.logsInputVolume" . | nindent 10 }} - {{- include "clp.pvcVolume" (dict "root" . "component_category" "compression-scheduler" @@ -101,3 +107,12 @@ spec: - name: "config" configMap: name: {{ include "clp.fullname" . }}-config + {{- with .Values.clpConfig.aws_config_directory }} + - name: "aws-config" + hostPath: + path: {{ . | quote }} + type: "Directory" + {{- end }} + {{- if eq .Values.clpConfig.logs_input.type "fs" }} + - {{- include "clp.logsInputVolume" . | nindent 10 }} + {{- end }} diff --git a/tools/deployment/package-helm/templates/compression-worker-deployment.yaml b/tools/deployment/package-helm/templates/compression-worker-deployment.yaml index 70a6165154..d8d3308353 100644 --- a/tools/deployment/package-helm/templates/compression-worker-deployment.yaml +++ b/tools/deployment/package-helm/templates/compression-worker-deployment.yaml @@ -45,7 +45,6 @@ spec: - name: "PYTHONPATH" value: "/opt/clp/lib/python3/site-packages" volumeMounts: - - {{- include "clp.logsInputVolumeMount" . | nindent 14 }} - name: {{ include "clp.volumeName" (dict "component_category" "compression-worker" "name" "logs" @@ -60,11 +59,28 @@ spec: mountPath: "/etc/clp-config.yaml" subPath: "clp-config.yaml" readOnly: true + {{- if eq .Values.clpConfig.archive_output.storage.type "fs" }} - name: {{ include "clp.volumeName" (dict "component_category" "shared-data" "name" "archives" ) | quote }} mountPath: "/var/data/archives" + {{- end }} + {{- if .Values.clpConfig.aws_config_directory }} + - name: "aws-config" + mountPath: "/opt/clp/.aws" + readOnly: true + {{- end }} + {{- if eq .Values.clpConfig.logs_input.type "fs" }} + - {{- include "clp.logsInputVolumeMount" . | nindent 14 }} + {{- end }} + {{- if eq .Values.clpConfig.archive_output.storage.type "s3" }} + - name: {{ include "clp.volumeName" (dict + "component_category" "compression-worker" + "name" "staged-archives" + ) | quote }} + mountPath: "/var/data/staged-archives" + {{- end }} command: [ "python3", "-u", "/opt/clp/lib/python3/site-packages/bin/celery", @@ -77,7 +93,6 @@ spec: "-n", "compression-worker" ] volumes: - - {{- include "clp.logsInputVolume" . | nindent 10 }} - {{- include "clp.pvcVolume" (dict "root" . "component_category" "compression-worker" @@ -88,11 +103,29 @@ spec: "component_category" "compression-worker" "name" "tmp" ) | nindent 10 }} + - name: "config" + configMap: + name: {{ include "clp.fullname" . }}-config + {{- if eq .Values.clpConfig.archive_output.storage.type "fs" }} - {{- include "clp.pvcVolume" (dict "root" . "component_category" "shared-data" "name" "archives" ) | nindent 10 }} - - name: "config" - configMap: - name: {{ include "clp.fullname" . }}-config + {{- end }} + {{- with .Values.clpConfig.aws_config_directory }} + - name: "aws-config" + hostPath: + path: {{ . | quote }} + type: "Directory" + {{- end }} + {{- if eq .Values.clpConfig.logs_input.type "fs" }} + - {{- include "clp.logsInputVolume" . | nindent 10 }} + {{- end }} + {{- if eq .Values.clpConfig.archive_output.storage.type "s3" }} + - {{- include "clp.pvcVolume" (dict + "root" . + "component_category" "compression-worker" + "name" "staged-archives" + ) | nindent 10 }} + {{- end }} diff --git a/tools/deployment/package-helm/templates/compression-worker-staged-archives-pv.yaml b/tools/deployment/package-helm/templates/compression-worker-staged-archives-pv.yaml new file mode 100644 index 0000000000..c00226b797 --- /dev/null +++ b/tools/deployment/package-helm/templates/compression-worker-staged-archives-pv.yaml @@ -0,0 +1,11 @@ +{{- if eq .Values.clpConfig.archive_output.storage.type "s3" }} +{{- include "clp.createLocalPv" (dict + "root" . + "component_category" "compression-worker" + "name" "staged-archives" + "nodeRole" "control-plane" + "capacity" "20Gi" + "accessModes" (list "ReadWriteOnce") + "hostPath" (printf "%s/staged-archives" .Values.clpConfig.data_directory) +) }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/compression-worker-staged-archives-pvc.yaml b/tools/deployment/package-helm/templates/compression-worker-staged-archives-pvc.yaml new file mode 100644 index 0000000000..b8a9367236 --- /dev/null +++ b/tools/deployment/package-helm/templates/compression-worker-staged-archives-pvc.yaml @@ -0,0 +1,9 @@ +{{- if eq .Values.clpConfig.archive_output.storage.type "s3" }} +{{- include "clp.createPvc" (dict + "root" . + "component_category" "compression-worker" + "name" "staged-archives" + "capacity" "20Gi" + "accessModes" (list "ReadWriteOnce") +) }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/configmap.yaml b/tools/deployment/package-helm/templates/configmap.yaml index f61296931f..09562afcbf 100644 --- a/tools/deployment/package-helm/templates/configmap.yaml +++ b/tools/deployment/package-helm/templates/configmap.yaml @@ -6,78 +6,198 @@ metadata: {{- include "clp.labels" . | nindent 4 }} data: clp-config.yaml: | + {{- with .Values.clpConfig.api_server }} + api_server: + default_max_num_query_results: {{ .default_max_num_query_results | int }} + host: "localhost" + port: 3001 + query_job_polling: + initial_backoff_ms: {{ .query_job_polling.initial_backoff_ms | int }} + max_backoff_ms: {{ .query_job_polling.max_backoff_ms | int }} + {{- else }} + api_server: null + {{- end }} + {{- with .Values.clpConfig.archive_output }} archive_output: - compression_level: {{ .Values.clpConfig.archive_output.compression_level }} + compression_level: {{ .compression_level | int }} + {{ with .retention_period }} + retention_period: {{ . | int }} + {{ else }} + retention_period: null + {{ end }} storage: - directory: "/var/data/archives" + {{- if eq .storage.type "fs" }} type: "fs" - target_archive_size: {{ .Values.clpConfig.archive_output.target_archive_size | int }} - target_dictionaries_size: {{ .Values.clpConfig.archive_output.target_dictionaries_size - | int }} - target_encoded_file_size: {{ .Values.clpConfig.archive_output.target_encoded_file_size - | int }} - target_segment_size: {{ .Values.clpConfig.archive_output.target_segment_size - | int }} + directory: "/var/data/archives" + {{- else }} + type: "s3" + staging_directory: "/var/data/staged-archives" + {{- with .storage.s3_config }} + s3_config: + endpoint_url: {{ .endpoint_url | default "null" }} + region_code: {{ .region_code | quote }} + bucket: {{ .bucket | quote }} + key_prefix: {{ .key_prefix | quote }} + {{- with .aws_authentication }} + aws_authentication: + type: {{ .type | quote }} + {{- if eq .type "credentials" }} + credentials: + access_key_id: {{ .credentials.access_key_id | quote }} + secret_access_key: {{ .credentials.secret_access_key | quote }} + {{- end }} + {{- if eq .type "profile" }} + profile: {{ .profile | quote }} + {{- end }} + {{- end }}{{/* with .aws_authentication */}} + {{- end }}{{/* with .storage.s3_config */}} + {{- end }}{{/* if eq .storage.type "fs" */}} + target_archive_size: {{ .target_archive_size | int }} + target_dictionaries_size: {{ .target_dictionaries_size | int }} + target_encoded_file_size: {{ .target_encoded_file_size | int }} + target_segment_size: {{ .target_segment_size | int }} + {{- end }}{{/* with .Values.clpConfig.archive_output */}} + {{- with .Values.clpConfig.aws_config_directory }} + aws_config_directory: {{ . | quote }} + {{- else }} + aws_config_directory: null + {{- end }} compression_scheduler: jobs_poll_delay: {{ .Values.clpConfig.compression_scheduler.jobs_poll_delay }} logging_level: {{ .Values.clpConfig.compression_scheduler.logging_level | quote }} max_concurrent_tasks_per_job: {{ - .Values.clpConfig.compression_scheduler.max_concurrent_tasks_per_job }} + .Values.clpConfig.compression_scheduler.max_concurrent_tasks_per_job | int }} + type: {{ .Values.clpConfig.compression_scheduler.type | quote }} compression_worker: logging_level: {{ .Values.clpConfig.compression_worker.logging_level | quote }} data_directory: "/var/data" database: auto_commit: false compress: true - host: {{ include "clp.fullname" . }}-database - name: {{ .Values.clpConfig.database.name | quote }} + host: "{{ include "clp.fullname" . }}-database" + names: + clp: {{ .Values.clpConfig.database.names.clp | quote }} port: 3306 ssl_cert: null type: {{ .Values.clpConfig.database.type | quote }} + garbage_collector: + logging_level: {{ .Values.clpConfig.garbage_collector.logging_level | quote }} + sweep_interval: + archive: {{ .Values.clpConfig.garbage_collector.sweep_interval.archive | int }} + search_result: {{ .Values.clpConfig.garbage_collector.sweep_interval.search_result | int }} logs_directory: "/var/log" + {{- with .Values.clpConfig.logs_input }} logs_input: - directory: "/mnt/logs" + {{- if eq .type "fs" }} type: "fs" + directory: "/mnt/logs" + {{- else }} + type: "s3" + {{- with .aws_authentication }} + aws_authentication: + type: {{ .type | quote }} + {{- if eq .type "credentials" }} + credentials: + access_key_id: {{ .credentials.access_key_id | quote }} + secret_access_key: {{ .credentials.secret_access_key | quote }} + {{- end }} + {{- if eq .type "profile" }} + profile: {{ .profile | quote }} + {{- end }} + {{- end }}{{/* with .aws_authentication */}} + {{- end }}{{/* if eq .type "fs" */}} + {{- end }}{{/* with .Values.clpConfig.logs_input */}} + {{- with .Values.clpConfig.log_ingestor }} + log_ingestor: + buffer_flush_threshold: {{ .buffer_flush_threshold | int }} + buffer_flush_timeout: {{ .buffer_flush_timeout | int }} + channel_capacity: {{ .channel_capacity | int }} + host: "localhost" + logging_level: {{ .logging_level | quote }} + port: 3002 + {{- else }} + log_ingestor: null + {{- end }} + {{- with .Values.clpConfig.mcp_server }} + mcp_server: + host: "localhost" + logging_level: {{ .logging_level | quote }} + port: 8000 + {{- else }} + mcp_server: null + {{- end }} package: query_engine: {{ .Values.clpConfig.package.query_engine | quote }} storage_engine: {{ .Values.clpConfig.package.storage_engine | quote }} + presto: {{ .Values.clpConfig.presto }} query_scheduler: - host: {{ include "clp.fullname" . }}-query-scheduler + host: "{{ include "clp.fullname" . }}-query-scheduler" jobs_poll_delay: {{ .Values.clpConfig.query_scheduler.jobs_poll_delay }} logging_level: {{ .Values.clpConfig.query_scheduler.logging_level | quote }} num_archives_to_search_per_sub_job: {{ - .Values.clpConfig.query_scheduler.num_archives_to_search_per_sub_job }} + .Values.clpConfig.query_scheduler.num_archives_to_search_per_sub_job | int }} port: 7000 query_worker: logging_level: {{ .Values.clpConfig.query_worker.logging_level | quote }} queue: - host: {{ include "clp.fullname" . }}-queue + host: "{{ include "clp.fullname" . }}-queue" port: 5672 redis: - compression_backend_database: {{ .Values.clpConfig.redis.compression_backend_database }} - host: {{ include "clp.fullname" . }}-redis + compression_backend_database: {{ .Values.clpConfig.redis.compression_backend_database | int }} + host: "{{ include "clp.fullname" . }}-redis" port: 6379 - query_backend_database: {{ .Values.clpConfig.redis.query_backend_database }} + query_backend_database: {{ .Values.clpConfig.redis.query_backend_database | int }} reducer: base_port: 14009 - host: {{ include "clp.fullname" . }}-reducer + host: "{{ include "clp.fullname" . }}-reducer" logging_level: {{ .Values.clpConfig.reducer.logging_level | quote }} - upsert_interval: {{ .Values.clpConfig.reducer.upsert_interval }} + upsert_interval: {{ .Values.clpConfig.reducer.upsert_interval | int }} results_cache: db_name: {{ .Values.clpConfig.results_cache.db_name | quote }} - host: {{ include "clp.fullname" . }}-results-cache + host: "{{ include "clp.fullname" . }}-results-cache" port: 27017 + {{ with .Values.clpConfig.results_cache.retention_period }} + retention_period: {{ . | int }} + {{ else }} + retention_period: null + {{ end }} stream_collection_name: {{ .Values.clpConfig.results_cache.stream_collection_name | quote }} + {{- with .Values.clpConfig.stream_output }} stream_output: storage: - directory: "/var/data/streams" + {{- if eq .storage.type "fs" }} type: "fs" - target_uncompressed_size: {{ .Values.clpConfig.stream_output.target_uncompressed_size | int }} + directory: "/var/data/streams" + {{- else }} + type: "s3" + staging_directory: "/var/data/staged-streams" + {{- with .storage.s3_config }} + s3_config: + endpoint_url: {{ .endpoint_url | default "null" }} + region_code: {{ .region_code | quote }} + bucket: {{ .bucket | quote }} + key_prefix: {{ .key_prefix | quote }} + {{- with .aws_authentication }} + aws_authentication: + type: {{ .type | quote }} + {{- if eq .type "credentials" }} + credentials: + access_key_id: {{ .credentials.access_key_id | quote }} + secret_access_key: {{ .credentials.secret_access_key | quote }} + {{- end }} + {{- if eq .type "profile" }} + profile: {{ .profile | quote }} + {{- end }} + {{- end }}{{/* with .aws_authentication */}} + {{- end }}{{/* with .storage.s3_config */}} + {{- end }}{{/* if eq .storage.type "fs" */}} + target_uncompressed_size: {{ .target_uncompressed_size | int }} + {{- end }}{{/* with .Values.clpConfig.stream_output */}} tmp_directory: "/var/tmp" webui: host: "localhost" port: 4000 - rate_limit: {{ .Values.clpConfig.webui.rate_limit }} + rate_limit: {{ .Values.clpConfig.webui.rate_limit | int }} results_metadata_collection_name: {{ .Values.clpConfig.webui.results_metadata_collection_name | quote }} @@ -114,7 +234,11 @@ data: "ClpStorageEngine": {{ .Values.clpConfig.package.storage_engine | quote }}, "ClpQueryEngine": {{ .Values.clpConfig.package.query_engine | quote }}, "LogsInputType": {{ .Values.clpConfig.logs_input.type | quote }}, + {{- if eq .Values.clpConfig.logs_input.type "fs" }} "LogsInputRootDir": "/mnt/logs", + {{- else }} + "LogsInputRootDir": null, + {{- end }} "MongoDbSearchResultsMetadataCollectionName": {{ .Values.clpConfig.webui.results_metadata_collection_name | quote }}, "SqlDbClpArchivesTableName": "clp_archives", @@ -128,7 +252,7 @@ data: { "SqlDbHost": "{{ include "clp.fullname" . }}-database", "SqlDbPort": 3306, - "SqlDbName": {{ .Values.clpConfig.database.name | quote }}, + "SqlDbName": {{ .Values.clpConfig.database.names.clp | quote }}, "SqlDbQueryJobsTableName": "query_jobs", "SqlDbCompressionJobsTableName": "compression_jobs", "MongoDbHost": "{{ include "clp.fullname" . }}-results-cache", @@ -140,14 +264,31 @@ data: {{ .Values.clpConfig.results_cache.stream_collection_name | quote }}, "ClientDir": "/opt/clp/var/www/webui/client", "LogViewerDir": "/opt/clp/var/www/webui/yscope-log-viewer", + {{- if eq .Values.clpConfig.logs_input.type "fs" }} "LogsInputRootDir": "/mnt/logs", + {{- else }} + "LogsInputRootDir": null, + {{- end }} + {{- with .Values.clpConfig.stream_output.storage }} + {{- if eq .type "fs" }} "StreamFilesDir": "/var/data/streams", - "StreamTargetUncompressedSize": - {{ .Values.clpConfig.stream_output.target_uncompressed_size | int }}, "StreamFilesS3Region": null, "StreamFilesS3PathPrefix": null, "StreamFilesS3Profile": null, - "ArchiveOutputCompressionLevel": {{ .Values.clpConfig.archive_output.compression_level }}, + {{- else }} + "StreamFilesDir": null, + "StreamFilesS3Region": {{ .s3_config.region_code | quote }}, + "StreamFilesS3PathPrefix": "{{ .s3_config.bucket }}/{{ .s3_config.key_prefix }}", + {{- if eq .s3_config.aws_authentication.type "profile" }} + "StreamFilesS3Profile": {{ .s3_config.aws_authentication.profile | quote }}, + {{- else }} + "StreamFilesS3Profile": null, + {{- end }} + {{- end }}{{/* if eq .type "fs" */}} + {{- end }}{{/* with .Values.clpConfig.stream_output.storage */}} + "StreamTargetUncompressedSize": + {{ .Values.clpConfig.stream_output.target_uncompressed_size | int }}, + "ArchiveOutputCompressionLevel": {{ .Values.clpConfig.archive_output.compression_level | int }}, "ArchiveOutputTargetArchiveSize": {{ .Values.clpConfig.archive_output.target_archive_size | int }}, "ArchiveOutputTargetDictionariesSize": @@ -158,6 +299,20 @@ data: {{ .Values.clpConfig.archive_output.target_segment_size | int }}, "ClpQueryEngine": {{ .Values.clpConfig.package.query_engine | quote }}, "ClpStorageEngine": {{ .Values.clpConfig.package.storage_engine | quote }}, + {{- with .Values.clpConfig.presto }} + "PrestoHost": {{ .host | quote }}, + "PrestoPort": {{ .port | int }}, + {{- else }} "PrestoHost": null, - "PrestoPort": null + "PrestoPort": null, + {{- end }} + {{- if .Values.clpConfig.fluent_bit }} + {{- if .Values.clpConfig.fluent_bit.enabled }} + "OperationalLogsDir": "/var/log/clp" + {{- else }} + "OperationalLogsDir": null + {{- end }} + {{- else }} + "OperationalLogsDir": null + {{- end }} } diff --git a/tools/deployment/package-helm/templates/database-statefulset.yaml b/tools/deployment/package-helm/templates/database-statefulset.yaml index 8e67cee934..4c510c57c8 100644 --- a/tools/deployment/package-helm/templates/database-statefulset.yaml +++ b/tools/deployment/package-helm/templates/database-statefulset.yaml @@ -33,7 +33,7 @@ spec: imagePullPolicy: "Always" env: - name: "MYSQL_DATABASE" - value: {{ .Values.clpConfig.database.name | quote }} + value: {{ .Values.clpConfig.database.names.clp | quote }} - name: "MYSQL_USER" valueFrom: secretKeyRef: diff --git a/tools/deployment/package-helm/templates/fluent-bit-clusterrole.yaml b/tools/deployment/package-helm/templates/fluent-bit-clusterrole.yaml new file mode 100644 index 0000000000..3d0ac38864 --- /dev/null +++ b/tools/deployment/package-helm/templates/fluent-bit-clusterrole.yaml @@ -0,0 +1,20 @@ +{{- if .Values.clpConfig.fluent_bit }} +{{- if .Values.clpConfig.fluent_bit.enabled }} +apiVersion: "rbac.authorization.k8s.io/v1" +kind: "ClusterRole" +metadata: + name: {{ include "clp.fullname" . }}-fluent-bit + labels: + {{- include "clp.labels" . | nindent 4 }} + app.kubernetes.io/component: "fluent-bit" +rules: + - apiGroups: [""] + resources: + - "namespaces" + - "pods" + verbs: + - "get" + - "list" + - "watch" +{{- end }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/fluent-bit-clusterrolebinding.yaml b/tools/deployment/package-helm/templates/fluent-bit-clusterrolebinding.yaml new file mode 100644 index 0000000000..341761e16c --- /dev/null +++ b/tools/deployment/package-helm/templates/fluent-bit-clusterrolebinding.yaml @@ -0,0 +1,19 @@ +{{- if .Values.clpConfig.fluent_bit }} +{{- if .Values.clpConfig.fluent_bit.enabled }} +apiVersion: "rbac.authorization.k8s.io/v1" +kind: "ClusterRoleBinding" +metadata: + name: {{ include "clp.fullname" . }}-fluent-bit + labels: + {{- include "clp.labels" . | nindent 4 }} + app.kubernetes.io/component: "fluent-bit" +roleRef: + apiGroup: "rbac.authorization.k8s.io" + kind: "ClusterRole" + name: {{ include "clp.fullname" . }}-fluent-bit +subjects: + - kind: "ServiceAccount" + name: {{ include "clp.fullname" . }}-fluent-bit + namespace: {{ .Release.Namespace }} +{{- end }} +{{- end }} diff --git a/tools/deployment/package-helm/templates/fluent-bit-configmap.yaml b/tools/deployment/package-helm/templates/fluent-bit-configmap.yaml new file mode 100644 index 0000000000..2519ba6217 --- /dev/null +++ b/tools/deployment/package-helm/templates/fluent-bit-configmap.yaml @@ -0,0 +1,287 @@ +{{- if .Values.clpConfig.fluent_bit }} +{{- if .Values.clpConfig.fluent_bit.enabled }} +apiVersion: "v1" +kind: "ConfigMap" +metadata: + name: {{ include "clp.fullname" . }}-fluent-bit-config + labels: + {{- include "clp.labels" . | nindent 4 }} + app.kubernetes.io/component: "fluent-bit" +data: + fluent-bit.conf: | + [SERVICE] + Daemon Off + Flush {{ .Values.clpConfig.fluent_bit.flush_interval | default 5 }} + Log_Level ${FLUENT_BIT_LOG_LEVEL} + Parsers_File /fluent-bit/etc/parsers.conf + HTTP_Server On + HTTP_Listen 0.0.0.0 + HTTP_Port 2020 + Health_Check On + storage.path /var/log/clp-operational/buffer + storage.sync normal + storage.checksum off + storage.backlog.mem_limit 5M + + @INCLUDE inputs.conf + @INCLUDE filters.conf + @INCLUDE outputs.conf + + inputs.conf: | + # Kubernetes container logs (stdout/stderr captured by container runtime) + [INPUT] + Name tail + Tag kube.* + Path /var/log/containers/*{{ include "clp.fullname" . }}*.log + Parser cri + Mem_Buf_Limit 5MB + Skip_Long_Lines On + Refresh_Interval {{ .Values.clpConfig.fluent_bit.refresh_interval | default 10 }} + DB /var/log/clp-operational/buffer/kube-tail.db + + # Forward input for receiving logs from other Fluent Bit instances + [INPUT] + Name forward + Listen 0.0.0.0 + Port 24224 + + filters.conf: | + # Kubernetes metadata enrichment + [FILTER] + Name kubernetes + Match kube.* + Kube_URL https://kubernetes.default.svc:443 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token + Merge_Log On + Keep_Log Off + K8S-Logging.Parser On + K8S-Logging.Exclude On + Labels Off + Annotations Off + + # Extract component name from Kubernetes labels + [FILTER] + Name rewrite_tag + Match kube.* + Rule $kubernetes['labels']['app.kubernetes.io/component'] ^(.+)$ component.$1 false + + # Add common metadata + [FILTER] + Name modify + Match * + Add clp_deployment {{ include "clp.fullname" . }} + Add node ${NODE_NAME} + + # Nest record under 'log' key for consistent structure + [FILTER] + Name nest + Match component.* + Operation nest + Wildcard * + Nest_under body + Remove_prefix body_ + + outputs.conf: | + # Hot tier: Write to organized file storage for real-time access + [OUTPUT] + Name file + Match component.* + Path /var/log/clp-operational/hot + Mkdir On + Format out_file + Template {time} {component} {log} + + # Write all logs to daily rotating files organized by component + [OUTPUT] + Name file + Match component.compression-scheduler + Path /var/log/clp-operational/hot/compression_scheduler + Mkdir On + + [OUTPUT] + Name file + Match component.compression-worker + Path /var/log/clp-operational/hot/compression_worker + Mkdir On + + [OUTPUT] + Name file + Match component.query-scheduler + Path /var/log/clp-operational/hot/query_scheduler + Mkdir On + + [OUTPUT] + Name file + Match component.query-worker + Path /var/log/clp-operational/hot/query_worker + Mkdir On + + [OUTPUT] + Name file + Match component.reducer + Path /var/log/clp-operational/hot/reducer + Mkdir On + + [OUTPUT] + Name file + Match component.webui + Path /var/log/clp-operational/hot/webui + Mkdir On + + [OUTPUT] + Name file + Match component.api-server + Path /var/log/clp-operational/hot/api_server + Mkdir On + + [OUTPUT] + Name file + Match component.log-ingestor + Path /var/log/clp-operational/hot/log_ingestor + Mkdir On + + [OUTPUT] + Name file + Match component.mcp-server + Path /var/log/clp-operational/hot/mcp_server + Mkdir On + + [OUTPUT] + Name file + Match component.garbage-collector + Path /var/log/clp-operational/hot/garbage_collector + Mkdir On + + [OUTPUT] + Name file + Match component.database + Path /var/log/clp-operational/hot/database + Mkdir On + + [OUTPUT] + Name file + Match component.queue + Path /var/log/clp-operational/hot/queue + Mkdir On + + [OUTPUT] + Name file + Match component.redis + Path /var/log/clp-operational/hot/redis + Mkdir On + + [OUTPUT] + Name file + Match component.results-cache + Path /var/log/clp-operational/hot/results_cache + Mkdir On + + {{- if .Values.clpConfig.fluent_bit.s3_output }} + {{- if .Values.clpConfig.fluent_bit.s3_output.enabled }} + # Cold tier: Upload to S3 for archival with CLP compression + [OUTPUT] + Name s3 + Match component.* + bucket {{ .Values.clpConfig.fluent_bit.s3_output.bucket }} + region {{ .Values.clpConfig.fluent_bit.s3_output.region }} + {{- if .Values.clpConfig.fluent_bit.s3_output.endpoint }} + endpoint {{ .Values.clpConfig.fluent_bit.s3_output.endpoint }} + {{- end }} + s3_key_format /{{ .Values.clpConfig.fluent_bit.s3_output.key_prefix | default + "operational-logs" }}/$TAG[1]/%Y/%m/%d/%H/$UUID.log + total_file_size {{ .Values.clpConfig.fluent_bit.s3_output.total_file_size | default "10M" }} + upload_timeout {{ .Values.clpConfig.fluent_bit.s3_output.upload_timeout | default "5m" }} + use_put_object On + compression gzip + store_dir /var/log/clp-operational/s3-buffer + static_file_path Off + {{- if eq .Values.clpConfig.fluent_bit.s3_output.auth_type "profile" }} + {{- if .Values.clpConfig.fluent_bit.s3_output.profile }} + profile {{ .Values.clpConfig.fluent_bit.s3_output.profile }} + {{- end }} + {{- end }} + {{- end }} + {{- end }} + + # Debug output (can be enabled for troubleshooting) + {{- if .Values.clpConfig.fluent_bit.debug_output }} + [OUTPUT] + Name stdout + Match * + Format json_lines + {{- end }} + + parsers.conf: | + # CRI log format (containerd/CRI-O) + [PARSER] + Name cri + Format regex + Regex ^(?