diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index f58db6bc..12309ac4 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -16,17 +16,16 @@ env: on: [push, pull_request] jobs: - metrics: - name: "[metrics] handler: ceilometer-metrics, collectd-metrics; application: prometheus" - runs-on: ubuntu-20.04 + collectd-metrics-bridge: + name: "[metrics] transport: socket(sg-bridge); handler: collectd-metrics; application: prometheus" + runs-on: ubuntu-22.04 env: - QDR_CHANNEL_CEILOMTR: ceilometer/metering.sample - QDR_CHANNEL_COLLECTD: collectd/metrics + QDR_CHANNEL: collectd/metrics BRIDGE_SOCKET: /tmp/sg-bridge/test-socket PROMETHEUS_IMAGE: prom/prometheus:latest steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Prepare environment run: | mkdir -p /opt/stack/ @@ -39,6 +38,13 @@ jobs: sudo rm -rf /etc/mysql sudo dpkg -l | grep -i mysql sudo apt-get clean + - name: Prepare environment for postgres-server installation + run: | + sudo apt remove postgresql-client-common + sudo apt install postgresql-client-common=238 + sudo apt install postgresql-common + sudo python -m pip install --upgrade pip + sudo python -m pip install --upgrade virtualenv - name: Install devstack run: | SOURCE=$(pwd) @@ -58,17 +64,111 @@ jobs: run: | echo "${GITHUB_REF#refs/heads/}" git ls-remote --exit-code --heads https://github.com/infrawatch/sg-bridge.git "$(echo ${GITHUB_REF#refs/heads/})" - - name: Start sg-bridge for collectd from container image + - name: Start sg-bridge from container image if: steps.bridge_branch.outcome != 'success' run: | docker run --name=sgbridge --network host $BRIDGE_VOLUME -d \ - $BRIDGE_IMAGE --amqp_url amqp://localhost:5666/$QDR_CHANNEL_COLLECTD \ + $BRIDGE_IMAGE --amqp_url amqp://localhost:5666/$QDR_CHANNEL \ --gw_unix=$BRIDGE_SOCKET - - name: Start sg-bridge for collectd with same branch + - name: Start sg-bridge from same branch if: steps.bridge_branch.outcome == 'success' run: | docker run --name=sgbridge --network host $BRIDGE_VOLUME -d -uroot \ - -e GITHUB_REF -e BRIDGE_SOCKET -e QDR_CHANNEL_COLLECTD -e OPSTOOLS_REPO \ + -e GITHUB_REF -e BRIDGE_SOCKET -e QDR_CHANNEL -e OPSTOOLS_REPO \ + --workdir=$(dirname $BRIDGE_SOCKET) \ + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/run_bridge.sh + - name: Install collectd + run: | + sudo apt-get install collectd + sudo systemctl stop collectd && sudo systemctl disable collectd + sudo cp ci/integration/metrics/collectd/collectd.conf /etc/collectd/collectd.conf + sudo touch /var/log/collectd.log && sudo chmod a+rw /var/log/collectd.log + sudo collectd -C ci/integration/metrics/collectd/collectd.conf + - name: Run sg-core to process metrics + run: | + docker run --name=sgcore -d -uroot --network host $BRIDGE_VOLUME -e OPSTOOLS_REPO \ + --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/collectd/run_sg.sh + - name: Run Prometheus to store metrics + run: | + docker run --name=prometheus -d --network host \ + --volume ${{ github.workspace }}/ci/integration/metrics/prometheus.yml:/etc/prometheus/prometheus.yml:ro \ + $PROMETHEUS_IMAGE + - name: Debug output + run: | + sleep 360 + echo "=========================== qdr =========================" && \ + docker exec qdr qdstat -b 127.0.0.1:5666 -a + docker logs qdr + echo "========================= sg-core =======================" && \ + docker logs sgcore + echo "======================== prometheus =====================" && \ + docker logs prometheus + - name: Validate metrics processing + run: | + docker run --name=validate -uroot --network host \ + --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/collectd/run_validation.sh +#------------------------------------------------------------------------------- + ceilometer-metrics-bridge: + name: "[metrics] transport: socket(sg-bridge); handler: ceilometer-metrics; application: prometheus" + runs-on: ubuntu-22.04 + env: + QDR_CHANNEL: anycast/ceilometer/metering.sample + BRIDGE_SOCKET: /tmp/sg-bridge/test-socket + PROMETHEUS_IMAGE: prom/prometheus:latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Prepare environment + run: | + mkdir -p /opt/stack/ + sudo setfacl -Rdm u::7,g::0,o:0 /opt/stack + - name: Prepare environment for mysql-server installation # https://stackoverflow.com/a/66026366 + run: | + sudo apt-get -f install -o Dpkg::Options::="--force-overwrite" + sudo apt-get purge mysql\* + sudo rm -rf /var/lib/mysql + sudo rm -rf /etc/mysql + sudo dpkg -l | grep -i mysql + sudo apt-get clean + - name: Prepare environment for postgres-server installation + run: | + sudo apt remove postgresql-client-common + sudo apt install postgresql-client-common=238 + sudo apt install postgresql-common + sudo python -m pip install --upgrade pip + sudo python -m pip install --upgrade virtualenv + - name: Install devstack + run: | + SOURCE=$(pwd) + git clone http://github.com/openstack/devstack /opt/stack/devstack + pushd /opt/stack/devstack + cp $SOURCE/ci/integration/metrics/local.conf . + sudo apt-get update + ./stack.sh + popd + # start message bus services + - name: Start QDR service + run: | + docker run --name=qdr $QDR_VOLUME $QDR_PORT -d $QDR_IMAGE + - name: Check if sg-bridge repository has same topic branch + id: bridge_branch + continue-on-error: true + run: | + echo "${GITHUB_REF#refs/heads/}" + git ls-remote --exit-code --heads https://github.com/infrawatch/sg-bridge.git "$(echo ${GITHUB_REF#refs/heads/})" + - name: Start sg-bridge from container image + if: steps.bridge_branch.outcome != 'success' + run: | + docker run --name=sgbridge --network host $BRIDGE_VOLUME -d \ + $BRIDGE_IMAGE --amqp_url amqp://localhost:5666/$QDR_CHANNEL \ + --gw_unix=$BRIDGE_SOCKET + - name: Start sg-bridge from same branch + if: steps.bridge_branch.outcome == 'success' + run: | + docker run --name=sgbridge --network host $BRIDGE_VOLUME -d -uroot \ + -e GITHUB_REF -e BRIDGE_SOCKET -e QDR_CHANNEL -e OPSTOOLS_REPO \ --workdir=$(dirname $BRIDGE_SOCKET) \ $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/run_bridge.sh - name: Set Ceilometer pipelines to QDR output and restart notification agent @@ -78,22 +178,15 @@ jobs: echo pseudo_vhost=true | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp echo rpc_address_prefix="" | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp echo notify_address_prefix="" | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp - cp ci/integration/metrics/*pipeline.yaml /etc/ceilometer/. + cp ci/integration/metrics/ceilometer/bridge/*pipeline.yaml /etc/ceilometer/. cat /etc/ceilometer/* sudo pip install pyngus sudo systemctl restart devstack@ceilometer-anotification.service - - name: Install collectd - run: | - sudo apt-get install collectd - sudo systemctl stop collectd && sudo systemctl disable collectd - sudo cp ci/integration/metrics/collectd.conf /etc/collectd/collectd.conf - sudo touch /var/log/collectd.log && sudo chmod a+rw /var/log/collectd.log - sudo collectd -C ci/integration/metrics/collectd.conf - name: Run sg-core to process metrics run: | docker run --name=sgcore -d -uroot --network host $BRIDGE_VOLUME -e OPSTOOLS_REPO \ --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ - $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/run_sg.sh + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/ceilometer/bridge/run_sg.sh - name: Run Prometheus to store metrics run: | docker run --name=prometheus -d --network host \ @@ -104,23 +197,94 @@ jobs: sleep 360 echo "=========================== qdr =========================" && \ docker exec qdr qdstat -b 127.0.0.1:5666 -a + docker logs qdr echo "========================= sg-core =======================" && \ docker logs sgcore - echo "======================== collectd =======================" && \ - cat /var/log/collectd.log echo "========================= ceilometer ====================" && \ sudo journalctl -xu devstack@ceilometer-anotification.service + echo "======================== prometheus =====================" && \ + docker logs prometheus + - name: Validate metrics processing + run: | + docker run --name=validate -uroot --network host \ + --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/ceilometer/run_validation.sh +#------------------------------------------------------------------------------- + ceilometer-metrics-tcp: + name: "[metrics] transport: socket(tcp); handler: ceilometer-metrics; application: prometheus" + runs-on: ubuntu-22.04 + env: + PROMETHEUS_IMAGE: prom/prometheus:latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Prepare environment + run: | + mkdir -p /opt/stack/ + sudo setfacl -Rdm u::7,g::0,o:0 /opt/stack + - name: Prepare environment for mysql-server installation # https://stackoverflow.com/a/66026366 + run: | + sudo apt-get -f install -o Dpkg::Options::="--force-overwrite" + sudo apt-get purge mysql\* + sudo rm -rf /var/lib/mysql + sudo rm -rf /etc/mysql + sudo dpkg -l | grep -i mysql + sudo apt-get clean + - name: Prepare environment for postgres-server installation + run: | + sudo apt remove postgresql-client-common + sudo apt install postgresql-client-common=238 + sudo apt install postgresql-common + sudo python -m pip install --upgrade pip + sudo python -m pip install --upgrade virtualenv + - name: Install devstack + run: | + SOURCE=$(pwd) + git clone http://github.com/openstack/devstack /opt/stack/devstack + pushd /opt/stack/devstack + cp $SOURCE/ci/integration/metrics/local.conf . + sudo apt-get update + ./stack.sh + popd + - name: Set Ceilometer pipelines to TCP output and restart notification agent + run: | + sudo apt-get install -y crudini + echo addressing_mode="dynamic" | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp + echo pseudo_vhost=true | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp + echo rpc_address_prefix="" | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp + echo notify_address_prefix="" | crudini --merge /etc/ceilometer/ceilometer.conf oslo_messaging_amqp + cp ci/integration/metrics/ceilometer/tcp/*pipeline.yaml /etc/ceilometer/. + cat /etc/ceilometer/* + sudo pip install pyngus + sudo systemctl restart devstack@ceilometer-anotification.service + - name: Run sg-core to process metrics + run: | + docker run --name=sgcore -d -uroot --network host $BRIDGE_VOLUME -e OPSTOOLS_REPO \ + --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/ceilometer/tcp/run_sg.sh + - name: Run Prometheus to store metrics + run: | + docker run --name=prometheus -d --network host \ + --volume ${{ github.workspace }}/ci/integration/metrics/prometheus.yml:/etc/prometheus/prometheus.yml:ro \ + $PROMETHEUS_IMAGE + - name: Debug output + run: | + sleep 360 echo "========================= sg-core =======================" && \ + docker logs sgcore + echo "========================= ceilometer ====================" && \ + sudo journalctl -xu devstack@ceilometer-anotification.service + echo "======================== prometheus =====================" && \ docker logs prometheus - name: Validate metrics processing run: | docker run --name=validate -uroot --network host \ --volume ${{ github.workspace }}:$PROJECT_ROOT:z --workdir $PROJECT_ROOT \ - $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/run_validation.sh + $TEST_IMAGE bash $PROJECT_ROOT/ci/integration/metrics/ceilometer/run_validation.sh #------------------------------------------------------------------------------- logging: name: "[logging] handler: logs; application: elasticsearch, loki" - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 env: BRIDGE_SOCKET: /tmp/sg-bridge/test-socket @@ -135,7 +299,7 @@ jobs: RSYSLOG_VOLUME: "--volume ${{ github.workspace }}/ci/service_configs/rsyslog/rsyslog_config.conf:/etc/rsyslog.d/integration.conf:z" steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 # start data store services - name: Start Elasticsearch service run: | @@ -171,7 +335,7 @@ jobs: $RSYSLOG_IMAGE bash $PROJECT_ROOT/ci/integration/logging/run_rsyslog.sh - name: Wait for services to start successfuly run: | - timeout=180 + timeout=240 echo "======================= rsyslog =======================" rsyslog_wait=0 while [[ $(docker exec qdr qdstat -b 127.0.0.1:5666 -a | grep rsyslog/logs | awk '{print $8}') -le 0 ]] diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9403fbd1..8cdbe3c3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,7 +23,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '1.18' + go-version: '1.19' - uses: actions/checkout@v3 #- name: download libraries # run: go mod download @@ -78,7 +78,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '1.18' + go-version: '1.19' - name: Verify image builds run: | docker build --tag infrawatch/sg-core:latest --file build/Dockerfile . diff --git a/build.sh b/build.sh index 943c206b..997b1571 100755 --- a/build.sh +++ b/build.sh @@ -14,6 +14,7 @@ base=$(pwd) GOCMD=${GOCMD:-"go"} PLUGIN_DIR=${PLUGIN_DIR:-"/tmp/plugins/"} CONTAINER_BUILD=${CONTAINER_BUILD:-false} +BUILD_ARGS=${BUILD_ARGS:-''} PRODUCTION_BUILD=${PRODUCTION_BUILD:-false} if $PRODUCTION_BUILD; then @@ -60,7 +61,7 @@ build_plugins() { search_list "$(basename $i)" OMIT_TRANSPORTS if [ $? -ne 1 ]; then echo "building $(basename $i).so" - $GOCMD build -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin + $GOCMD build $BUILD_ARGS -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin fi done @@ -71,7 +72,7 @@ build_plugins() { search_list "$(basename $i)" OMIT_HANDLERS if [ $? -ne 1 ]; then echo "building $(basename $i).so" - $GOCMD build -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin + $GOCMD build $BUILD_ARGS -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin fi done @@ -82,7 +83,7 @@ build_plugins() { search_list "$(basename $i)" OMIT_APPLICATIONS if [ $? -ne 1 ]; then echo "building $(basename $i).so" - $GOCMD build -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin + $GOCMD build $BUILD_ARGS -o "$PLUGIN_DIR$(basename $i).so" -buildmode=plugin fi done } @@ -92,9 +93,9 @@ build_core() { cd "$base" if $CONTAINER_BUILD; then echo "building sg-core for container" - $GOCMD build -o /tmp/sg-core cmd/*.go + $GOCMD build $BUILD_ARGS -o /tmp/sg-core cmd/*.go else - $GOCMD build -o sg-core cmd/*.go + $GOCMD build $BUILD_ARGS -o sg-core cmd/*.go fi } diff --git a/build/Dockerfile b/build/Dockerfile index c722eb29..946bf85f 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -10,7 +10,7 @@ COPY . $D/ COPY build/repos/opstools.repo /etc/yum.repos.d/opstools.repo RUN dnf install golang git qpid-proton-c-devel -y --setopt=tsflags=nodocs -RUN go install golang.org/dl/go1.18@latest && /go/bin/go1.18 download && PRODUCTION_BUILD=false CONTAINER_BUILD=true GOCMD=/go/bin/go1.18 ./build.sh +RUN go install golang.org/dl/go1.19@latest && /go/bin/go1.19 download && PRODUCTION_BUILD=false CONTAINER_BUILD=true GOCMD=/go/bin/go1.19 ./build.sh # --- end build, create smart gateway layer --- FROM registry.access.redhat.com/ubi8-minimal:latest diff --git a/ci/integration/logging/run_sg.sh b/ci/integration/logging/run_sg.sh index bf0abcf6..c9c825a6 100644 --- a/ci/integration/logging/run_sg.sh +++ b/ci/integration/logging/run_sg.sh @@ -13,11 +13,11 @@ dnf install -y git golang gcc make qpid-proton-c-devel export GOBIN=$GOPATH/bin export PATH=$PATH:$GOBIN -go install golang.org/dl/go1.18@latest -go1.18 download +go install golang.org/dl/go1.19@latest +go1.19 download # install sg-core and start sg-core mkdir -p /usr/lib64/sg-core -PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.18 ./build.sh +PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.19 BUILD_ARGS=-buildvcs=false ./build.sh ./sg-core -config ./ci/integration/logging/sg_config.yaml diff --git a/ci/integration/metrics/event_pipeline.yaml b/ci/integration/metrics/ceilometer/bridge/event_pipeline.yaml similarity index 100% rename from ci/integration/metrics/event_pipeline.yaml rename to ci/integration/metrics/ceilometer/bridge/event_pipeline.yaml diff --git a/ci/integration/metrics/pipeline.yaml b/ci/integration/metrics/ceilometer/bridge/pipeline.yaml similarity index 100% rename from ci/integration/metrics/pipeline.yaml rename to ci/integration/metrics/ceilometer/bridge/pipeline.yaml diff --git a/ci/integration/metrics/ceilometer/bridge/run_sg.sh b/ci/integration/metrics/ceilometer/bridge/run_sg.sh new file mode 100644 index 00000000..a6bcb071 --- /dev/null +++ b/ci/integration/metrics/ceilometer/bridge/run_sg.sh @@ -0,0 +1,23 @@ +#!/bin/env bash +# CI script for UBI8 job +# purpose: spawn sg-core to process messages sent by rsyslog + +set -ex + +# enable required repo(s) +curl -o /etc/yum.repos.d/CentOS-OpsTools.repo $OPSTOOLS_REPO +sed -i 's/gpgcheck=1/gpgcheck=0/g' /etc/yum.repos.d/CentOS-OpsTools.repo + +dnf install -y git golang gcc make qpid-proton-c-devel + +export GOBIN=$GOPATH/bin +export PATH=$PATH:$GOBIN + +go install golang.org/dl/go1.19@latest +go1.19 download + +# install sg-core and start sg-core +mkdir -p /usr/lib64/sg-core +PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.19 BUILD_ARGS=-buildvcs=false ./build.sh + +./sg-core -config ./ci/integration/metrics/ceilometer/bridge/sg_config.yaml diff --git a/ci/integration/metrics/sg_config.yaml b/ci/integration/metrics/ceilometer/bridge/sg_config.yaml similarity index 63% rename from ci/integration/metrics/sg_config.yaml rename to ci/integration/metrics/ceilometer/bridge/sg_config.yaml index f2b1fd4d..db3fdab3 100644 --- a/ci/integration/metrics/sg_config.yaml +++ b/ci/integration/metrics/ceilometer/bridge/sg_config.yaml @@ -7,12 +7,6 @@ transports: - name: socket config: path: /tmp/sg-bridge/test-socket - handlers: - - name: collectd-metrics - - name: amqp1 - config: - uri: amqp://127.0.0.1:5666 - channel: anycast/ceilometer/metering.sample handlers: - name: ceilometer-metrics diff --git a/ci/integration/metrics/ceilometer/run_validation.sh b/ci/integration/metrics/ceilometer/run_validation.sh new file mode 100644 index 00000000..0c9b5724 --- /dev/null +++ b/ci/integration/metrics/ceilometer/run_validation.sh @@ -0,0 +1,30 @@ +#!/bin/env bash +# CI script for UBI8 job +# purpose: verify the expected metric data is scraped by Prometheus + +set -ex + +dnf install -y jq hostname + +PROMETHEUS_URL=http://127.0.0.1:9090 +METRICS=$(curl -s "$PROMETHEUS_URL/api/v1/label/__name__/values" | jq -r .data) + +######################### gather ceilometer data ######################### +ceilo_found="" +for item in $METRICS; do + if [[ $item == \"ceilometer_* ]]; then + if [[ -z "$ceilo_found" ]]; then + ceilo_found=$item + else + ceilo_found="$ceilo_found, $item" + fi + fi +done + +############################### validate ############################### +echo "Ceilometer metrics stored: $ceilo_found" + +if [[ -z "$ceilo_found" ]] ; then + echo "Missing expected metrics data" + exit 1 +fi diff --git a/ci/integration/metrics/ceilometer/tcp/event_pipeline.yaml b/ci/integration/metrics/ceilometer/tcp/event_pipeline.yaml new file mode 100644 index 00000000..f05c696a --- /dev/null +++ b/ci/integration/metrics/ceilometer/tcp/event_pipeline.yaml @@ -0,0 +1,13 @@ +--- +sources: + - name: event_source + events: + - "*" + sinks: + - event_sink +sinks: + - name: event_sink + transformers: + triggers: + publishers: + - tcp://127.0.0.1:4242 diff --git a/ci/integration/metrics/ceilometer/tcp/pipeline.yaml b/ci/integration/metrics/ceilometer/tcp/pipeline.yaml new file mode 100644 index 00000000..9527c24e --- /dev/null +++ b/ci/integration/metrics/ceilometer/tcp/pipeline.yaml @@ -0,0 +1,11 @@ +--- +sources: + - name: meter_source + meters: + - "*" + sinks: + - meter_sink +sinks: + - name: meter_sink + publishers: + - tcp://127.0.0.1:4242 diff --git a/ci/integration/metrics/ceilometer/tcp/run_sg.sh b/ci/integration/metrics/ceilometer/tcp/run_sg.sh new file mode 100644 index 00000000..9ab3da08 --- /dev/null +++ b/ci/integration/metrics/ceilometer/tcp/run_sg.sh @@ -0,0 +1,23 @@ +#!/bin/env bash +# CI script for UBI8 job +# purpose: spawn sg-core to process messages sent by rsyslog + +set -ex + +# enable required repo(s) +curl -o /etc/yum.repos.d/CentOS-OpsTools.repo $OPSTOOLS_REPO +sed -i 's/gpgcheck=1/gpgcheck=0/g' /etc/yum.repos.d/CentOS-OpsTools.repo + +dnf install -y git golang gcc make qpid-proton-c-devel + +export GOBIN=$GOPATH/bin +export PATH=$PATH:$GOBIN + +go install golang.org/dl/go1.19@latest +go1.19 download + +# install sg-core and start sg-core +mkdir -p /usr/lib64/sg-core +PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.19 BUILD_ARGS=-buildvcs=false ./build.sh + +./sg-core -config ./ci/integration/metrics/ceilometer/tcp/sg_config.yaml diff --git a/ci/integration/metrics/ceilometer/tcp/sg_config.yaml b/ci/integration/metrics/ceilometer/tcp/sg_config.yaml new file mode 100644 index 00000000..dd9e8aec --- /dev/null +++ b/ci/integration/metrics/ceilometer/tcp/sg_config.yaml @@ -0,0 +1,20 @@ +--- + +pluginDir: /usr/lib64/sg-core +logLevel: debug + +transports: + - name: socket + config: + type: tcp + socketaddr: 127.0.0.1:4242 + handlers: + - name: ceilometer-metrics + config: + source: tcp + +applications: + - name: prometheus + config: + host: 0.0.0.0 + port: 3000 diff --git a/ci/integration/metrics/collectd.conf b/ci/integration/metrics/collectd/collectd.conf similarity index 100% rename from ci/integration/metrics/collectd.conf rename to ci/integration/metrics/collectd/collectd.conf diff --git a/ci/integration/metrics/run_sg.sh b/ci/integration/metrics/collectd/run_sg.sh similarity index 68% rename from ci/integration/metrics/run_sg.sh rename to ci/integration/metrics/collectd/run_sg.sh index cab949d5..7e186cd0 100644 --- a/ci/integration/metrics/run_sg.sh +++ b/ci/integration/metrics/collectd/run_sg.sh @@ -13,11 +13,11 @@ dnf install -y git golang gcc make qpid-proton-c-devel export GOBIN=$GOPATH/bin export PATH=$PATH:$GOBIN -go install golang.org/dl/go1.18@latest -go1.18 download +go install golang.org/dl/go1.19@latest +go1.19 download # install sg-core and start sg-core mkdir -p /usr/lib64/sg-core -PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.18 ./build.sh +PLUGIN_DIR=/usr/lib64/sg-core/ GOCMD=go1.19 BUILD_ARGS=-buildvcs=false ./build.sh -./sg-core -config ./ci/integration/metrics/sg_config.yaml +./sg-core -config ./ci/integration/metrics/collectd/sg_config.yaml diff --git a/ci/integration/metrics/run_validation.sh b/ci/integration/metrics/collectd/run_validation.sh similarity index 65% rename from ci/integration/metrics/run_validation.sh rename to ci/integration/metrics/collectd/run_validation.sh index 1e453719..d13ccc21 100644 --- a/ci/integration/metrics/run_validation.sh +++ b/ci/integration/metrics/collectd/run_validation.sh @@ -9,18 +9,6 @@ dnf install -y jq hostname PROMETHEUS_URL=http://127.0.0.1:9090 METRICS=$(curl -s "$PROMETHEUS_URL/api/v1/label/__name__/values" | jq -r .data) -######################### gather ceilometer data ######################### -ceilo_found="" -for item in $METRICS; do - if [[ $item == \"ceilometer_* ]]; then - if [[ -z "$ceilo_found" ]]; then - ceilo_found=$item - else - ceilo_found="$ceilo_found, $item" - fi - fi -done - ######################### gather collectd data ######################### collectd_found="" for item in $METRICS; do @@ -34,10 +22,9 @@ for item in $METRICS; do done ############################### validate ############################### -echo "Ceilometer metrics stored: $ceilo_found" echo "Collectd metrics stored: $collectd_found" -if [[ -z "$ceilo_found" ]] || [[ -z "$collectd_found" ]]; then +if [[ -z "$collectd_found" ]]; then echo "Missing expected metrics data" exit 1 fi diff --git a/ci/integration/metrics/collectd/sg_config.yaml b/ci/integration/metrics/collectd/sg_config.yaml new file mode 100644 index 00000000..0516a4d6 --- /dev/null +++ b/ci/integration/metrics/collectd/sg_config.yaml @@ -0,0 +1,17 @@ +--- + +pluginDir: /usr/lib64/sg-core +logLevel: debug + +transports: + - name: socket + config: + path: /tmp/sg-bridge/test-socket + handlers: + - name: collectd-metrics + +applications: + - name: prometheus + config: + host: 0.0.0.0 + port: 3000 diff --git a/ci/integration/metrics/local.conf b/ci/integration/metrics/local.conf index b696c197..c6f02d56 100644 --- a/ci/integration/metrics/local.conf +++ b/ci/integration/metrics/local.conf @@ -8,6 +8,7 @@ REDIS_PASSWORD=$ADMIN_PASSWORD enable_plugin ceilometer https://opendev.org/openstack/ceilometer.git CEILOMETER_BACKEND=none +CEILOMETER_PIPELINE_INTERVAL=60 enable_service ceilometer-acompute ceilometer-acentral ceilometer-anotification disable_service horizon diff --git a/ci/integration/metrics/run_bridge.sh b/ci/integration/metrics/run_bridge.sh index bb79e043..fa0e0b68 100644 --- a/ci/integration/metrics/run_bridge.sh +++ b/ci/integration/metrics/run_bridge.sh @@ -4,8 +4,7 @@ set -ex -CHANNEL=$QDR_CHANNEL_CEILOMTR -CHANNEL=${CHANNEL:-$QDR_CHANNEL_COLLECTD} +CHANNEL=$QDR_CHANNEL # enable required repo(s) curl -o /etc/yum.repos.d/CentOS-OpsTools.repo $OPSTOOLS_REPO diff --git a/ci/unit/run_tests.sh b/ci/unit/run_tests.sh index fb985982..904d7113 100644 --- a/ci/unit/run_tests.sh +++ b/ci/unit/run_tests.sh @@ -14,8 +14,8 @@ yum install -y git golang gcc make glibc-langpack-en qpid-proton-c-devel export GOBIN=$GOPATH/bin export PATH=$PATH:$GOBIN -go install golang.org/dl/go1.18@latest -go1.18 download +go install golang.org/dl/go1.19@latest +go1.19 download -go1.18 test -v -coverprofile=profile.cov ./... +go1.19 test -v -coverprofile=profile.cov ./... diff --git a/cmd/manager/manager.go b/cmd/manager/manager.go index 80debc5c..b81515d5 100644 --- a/cmd/manager/manager.go +++ b/cmd/manager/manager.go @@ -163,7 +163,7 @@ func SetTransportHandlers(name string, handlerBlocks []struct { err = h.Config(configBlob) if err != nil { - return err + return errors.Wrapf(err, "failed configuring handler plugin '%s'", block.Name) } handlers[name] = append(handlers[name], h) diff --git a/devstack/override-defaults b/devstack/override-defaults new file mode 100644 index 00000000..e69de29b diff --git a/devstack/plugin.sh b/devstack/plugin.sh new file mode 100755 index 00000000..f2734547 --- /dev/null +++ b/devstack/plugin.sh @@ -0,0 +1,51 @@ +function preinstall_sg-core { + install_package $SG_CORE_CONTAINER_EXECUTABLE +} + +function install_sg-core { + $SG_CORE_CONTAINER_EXECUTABLE pull $SG_CORE_CONTAINER_IMAGE +} + +function configure_sg-core { + sudo mkdir -p `dirname $SG_CORE_CONF` + sudo cp $SG_CORE_DIR/devstack/sg-core.conf.yaml $SG_CORE_CONF +} + +function init_sg-core { + $SG_CORE_CONTAINER_EXECUTABLE run -v $SG_CORE_CONF:/etc/sg-core.conf.yaml --network host --name sg-core -d $SG_CORE_CONTAINER_IMAGE +} + +# check for service enabled +if is_service_enabled sg-core; then + + if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then + # Set up system services + echo_summary "Configuring system services sg-core" + preinstall_sg-core + + elif [[ "$1" == "stack" && "$2" == "install" ]]; then + # Perform installation of service source + echo_summary "Installing sg-core" + install_sg-core + + elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then + # Configure after the other layer 1 and 2 services have been configured + echo_summary "Configuring sg-core" + configure_sg-core + + elif [[ "$1" == "stack" && "$2" == "extra" ]]; then + # Initialize and start the sg-core service + echo_summary "Initializing sg-core" + init_sg-core + fi + + if [[ "$1" == "unstack" ]]; then + $SG_CORE_CONTAINER_EXECUTABLE stop sg-core + $SG_CORE_CONTAINER_EXECUTABLE rm sg-core + fi + + if [[ "$1" == "clean" ]]; then + $SG_CORE_CONTAINER_EXECUTABLE rmi sg-core:latest + fi +fi + diff --git a/devstack/settings b/devstack/settings new file mode 100644 index 00000000..3e00b4c4 --- /dev/null +++ b/devstack/settings @@ -0,0 +1,14 @@ +define_plugin sg-core +plugin_requires sg-core ceilometer +enable_service sg-core +enable_service ceilometer + +SG_CORE_DIR=$DEST/sg-core +SG_CORE_CONF_DIR=/etc/sg-core +SG_CORE_CONF=$SG_CORE_CONF_DIR/sg-core.conf.yaml + +SG_CORE_CONTAINER_REPOSITORY=${SG_CORE_CONTAINER_REPOSITORY:-quay.io/infrawatch/sg-core} +SG_CORE_CONTAINER_TAG=${SG_CORE_CONTAINER_TAG:-latest} +SG_CORE_CONTAINER_IMAGE=$SG_CORE_CONTAINER_REPOSITORY:$SG_CORE_CONTAINER_TAG + +SG_CORE_CONTAINER_EXECUTABLE="podman" diff --git a/devstack/sg-core.conf.yaml b/devstack/sg-core.conf.yaml new file mode 100644 index 00000000..dd9e8aec --- /dev/null +++ b/devstack/sg-core.conf.yaml @@ -0,0 +1,20 @@ +--- + +pluginDir: /usr/lib64/sg-core +logLevel: debug + +transports: + - name: socket + config: + type: tcp + socketaddr: 127.0.0.1:4242 + handlers: + - name: ceilometer-metrics + config: + source: tcp + +applications: + - name: prometheus + config: + host: 0.0.0.0 + port: 3000 diff --git a/generator/amqp_snd_th.c b/generator/amqp_snd_th.c index 7ecd420b..3953c68d 100644 --- a/generator/amqp_snd_th.c +++ b/generator/amqp_snd_th.c @@ -70,32 +70,78 @@ char *RSYSLOG_MSG1 = "{\"@timestamp\":\""; char *RSYSLOG_MSG2 = "\", \"host\":\""; char *RSYSLOG_MSG3 = "\", \"severity\":\"5\", \"facility\":\"user\", \"tag\":\"tag1\", \"source\":\"some-source\", \"message\":\"a log message from generator'\", \"file\":\"\", \"cloud\": \"cloud1\", \"region\": \"some-region\"}"; +char *CEIL_MSG1 = + "{\"request\": {\"oslo.version\": \"2.0\", \"oslo.message\": \"{\\\"message_id\\\": \\\"111c1c6e-21b8-4113-1a21-d10121214113\\\", \\\"publisher_id\\\": \\\"telemetry.publisher.somethingk.cloud.internal\\\", \\\"event_type\\\": \\\"metering\\\", \\\"priority\\\": \\\"SAMPLE\\\", \\\"payload\\\": ["; +char *CEIL_MSG2 = + "{\\\"source\\\": \\\"openstack\\\", \\\"counter_name\\\": \\\"some_counter_name\\\", \\\"counter_type\\\": \\\"delta\\\", \\\"counter_unit\\\": \\\"user\\\", \\\"counter_volume\\\": 1, \\\"user_id\\\": \\\"11118c1fa1d019019b118c1901e41151\\\", \\\"project_id\\\": \\\"None\\\", \\\"resource_id\\\": \\\"161b1cd1a6d1491e9b11811918e41151\\\", \\\"timestamp\\\": \\\""; +char *CEIL_MSG3 = + "\\\", \\\"resource_metadata\\\": {\\\"host\\\": \\\"compute-0.redhat.local\\\", \\\"flavor_id\\\": \\\"71cd0af1-afd3-4ee4-b918-cec05bf89578\\\", \\\"flavor_name\\\": \\\"m1.tiny\\\", \\\"display_name\\\": \\\"new-instance\\\", \\\"image_ref\\\": \\\"45333e02-643d-4f4f-a817-065060753983\\\", \\\"launched_at\\\": \\\"2020-09-14T16:12:49.839122\\\", \\\"created_at\\\": \\\"2020-09-14 16:12:39+00:00\\\"}, \\\"message_id\\\": \\\"22a22d22-0292-12e2-8232-c2a2e02d52a5\\\", \\\"monotonic_time\\\": \\\"None\\\", \\\"message_signature\\\": \\\"6322324324323b2d32832932132432c32732e32e323d2f3732d32e3232c32323\\\"}"; +char *CEIL_MSG4 = "], \\\"timestamp\\\": \\\""; +char *CEIL_MSG5 = "\\\"}\"}, \"context\": {}}"; + + inline static char *msg_cpy(char *p, char *end, char *msg, int msg_len) { - long remain = end - p + 1; - if ( (p+msg_len) > end) { - p = '\0'; - return (char *)NULL; - } - p = memccpy(p, msg, '\0', remain); - return --p; + long remain = end - p + 1; + if ( (p+msg_len) > end) { + p = '\0'; + return (char *)NULL; + } + p = memccpy(p, msg, '\0', remain); + return --p; +} + +static char *build_ceil_mesg(app_data_t *app, char *time_buf) { + char *end = &app->MSG_BUFFER[sizeof(app->MSG_BUFFER) - 1]; + char *p = app->MSG_BUFFER; + + if ( ( p = msg_cpy(p, end, CEIL_MSG1, sizeof(CEIL_MSG1) ) ) == NULL ) + return NULL; + + for (int i = 0; i < app->num_cd_per_mesg;) { + if ( ( p = msg_cpy(p, end, CEIL_MSG2, sizeof(CEIL_MSG2) ) ) == NULL ) + return NULL; + if ( ( p = msg_cpy(p, end, time_buf, sizeof(time_buf) ) ) == NULL ) + return NULL; + if ( ( p = msg_cpy(p, end, CEIL_MSG3, sizeof(CEIL_MSG3) ) ) == NULL ) + return NULL; + + if (++i < app->num_cd_per_mesg) { + *p++ = ','; + } + + app->curr_host++; + if (app->curr_host == (app->host_list_len - 1)) + app->curr_host = 0; + } + + if ( ( p = msg_cpy(p, end, CEIL_MSG4, sizeof(CEIL_MSG4) ) ) == NULL ) + return NULL; + if ( ( p = msg_cpy(p, end, time_buf, sizeof(time_buf) ) ) == NULL ) + return NULL; + if ( ( p = msg_cpy(p, end, CEIL_MSG5, sizeof(CEIL_MSG5) ) ) == NULL ) + return NULL; + + *p = '\0'; + + return app->MSG_BUFFER; } static char *build_log_mesg(app_data_t *app, char *time_buf) { - char *end = &app->MSG_BUFFER[sizeof(app->MSG_BUFFER) - 1]; + char *end = &app->MSG_BUFFER[sizeof(app->MSG_BUFFER) - 1]; char *p = app->MSG_BUFFER; for (int i = 0; i < app->num_cd_per_mesg;) { - char *hostname = app->host_list[app->curr_host].hostname; + char *hostname = app->host_list[app->curr_host].hostname; if ( ( p = msg_cpy(p, end, RSYSLOG_MSG1, sizeof(RSYSLOG_MSG1) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, time_buf, sizeof(time_buf) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, RSYSLOG_MSG2, sizeof(RSYSLOG_MSG2) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, hostname, strlen(hostname) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, RSYSLOG_MSG3, sizeof(RSYSLOG_MSG3) ) ) == NULL ) - return NULL; + return NULL; app->curr_host++; if (app->curr_host == (app->host_list_len - 1)) app->curr_host = 0; @@ -108,7 +154,7 @@ static char *build_log_mesg(app_data_t *app, char *time_buf) { static char *build_metric_mesg(app_data_t *app, char *time_buf) { - char *end = &app->MSG_BUFFER[sizeof(app->MSG_BUFFER) - 1]; + char *end = &app->MSG_BUFFER[sizeof(app->MSG_BUFFER) - 1]; char *p = app->MSG_BUFFER; char val_buff[20]; @@ -116,27 +162,27 @@ static char *build_metric_mesg(app_data_t *app, char *time_buf) { for (int i = 0; i < app->num_cd_per_mesg;) { sprintf(val_buff, "%ld", app->host_list[app->curr_host].count++); - char *hostname = app->host_list[app->curr_host].hostname; - char *metric = app->host_list[app->curr_host].metric; + char *hostname = app->host_list[app->curr_host].hostname; + char *metric = app->host_list[app->curr_host].metric; if ( ( p = msg_cpy(p, end, CD_MSG1, sizeof(CD_MSG1) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, val_buff, sizeof(val_buff) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, CD_MSG2, sizeof(CD_MSG2) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, time_buf, sizeof(time_buf) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, CD_MSG3, sizeof(CD_MSG3) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, hostname, strlen(hostname) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, CD_MSG4, sizeof(CD_MSG4) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, metric, strlen(metric) ) ) == NULL ) - return NULL; + return NULL; if ( ( p = msg_cpy(p, end, CD_MSG5, sizeof(CD_MSG5) ) ) == NULL ) - return NULL; + return NULL; if (++i < app->num_cd_per_mesg) { *p++ = ','; @@ -156,14 +202,18 @@ static char *build_metric_mesg(app_data_t *app, char *time_buf) { static void gen_mesg(pn_rwbytes_t *buf, app_data_t *app, char *time_buf) { if (app->logs) { buf->start = build_log_mesg(app, time_buf); - } else { + } else if (app->ceilometer) { + buf->start = build_ceil_mesg(app, time_buf); + } else if (app->collectd){ buf->start = build_metric_mesg(app, time_buf); - } + } else { + buf->start = NULL; + } - if (buf->start != NULL) - buf->size = strlen(buf->start); - else - buf->size = 0; + if (buf->start != NULL) + buf->size = strlen(buf->start); + else + buf->size = 0; } /* Create a message with a map { "sequence" : number } encode it and return the @@ -254,7 +304,7 @@ static bool send_burst(app_data_t *app, pn_event_t *event) { break; } } - + if (app->sleep_usec) usleep(app->sleep_usec); diff --git a/generator/gen.c b/generator/gen.c index d51d3da1..a88d2979 100644 --- a/generator/gen.c +++ b/generator/gen.c @@ -36,7 +36,7 @@ extern int batch_count; static void usage(void) { fprintf(stdout, "%s: gen [OPTIONS] amqp_ip amqp_port\n\n" - "Generate Collectd or Rsyslog traffic on AMQP...\n\n" + "Generate Collectd, Ceilometer or Rsyslog traffic on AMQP...\n\n" "positional args:\n" " amqp_ip ip address of QDR\n" " amqp_port port number of the QDR\n" @@ -45,13 +45,13 @@ static void usage(void) { " -i container_id should be unique (defaults to sa-RND)\n" " -a amqp_address AMQP address for endpoint (defaults to collectd/telemetry)\n" " -c count message count to stop (defaults to 0 for continuous)\n" - " -n cd_per_mesg number of collectd or rsyslog messages per AMQP message (defaults to 1)\n" + " -n cd_per_mesg number of messages per AMQP message (defaults to 1)\n" " -o num_hosts number of hosts to simulate (defaults to 1)\n" " -m messages_hosts number of metrics or logs per hosts to simulate (defaults to 100)\n" " -t num_threads number of independent send pthreads (defaults to 1)\n" " -b burst_size maximum number of AMQP msgs to send per credit interval (defaults to # of credits)\n" " -s sleep_usec number of usec to sleep per credit interval (defaults to 0 for no sleep)\n" - " -l generate rsyslog log messages (generates collectd metrics by default)\n" + " -g message_type choose between collectd, ceilometer or rsyslog message type (generates collectd metrics by default)\n" " -v verbose, print extra info (additional -v increases verbosity)\n" " -h show help\n\n" "\n", @@ -106,10 +106,12 @@ int main(int argc, char **argv) { app.num_metrics = 100; app.presettled = false; app.logs = false; + app.ceilometer = false; + app.collectd = true; int num_threads = 1; - while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:pt:l")) != -1) { + while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:pt:g:")) != -1) { switch (opt) { case 'i': sprintf(cid_buf, optarg); @@ -147,8 +149,17 @@ int main(int argc, char **argv) { case 'n': app.num_cd_per_mesg = atoi(optarg); break; - case 'l': - app.logs = true; + case 'g': + if (!strcmp(optarg, "collectd")) { + app.collectd = true; + } else if (!strcmp(optarg, "ceilometer")) { + app.ceilometer = true; + } else if (!strcmp(optarg, "rsyslog")) { + app.logs = true; + } else { + usage(); + return 1; + } break; default: usage(); diff --git a/generator/gen.h b/generator/gen.h index 26f42225..5f57dffd 100644 --- a/generator/gen.h +++ b/generator/gen.h @@ -28,6 +28,8 @@ typedef struct { int num_metrics; int metrics_per_second; int logs; + int ceilometer; + int collectd; pthread_t amqp_snd_th; diff --git a/go.mod b/go.mod index 98aaf499..2d2f4825 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/infrawatch/sg-core -go 1.18 +go 1.19 require ( collectd.org v0.5.0 @@ -11,15 +11,37 @@ require ( github.com/infrawatch/apputils v0.0.0-20210809211320-3573b2937d14 github.com/json-iterator/go v1.1.12 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.11.0 + github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.6.1 - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect - google.golang.org/protobuf v1.27.1 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 gopkg.in/errgo.v2 v2.1.0 gopkg.in/go-playground/assert.v1 v1.2.1 gopkg.in/go-playground/validator.v9 v9.31.0 gopkg.in/yaml.v2 v2.4.0 - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b + gopkg.in/yaml.v3 v3.0.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-ini/ini v1.62.0 // indirect + github.com/go-playground/locales v0.13.0 // indirect + github.com/go-playground/universal-translator v0.17.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/leodido/go-urn v1.2.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.29.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect + github.com/smartystreets/goconvey v1.7.2 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/sys v0.1.0 // indirect + google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/ini.v1 v1.63.2 // indirect ) require ( diff --git a/go.sum b/go.sum index d5215054..1f0c38a8 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -248,6 +249,10 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -373,8 +378,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -531,8 +536,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/plugins/handler/ceilometer-metrics/main.go b/plugins/handler/ceilometer-metrics/main.go index d0018ad8..73459195 100644 --- a/plugins/handler/ceilometer-metrics/main.go +++ b/plugins/handler/ceilometer-metrics/main.go @@ -1,12 +1,15 @@ package main import ( + "bytes" "context" "errors" + "fmt" "strings" "time" "github.com/infrawatch/sg-core/pkg/bus" + "github.com/infrawatch/sg-core/pkg/config" "github.com/infrawatch/sg-core/pkg/data" "github.com/infrawatch/sg-core/pkg/handler" "github.com/infrawatch/sg-core/plugins/handler/ceilometer-metrics/pkg/ceilometer" @@ -29,6 +32,14 @@ type ceilometerMetricHandler struct { totalMetricsDecoded uint64 totalDecodeErrors uint64 totalMessagesReceived uint64 + config ceilometerConfig +} + +// The tcp and udp ceilometer publishers send the data in a message pack format. +// The messaging ceilometer publisher sends the data in a JSON format. +// That's the reason why we need to know the source. +type ceilometerConfig struct { + Source string `yaml:"source"` } func (c *ceilometerMetricHandler) Run(ctx context.Context, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) { @@ -71,7 +82,18 @@ func (c *ceilometerMetricHandler) Run(ctx context.Context, mpf bus.MetricPublish func (c *ceilometerMetricHandler) Handle(blob []byte, reportErrs bool, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) error { c.totalMessagesReceived++ - msg, err := c.ceilo.ParseInputJSON(blob) + var msg *ceilometer.Message + var err error + switch c.config.Source { + case "tcp": + fallthrough + case "udp": + msg, err = c.ceilo.ParseInputMsgPack(blob) + case "unix": + fallthrough + default: + msg, err = c.ceilo.ParseInputJSON(blob) + } if err != nil { return err } @@ -162,8 +184,8 @@ func genName(cNameShards []string) string { } func genLabels(m ceilometer.Metric, publisher string, cNameShards []string) ([]string, []string) { - labelKeys := make([]string, 8) // TODO: set to persistent var - labelVals := make([]string, 8) + labelKeys := make([]string, 12) // TODO: set to persistent var + labelVals := make([]string, 12) plugin := cNameShards[0] pluginVal := m.ResourceID if len(cNameShards) > 2 { @@ -201,6 +223,24 @@ func genLabels(m ceilometer.Metric, publisher string, cNameShards []string) ([]s index++ } + if m.ProjectName != "" { + labelKeys[index] = "project_name" + labelVals[index] = m.ProjectName + index++ + } + + if m.UserID != "" { + labelKeys[index] = "user" + labelVals[index] = m.UserID + index++ + } + + if m.UserName != "" { + labelKeys[index] = "user_name" + labelVals[index] = m.UserName + index++ + } + if m.CounterUnit != "" { labelKeys[index] = "unit" labelVals[index] = m.CounterUnit @@ -219,6 +259,25 @@ func genLabels(m ceilometer.Metric, publisher string, cNameShards []string) ([]s index++ } + if m.ResourceMetadata.DisplayName != "" { + labelKeys[index] = "resource_name" + labelVals[index] = m.ResourceMetadata.DisplayName + // index++ + } + + if m.ResourceMetadata.Name != "" { + labelKeys[index] = "resource_name" + if labelVals[index] != "" { + // Use the ":" delimiter when DisplayName is not None + labelVals[index] = labelVals[index] + ":" + m.ResourceMetadata.Name + } else { + labelVals[index] = m.ResourceMetadata.Name + } + } + if labelVals[index] != "" { + index++ + } + return labelKeys[:index], labelVals[:index] } @@ -227,6 +286,20 @@ func (c *ceilometerMetricHandler) Identify() string { } func (c *ceilometerMetricHandler) Config(blob []byte) error { + c.config = ceilometerConfig{ + Source: "unix", + } + err := config.ParseConfig(bytes.NewReader(blob), &c.config) + if err != nil { + return err + } + + c.config.Source = strings.ToLower(c.config.Source) + + if c.config.Source != "unix" && c.config.Source != "tcp" && c.config.Source != "udp" { + return fmt.Errorf("incorrect source, should be either \"unix\", \"tcp\" or \"udp\", received: %s", + c.config.Source) + } return nil } diff --git a/plugins/handler/ceilometer-metrics/main_test.go b/plugins/handler/ceilometer-metrics/main_test.go index c4a66225..e698f921 100644 --- a/plugins/handler/ceilometer-metrics/main_test.go +++ b/plugins/handler/ceilometer-metrics/main_test.go @@ -17,6 +17,16 @@ var ( metricsUT []data.Metric ) +var expectedMsgpackMetric = data.Metric{ + Name: "ceilometer_test_name_0_0_82", + Time: 0, + Type: data.UNTYPED, + Interval: 100 * time.Second, + Value: 0, + LabelKeys: []string{"test_name_0_0_82", "publisher", "type", "counter", "project", "project_name", "user", "user_name", "unit", "resource", "resource_name"}, + LabelVals: []string{"test_resource_id", "localhost.localdomain", "test_name_0_0_82", "test_name_0_0_82", "test_project_id_0", "test_project_name_0", "test_user_id", "test_user_name", "test_unit", "test_resource_id", "test_display_name:test_name"}, +} + // CeilometerMetricTemplate holds correct parsings for comparing against parsed results type CeilometerMetricTestTemplate struct { TestInput jsoniter.RawMessage `json:"testInput"` @@ -48,8 +58,12 @@ func MetricReceive(name string, mTime float64, mType data.MetricType, interval t }) } -func TestCeilometerIncoming(t *testing.T) { +func TestCeilometerIncomingJSON(t *testing.T) { plugin := New() + err := plugin.Config([]byte{}) + if err != nil { + t.Errorf("failed configuring ceilometer handler plugin: %s", err.Error()) + } testData, err := os.ReadFile("messages/metric-tests.json") if err != nil { @@ -81,6 +95,26 @@ func TestCeilometerIncoming(t *testing.T) { assert.Equal(t, expMetric, metricsUT[index]) } } +func TestCeilometerIncomingMsgpack(t *testing.T) { + plugin := New() + err := plugin.Config([]byte("source: tcp")) + if err != nil { + t.Errorf("failed configuring ceilometer handler plugin: %s", err.Error()) + } + + testData, err := os.ReadFile("messages/msgpack-test.msgpack") + if err != nil { + t.Errorf("failed loading test data: %s", err.Error()) + } + + metricsUT = []data.Metric{} + err = plugin.Handle(testData, false, MetricReceive, EventReceive) + if err != nil { + t.Error(err) + } + + assert.Equal(t, expectedMsgpackMetric, metricsUT[0]) +} func TestGenLabelsSizes(t *testing.T) { t.Run("un-exhaustive labels", func(t *testing.T) { @@ -93,7 +127,9 @@ func TestGenLabelsSizes(t *testing.T) { CounterUnit: "GB", CounterVolume: 2, UserID: "user_id", + UserName: "user_name", ProjectID: "db3fce7b7aeb4109bb2794f9337e68fa", + ProjectName: "test_project", ResourceID: "ed8102c3-923a-4f5a-9a24-d59afc174755", Timestamp: "2021-03-30T15:20:19.891893", } @@ -110,8 +146,8 @@ func TestGenLabelsSizes(t *testing.T) { } } - // should have 7 labels - assert.Equal(t, len(labelKeys), 7) + // should have 10 labels + assert.Equal(t, len(labelKeys), 10) }) t.Run("exhaustive labels", func(t *testing.T) { @@ -122,7 +158,9 @@ func TestGenLabelsSizes(t *testing.T) { CounterUnit: "GB", CounterVolume: 2, UserID: "user_id", + UserName: "user_name", ProjectID: "db3fce7b7aeb4109bb2794f9337e68fa", + ProjectName: "test_project", ResourceID: "ed8102c3-923a-4f5a-9a24-d59afc174755", Timestamp: "2021-03-30T15:20:19.891893", ResourceMetadata: ceilometer.Metadata{ @@ -136,8 +174,8 @@ func TestGenLabelsSizes(t *testing.T) { assert.Equal(t, len(labelKeys), len(labelVals)) fmt.Println(labelKeys) - // should have 8 labels - assert.Equal(t, len(labelKeys), 8) + // should have 11 labels + assert.Equal(t, len(labelKeys), 11) }) diff --git a/plugins/handler/ceilometer-metrics/messages/metric-tests.json b/plugins/handler/ceilometer-metrics/messages/metric-tests.json index 0bdc0bcd..155bd847 100644 --- a/plugins/handler/ceilometer-metrics/messages/metric-tests.json +++ b/plugins/handler/ceilometer-metrics/messages/metric-tests.json @@ -4,7 +4,7 @@ "testInput": { "request": { "oslo.version": "2.0", - "oslo.message": "{\"message_id\": \"37f64423-db31-4cfb-8c9d-06f9c0fad04a\", \"publisher_id\": \"telemetry.publisher.controller-0.redhat.local\", \"event_type\": \"metering\", \"priority\": \"SAMPLE\", \"payload\": [{\"source\": \"openstack\", \"counter_name\": \"disk.ephemeral.size\", \"counter_type\": \"gauge\", \"counter_unit\": \"GB\", \"counter_volume\": 0, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a54880-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"be55d63bd5d876a62ab52824104128eedfa0619386e8569e326ccef4dcf0d9db\"}, {\"source\": \"openstack\", \"counter_name\": \"disk.root.size\", \"counter_type\": \"gauge\", \"counter_unit\": \"GB\", \"counter_volume\": 1, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a55c80-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"bc0b987d71fe9f0d5d902347f22a0a20b2d975344b4c948572cae4dae553e960\"}, {\"source\": \"openstack\", \"counter_name\": \"compute.instance.booting.time\", \"counter_type\": \"gauge\", \"counter_unit\": \"sec\", \"counter_volume\": 10.839122, \"user_id\": null, \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a574d6-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"fd7f1e2fdb34b7beb836d0ead178289f7c36f39bcd68acfd0719848667c58a13\"}, {\"source\": \"openstack\", \"counter_name\": \"vcpus\", \"counter_type\": \"gauge\", \"counter_unit\": \"vcpu\", \"counter_volume\": 2, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a5821e-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"d3f107c2ef6bb1b06e1a9975d1f1ff0bdc51432adff39403db2a1f6a9773b99d\"}, {\"source\": \"openstack\", \"counter_name\": \"memory\", \"counter_type\": \"gauge\", \"counter_unit\": \"MB\", \"counter_volume\": 512, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a591dc-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"9dcf78e3cd43e7fcd66cda5cf33511150e79a086a634bd9d087bb567e4985980\"}], \"timestamp\": \"2020-09-14 16:12:49.954128\"}" + "oslo.message": "{\"message_id\": \"37f64423-db31-4cfb-8c9d-06f9c0fad04a\", \"publisher_id\": \"telemetry.publisher.controller-0.redhat.local\", \"event_type\": \"metering\", \"priority\": \"SAMPLE\", \"payload\": [{\"source\": \"openstack\", \"counter_name\": \"disk.ephemeral.size\", \"counter_type\": \"gauge\", \"counter_unit\": \"GB\", \"counter_volume\": 0, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"user_name\": \"test_user\", \"project_name\": \"test_project\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"name\": \"instance-0000001\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a54880-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"be55d63bd5d876a62ab52824104128eedfa0619386e8569e326ccef4dcf0d9db\"}, {\"source\": \"openstack\", \"counter_name\": \"disk.root.size\", \"counter_type\": \"gauge\", \"counter_unit\": \"GB\", \"counter_volume\": 1, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"user_name\": \"test_user\", \"project_name\": \"test_project\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"name\": \"instance-0000001\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a55c80-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"bc0b987d71fe9f0d5d902347f22a0a20b2d975344b4c948572cae4dae553e960\"}, {\"source\": \"openstack\", \"counter_name\": \"compute.instance.booting.time\", \"counter_type\": \"gauge\", \"counter_unit\": \"sec\", \"counter_volume\": 10.839122, \"user_id\": null, \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"user_name\": null, \"project_name\": \"test_project\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"name\": \"instance-0000001\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a574d6-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"fd7f1e2fdb34b7beb836d0ead178289f7c36f39bcd68acfd0719848667c58a13\"}, {\"source\": \"openstack\", \"counter_name\": \"vcpus\", \"counter_type\": \"gauge\", \"counter_unit\": \"vcpu\", \"counter_volume\": 2, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"user_name\": \"test_user\", \"project_name\": \"test_project\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"display_name\": \"new-instance\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a5821e-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"d3f107c2ef6bb1b06e1a9975d1f1ff0bdc51432adff39403db2a1f6a9773b99d\"}, {\"source\": \"openstack\", \"counter_name\": \"memory\", \"counter_type\": \"gauge\", \"counter_unit\": \"MB\", \"counter_volume\": 512, \"user_id\": \"3ee72fcd74aa4439bb07fa69f1bc7169\", \"project_id\": \"e56191ef77744c599dbcecae6af176bb\", \"user_name\": \"test_user\", \"project_name\": \"test_project\", \"resource_id\": \"d8bd99b6-6fd8-4c02-a2e3-efbf596df636\", \"timestamp\": \"2020-09-14T16:12:49.939250+00:00\", \"resource_metadata\": {\"host\": \"compute-0.redhat.local\", \"flavor_id\": \"71cd0af1-afd3-4ee4-b918-cec05bf89578\", \"flavor_name\": \"m1.tiny\", \"name\": \"instance-0000001\", \"image_ref\": \"45333e02-643d-4f4f-a817-065060753983\", \"launched_at\": \"2020-09-14T16:12:49.839122\", \"created_at\": \"2020-09-14 16:12:39+00:00\"}, \"message_id\": \"22a591dc-f6a5-11ea-b0f2-525400971e97\", \"monotonic_time\": null, \"message_signature\": \"9dcf78e3cd43e7fcd66cda5cf33511150e79a086a634bd9d087bb567e4985980\"}], \"timestamp\": \"2020-09-14 16:12:49.954128\"}" }, "context": {} }, @@ -17,9 +17,13 @@ "type", "counter", "project", + "project_name", + "user", + "user_name", "unit", "resource", - "vm_instance" + "vm_instance", + "resource_name" ], "LabelVals": [ "size", @@ -27,9 +31,13 @@ "ephemeral", "disk.ephemeral.size", "e56191ef77744c599dbcecae6af176bb", + "test_project", + "3ee72fcd74aa4439bb07fa69f1bc7169", + "test_user", "GB", "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", - "compute-0.redhat.local" + "compute-0.redhat.local", + "new-instance:instance-0000001" ], "Time": 1600099969, "Type": 2, @@ -44,9 +52,13 @@ "type", "counter", "project", + "project_name", + "user", + "user_name", "unit", "resource", - "vm_instance" + "vm_instance", + "resource_name" ], "LabelVals": [ "size", @@ -54,9 +66,13 @@ "root", "disk.root.size", "e56191ef77744c599dbcecae6af176bb", + "test_project", + "3ee72fcd74aa4439bb07fa69f1bc7169", + "test_user", "GB", "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", - "compute-0.redhat.local" + "compute-0.redhat.local", + "new-instance:instance-0000001" ], "Time": 1600099969, "Type": 2, @@ -71,9 +87,11 @@ "type", "counter", "project", + "project_name", "unit", "resource", - "vm_instance" + "vm_instance", + "resource_name" ], "LabelVals": [ "booting", @@ -81,9 +99,11 @@ "instance", "compute.instance.booting.time", "e56191ef77744c599dbcecae6af176bb", + "test_project", "sec", "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", - "compute-0.redhat.local" + "compute-0.redhat.local", + "new-instance:instance-0000001" ], "Time": 1600099969, "Type": 2, @@ -98,9 +118,13 @@ "type", "counter", "project", + "project_name", + "user", + "user_name", "unit", "resource", - "vm_instance" + "vm_instance", + "resource_name" ], "LabelVals": [ "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", @@ -108,9 +132,13 @@ "vcpus", "vcpus", "e56191ef77744c599dbcecae6af176bb", + "test_project", + "3ee72fcd74aa4439bb07fa69f1bc7169", + "test_user", "vcpu", "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", - "compute-0.redhat.local" + "compute-0.redhat.local", + "new-instance" ], "Time": 1600099969, "Type": 2, @@ -125,9 +153,13 @@ "type", "counter", "project", + "project_name", + "user", + "user_name", "unit", "resource", - "vm_instance" + "vm_instance", + "resource_name" ], "LabelVals": [ "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", @@ -135,9 +167,13 @@ "memory", "memory", "e56191ef77744c599dbcecae6af176bb", + "test_project", + "3ee72fcd74aa4439bb07fa69f1bc7169", + "test_user", "MB", "d8bd99b6-6fd8-4c02-a2e3-efbf596df636", - "compute-0.redhat.local" + "compute-0.redhat.local", + "instance-0000001" ], "Time": 1600099969, "Type": 2, diff --git a/plugins/handler/ceilometer-metrics/messages/msgpack-test.msgpack b/plugins/handler/ceilometer-metrics/messages/msgpack-test.msgpack new file mode 100644 index 00000000..a6fe1db4 Binary files /dev/null and b/plugins/handler/ceilometer-metrics/messages/msgpack-test.msgpack differ diff --git a/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go b/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go index 037981cf..01a1e47d 100644 --- a/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go +++ b/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go @@ -6,6 +6,7 @@ import ( "strings" jsoniter "github.com/json-iterator/go" + "github.com/vmihailenco/msgpack/v5" ) var ( @@ -16,26 +17,31 @@ var ( // Metedata represents metadataof a metric from ceilometer type Metadata struct { - Host string + Host string `json:"host" msgpack:"host"` + Name string `json:"name" msgpack:"name"` + DisplayName string `json:"display_name" msgpack:"display_name"` + InstanceHost string `json:"instance_host" msgpack:"instance_host"` } // Metric represents a single metric from ceilometer for unmarshalling type Metric struct { - Source string - CounterName string `json:"counter_name"` - CounterType string `json:"counter_type"` - CounterUnit string `json:"counter_unit"` - CounterVolume float64 `json:"counter_volume"` - UserID string `json:"user_id"` - ProjectID string `json:"project_id"` - ResourceID string `json:"resource_id"` - Timestamp string - ResourceMetadata Metadata `json:"resource_metadata"` + Source string `json:"source" msgpack:"source"` + CounterName string `json:"counter_name" msgpack:"counter_name"` + CounterType string `json:"counter_type" msgpack:"counter_type"` + CounterUnit string `json:"counter_unit" msgpack:"counter_unit"` + CounterVolume float64 `json:"counter_volume" msgpack:"counter_volume"` + UserID string `json:"user_id" msgpack:"user_id"` + UserName string `json:"user_name" msgpack:"user_name"` + ProjectID string `json:"project_id" msgpack:"project_id"` + ProjectName string `json:"project_name" msgpack:"project_name"` + ResourceID string `json:"resource_id" msgpack:"resource_id"` + Timestamp string `json:"timestamp" msgpack:"timestamp"` + ResourceMetadata Metadata `json:"resource_metadata" msgpack:"resource_metadata"` } // Message struct represents an incoming ceilometer metrics message type Message struct { - Publisher string `json:"publisher_id"` + Publisher string `json:"publisher_id" msgpack:"publisher_id"` Payload []Metric `json:"payload"` } @@ -73,6 +79,22 @@ func (c *Ceilometer) ParseInputJSON(blob []byte) (*Message, error) { return msg, nil } +// ParseInputMsgPack parse blob into list of metrics +func (c *Ceilometer) ParseInputMsgPack(blob []byte) (*Message, error) { + msg := &Message{} + metric := Metric{} + err := msgpack.Unmarshal(blob, &metric) + if err != nil { + return nil, err + } + err = msgpack.Unmarshal(blob, msg) + if err != nil { + return nil, err + } + msg.Payload = append(msg.Payload, metric) + return msg, nil +} + // sanitize remove extraneous characters func (c *Ceilometer) sanitize() string { sanitized := rexForNestedQuote.ReplaceAllString(c.schema.Request.OsloMessage, `"`) diff --git a/plugins/transport/socket/main.go b/plugins/transport/socket/main.go index 9b07f734..bfac8fa7 100644 --- a/plugins/transport/socket/main.go +++ b/plugins/transport/socket/main.go @@ -4,10 +4,13 @@ import ( "bufio" "bytes" "context" + "encoding/binary" "fmt" + "io" "net" "os" "strings" + "sync" "time" "github.com/infrawatch/apputils/logging" @@ -20,6 +23,8 @@ const ( maxBufferSize = 65535 udp = "udp" unix = "unix" + tcp = "tcp" + msgLengthSize = 8 ) var ( @@ -72,6 +77,7 @@ type Socket struct { logger *logWrapper dumpBuf *bufio.Writer dumpFile *os.File + mutex sync.Mutex } func (s *Socket) initUnixSocket() *net.UnixConn { @@ -115,50 +121,144 @@ func (s *Socket) initUDPSocket() *net.UDPConn { return pc } -// Run implements type Transport -func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) { - var pc net.Conn - if s.conf.Type == udp { - pc = s.initUDPSocket() - } else { - pc = s.initUnixSocket() +func (s *Socket) initTCPSocket() *net.TCPListener { + addr, err := net.ResolveTCPAddr(tcp, s.conf.Socketaddr) + if err != nil { + s.logger.Errorf(err, "failed to resolve tcp address: %s", s.conf.Socketaddr) + return nil } - if pc == nil { - s.logger.Errorf(nil, "Failed to initialize socket transport plugin") + pc, err := net.ListenTCP(tcp, addr) + if err != nil { + s.logger.Errorf(err, "failed to bind tcp socket to addr: %s", s.conf.Socketaddr) + return nil } - go func(maxBuffSize int64) { - msgBuffer := make([]byte, maxBuffSize) - for { - n, err := pc.Read(msgBuffer) - if err != nil || n < 1 { - if err != nil { - s.logger.Errorf(err, "reading from socket failed") - } + + s.logger.Infof("socket listening on %s", s.conf.Socketaddr) + + return pc +} + +func (s *Socket) WriteTCPMsg(w transport.WriteFn, msgBuffer []byte, n int) (int64, error) { + var pos int64 + var length int64 + reader := bytes.NewReader(msgBuffer[:n]) + for pos+msgLengthSize < int64(n) { + _, err := reader.Seek(pos, io.SeekStart) + if err != nil { + return pos, err + } + err = binary.Read(reader, binary.LittleEndian, &length) + if err != nil { + return pos, err + } + + if pos+msgLengthSize+length > int64(n) || + pos+msgLengthSize+length < 0 { + break + } + s.mutex.Lock() + w(msgBuffer[pos+msgLengthSize : pos+msgLengthSize+length]) + msgCount++ + s.mutex.Unlock() + pos += msgLengthSize + length + } + return pos, nil +} + +func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w transport.WriteFn) { + defer pc.Close() + msgBuffer := make([]byte, maxBuffSize) + var remainingMsg []byte + for { + n, err := pc.Read(msgBuffer) + if err != nil || n < 1 { + if err != nil { + s.logger.Errorf(err, "reading from socket failed") + } + if s.conf.Type != tcp { done <- true - return } + return + } + msgBuffer = append(remainingMsg, msgBuffer...) - // whole buffer was used, so we are potentially handling larger message - if n == len(msgBuffer) { - s.logger.Warnf("full read buffer used") - } + // whole buffer was used, so we are potentially handling larger message + if n == len(msgBuffer) { + s.logger.Warnf("full read buffer used") + } - if s.conf.DumpMessages.Enabled { - _, err := s.dumpBuf.Write(msgBuffer[:n]) - if err != nil { - s.logger.Errorf(err, "writing to dump buffer") - } - _, err = s.dumpBuf.WriteString("\n") - if err != nil { - s.logger.Errorf(err, "writing to dump buffer") - } - s.dumpBuf.Flush() + n += len(remainingMsg) + + if s.conf.DumpMessages.Enabled { + _, err := s.dumpBuf.Write(msgBuffer[:n]) + if err != nil { + s.logger.Errorf(err, "writing to dump buffer") + } + _, err = s.dumpBuf.WriteString("\n") + if err != nil { + s.logger.Errorf(err, "writing to dump buffer") } + s.dumpBuf.Flush() + } + if s.conf.Type == tcp { + parsed, err := s.WriteTCPMsg(w, msgBuffer, n) + if err != nil { + s.logger.Errorf(err, "error, while parsing messages") + return + } + remainingMsg = make([]byte, int64(n)-parsed) + copy(remainingMsg, msgBuffer[parsed:n]) + } else { w(msgBuffer[:n]) msgCount++ } - }(maxBufferSize) + } +} + +// Run implements type Transport +func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) { + var pc net.Conn + switch s.conf.Type { + case udp: + pc = s.initUDPSocket() + if pc == (*net.UDPConn)(nil) { + s.logger.Errorf(nil, "Failed to initialize socket transport plugin with type: "+s.conf.Type) + return + } + go s.ReceiveData(maxBufferSize, done, pc, w) + + case tcp: + TCPSocket := s.initTCPSocket() + if TCPSocket == nil { + s.logger.Errorf(nil, "Failed to initialize socket transport plugin with type: "+s.conf.Type) + return + } + go func() { + for { + pc, err := TCPSocket.AcceptTCP() + if err != nil { + select { + case <-ctx.Done(): + break + default: + s.logger.Errorf(err, "failed to accept TCP connection") + continue + } + } + go s.ReceiveData(maxBufferSize, done, pc, w) + } + }() + case unix: + fallthrough + default: + pc = s.initUnixSocket() + if pc == (*net.UnixConn)(nil) { + s.logger.Errorf(nil, "Failed to initialize socket transport plugin with type: "+s.conf.Type) + return + } + go s.ReceiveData(maxBufferSize, done, pc, w) + } for { select { @@ -170,7 +270,6 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) { } } Done: - pc.Close() if s.conf.Type == unix { os.Remove(s.conf.Path) } @@ -210,8 +309,8 @@ func (s *Socket) Config(c []byte) error { } s.conf.Type = strings.ToLower(s.conf.Type) - if s.conf.Type != unix && s.conf.Type != udp { - return fmt.Errorf("unable to determine socket type from configuration file. Should be either \"unix\" or \"udp\", received: %s", + if s.conf.Type != unix && s.conf.Type != udp && s.conf.Type != tcp { + return fmt.Errorf("unable to determine socket type from configuration file. Should be one of \"unix\", \"udp\" or \"tcp\", received: %s", s.conf.Type) } @@ -219,8 +318,8 @@ func (s *Socket) Config(c []byte) error { return fmt.Errorf("the path configuration option is required when using unix socket type") } - if s.conf.Type == udp && s.conf.Socketaddr == "" { - return fmt.Errorf("the socketaddr configuration option is required when using udp socket type") + if (s.conf.Type == udp || s.conf.Type == tcp) && s.conf.Socketaddr == "" { + return fmt.Errorf("the socketaddr configuration option is required when using udp or tcp socket type") } return nil diff --git a/plugins/transport/socket/main_test.go b/plugins/transport/socket/main_test.go index 48b58776..d734248c 100644 --- a/plugins/transport/socket/main_test.go +++ b/plugins/transport/socket/main_test.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "context" + "encoding/binary" "net" "os" "path" @@ -15,6 +17,7 @@ import ( ) const regularBuffSize = 16384 +const addition = "wubba lubba dub dub" func TestUnixSocketTransport(t *testing.T) { tmpdir, err := os.MkdirTemp(".", "socket_test_tmp") @@ -41,7 +44,6 @@ func TestUnixSocketTransport(t *testing.T) { t.Run("test large message transport", func(t *testing.T) { msg := make([]byte, regularBuffSize) - addition := "wubba lubba dub dub" for i := 0; i < regularBuffSize; i++ { msg[i] = byte('X') } @@ -102,7 +104,6 @@ func TestUdpSocketTransport(t *testing.T) { t.Run("test large message transport", func(t *testing.T) { msg := make([]byte, regularBuffSize) - addition := "wubba lubba dub dub" for i := 0; i < regularBuffSize; i++ { msg[i] = byte('X') } @@ -133,3 +134,113 @@ func TestUdpSocketTransport(t *testing.T) { wskt.Close() }) } + +func TestTcpSocketTransport(t *testing.T) { + tmpdir, err := os.MkdirTemp(".", "socket_test_tmp") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + logpath := path.Join(tmpdir, "test.log") + logger, err := logging.NewLogger(logging.DEBUG, logpath) + require.NoError(t, err) + + trans := Socket{ + conf: configT{ + Socketaddr: "127.0.0.1:8642", + Type: "tcp", + }, + logger: &logWrapper{ + l: logger, + }, + } + + t.Run("test large message transport single connection", func(t *testing.T) { + msg := make([]byte, regularBuffSize) + for i := 0; i < regularBuffSize; i++ { + msg[i] = byte('X') + } + msg[regularBuffSize-1] = byte('$') + msg = append(msg, []byte(addition)...) + msgLength := new(bytes.Buffer) + err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msg))) + require.NoError(t, err) + msg = append(msgLength.Bytes(), msg...) + + // verify transport + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + go trans.Run(ctx, func(mess []byte) { + wg.Add(1) + strmsg := string(mess) + assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message + assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct + wg.Done() + }, make(chan bool)) + + // write to socket + wskt, err := net.Dial("tcp", "127.0.0.1:8642") + if err != nil { + // The socket might not be listening yet, wait a little bit and try to connect again + for retries := 0; err != nil && retries < 3; retries++ { + time.Sleep(2 * time.Second) + wskt, err = net.Dial("tcp", "127.0.0.1:8642") + } + } + require.NoError(t, err) + _, err = wskt.Write(msg) + require.NoError(t, err) + + cancel() + wg.Wait() + wskt.Close() + }) + + t.Run("test large message transport multiple connections", func(t *testing.T) { + msg := make([]byte, regularBuffSize) + for i := 0; i < regularBuffSize; i++ { + msg[i] = byte('X') + } + msg[regularBuffSize-1] = byte('$') + msg = append(msg, []byte(addition)...) + msgLength := new(bytes.Buffer) + err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msg))) + require.NoError(t, err) + msg = append(msgLength.Bytes(), msg...) + + // verify transport + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + go trans.Run(ctx, func(mess []byte) { + wg.Add(1) + strmsg := string(mess) + assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message + assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct + wg.Done() + }, make(chan bool)) + + // write to socket + wskt1, err := net.Dial("tcp", "127.0.0.1:8642") + if err != nil { + // The socket might not be listening yet, wait a little bit and try to connect again + for retries := 0; err != nil && retries < 3; retries++ { + time.Sleep(2 * time.Second) + wskt1, err = net.Dial("tcp", "127.0.0.1:8642") + } + } + require.NoError(t, err) + + // We shouldn't need to retry the second connection, if this fails, then something is wrong + wskt2, err := net.Dial("tcp", "127.0.0.1:8642") + require.NoError(t, err) + + _, err = wskt1.Write(msg) + require.NoError(t, err) + _, err = wskt2.Write(msg) + require.NoError(t, err) + + cancel() + wg.Wait() + wskt1.Close() + wskt2.Close() + }) +}