Skip to content

Commit

Permalink
fix agi log preprocess-special tab handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Glonek committed Sep 13, 2024
1 parent c453d52 commit fbae7af
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/7.6.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ _Release Date: Month Day, Year_
* FIX: Ubuntu in AWS/GCP: Disable unattended upgrades as they interfere with all automated processes.
* FIX: Multiple small WebUI fixes.
* FIX: AGI: handle files smaller than 4KiB.
* FIX: AGI: PreProcess tab-separated files - handle error cases where some lines are not aerospike.
* FIX: Docker: Implement rate-limiting to allow for large container counts.
* FIX: Docker: Error handling in inventory listing.
* FIX: Inventory: sorting node numbers numerically.
Expand Down
10 changes: 9 additions & 1 deletion src/ingest/preprocess-special.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (i *Ingest) preProcessSpecial(fn string, mimeType *mimetype.MIME) (fnlist [
logger.Detail("PreProcess-Special: %s is tab-4 format but found line without 4 tabs: %s", fn, line)
continue
}
if line[1] == "pod_name" && strings.TrimSuffix(line[3], "\n") == "text_payload" { // timestamp pod_name text_payload
logger.Detail("PreProcess-Special: %s tab-4 header found, ignoring", fn)
continue
}
ident := strings.Trim(line[2], "\r\n\t ") + "-" + strings.Trim(line[1], "\r\t\n ")
logline := strings.Trim(line[3], "\r\t\n ") + "\n"
if _, ok := tracker[fn+"_special-split_"+ident]; !ok {
Expand Down Expand Up @@ -79,7 +83,11 @@ func (i *Ingest) preProcessSpecial(fn string, mimeType *mimetype.MIME) (fnlist [
}
line := strings.Split(s.Text(), "\t")
if len(line) < 3 {
logger.Detail("PreProcess-Special: %s is tab-3 format but found line without 4 tabs: %s", fn, line)
logger.Detail("PreProcess-Special: %s is tab-3 format but found line without 3 tabs: %s", fn, line)
continue
}
if line[1] == "pod_name" && strings.TrimSuffix(line[2], "\n") == "text_payload" { // timestamp pod_name text_payload
logger.Detail("PreProcess-Special: %s tab-3 header found, ignoring", fn)
continue
}
ident := strings.Trim(line[1], "\r\n\t ")
Expand Down
71 changes: 41 additions & 30 deletions src/ingest/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,40 +146,51 @@ func (i *Ingest) preProcessTextFile(fn string, files map[string]*EnumFile) error
fnlist = []string{fn}
}
outpaths := []string{}
var errors error
for _, fna := range fnlist {
clusterName, nodeId, err := i.preProcessGetClusterNode(fna)
if err != nil {
return err
}
var prefix, suffix int
i.progress.Lock()
if _, ok := i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId]; !ok {
i.progress.PreProcessor.LastUsedPrefix++
prefix = i.progress.PreProcessor.LastUsedPrefix
i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId] = prefix
suffix = 1
i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix] = suffix
} else {
prefix = i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId]
i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix]++
suffix = i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix]
}
outpath := path.Join(i.config.Directories.Logs, clusterName, strconv.Itoa(prefix)+"_"+nodeId+"_"+strconv.Itoa(suffix))
outpaths = append(outpaths, outpath)
files[fn].PreProcessOutPaths = outpaths
i.progress.PreProcessor.Files[fn] = files[fn]
i.progress.PreProcessor.changed = true
i.progress.Unlock()
err = os.MkdirAll(path.Join(i.config.Directories.Logs, clusterName), 0755)
if err != nil {
return fmt.Errorf("failed to create %s: %s", path.Join(i.config.Directories.Logs, clusterName), err)
}
err = os.Rename(fna, outpath)
err = func(fna string) error {
clusterName, nodeId, err := i.preProcessGetClusterNode(fna)
if err != nil {
return err
}
var prefix, suffix int
i.progress.Lock()
if _, ok := i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId]; !ok {
i.progress.PreProcessor.LastUsedPrefix++
prefix = i.progress.PreProcessor.LastUsedPrefix
i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId] = prefix
suffix = 1
i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix] = suffix
} else {
prefix = i.progress.PreProcessor.NodeToPrefix[clusterName+"_"+nodeId]
i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix]++
suffix = i.progress.PreProcessor.LastUsedSuffixForPrefix[prefix]
}
outpath := path.Join(i.config.Directories.Logs, clusterName, strconv.Itoa(prefix)+"_"+nodeId+"_"+strconv.Itoa(suffix))
outpaths = append(outpaths, outpath)
files[fn].PreProcessOutPaths = outpaths
i.progress.PreProcessor.Files[fn] = files[fn]
i.progress.PreProcessor.changed = true
i.progress.Unlock()
err = os.MkdirAll(path.Join(i.config.Directories.Logs, clusterName), 0755)
if err != nil {
return fmt.Errorf("failed to create %s: %s", path.Join(i.config.Directories.Logs, clusterName), err)
}
err = os.Rename(fna, outpath)
if err != nil {
return err
}
return nil
}(fna)
if err != nil {
return err
if errors == nil {
errors = err
} else {
errors = fmt.Errorf("%s; %s", errors, err)
}
}
}
return nil
return errors
}

var errPreProcessNotSpecial = errors.New("STANDARD-LOG")
Expand Down

0 comments on commit fbae7af

Please sign in to comment.