Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
aa64df9
feat(deployment): Add k8s Helm chart for CLP package deployment.
junhaoliao Nov 14, 2025
b13eadb
Add reducer service and deployment to Helm chart.
junhaoliao Nov 16, 2025
c0c9d02
Replace hostPath volumes with PersistentVolumeClaims in Helm chart; f…
junhaoliao Nov 16, 2025
8bfd5e1
update test.sh
junhaoliao Nov 16, 2025
a3a1356
update test.sh
junhaoliao Nov 16, 2025
37000f6
Update readiness probe timeoutSeconds to 1.
junhaoliao Nov 16, 2025
34b3dca
feat(deployment): Add deployments for MCP server, API server, and gar…
junhaoliao Nov 16, 2025
a266427
feat(deployment): Add liveness and readiness probes to API server dep…
junhaoliao Nov 16, 2025
a1afc7f
Simplify service templates with conditional NodePort configuration.
junhaoliao Nov 16, 2025
e3d78a4
Add named ports in Helm templates for standardization.
junhaoliao Nov 16, 2025
f679f38
fix nodePort 's value
junhaoliao Nov 16, 2025
3108eca
fix(helm): Update default port values in configmap for query schedule…
junhaoliao Nov 16, 2025
597de93
remove comment
junhaoliao Nov 16, 2025
9c68929
Rename `user` to `username` across all components
junhaoliao Nov 16, 2025
1f6a5f0
refactor(helm): Split secrets into component-specific files; remove r…
junhaoliao Nov 16, 2025
89af695
Replace `clp-package` references with `clp` across templates.
junhaoliao Nov 16, 2025
ca7d820
split yamls
junhaoliao Nov 16, 2025
0b8ef72
exclude helm directory from yaml lint
junhaoliao Nov 16, 2025
9b5b53b
fix templates
junhaoliao Nov 16, 2025
c1dccd6
fix(helm): Update maintainer name in Chart.yaml.
junhaoliao Nov 16, 2025
236c024
fix(helm): Set default tag to "nightly" for clpPackage.
junhaoliao Nov 16, 2025
5aa26cd
Add Helm linting and toolchain support
junhaoliao Nov 16, 2025
2ab3e74
fix(helm): Rename `user` to `username` in query-worker-deployment tem…
junhaoliao Nov 16, 2025
2ac391b
refactor(helm): Rename PVC and PV files; add tmp-pvc definition.
junhaoliao Nov 16, 2025
f1f3a54
refactor(helm): Remove localPathBase; redefine directory paths for st…
junhaoliao Nov 16, 2025
832eb19
refactor(helm): Consolidate PersistentVolume templates into `createLo…
junhaoliao Nov 16, 2025
d2b4671
refactor(helm): Rename PVC and PV template files for consistency.
junhaoliao Nov 17, 2025
8985355
refactor(helm): Simplify PV and secret object names for consistency.
junhaoliao Nov 17, 2025
75c0101
refactor(helm): Consolidate PVC definitions into `createPvc` and stre…
junhaoliao Nov 17, 2025
a47741a
Merge branch 'main' into helm
junhaoliao Nov 17, 2025
1476b71
Add user-logs PVC and PV definitions.
junhaoliao Nov 17, 2025
ad82e25
fix(helm): Remove unnecessary image tag override in test script.
junhaoliao Nov 17, 2025
29fe3c0
feat(helm): Add conditional database image selection based on configu…
junhaoliao Nov 17, 2025
c5bb582
feat(helm): Add persistentVolumeClaimRetentionPolicy to statefulsets.
junhaoliao Nov 17, 2025
db9cef9
feat(helm): Add ttlSecondsAfterFinished to job templates.
junhaoliao Nov 17, 2025
1503c49
feat(deployment): set up initial Helm chart for CLP package:
junhaoliao Nov 17, 2025
2b8052f
chore(deployment): remove pod watch from test script.
junhaoliao Nov 17, 2025
0b4c4fd
chore(deployment): update chart-testing config to set target branch.
junhaoliao Nov 17, 2025
f12557d
quote string
junhaoliao Nov 17, 2025
5713eb3
fix missing yamale
junhaoliao Nov 17, 2025
fa2522e
quote string
junhaoliao Nov 17, 2025
8d176fe
fix missing yamale
junhaoliao Nov 17, 2025
e1fe436
chore(ci): Set fetch-depth to 0 in clp-lint workflow.
junhaoliao Nov 17, 2025
e4c0f22
fix(deployment): Adjust readinessProbe timeout and failure threshold.
junhaoliao Nov 17, 2025
649848d
Merge branch 'main' into helm-setup
junhaoliao Nov 17, 2025
83770ef
Merge branch 'main' into helm-setup
kirkrodrigues Nov 18, 2025
7fe32b7
chore(ci): Add comment explaining fetch-depth requirement in clp-lint…
junhaoliao Nov 21, 2025
129cc17
Merge branch 'main' into helm-setup
junhaoliao Nov 21, 2025
6c89ed6
Reformat database Job initContainer command for readability.
junhaoliao Nov 21, 2025
e40d239
Update database livenessProbe to include authentication parameters.
junhaoliao Nov 21, 2025
f17264e
alphabetize
junhaoliao Nov 21, 2025
f51b97e
Adjust probe timings for readiness and liveness checks.
junhaoliao Nov 21, 2025
8ed32a7
add url to maintainers - Apply suggestions from code review
junhaoliao Nov 21, 2025
45674e2
Standardize version field placement in Chart.yaml.
junhaoliao Nov 21, 2025
2ad9b0a
Adjust livenessProbe periodSeconds to 30.
junhaoliao Nov 21, 2025
5d5bd27
Swap liveness and readiness probe configurations in database-stateful…
junhaoliao Nov 21, 2025
fb2d287
Update all `.yml` extensions to `.yaml` for consistency
junhaoliao Nov 22, 2025
5dd3ab4
Merge branch 'helm-setup' into helm
junhaoliao Nov 25, 2025
a155ba3
Update configuration file extensions from .yml to .yaml for consistency
junhaoliao Nov 25, 2025
be9afb3
Refactor liveness and readiness probes across multiple deployment fil…
junhaoliao Nov 25, 2025
762acb9
Merge branch 'main' into helm
junhaoliao Nov 25, 2025
387ad2f
Add workerConcurrency setting to values.yaml
junhaoliao Nov 25, 2025
d2a4357
Set replicas to 1 for compression and query workers
junhaoliao Nov 25, 2025
d4f3c81
upgrade yamale to 6.1.0 - Apply suggestions from code review
junhaoliao Nov 26, 2025
1515d0d
Add chart-testing SHA256 checksum for Linux ARM64 architecture in too…
junhaoliao Nov 26, 2025
dd35d68
Remove redundant chart schema and lint validation check settings beca…
junhaoliao Nov 26, 2025
fe69119
Merge remote-tracking branch 'refs/remotes/origin/main' into helm-setup
junhaoliao Nov 26, 2025
5296f31
Remove the unneeded `chmod u+x` from the helm env source script - App…
junhaoliao Nov 26, 2025
2c41251
use chart-yaml-schema from the downloaded toolchain
junhaoliao Nov 26, 2025
0931136
Remove chart_schema.yaml from lint.yaml
junhaoliao Nov 26, 2025
dd70f5c
Assign `nodeRole` for database persistent volumes to target control-p…
junhaoliao Nov 27, 2025
ea06268
Improve waiting prompts - Apply suggestions from code review
junhaoliao Nov 27, 2025
ba5d4df
update helm chart description - Apply suggestions from code review
junhaoliao Nov 27, 2025
8b44dd6
update docs - Apply suggestions from code review
junhaoliao Nov 27, 2025
9ff1611
Rename `allowSbinScripts` to `allowHostAccessForSbinScripts` for clea…
junhaoliao Nov 27, 2025
0140971
update test script shebang, update TODO comment, set eu pipefail - Ap…
junhaoliao Nov 27, 2025
44bd1af
Make image pull policy configurable for db-table-creator job in Helm …
junhaoliao Nov 27, 2025
72013ed
Add JSDoc-style comments for helper templates in Helm chart
junhaoliao Nov 27, 2025
550e7a9
Merge branch 'main' into helm-setup
junhaoliao Nov 27, 2025
1452795
improve docs - Apply suggestions from code review
junhaoliao Nov 27, 2025
480c14b
Remove unused `createPvc` helper definition from Helm templates
junhaoliao Nov 27, 2025
4142856
Update helm toolchain output directory to use `G_HELM_TOOLCHAIN_DIR`
junhaoliao Nov 27, 2025
036222f
Remove `ct.yaml` and inline configuration in `lint.yaml` for clarity
junhaoliao Nov 27, 2025
e258b3d
Merge branch 'main' into helm-setup
junhaoliao Nov 27, 2025
bda53c1
Merge branch 'main' into helm-setup
junhaoliao Nov 27, 2025
c7789f8
Merge branch 'helm-setup' into helm
junhaoliao Nov 28, 2025
476526c
Merge branch 'main' into helm
junhaoliao Nov 28, 2025
bfa2ea3
update values.yaml with the latest clp-config.yaml
junhaoliao Nov 28, 2025
93fcbfe
Assign `nodeRole` to persistent volumes for control-plane targeting.
junhaoliao Nov 28, 2025
7a1fff4
add back helper to create PersistentVolumeClaim templates.
junhaoliao Nov 28, 2025
ba6abb4
fix(helm): Rename values flag for sbin script access in results cache…
junhaoliao Nov 28, 2025
b25b243
refactor(helm): Update initContainer command syntax for better readab…
junhaoliao Nov 28, 2025
4ef25b5
refactor(helm): Use configurable image pull policy for deployments.
junhaoliao Nov 28, 2025
0ca3173
revert db port
junhaoliao Nov 28, 2025
e57243b
Merge branch 'main' into helm
junhaoliao Nov 29, 2025
399373f
refactor(helm): Remove redundant host field from configuration values.
junhaoliao Nov 29, 2025
0bb706b
remove empty line
junhaoliao Nov 29, 2025
7f90a85
Update image tag to 'main' and pull policy to 'Always' in values.yaml.
junhaoliao Nov 29, 2025
39fdcd6
feat(deployment): Add redis, queue (RabbitMQ), and results cache (Mon…
junhaoliao Nov 29, 2025
72024e4
chore(deployment): Bump Helm chart version to 0.1.1.
junhaoliao Nov 29, 2025
4fb6dd4
add empty newline at EOF
junhaoliao Nov 29, 2025
1f4df0c
docs(deployment): Reorder parameters in Helm helper function document…
junhaoliao Nov 29, 2025
2bbccca
Merge branch 'dbs' into helm
junhaoliao Nov 29, 2025
683f03b
fix(helm): Simplify syntax for redis livenessProbe exec configuration.
junhaoliao Nov 29, 2025
abc02ca
Merge branch 'main' into helm
junhaoliao Dec 3, 2025
1b93ccc
Merge branch 'main' into helm
junhaoliao Dec 8, 2025
33ea69a
Update port names for services and deployments.
junhaoliao Dec 8, 2025
7a52e6c
chore(helm): Bump chart version to 0.1.2-dev.3
junhaoliao Dec 8, 2025
abb1cf8
feat(helm): Add `max_concurrent_tasks_per_job` parameter to compressi…
junhaoliao Dec 8, 2025
3bf2d94
Update configmap webui settings
junhaoliao Dec 8, 2025
0195c05
Make `StreamTargetUncompressedSize` configurable via values file
junhaoliao Dec 8, 2025
af2a04b
avoid encoding u64 as float
junhaoliao Dec 8, 2025
0a8707d
feat(helm): Add conditional storage_engine configuration to configmap
junhaoliao Dec 8, 2025
eeb0fc2
fix(helm): Update reducer base_port to a fixed value in configmap and…
junhaoliao Dec 8, 2025
696ea80
fix(helm): Replace hardcoded sample file copy with dynamic download a…
junhaoliao Dec 8, 2025
356f336
remove compression-scheduler-service.yaml
junhaoliao Dec 8, 2025
7bd5558
Download and extract sample datasets in the background during test setup
junhaoliao Dec 8, 2025
ddb98c0
feat(test): Add hostPath mount for /home in test setup
junhaoliao Dec 8, 2025
c0d6a5a
fxi: make Web UI rate limit configurable in Helm chart
junhaoliao Dec 9, 2025
7d6cf4f
feat(helm): Add support for root database credentials in Helm templates.
junhaoliao Dec 10, 2025
8f695d9
feat(helm): Refactor init containers to use shared `clp.waitFor` help…
junhaoliao Dec 12, 2025
b231898
feat(helm): Refactor PersistentVolume and PersistentVolumeClaim templ…
junhaoliao Dec 12, 2025
44b0f17
feat(helm): Add shared helpers for Celery environment variables and i…
junhaoliao Dec 12, 2025
a9e6d09
feat(helm): Update hostPath in PersistentVolume templates for consist…
junhaoliao Dec 12, 2025
9279032
fix(test): Correct temp directory structure and ensure Redis database…
junhaoliao Dec 12, 2025
2d92641
Merge branch 'main' into helm
junhaoliao Dec 12, 2025
8580c11
refactor(helm): Reorder volumeMounts in StatefulSet templates for con…
junhaoliao Dec 12, 2025
1c26692
refactor(helm): Update templates to replace `component` with `compone…
junhaoliao Dec 14, 2025
a0cbb98
refactor(helm): Standardize indentation and list formatting in templa…
junhaoliao Dec 14, 2025
e44f8b9
Merge branch 'main' into helm
junhaoliao Dec 14, 2025
7447430
Merge branch 'main' into helm
junhaoliao Dec 16, 2025
8beb8e0
feat(helm): Add S3 support for storage configurations and AWS authent…
junhaoliao Dec 18, 2025
e9868f4
Merge branch 'main' into helm
junhaoliao Dec 18, 2025
41f0348
refactor(helm): Reorganize volumeMounts and environment variables for…
junhaoliao Dec 19, 2025
3363229
feat(helm): Add log_ingestor configuration for S3 log input support
junhaoliao Dec 19, 2025
17e2fc2
feat(helm): Add `type` configuration to compression_scheduler for tas…
junhaoliao Dec 19, 2025
2300cc5
feat(helm): Add log-ingestor deployment, service, and volume configur…
junhaoliao Dec 19, 2025
0e34c78
refactor(helm): Replace `database.name` with `database.names.clp`
junhaoliao Dec 19, 2025
50906ea
fix(helm): Update S3 profile condition to include storage type valida…
junhaoliao Dec 19, 2025
748c176
feat(helm): Add mcp_server configuration support in configmap template
junhaoliao Dec 19, 2025
b807445
chore(helm): Update api-server RUST_LOG environment variable to use "…
junhaoliao Dec 19, 2025
25c9003
feat(helm): Add readiness and liveness probes to query-scheduler depl…
junhaoliao Dec 19, 2025
e3a34e9
fix(helm): Correct indentation for logsInputVolumeMount in webui-depl…
junhaoliao Dec 19, 2025
dc878b9
feat(helm): Add S3 storage type support for query-worker staged strea…
junhaoliao Dec 19, 2025
bdcfb5e
feat(helm): Add Presto configuration support in values.yaml and confi…
junhaoliao Dec 19, 2025
6ad5757
feat(helm): Add conditional checks for api-server and garbage-collect…
junhaoliao Dec 19, 2025
77743e7
fix(helm): Remove unused `port` field from query_scheduler configurat…
junhaoliao Dec 19, 2025
4e0cd58
feat(helm): Add support for container port 30302 in query-scheduler d…
junhaoliao Dec 19, 2025
43da85a
fix(helm): Add missing log_ingestor directory to test script
junhaoliao Dec 19, 2025
ffcd412
fix(helm): Simplify S3 credentials configuration in webui-deployment …
junhaoliao Dec 19, 2025
aee79bc
fix(helm): Correct placement of readiness and liveness probes in quer…
junhaoliao Dec 19, 2025
a9cdedd
fix(helm): Refactor configmap template to improve conditional handlin…
junhaoliao Dec 19, 2025
3a5968a
fix(helm): Use `with` for aws_config_directory in webui-deployment te…
junhaoliao Dec 19, 2025
753d402
fix(helm): Replace `if` with `with` for aws_config_directory in deplo…
junhaoliao Dec 19, 2025
896cb31
fix(helm): Update accessModes to ReadWriteOnce in PVC and PV template…
junhaoliao Dec 19, 2025
5c81b0f
fix(helm): Add missing quotes for host fields in configmap template t…
junhaoliao Dec 19, 2025
f9a507d
fix(helm): Remove S3-specific PVC/PV and volume mounts for staged-str…
junhaoliao Dec 19, 2025
ee248a1
fix(helm): Add `| quote` and handle null case for aws_config_director…
junhaoliao Dec 19, 2025
be70bd3
fix(helm): Add conditional for logsInputVolumeMount in webui-deployme…
junhaoliao Dec 19, 2025
c346734
fix(helm): Add conditional for logsInputVolume in compression-schedul…
junhaoliao Dec 19, 2025
80251dc
fix(helm): Correct placement of logsInputVolume and pvcVolume conditi…
junhaoliao Dec 19, 2025
82dc4d7
fix(helm): Add int casting for numerical values in configmap template…
junhaoliao Dec 19, 2025
8709443
fix(helm): Adjust volume mounts and conditionals for staged archives …
junhaoliao Dec 19, 2025
7443e4e
fix(helm): Correct placement of config volume mounts across deploymen…
junhaoliao Dec 19, 2025
b32613b
chore(helm): Bump chart version to 0.1.2-dev.8
junhaoliao Dec 19, 2025
3b411a7
fix(helm): Handle null case for log_ingestor and update default clpCo…
junhaoliao Dec 19, 2025
f427226
WIP - feat(helm): Add Fluent Bit configuration for operational loggin…
junhaoliao Jan 5, 2026
6546cf0
Add operational logs feature with API integration and UI display
junhaoliao Jan 8, 2026
64ca1d7
Add missing input configuration for Docker Fluentd logging driver
junhaoliao Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions components/api-server/src/bin/api_server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use anyhow::Context;
use clap::Parser;
use clp_rust_utils::{clp_config::package, serde::yaml};
use tracing_appender::{
non_blocking::WorkerGuard,
rolling::{RollingFileAppender, Rotation},
};
use tracing_subscriber::{self, fmt::writer::MakeWriterExt};
use tracing_subscriber;

#[derive(Parser)]
#[command(version, about = "API Server for CLP.")]
Expand Down Expand Up @@ -42,13 +38,8 @@ fn read_config_and_credentials(
Ok((config, credentials))
}

fn set_up_logging() -> anyhow::Result<WorkerGuard> {
let logs_directory =
std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?;
let logs_directory = std::path::Path::new(logs_directory.as_str());
let file_appender =
RollingFileAppender::new(Rotation::HOURLY, logs_directory, "api_server.log");
let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender);
fn set_up_logging() {
// Logs to stdout only - Docker's fluentd driver captures and forwards to Fluent Bit
tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
Expand All @@ -60,9 +51,8 @@ fn set_up_logging() -> anyhow::Result<WorkerGuard> {
)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.with_writer(std::io::stdout.and(non_blocking_writer))
.with_writer(std::io::stdout)
.init();
Ok(guard)
}

async fn shutdown_signal() {
Expand All @@ -81,7 +71,7 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let (config, credentials) = read_config_and_credentials(&args)?;
let _guard = set_up_logging()?;
set_up_logging();

let addr = format!(
"{}:{}",
Expand Down
10 changes: 2 additions & 8 deletions components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""CLP MCP Server entry point."""

import ipaddress
import logging
import os
import socket
import sys
from pathlib import Path

import click
from clp_py_utils.clp_config import ClpConfig, MCP_SERVER_COMPONENT_NAME
from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level
from clp_py_utils.clp_logging import get_logger, set_logging_level
from clp_py_utils.core import read_yaml_config_file
from pydantic import ValidationError

Expand Down Expand Up @@ -38,11 +36,7 @@ def main(host: str, port: int, config_path: Path) -> int:
:param config_path: The path to server's configuration file.
:return: Exit code (0 for success, non-zero for failure).
"""
# Setup logging to file
log_file_path = Path(os.getenv("CLP_LOGS_DIR")) / "mcp_server.log"
logging_file_handler = logging.FileHandler(filename=log_file_path, encoding="utf-8")
logging_file_handler.setFormatter(get_logging_formatter())
logger.addHandler(logging_file_handler)
# Set logging level from environment
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))

exit_code = 0
Expand Down
25 changes: 25 additions & 0 deletions components/clp-package-utils/clp_package_utils/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
DatabaseEngine,
DB_COMPONENT_NAME,
DeploymentType,
FLUENT_BIT_COMPONENT_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
LOG_INGESTOR_COMPONENT_NAME,
MCP_SERVER_COMPONENT_NAME,
Expand Down Expand Up @@ -711,6 +712,7 @@ def _set_up_env_for_webui(self, container_clp_config: ClpConfig) -> EnvVarsDict:
"LogViewerDir": str(container_webui_dir / "yscope-log-viewer"),
"StreamTargetUncompressedSize": self._clp_config.stream_output.target_uncompressed_size,
"ClpQueryEngine": self._clp_config.package.query_engine,
"OperationalLogsDir": "/var/log/clp",
}

stream_storage = self._clp_config.stream_output.storage
Expand Down Expand Up @@ -852,6 +854,28 @@ def _set_up_env_for_garbage_collector(self) -> EnvVarsDict:

return env_vars

def _set_up_env_for_fluent_bit(self) -> EnvVarsDict:
"""
Sets up environment variables for the Fluent Bit component.

Fluent Bit receives logs from Docker's fluentd logging driver and writes
them to the logs directory for hot tier access.

:return: Dictionary of environment variables necessary to launch the component.
"""
component_name = FLUENT_BIT_COMPONENT_NAME
logger.info(f"Setting up environment for {component_name}...")

env_vars = EnvVarsDict()

# Paths
fluent_bit_conf_dir = self._conf_dir / "fluent-bit"
env_vars |= {
"CLP_FLUENT_BIT_CONF_DIR_HOST": str(fluent_bit_conf_dir),
}

return env_vars

def _read_and_update_settings_json(
self, settings_file_path: pathlib.Path, updates: dict[str, Any]
) -> dict[str, Any]:
Expand Down Expand Up @@ -997,6 +1021,7 @@ def set_up_env(self) -> None:
env_vars |= self._set_up_env_for_webui(container_clp_config)
env_vars |= self._set_up_env_for_mcp_server()
env_vars |= self._set_up_env_for_garbage_collector()
env_vars |= self._set_up_env_for_fluent_bit()

# Write the environment variables to the `.env` file.
with open(f"{self._clp_home}/.env", "w") as env_file:
Expand Down
1 change: 1 addition & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
WEBUI_COMPONENT_NAME = "webui"
MCP_SERVER_COMPONENT_NAME = "mcp_server"
GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector"
FLUENT_BIT_COMPONENT_NAME = "fluent_bit"

# Action names
ARCHIVE_MANAGER_ACTION_NAME = "archive_manager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def run_clp(
worker_config: WorkerConfig,
clp_config: ClpIoConfig,
clp_home: pathlib.Path,
logs_dir: pathlib.Path,
job_id: int,
task_id: int,
tag_ids: list[int],
Expand All @@ -362,7 +361,6 @@ def run_clp(
:param worker_config: WorkerConfig
:param clp_config: ClpIoConfig
:param clp_home:
:param logs_dir:
:param job_id:
:param task_id:
:param tag_ids:
Expand Down Expand Up @@ -444,17 +442,22 @@ def cleanup_temporary_files():
if converted_inputs_dir is not None:
shutil.rmtree(converted_inputs_dir)

# Open stderr log file
stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log"
stderr_log_file = open(stderr_log_path, "w")
def log_stderr(stderr_data: bytes, process_name: str):
"""Log stderr content through the logger (captured by Docker's fluentd driver)."""
stderr_text = stderr_data.decode("utf-8").strip()
if stderr_text:
for line in stderr_text.split("\n"):
logger.info(f"[{process_name} stderr] {line}")

conversion_return_code = 0
if conversion_cmd is not None:
logger.debug("Execute log-converter with command: %s", conversion_cmd)
conversion_proc = subprocess.Popen(
conversion_cmd, stdout=subprocess.DEVNULL, stderr=stderr_log_file, env=conversion_env
conversion_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, env=conversion_env
)
conversion_return_code = conversion_proc.wait()
_, stderr_data = conversion_proc.communicate()
conversion_return_code = conversion_proc.returncode
log_stderr(stderr_data, "log-converter")

if conversion_return_code != 0:
cleanup_temporary_files()
Expand All @@ -464,16 +467,15 @@ def cleanup_temporary_files():
worker_output = {
"total_uncompressed_size": 0,
"total_compressed_size": 0,
"error_message": f"Check logs in {stderr_log_path}",
"error_message": "See worker logs (captured by Fluent Bit)",
}
stderr_log_file.close()
return CompressionTaskStatus.FAILED, worker_output

# Start compression
logger.debug("Compressing...")
compression_successful = False
proc = subprocess.Popen(
compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env
compression_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=compression_env
)

# Compute the total amount of data compressed
Expand Down Expand Up @@ -550,23 +552,27 @@ def cleanup_temporary_files():
)

try:
subprocess.run(
result = subprocess.run(
indexer_cmd,
stdout=subprocess.DEVNULL,
stderr=stderr_log_file,
stderr=subprocess.PIPE,
check=True,
env=indexer_env,
)
except subprocess.CalledProcessError:
log_stderr(result.stderr, "indexer")
except subprocess.CalledProcessError as e:
log_stderr(e.stderr if e.stderr else b"", "indexer")
logger.exception("Failed to index archive.")

if enable_s3_write:
archive_path.unlink()

last_archive_stats = stats

# Wait for compression to finish
# Wait for compression to finish and capture stderr
compression_stderr = proc.stderr.read()
return_code = proc.wait()
log_stderr(compression_stderr, "compression")

if 0 != return_code:
logger.error(f"Failed to compress, return_code={return_code!s}")
Expand All @@ -576,9 +582,6 @@ def cleanup_temporary_files():

logger.debug("Compressed.")

# Close stderr log file
stderr_log_file.close()

worker_output = {
"total_uncompressed_size": total_uncompressed_size,
"total_compressed_size": total_compressed_size,
Expand All @@ -588,7 +591,7 @@ def cleanup_temporary_files():
return CompressionTaskStatus.SUCCEEDED, worker_output
error_msgs = []
if compression_successful is False:
error_msgs.append(f"See logs {stderr_log_path}")
error_msgs.append("See worker logs (captured by Fluent Bit)")
if s3_error is not None:
error_msgs.append(s3_error)
worker_output["error_message"] = "\n".join(error_msgs)
Expand All @@ -606,10 +609,8 @@ def compression_entry_point(
):
clp_home = pathlib.Path(os.getenv("CLP_HOME"))

# Set logging level
logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL"))
set_logging_level(logger, clp_logging_level)
# Set logging level from environment
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))

# Load configuration
try:
Expand Down Expand Up @@ -637,7 +638,6 @@ def compression_entry_point(
worker_config,
clp_io_config,
clp_home,
logs_dir,
job_id,
task_id,
tag_ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,8 @@ def extract_stream(
) -> dict[str, Any]:
task_name = "Stream Extraction"

# Setup logging to file
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = os.getenv("CLP_LOGGING_LEVEL")
set_logging_level(logger, clp_logging_level)
# Set logging level from environment
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))

logger.info(f"Started {task_name} task for job {job_id}")

Expand Down Expand Up @@ -242,7 +240,6 @@ def extract_stream(
task_results, task_stdout_str = run_query_task(
sql_adapter=sql_adapter,
logger=logger,
clp_logs_dir=clp_logs_dir,
task_command=task_command,
env_vars=core_clp_env_vars,
task_name=task_name,
Expand Down Expand Up @@ -289,7 +286,7 @@ def extract_stream(

if upload_error:
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
task_results.error_log_path = "See worker logs (captured by Fluent Bit)"
else:
logger.info("Finished uploading streams.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ def search(
) -> dict[str, Any]:
task_name = "search"

# Setup logging to file
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = os.getenv("CLP_LOGGING_LEVEL")
set_logging_level(logger, clp_logging_level)
# Set logging level from environment
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))

logger.info(f"Started {task_name} task for job {job_id}")

Expand Down Expand Up @@ -242,7 +240,6 @@ def search(
task_results, _ = run_query_task(
sql_adapter=sql_adapter,
logger=logger,
clp_logs_dir=clp_logs_dir,
task_command=task_command,
env_vars=core_clp_env_vars,
task_name=task_name,
Expand All @@ -268,7 +265,7 @@ def search(
except Exception as err:
logger.error(f"Failed to upload query results {dest_path}: {err}")
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
task_results.error_log_path = "See worker logs (captured by Fluent Bit)"

src_file.unlink()

Expand Down
Loading