From 84903cee73e4cbdab137dfeda5115795cc002d94 Mon Sep 17 00:00:00 2001 From: xiaofanluan Date: Sat, 2 Dec 2023 01:02:00 +0800 Subject: [PATCH] Support fix broken segemnts Signed-off-by: xiaofanluan --- states/etcd/common/segment.go | 33 +++++++++ states/kv/kv_audit.go | 20 ++++-- states/verify_segment.go | 124 +++++++++++++++++++++++++++------- 3 files changed, 146 insertions(+), 31 deletions(-) diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index c89960b..3ffe726 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -307,6 +307,39 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll return err } +func RemoveSegmentInsertLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error { + // delete binlog entries + BinLogPath := path.Join(basePath, "datacoord-meta", "insert_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID)) + fmt.Println("remove", BinLogPath) + err := cli.Remove(ctx, BinLogPath) + if err != nil { + fmt.Printf("failed to delete insert binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + return err +} + +func RemoveSegmentDeltaLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, logID int64) error { + // delete binlog entries + DeltaLogPath := path.Join(basePath, "datacoord-meta", "delta_log", fmt.Sprintf("%d/%d/%d/%d", collectionID, partitionID, segmentID, logID)) + fmt.Println("remove", DeltaLogPath) + err := cli.Remove(ctx, DeltaLogPath) + if err != nil { + fmt.Printf("failed to delete delta binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + return err +} + +func RemoveSegmentStatLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error { + // delete binlog entries + StatLogPath := path.Join(basePath, "datacoord-meta", "stats_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID)) + fmt.Println("remove", StatLogPath) + err := cli.Remove(ctx, StatLogPath) + if err != nil { + fmt.Printf("failed to delete stat logs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + return err +} + func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error { prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/" diff --git a/states/kv/kv_audit.go b/states/kv/kv_audit.go index f909103..2332bd5 100644 --- a/states/kv/kv_audit.go +++ b/states/kv/kv_audit.go @@ -49,25 +49,33 @@ func (c *FileAuditKV) Save(ctx context.Context, key, value string) error { func (c *FileAuditKV) Remove(ctx context.Context, key string) error { fmt.Println("audit delete", key) - kv, err := c.cli.removeWithPrevKV(ctx, key) + val, err := c.cli.Load(ctx, key) + if err != nil { + return err + } + + err = c.cli.Remove(ctx, key) if err != nil { return err } c.writeHeader(models.OpDel, 1) - c.writeLogKV(kv) + c.writeKeyValue(key, val) return nil } func (c *FileAuditKV) RemoveWithPrefix(ctx context.Context, key string) error { fmt.Println("audit delete with prefix", key) - kvs, err := c.cli.removeWithPrefixAndPrevKV(ctx, key) + val, err := c.cli.Load(ctx, key) if err != nil { return err } - c.writeHeader(models.OpDel, int32(len(kvs))) - for _, kv := range kvs { - c.writeLogKV(kv) + + err = c.cli.RemoveWithPrefix(ctx, key) + if err != nil { + return err } + c.writeHeader(models.OpDel, 1) + c.writeKeyValue(key, val) return nil } diff --git a/states/verify_segment.go b/states/verify_segment.go index 055bc5c..2250d1d 100644 --- a/states/verify_segment.go +++ b/states/verify_segment.go @@ -3,6 +3,7 @@ package states import ( "context" "fmt" + "strconv" "strings" "github.com/milvus-io/birdwatcher/models" @@ -23,7 +24,7 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command { fmt.Println(err.Error()) return } - patch, err := cmd.Flags().GetBool("patch") + fix, err := cmd.Flags().GetBool("fix") if err != nil { fmt.Println(err.Error()) return @@ -61,30 +62,103 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command { _, err := minioClient.StatObject(context.Background(), bucketName, l.LogPath, minio.StatObjectOptions{}) if err != nil { errResp := minio.ToErrorResponse(err) - if errResp.Code != "NoSuchKey" { - fmt.Println("failed to stat object in minio", err.Error()) - continue - } - if !patch { - fmt.Println("file not exists in minio", l.LogPath) - continue - } - // try to patch 01 => 1 bug - if item.tag == "statslog" && strings.HasSuffix(l.LogPath, "/1") { - currentObjectPath := strings.TrimSuffix(l.LogPath, "/1") + "/01" - _, err = minioClient.StatObject(context.Background(), bucketName, currentObjectPath, minio.StatObjectOptions{}) - if err != nil { - fmt.Println(currentObjectPath, "also not exists") - continue + fmt.Println("failed to check ", l.LogPath, err, errResp.Code) + if errResp.Code == "NoSuchKey" { + if item.tag == "binlog" { + fmt.Println("path", l.LogPath, fix) + splits := strings.Split(l.LogPath, "/") + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + fmt.Println("failed to parse logID") + } + + fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64) + if err != nil { + fmt.Println("failed to parse fieldID") + } + + segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64) + if err != nil { + fmt.Println("failed to parse segmentID") + } + + partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64) + if err != nil { + fmt.Println("failed to parse partitionID") + } + + collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64) + if err != nil { + fmt.Println("failed to parse collectionID") + } + + if fix && err == nil { + err := common.RemoveSegmentInsertLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID) + if err != nil { + fmt.Println("failed to remove segment insert path") + } + } + } else if item.tag == "statslog" { + splits := strings.Split(l.LogPath, "/") + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + fmt.Println("failed to parse logID") + } + + fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64) + if err != nil { + fmt.Println("failed to parse fieldID") + } + + segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64) + if err != nil { + fmt.Println("failed to parse segmentID") + } + + partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64) + if err != nil { + fmt.Println("failed to parse parititonID") + } + + collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64) + if err != nil { + fmt.Println("failed to parse col id") + } + + if fix && err == nil { + err := common.RemoveSegmentStatLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID) + if err != nil { + fmt.Println("failed to remove segment insert path") + } + } + } else if item.tag == "deltalog" { + splits := strings.Split(l.LogPath, "/") + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + fmt.Println("failed to parse logID") + } + segmentID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64) + if err != nil { + fmt.Println("failed to parse segmentID") + } + + partitionID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64) + if err != nil { + fmt.Println("failed to parse partitionID") + } + + collectionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64) + if err != nil { + fmt.Println("failed to parse col id") + } + + if fix && err == nil { + err := common.RemoveSegmentDeltaLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, logID) + if err != nil { + fmt.Println("failed to remove segment insert path") + } + } } - fmt.Printf("current statslog(%s) for (%s) found, try to copy object", currentObjectPath, l.LogPath) - minioClient.CopyObject(context.Background(), minio.CopyDestOptions{ - Bucket: bucketName, - Object: l.LogPath, - }, minio.CopySrcOptions{ - Bucket: bucketName, - Object: currentObjectPath, - }) } } } @@ -98,6 +172,6 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command { } cmd.Flags().Int64("collection", 0, "collection id") - cmd.Flags().Bool("patch", false, "try to patch with known issue logic") + cmd.Flags().Bool("fix", false, "remove the log path to fix no such key") return cmd }