Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse job events to extract failure messages #310

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/crossplane/crossplane-runtime v0.19.2
github.com/crossplane/crossplane-tools v0.0.0-20220310165030-1f43fc12793e
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/spf13/afero v1.9.5
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
126 changes: 115 additions & 11 deletions internal/ansible/ansible.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os/user"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/apenella/go-ansible/pkg/stdoutcallback/results"
Expand All @@ -36,7 +37,9 @@ import (
"github.com/crossplane-contrib/provider-ansible/pkg/runnerutil"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
Expand All @@ -53,6 +56,9 @@ const (
errMkdir = "cannot make directory"
)

// using a variable for uuid generator allows for stubbing in tests
var generateUUID = uuid.New

const (
// AnnotationKeyPolicyRun is the name of an annotation which instructs
// the provider how to run the corresponding Ansible contents
Expand Down Expand Up @@ -133,10 +139,10 @@ func withBehaviorVars(behaviorVars map[string]string) runnerOption {
}
}

// withAnsibleEnvDir set the runner env/extravars dir.
func withAnsibleEnvDir(dir string) runnerOption {
// withWorkDir set the runner working dir.
func withWorkDir(dir string) runnerOption {
return func(r *Runner) {
r.AnsibleEnvDir = dir
r.workDir = dir
}
}

Expand Down Expand Up @@ -312,22 +318,24 @@ func (p Parameters) Init(ctx context.Context, cr *v1alpha1.AnsibleRun, behaviorV
return nil, err
}

return new(withPath(path),
r := new(withPath(path),
withCmdFunc(cmdFunc),
withBehaviorVars(behaviorVars),
withAnsibleRunPolicy(rPolicy),
// TODO should be moved to connect() func
withAnsibleEnvDir(ansibleEnvDir),
withWorkDir(p.WorkingDirPath),
withArtifactsHistoryLimit(p.ArtifactsHistoryLimit),
), nil
)

return r, nil
}

// Runner struct holds the configuration to run the cmdFunc
type Runner struct {
Path string // absolute path on disk to a playbook or role depending on what cmdFunc expects
behaviorVars map[string]string
cmdFunc cmdFuncType // returns a Cmd that runs ansible-runner
AnsibleEnvDir string
workDir string
checkMode bool
AnsibleRunPolicy *RunPolicy
artifactsHistoryLimit int
Expand All @@ -350,15 +358,23 @@ func (r *Runner) GetAnsibleRunPolicy() *RunPolicy {
return r.AnsibleRunPolicy
}

func (r *Runner) ansibleEnvDir() string {
return filepath.Clean(filepath.Join(r.workDir, "env"))
}

// Run execute the appropriate cmdFunc
func (r *Runner) Run() (*exec.Cmd, io.Reader, error) {
func (r *Runner) Run(ctx context.Context) (io.Reader, error) {
var (
stdoutBuf bytes.Buffer
stdoutWriter, stderrWriter io.Writer
)

dc := r.cmdFunc(r.behaviorVars, r.checkMode)
dc.Args = append(dc.Args, "--rotate-artifacts", strconv.Itoa(r.artifactsHistoryLimit))

id := generateUUID().String()
dc.Args = append(dc.Args, "--ident", id)

if !r.checkMode {
// for disabled checkMode dc.Stdout and dc.Stderr are respectfully
// written to os.Stdout and os.Stdout for debugging purpose
Expand All @@ -383,10 +399,98 @@ func (r *Runner) Run() (*exec.Cmd, io.Reader, error) {

err := dc.Start()
if err != nil {
return nil, nil, err
return nil, err
}

if err := dc.Wait(); err != nil {
jobEventsDir := filepath.Clean(filepath.Join(r.workDir, "artifacts", id, "job_events"))
failureReason, reasonErr := extractFailureReason(ctx, jobEventsDir)
if reasonErr != nil {
log.FromContext(ctx).V(1).Info("extracting ansible failure message", "err", reasonErr)
return nil, err
}

return nil, fmt.Errorf("%w: %s", err, failureReason)
}

return &stdoutBuf, nil
}

func extractFailureReason(ctx context.Context, eventsDir string) (string, error) {
evts, err := parseEvents(ctx, eventsDir)
if err != nil {
return "", fmt.Errorf("parsing job events: %w", err)
}

var msgs []string
for _, evt := range evts {
switch evt.Event {
case eventTypeRunnerFailed:
m, err := runnerEventMessage(evt, "Failed")
if err != nil {
return "", err
}
msgs = append(msgs, m)
case eventTypeRunnerUnreachable:
m, err := runnerEventMessage(evt, "Unreachable")
if err != nil {
return "", err
}
msgs = append(msgs, m)
default:
}
}

return strings.Join(msgs, "; "), nil
}

func parseEvents(ctx context.Context, dir string) ([]jobEvent, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("reading job events directory %q: %w", dir, err)
}

evts := make([]jobEvent, 0)
for _, file := range files {
evtBytes, err := os.ReadFile(filepath.Clean(filepath.Join(dir, file.Name())))
if err != nil {
log.FromContext(ctx).V(1).Info("reading job event file", "filename", file.Name(), "err", err)
continue
}

var evt jobEvent
if err := json.Unmarshal(evtBytes, &evt); err != nil {
log.FromContext(ctx).V(1).Info("unmarshaling job event from file", "filename", file.Name(), "err", err)
continue
}
evts = append(evts, evt)
}

return evts, nil
}

func reunmarshal(data map[string]any, result any) error {
b, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshaling: %w", err)
}

return dc, &stdoutBuf, nil
return json.Unmarshal(b, result)
}

func runnerEventMessage(evt jobEvent, reason string) (string, error) {
var evtData runnerEventData
if err := reunmarshal(evt.EventData, &evtData); err != nil {
return "", fmt.Errorf("unmarshaling job event %s as runner event: %w", evt.UUID, err)
}

return fmt.Sprintf("%s on play %q, task %q, host %q: %s",
reason,
evtData.Play,
evtData.Task,
evtData.Host,
evtData.Result.Msg), nil

}

// selectRolePath will determines the role path
Expand Down Expand Up @@ -435,7 +539,7 @@ func addFile(path string, content []byte) error {
// WriteExtraVar write extra var to env/extravars under working directory
// it creates a non-existent env/extravars file
func (r *Runner) WriteExtraVar(extraVar map[string]interface{}) error {
extraVarsPath := filepath.Join(r.AnsibleEnvDir, "extravars")
extraVarsPath := filepath.Join(r.ansibleEnvDir(), "extravars")
contentVars := make(map[string]interface{})
data, err := os.ReadFile(filepath.Clean(extraVarsPath))
if err != nil {
Expand Down
103 changes: 89 additions & 14 deletions internal/ansible/ansible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ansible

import (
"context"
"fmt"
"io"
"os"
"os/exec"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/crossplane-contrib/provider-ansible/apis/v1alpha1"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"gotest.tools/v3/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -155,7 +157,7 @@ func TestInit(t *testing.T) {
expectedRunner := &Runner{
Path: dir,
cmdFunc: params.playbookCmdFunc(context.Background(), "playbook.yml", dir),
AnsibleEnvDir: filepath.Join(dir, "env"),
workDir: dir,
AnsibleRunPolicy: &RunPolicy{"ObserveAndDelete"},
artifactsHistoryLimit: 3,
}
Expand All @@ -175,6 +177,9 @@ func TestInit(t *testing.T) {
if runner.checkMode != expectedRunner.checkMode {
t.Errorf("Unexpected Runner.checkMode %v expected %v", runner.checkMode, expectedRunner.checkMode)
}
if runner.workDir != expectedRunner.workDir {
t.Errorf("Unexpected Runner.workDir %v expected %v", runner.workDir, expectedRunner.workDir)
}

expectedCmd := expectedRunner.cmdFunc(nil, false)
cmd := runner.cmdFunc(nil, false)
Expand All @@ -189,16 +194,18 @@ func TestRun(t *testing.T) {
runner := &Runner{
Path: dir,
cmdFunc: func(_ map[string]string, _ bool) *exec.Cmd {
// echo works well for testing cause it will just print all the args and flags it doesn't recognize and return success
// echo works well for testing cause it will just print all the args and flags it doesn't recognize and return success,
// therefore checking its output also checks the args passed to it are correct
return exec.CommandContext(context.Background(), "echo")
},
AnsibleEnvDir: filepath.Join(dir, "env"),
AnsibleRunPolicy: &RunPolicy{"ObserveAndDelete"},
artifactsHistoryLimit: 3,
}

expectedArgs := []string{"--rotate-artifacts", "3"}
expectedCmd := exec.Command(runner.cmdFunc(nil, false).Path, expectedArgs...)
expectedID := "217b3830-68fa-461b-90d1-1fb87c685010"
expectedArgs := []string{"--rotate-artifacts", "3", "--ident", expectedID}

generateUUID = func() uuid.UUID { return uuid.MustParse(expectedID) }

testCases := map[string]struct {
checkMode bool
Expand All @@ -216,19 +223,11 @@ func TestRun(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
runner.checkMode = tc.checkMode
cmd, outBuf, err := runner.Run()
outBuf, err := runner.Run(context.Background())
if err != nil {
t.Fatalf("Unexpected Run() error: %v", err)
}

if cmd.String() != expectedCmd.String() {
t.Errorf("Unexpected command %q expected %q", expectedCmd.String(), cmd.String())
}

if err := cmd.Wait(); err != nil {
t.Fatalf("Unexpected cmd.Wait() error: %v", err)
}

out, err := io.ReadAll(outBuf)
if err != nil {
t.Fatalf("Unexpected error reading command buffer: %v", err)
Expand All @@ -240,3 +239,79 @@ func TestRun(t *testing.T) {
})
}
}

func TestExtractFailureReason(t *testing.T) {
playbookStartEvt := `
{
"uuid": "63a52ed5-a403-4512-a430-c95f62fa3424",
"event": "playbook_on_start",
"event_data": {
"playbook": "playbook.yml"
}
}
`

runnerFailedEvt := `
{
"uuid": "7097758b-1109-4fd9-af59-f545633794dd",
"event": "runner_on_failed",
"event_data": {
"play": "test",
"task": "file",
"host": "testhost",
"res": {"msg": "fake error"}
}
}
`

runnerUnreachableEvt := `
{
"uuid": "ded6289b-e557-48c1-88e1-88eb630aec21",
"event": "runner_on_unreachable",
"event_data": {
"play": "test",
"task": "Gathering Facts",
"host": "testhost",
"res": {"msg": "Failed to connect to the host via ssh"}
}
}
`

cases := map[string]struct {
events []string
expectedReason string
}{
"NoEvents": {},
"NoFailedEvents": {
events: []string{playbookStartEvt},
},
"FailedEvent": {
events: []string{playbookStartEvt, runnerFailedEvt},
expectedReason: `Failed on play "test", task "file", host "testhost": fake error`,
},
"UnreachableEvent": {
events: []string{playbookStartEvt, runnerUnreachableEvt},
expectedReason: `Unreachable on play "test", task "Gathering Facts", host "testhost": Failed to connect to the host via ssh`,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
dir := t.TempDir()
for i, evt := range tc.events {
if err := os.WriteFile(filepath.Join(dir, fmt.Sprintf("%d.json", i)), []byte(evt), 0600); err != nil {
t.Fatalf("Writing test event to file: %v", err)
}
}

reason, err := extractFailureReason(context.Background(), dir)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if reason != tc.expectedReason {
t.Errorf("Unexpected reason %v, expected %v", reason, tc.expectedReason)
}
})
}
}
Loading
Loading