Skip to content

Commit

Permalink
Merge branch 'ansible:devel' into podlogs-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronH88 authored Oct 4, 2024
2 parents b933886 + 1be2c55 commit c8855a5
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ linters:
- gofumpt
- goheader
- goimports
- gomoddirectives
- gomodguard
- gosec
- gosimple
Expand Down Expand Up @@ -101,6 +100,7 @@ linters-settings:
- "k8s.io/client-go/tools/remotecommand"
- "github.com/quic-go/quic-go"
- "github.com/quic-go/quic-go/logging"
- "github.com/AaronH88/quic-go"

issues:
# Dont commit the following line.
Expand Down
21 changes: 21 additions & 0 deletions docs/source/user_guide/configuration_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,27 @@ Log level
- Error
- string

Add payload tracing using `RECEPTOR_PAYLOAD_TRACE_LEVEL=int` envorment variable and using log level debug.

.. list-table:: RECEPTOR_PAYLOAD_TRACE_LEVEL options
:header-rows: 1
:widths: auto

* - Tracing level
- Description
* - 0
- No payload tracing log
* - 1
- Log connection type
* - 2
- Log connection type and work unit id
* - 3
- Log connection type, work unit id and payload

**Warning: Payload Tracing May Expose Sensitive Data**

Please be aware that using payload tracing can potentially reveal sensitive information. This includes, but is not limited to, personal data, authentication tokens, and system configurations. Ensure that you only use tracing tools in a secure environment and avoid sharing trace output with unauthorized users. Always follow your organization's data protection policies when handling sensitive information. Proceed with caution!

.. code-block:: yaml
log-level:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/quic-go/quic-go v0.40.1 => github.com/AaronH88/quic-go v0.0.0-20240925173611-8b838692e0f5 //nolint:gomoddirectives
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4=
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AaronH88/quic-go v0.0.0-20240925173611-8b838692e0f5 h1:GegwLC9LhzGpsU7TpPHxAoYfsOh7iwYhCHiVOjPQ7xY=
github.com/AaronH88/quic-go v0.0.0-20240925173611-8b838692e0f5/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -176,8 +178,6 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7q
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs=
github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/quic-go/quic-go v0.40.1 h1:X3AGzUNFs0jVuO3esAGnTfvdgvL4fq655WaOi1snv1Q=
github.com/quic-go/quic-go v0.40.1/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
36 changes: 34 additions & 2 deletions pkg/controlsvc/controlsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package controlsvc

import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -14,6 +15,7 @@ import (
"os"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -122,8 +124,38 @@ func (s *SockControl) ReadFromConn(message string, out io.Writer, io Copier) err
if err := s.WriteMessage(message); err != nil {
return err
}
if _, err := io.Copy(out, s.conn); err != nil {
return err
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))

if payloadDebug != 0 {
var connectionType string
var payload string
if s.conn.LocalAddr().Network() == "unix" {
connectionType = "unix socket"
} else {
connectionType = "network connection"
}
reader := bufio.NewReader(s.conn)

for {
response, err := reader.ReadString('\n')
if err != nil {
if err.Error() != "EOF" {
MainInstance.nc.GetLogger().Error("Error reading from conn: %v \n", err)
}

break
}
payload += response
}

MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, "", connectionType)
if _, err := out.Write([]byte(payload)); err != nil {
return err
}
} else {
if _, err := io.Copy(out, s.conn); err != nil {
return err
}
}

return nil
Expand Down
27 changes: 27 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ func (rl *ReceptorLogger) Debug(format string, v ...interface{}) {
rl.Log(DebugLevel, format, v...)
}

// Debug payload data.
func (rl *ReceptorLogger) DebugPayload(payloadDebug int, payload string, workUnitID string, connectionType string) {
var payloadMessage string
var workunitIDMessage string
var connectionTypeMessage string
switch payloadDebug {
case 3:
payloadMessage = fmt.Sprintf(" with a payload of: %s", payload)

fallthrough
case 2:
if workUnitID != "" {
workunitIDMessage = fmt.Sprintf(" with work unit %s", workUnitID)
} else {
workunitIDMessage = ", work unit not created yet"
}

fallthrough
case 1:
if connectionType != "" {
connectionTypeMessage = fmt.Sprintf("Reading from %s", connectionType)
}
default:
}
rl.Debug(fmt.Sprintf("PACKET TRACING ENABLED: %s%s%s", connectionTypeMessage, workunitIDMessage, payloadMessage)) //nolint:govet
}

// SanitizedDebug contains extra information helpful to developers.
func (rl *ReceptorLogger) SanitizedDebug(format string, v ...interface{}) {
rl.SanitizedLog(DebugLevel, format, v...)
Expand Down
52 changes: 52 additions & 0 deletions pkg/logger/logger_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package logger_test

import (
"bytes"
"fmt"
"os"
"testing"

"github.com/ansible/receptor/pkg/logger"
Expand Down Expand Up @@ -68,3 +70,53 @@ func TestLogLevelToNameWithError(t *testing.T) {
t.Error("should have error")
}
}

func TestDebugPayload(t *testing.T) {
logFilePath := "/tmp/test-output"
logger.SetGlobalLogLevel(4)
receptorLogger := logger.NewReceptorLogger("testDebugPayload")
logFile, err := os.OpenFile(logFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o600)
if err != nil {
t.Error("error creating test-output file")
}

payload := "Testing debugPayload"
workUnitID := "1234"
connectionType := "unix socket"

debugPayloadTestCases := []struct {
name string
debugPayload int
payload string
workUnitID string
connectionType string
expectedLog string
}{
{name: "debugPayload no log", debugPayload: 0, payload: "", workUnitID: "", connectionType: "", expectedLog: ""},
{name: "debugPayload log level 1", debugPayload: 1, payload: "", workUnitID: "", connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v", connectionType)},
{name: "debugPayload log level 2 with workUnitID", debugPayload: 2, payload: "", workUnitID: workUnitID, connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v with work unit %v", connectionType, workUnitID)},
{name: "debugPayload log level 2 without workUnitID", debugPayload: 2, payload: "", workUnitID: "", connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v", connectionType)},
{name: "debugPayload log level 3 with workUnitID", debugPayload: 3, payload: payload, workUnitID: workUnitID, connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v with work unit %v with a payload of: %v", connectionType, workUnitID, payload)},
{name: "debugPayload log level 3 without workUnitID", debugPayload: 3, payload: payload, workUnitID: "", connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v, work unit not created yet with a payload of: %v", connectionType, payload)},
{name: "debugPayload log level 3 without workUnitID and payload is new line", debugPayload: 3, payload: "\n", workUnitID: "", connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v, work unit not created yet with a payload of: %v", connectionType, "\n")},
{name: "debugPayload log level 3 without workUnitID or payload", debugPayload: 3, payload: "", workUnitID: "", connectionType: connectionType, expectedLog: fmt.Sprintf("PACKET TRACING ENABLED: Reading from %v, work unit not created yet with a payload of: %v", connectionType, "")},
}

for _, testCase := range debugPayloadTestCases {
t.Run(testCase.name, func(t *testing.T) {
receptorLogger.SetOutput(logFile)
receptorLogger.DebugPayload(testCase.debugPayload, testCase.payload, testCase.workUnitID, testCase.connectionType)

testOutput, err := os.ReadFile(logFilePath)
if err != nil {
t.Error("error reading test-output file")
}
if !bytes.Contains(testOutput, []byte(testCase.expectedLog)) {
t.Errorf("failed to log correctly, expected: %v got %v", testCase.expectedLog, string(testOutput))
}
if err := os.Truncate(logFilePath, 0); err != nil {
t.Errorf("failed to truncate: %v", err)
}
})
}
}
8 changes: 7 additions & 1 deletion pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,13 @@ func (s *Netceptor) handleUnreachable(md *MessageData) error {
UnreachableMessage: unrMsg,
ReceivedFromNode: md.FromNode,
}
s.Logger.Warning("Received unreachable message from %s", md.FromNode)
s.Logger.Warning("Received unreachable message from %s (service %s) to %s (service %s): ttl %v, data %s",
md.FromNode,
md.FromService,
md.ToNode,
md.ToService,
md.HopsToLive,
unrMsg.Problem)

return s.unreachableBroker.Publish(unrData)
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package workceptor

import (
"bufio"
"context"
"flag"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"path"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -112,7 +115,38 @@ func commandRunner(command string, params string, unitdir string) error {
if err != nil {
return err
}
cmd.Stdin = stdin
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))

if payloadDebug != 0 {
splitUnitDir := strings.Split(unitdir, "/")
workUnitID := splitUnitDir[len(splitUnitDir)-1]
stdinStream, err := cmd.StdinPipe()
if err != nil {
return err
}
var payload string
reader := bufio.NewReader(stdin)
if err != nil {
return err
}

for {
response, err := reader.ReadString('\n')
if err != nil {
if err.Error() != "EOF" {
MainInstance.nc.GetLogger().Error("Error reading work unit %v stdin: %v\n", workUnitID, err)
}

break
}
payload += response
}

MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, workUnitID, "")
io.WriteString(stdinStream, payload)
} else {
cmd.Stdin = stdin
}
stdout, err := os.OpenFile(path.Join(unitdir, "stdout"), os.O_CREATE+os.O_WRONLY+os.O_SYNC, 0o600)
if err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/workceptor/stdio_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"os"
"path"
"strconv"
"strings"
"sync"
)

Expand Down Expand Up @@ -111,6 +113,7 @@ func (sw *STDoutWriter) SetWriter(writer FileWriteCloser) {
// STDinReader reads from a stdin file and provides a Done function.
type STDinReader struct {
reader FileReadCloser
workUnit string
lasterr error
doneChan chan struct{}
doneOnce sync.Once
Expand All @@ -120,6 +123,8 @@ var errFileSizeZero = errors.New("file is empty")

// NewStdinReader allocates a new stdinReader, which reads from a stdin file and provides a Done function.
func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {
splitUnitDir := strings.Split(unitdir, "/")
workUnitID := splitUnitDir[len(splitUnitDir)-1]
stdinpath := path.Join(unitdir, "stdin")
stat, err := fs.Stat(stdinpath)
if err != nil {
Expand All @@ -135,6 +140,7 @@ func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {

return &STDinReader{
reader: reader,
workUnit: workUnitID,
lasterr: nil,
doneChan: make(chan struct{}),
doneOnce: sync.Once{},
Expand All @@ -143,6 +149,23 @@ func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {

// Read reads data from the stdout file, implementing io.Reader.
func (sr *STDinReader) Read(p []byte) (n int, err error) {
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))

if payloadDebug != 0 {
isNotEmpty := func() bool {
for _, v := range p {
if v != 0 {
return true
}
}

return false
}()
if isNotEmpty {
payload := string(p)
MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, sr.workUnit, "kube api")
}
}
n, err = sr.reader.Read(p)
if err != nil {
sr.lasterr = err
Expand Down

0 comments on commit c8855a5

Please sign in to comment.