Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16726816: Add check-invalid-namespaces.py script #1011

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions carbonj.service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ RUN yum update -y && \
yum install -y https://mirror.stream.centos.org/9-stream/AppStream/x86_64/os/Packages/sysstat-12.5.4-7.el9.x86_64.rpm && \
yum install -y https://repo.almalinux.org/almalinux/9/extras/x86_64/os/Packages/epel-release-9-5.el9.noarch.rpm && \
yum install -y https://dl.fedoraproject.org/pub/epel/9/Everything/x86_64/Packages/p/perl-URI-Encode-1.1.1-17.el9.noarch.rpm && \
yum install -y https://rpmfind.net/linux/centos-stream/9-stream/BaseOS/x86_64/os/Packages/snappy-1.1.8-8.el9.x86_64.rpm && \
yum update -y && \
yum install -y zulu17-jdk python3 perl-Data-Dumper && \
yum clean all && \
dnf update && \
dnf -y install chkconfig && \
dnf clean all && \
pip3 install --upgrade pip && \
pip3 install aiohttp && \
chkconfig crond on && \
systemctl enable crond.service && \
mkdir -p /app/bin/ && \
wget https://github.com/krallin/tini/releases/download/v0.18.0/tini && \
wget https://github.com/krallin/tini/releases/download/v0.19.0/tini && \
mv tini /sbin/tini && \
chmod +x /sbin/tini

Expand All @@ -48,7 +50,8 @@ COPY ${DEPENDENCY}/BOOT-INF/classes /app
COPY ${DEPENDENCY}/entrypoint.sh ${DEPENDENCY}/onOutOfMemoryError.sh ${DEPENDENCY}/logCleanup.sh \
${DEPENDENCY}/deletemetrics.py ${DEPENDENCY}/disklog.sh ${DEPENDENCY}/fdlog.sh ${DEPENDENCY}/iolog.sh \
${DEPENDENCY}/reportGcMetrics.sh ${DEPENDENCY}/reportRocksDbMetrics.sh ${DEPENDENCY}/requestlog-stats.pl \
${DEPENDENCY}/whisper.py ${DEPENDENCY}/cj-load.py /app/bin/
${DEPENDENCY}/whisper.py ${DEPENDENCY}/cj-load.py ${DEPENDENCY}/check-invalid-namespaces.py \
${DEPENDENCY}/delete-invalid-namespaces.py /app/bin/
RUN chmod ugo+x /app/bin/*
# add java options file
# default configs
Expand Down
187 changes: 187 additions & 0 deletions carbonj.service/src/main/docker/files/check-invalid-namespaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python3

import os
import logging
import subprocess
import time
import glob
import argparse


logger = logging.getLogger()


def setup_logging():
logger.setLevel(level=os.environ.get("LOGLEVEL", "INFO"))
console = logging.StreamHandler()
formatter = logging.Formatter('[%(levelname)s] %(asctime)s - %(filename)s:%(lineno)s - %(message)s')
console.setFormatter(formatter)
logger.addHandler(console)


def get_key_value(line):
if '==>' in line:
key, value = line.split('==>')
return key.strip(), value.strip()
return line.strip(), None


def run_command(command, retry=1):
count = 1
while count <= retry:
count += 1
try:
line = subprocess.check_output(command, text=True, stderr=subprocess.STDOUT)
return line.strip()
except subprocess.CalledProcessError as e:
logger.error(f"Error: {e}")
time.sleep(1)
return ''


def check_id_in_db(db_name, key):
command = ['/root/ldb', 'scan', f"--db=/data/carbonj-data/{db_name}", f"--from={key}", '--max_keys=1', '--hex', '--no_value']
line = run_command(command, 3)
logger.info('Running command: %s', command)
if line == '' or len(line) < len(key):
return False
real_key = line[:len(key)]
if real_key != key:
logger.info('Mismatched real key %s', real_key)
return False
return True


def parse_args():
parser = argparse.ArgumentParser(description='Process RocksDB invalid ID check.')
parser.add_argument('--common-prefix', dest='common_prefix', help='Check IDs with the common prefix', default='')
parser.add_argument('--skip-prefix', dest='skip_prefix', help='Comma separated prefixes to skip', default='')
parser.add_argument('--no-dump', dest='no_dump', help='Whether to dump RocksDB or not', action='store_true')
parser.add_argument('--check-count', dest='check_count', type=int, help='The number of invalid checks', default='10')
return parser.parse_args()


setup_logging()


class RocksDbKeyDumper:
def __init__(self, db_name, args):
self.db_name = db_name
self.file_number = 0
self.no_dump = args.no_dump
self.common_prefix = args.common_prefix
if not self.common_prefix.endswith('.'):
self.common_prefix += '.'
self.prefix_parts = len(self.common_prefix.split('.'))
self.skip_prefixes = args.skip_prefix.split(',')
self.check_count = args.check_count

def dump(self, include_value=True):
if self.no_dump:
logger.info('Skipping dump')
return
self.cleanup_files()
last_key = ''
while True:
if last_key == '':
command = ['/root/ldb', 'scan', f"--db=/data/carbonj-data/{self.db_name}", '--max_keys=100000000', '--hex']
else:
command = ['/root/ldb', 'scan', f"--db=/data/carbonj-data/{self.db_name}", f"--from={last_key}", '--max_keys=100000000', '--hex']
if not include_value:
command.append('--no_value')
logger.info('Running command: %s', command)
self.file_number += 1
start_time = time.time()
file = f"/data/{self.db_name}_scan.{self.file_number}"
with open(file, 'w') as f:
try:
subprocess.run(command, stdout=f)
except subprocess.CalledProcessError as e:
logger.error(f"Error: {e}")
finally:
elapsed_time = time.time() - start_time
logger.info(f"Elapsed time to dump {self.db_name} keys to file {file} : {elapsed_time}")
command = ['tail', '-1', file]
line = run_command(command)
key, value = get_key_value(line)
if key == last_key:
break
last_key = key

def check_invalid_ids(self):
invalid_id_count = 0
stop = False
checked_prefixes = []
prefix = ''
files = sorted(glob.glob(f"/data/{self.db_name}_scan.*"))
with open('/data/invalid_namespaces', 'a', buffering=1) as wf:
for file in files:
if stop:
break
logger.info('Scanning IDs in file %s', file)
with open(file, 'r') as f:
while True:
line = f.readline()
if not line:
break
key, value = get_key_value(line)
name = bytes.fromhex(value[2:]).decode('utf-8')
first = name.split('.')[0]
# Skip root check
if name == 'root':
continue
if self.common_prefix != '.':
if not name.startswith(self.common_prefix):
continue
prefix = '.'.join(name.split('.')[:self.prefix_parts])
if prefix in checked_prefixes:
continue
else:
if first in self.skip_prefixes:
continue

logger.info('Checking namespace %s', name)
valid_id = False
for db_name in ['30m2y', '5m7d', '60s24h']:
if check_id_in_db(db_name, key):
valid_id = True
break
if not valid_id:
wf.write(f"{name}\n")
invalid_id_count += 1
if invalid_id_count >= self.check_count:
invalid_id_count = 0
if self.common_prefix == '.':
self.skip_prefixes.append(first)
else:
checked_prefixes.append(prefix)
else:
if self.common_prefix == '.':
self.skip_prefixes.append(first)
else:
checked_prefixes.append(prefix)

def cleanup_files(self):
files = glob.glob(f"/data/{self.db_name}_scan.*")
files.append('/data/invalid_namespaces')
for file in files:
try:
os.remove(file)
except Exception as e:
logger.error(f"Error: {e}")


class Main:
def __init__(self):
return

def execute(self):
args = parse_args()
dumper = RocksDbKeyDumper('index-id', args)
dumper.dump()
dumper.check_invalid_ids()


# Documentation https://salesforce.quip.com/gO3FAhYxHpQX
peterzxu-crm marked this conversation as resolved.
Show resolved Hide resolved
if __name__ == "__main__":
Main().execute()
82 changes: 82 additions & 0 deletions carbonj.service/src/main/docker/files/delete-invalid-namespaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3

import os
import logging
import argparse
import requests


logger = logging.getLogger()


def setup_logging():
logger.setLevel(level=os.environ.get("LOGLEVEL", "INFO"))
console = logging.StreamHandler()
formatter = logging.Formatter('[%(levelname)s] %(asctime)s - %(filename)s:%(lineno)s - %(message)s')
console.setFormatter(formatter)
logger.addHandler(console)


def parse_args():
parser = argparse.ArgumentParser(description='Delete RocksDB invalid namespaces.')
parser.add_argument('--shard', dest='shard', help='The shard ID', required=True)
parser.add_argument('--replica', dest='replica', help='The replica ID', required=True)
parser.add_argument('--invalid-namespaces-file', dest='invalid_namespaces_file', help='Invalid namespaces file', required=True)
parser.add_argument('--prefix-parts', dest='prefix_parts', type=int, help='The number of dot separated parts as prefix', default='0')
parser.add_argument('--dry-run', dest='dry_run', help='Whether to dryrun', action='store_true')
return parser.parse_args()


setup_logging()


class InvalidNamespacesCleaner:
def __init__(self, args):
self.shard = args.shard
self.replica = args.replica
self.invalid_namespaces_file = args.invalid_namespaces_file
self.prefix_parts = args.prefix_parts
self.dry_run = args.dry_run

def clean(self):
deleted_namespaces = []
with open(self.invalid_namespaces_file, 'r') as f:
while True:
namespace = f.readline()
if not namespace:
break
namespace = namespace.strip()
if not namespace:
continue
if self.prefix_parts > 0:
namespace = '.'.join(namespace.split('.')[:self.prefix_parts])
if namespace in deleted_namespaces:
continue
logger.info(f"Start cleaning namespace {namespace} with shard: {self.shard} on replica: {self.replica}")
deleted_namespaces.append(namespace)
if not self.dry_run:
self.delete_namespace(namespace)
logger.info(f"Done with cleaning namespace {namespace} with shard: {self.shard} on replica: {self.replica}")

def delete_namespace(self, namespace):
url = f"http://carbonj-p{self.shard}-{self.replica}.carbonj-p{self.shard}.carbonj.svc.cluster.local:2001/_dw/rest/carbonj/metrics/deleteAPI/{namespace}?exclude=&delete=true"
response = requests.delete(url)
if response.status_code == 200:
logger.info(response.json())
else:
logger.error(f"Failed to delete namespace {namespace} with HTTP Status code {response.status_code}")


class Main:
def __init__(self):
return

def execute(self):
args = parse_args()
cleaner = InvalidNamespacesCleaner(args)
cleaner.clean()


# Documentation https://salesforce.quip.com/gO3FAhYxHpQX
if __name__ == "__main__":
Main().execute()
2 changes: 1 addition & 1 deletion carbonj.service/src/main/docker/files/deletemetrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#! /usr/bin/env python3.6
#! /usr/bin/env python3
#
# Copyright (c) 2018, salesforce.com, inc.
# All rights reserved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,9 +999,7 @@ private Metric insert( String key )
{
DatabaseMetrics.invalidLeafMetricsReceived.mark();
cleanupAfterAbandonedInsert( leafEntry, paths, i + 1 );
String msg =
String.format( "Cannot create metric with name [%s] because [%s] is already a leaf", key,
entryKey );
String msg = String.format( "Cannot create metric with name [%s] because [%s] is already a leaf with ID [%d]", key, entryKey, e.getId());
if (invalidLeafMetricsReceivedLogQuota.allow()) {
log.error(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void doNotAllowNewNodesForLeaf() {
index.createLeafMetric( "a.b.c.d" );
fail("Expected exception");
} catch(RuntimeException e) {
assertEquals("Cannot create metric with name [a.b.c.d] because [a.b.c] is already a leaf",
assertEquals("Cannot create metric with name [a.b.c.d] because [a.b.c] is already a leaf with ID [2]",
e.getMessage());
}
// 2. verify we didn't leave any partial nodes behind
Expand All @@ -98,7 +98,7 @@ public void doNotAllowNewNodesForLeaf2() {
index.createLeafMetric( "a.b.c.d.e.f" );
fail("Expected exception");
} catch(RuntimeException e) {
assertEquals("Cannot create metric with name [a.b.c.d.e.f] because [a.b.c] is already a leaf",
assertEquals("Cannot create metric with name [a.b.c.d.e.f] because [a.b.c] is already a leaf with ID [2]",
e.getMessage());
}

Expand Down
Loading