Skip to content

Commit

Permalink
Add fix option for Verify segment and add more logs (milvus-io#224)
Browse files Browse the repository at this point in the history
1. add more logs for showing errors and 
2. add a fix option to delete etcd meta
  • Loading branch information
xiaofan-luan authored Dec 1, 2023
2 parents 9c5511c + 84903ce commit fc46ccc
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 31 deletions.
33 changes: 33 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,39 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll
return err
}

func RemoveSegmentInsertLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error {
// delete binlog entries
BinLogPath := path.Join(basePath, "datacoord-meta", "insert_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID))
fmt.Println("remove", BinLogPath)
err := cli.Remove(ctx, BinLogPath)
if err != nil {
fmt.Printf("failed to delete insert binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func RemoveSegmentDeltaLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, logID int64) error {
// delete binlog entries
DeltaLogPath := path.Join(basePath, "datacoord-meta", "delta_log", fmt.Sprintf("%d/%d/%d/%d", collectionID, partitionID, segmentID, logID))
fmt.Println("remove", DeltaLogPath)
err := cli.Remove(ctx, DeltaLogPath)
if err != nil {
fmt.Printf("failed to delete delta binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func RemoveSegmentStatLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error {
// delete binlog entries
StatLogPath := path.Join(basePath, "datacoord-meta", "stats_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID))
fmt.Println("remove", StatLogPath)
err := cli.Remove(ctx, StatLogPath)
if err != nil {
fmt.Printf("failed to delete stat logs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
Expand Down
20 changes: 14 additions & 6 deletions states/kv/kv_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,33 @@ func (c *FileAuditKV) Save(ctx context.Context, key, value string) error {

func (c *FileAuditKV) Remove(ctx context.Context, key string) error {
fmt.Println("audit delete", key)
kv, err := c.cli.removeWithPrevKV(ctx, key)
val, err := c.cli.Load(ctx, key)
if err != nil {
return err
}

err = c.cli.Remove(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, 1)
c.writeLogKV(kv)
c.writeKeyValue(key, val)
return nil
}

func (c *FileAuditKV) RemoveWithPrefix(ctx context.Context, key string) error {
fmt.Println("audit delete with prefix", key)
kvs, err := c.cli.removeWithPrefixAndPrevKV(ctx, key)
val, err := c.cli.Load(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, int32(len(kvs)))
for _, kv := range kvs {
c.writeLogKV(kv)

err = c.cli.RemoveWithPrefix(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, 1)
c.writeKeyValue(key, val)
return nil
}

Expand Down
124 changes: 99 additions & 25 deletions states/verify_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package states
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/milvus-io/birdwatcher/models"
Expand All @@ -23,7 +24,7 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
fmt.Println(err.Error())
return
}
patch, err := cmd.Flags().GetBool("patch")
fix, err := cmd.Flags().GetBool("fix")
if err != nil {
fmt.Println(err.Error())
return
Expand Down Expand Up @@ -61,30 +62,103 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
_, err := minioClient.StatObject(context.Background(), bucketName, l.LogPath, minio.StatObjectOptions{})
if err != nil {
errResp := minio.ToErrorResponse(err)
if errResp.Code != "NoSuchKey" {
fmt.Println("failed to stat object in minio", err.Error())
continue
}
if !patch {
fmt.Println("file not exists in minio", l.LogPath)
continue
}
// try to patch 01 => 1 bug
if item.tag == "statslog" && strings.HasSuffix(l.LogPath, "/1") {
currentObjectPath := strings.TrimSuffix(l.LogPath, "/1") + "/01"
_, err = minioClient.StatObject(context.Background(), bucketName, currentObjectPath, minio.StatObjectOptions{})
if err != nil {
fmt.Println(currentObjectPath, "also not exists")
continue
fmt.Println("failed to check ", l.LogPath, err, errResp.Code)
if errResp.Code == "NoSuchKey" {
if item.tag == "binlog" {
fmt.Println("path", l.LogPath, fix)
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}

fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse fieldID")
}

segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse partitionID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64)
if err != nil {
fmt.Println("failed to parse collectionID")
}

if fix && err == nil {
err := common.RemoveSegmentInsertLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
} else if item.tag == "statslog" {
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}

fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse fieldID")
}

segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse parititonID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64)
if err != nil {
fmt.Println("failed to parse col id")
}

if fix && err == nil {
err := common.RemoveSegmentStatLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
} else if item.tag == "deltalog" {
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}
segmentID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse partitionID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse col id")
}

if fix && err == nil {
err := common.RemoveSegmentDeltaLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
}
fmt.Printf("current statslog(%s) for (%s) found, try to copy object", currentObjectPath, l.LogPath)
minioClient.CopyObject(context.Background(), minio.CopyDestOptions{
Bucket: bucketName,
Object: l.LogPath,
}, minio.CopySrcOptions{
Bucket: bucketName,
Object: currentObjectPath,
})
}
}
}
Expand All @@ -98,6 +172,6 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
}

cmd.Flags().Int64("collection", 0, "collection id")
cmd.Flags().Bool("patch", false, "try to patch with known issue logic")
cmd.Flags().Bool("fix", false, "remove the log path to fix no such key")
return cmd
}

0 comments on commit fc46ccc

Please sign in to comment.