Skip to content

Commit f330b9c

Browse files
committed
db: allow ingesting local SSTs with values in blob files
Allow ingesting local ssts and their associated blob files. Note that we do not validate blob value handles within a table. Each blob file is assumed to be valid for and fully referenced by the SST.
1 parent d12c3ac commit f330b9c

File tree

9 files changed

+715
-85
lines changed

9 files changed

+715
-85
lines changed

ingest.go

Lines changed: 245 additions & 65 deletions
Large diffs are not rendered by default.

ingest_test.go

Lines changed: 261 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/cockroachdb/pebble/sstable"
4343
"github.com/cockroachdb/pebble/sstable/block"
4444
"github.com/cockroachdb/pebble/sstable/colblk"
45+
"github.com/cockroachdb/pebble/valsep"
4546
"github.com/cockroachdb/pebble/vfs"
4647
"github.com/cockroachdb/pebble/vfs/errorfs"
4748
"github.com/kr/pretty"
@@ -206,7 +207,7 @@ func TestIngestLoadRand(t *testing.T) {
206207
TableMetadata: &manifest.TableMetadata{
207208
TableNum: base.TableNum(pending[i]),
208209
},
209-
path: sstPaths[i],
210+
local: LocalSST{Path: sstPaths[i]},
210211
}
211212

212213
func() {
@@ -275,6 +276,147 @@ func TestIngestLoadRand(t *testing.T) {
275276
require.Equal(t, expected, lr.local)
276277
}
277278

279+
// TestIngestLocalWithBlobs tests the ingestion of local sstables with blobs.
280+
// Commands:
281+
// - define: defines the database
282+
// - write-table: writes an external table using valsep.SSTBlobWriter
283+
// - ingest: ingests the tables into the database
284+
func TestIngestLocalWithBlobs(t *testing.T) {
285+
keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
286+
ctx := context.Background()
287+
var db *DB
288+
defer func() {
289+
if db != nil {
290+
require.NoError(t, db.Close())
291+
}
292+
}()
293+
fileCount := 0
294+
fs := vfs.NewMem()
295+
reset := func() {
296+
if db != nil {
297+
require.NoError(t, db.Close())
298+
}
299+
fileCount = 0
300+
fs = vfs.NewMem()
301+
}
302+
datadriven.RunTest(t, "testdata/ingest_with_blobs", func(t *testing.T, td *datadriven.TestData) string {
303+
switch td.Cmd {
304+
case "define":
305+
reset()
306+
var err error
307+
db, err = runDBDefineCmd(td, &Options{
308+
Comparer: testkeys.Comparer,
309+
FS: fs,
310+
FormatMajorVersion: internalFormatNewest,
311+
})
312+
require.NoError(t, err)
313+
return ""
314+
case "write-table":
315+
sstWriterOpts := sstable.WriterOptions{
316+
Comparer: testkeys.Comparer,
317+
KeySchema: &keySchema,
318+
TableFormat: sstable.TableFormatMax,
319+
}
320+
sstFileName := td.CmdArgs[0].Key
321+
if sstFileName == "" {
322+
return "missing file name argument"
323+
}
324+
var valueSeparationMinSize, mvccGarbageValueSeparationMinSize int
325+
td.MaybeScanArgs(t, "value-separation-min-size", &valueSeparationMinSize)
326+
td.MaybeScanArgs(t, "mvcc-value-separation-min-size", &mvccGarbageValueSeparationMinSize)
327+
var blobPaths []string
328+
writerOpts := valsep.SSTBlobWriterOptions{
329+
SSTWriterOpts: sstWriterOpts,
330+
ValueSeparationMinSize: valueSeparationMinSize,
331+
MVCCGarbageValueSeparationMinSize: mvccGarbageValueSeparationMinSize,
332+
}
333+
writerOpts.NewBlobFileFn = func() (objstorage.Writable, error) {
334+
fnum := fileCount
335+
path := fmt.Sprintf("blob%d", fnum)
336+
blobPaths = append(blobPaths, path)
337+
f, err := fs.Create(path, vfs.WriteCategoryUnspecified)
338+
require.NoError(t, err)
339+
w := objstorageprovider.NewFileWritable(f)
340+
fileCount++
341+
return w, err
342+
}
343+
sstFile, err := fs.Create(sstFileName, vfs.WriteCategoryUnspecified)
344+
require.NoError(t, err)
345+
sstHandle := objstorageprovider.NewFileWritable(sstFile)
346+
require.NoError(t, err)
347+
writer := valsep.NewSSTBlobWriter(sstHandle, writerOpts)
348+
writerClosed := false
349+
defer func() {
350+
if !writerClosed {
351+
_ = writer.Close()
352+
}
353+
}()
354+
kvs, err := sstable.ParseTestKVsAndSpans(td.Input, nil)
355+
require.NoError(t, err)
356+
require.NoError(t, valsep.HandleTestKVs(writer, kvs))
357+
writerClosed = true
358+
if err := writer.Close(); err != nil {
359+
return err.Error()
360+
}
361+
return fmt.Sprintf("sst: %s\nblobs: %s", sstFileName, strings.Join(blobPaths, ","))
362+
case "ingest":
363+
if len(td.CmdArgs) == 0 {
364+
return "no sst files provided for ingestion"
365+
366+
}
367+
// Each argument key is an SST file path, and its values are the associated
368+
// blob file paths, if any.
369+
// Ex: ingest sst1=blob1,blob2 sst2=blob3 sst3
370+
var localTables LocalSSTables
371+
for _, arg := range td.CmdArgs {
372+
if arg.Key == "excise-span" {
373+
continue
374+
}
375+
sstPath := arg.Key
376+
var blobPaths []string
377+
// The values are the blob file paths for this SST.
378+
// Each value may contain comma-separated blob file paths.
379+
for _, val := range arg.Vals {
380+
// Split by comma to handle comma-separated blob paths
381+
paths := strings.Split(val, ",")
382+
for _, path := range paths {
383+
path = strings.TrimSpace(path)
384+
if path != "" {
385+
blobPaths = append(blobPaths, path)
386+
}
387+
}
388+
}
389+
localTables = append(localTables, LocalSST{
390+
Path: sstPath,
391+
BlobPaths: blobPaths,
392+
})
393+
}
394+
if len(localTables) == 0 {
395+
return "no sst files provided for ingestion"
396+
}
397+
398+
var exciseSpan KeyRange
399+
var exciseStr string
400+
td.MaybeScanArgs(t, "excise-span", &exciseStr)
401+
if exciseStr != "" {
402+
fields := strings.Split(exciseStr, "-")
403+
if len(fields) != 2 {
404+
return fmt.Sprintf("malformed excise span: %s", exciseStr)
405+
}
406+
exciseSpan.Start = []byte(fields[0])
407+
exciseSpan.End = []byte(fields[1])
408+
}
409+
_, err := db.IngestLocal(ctx, localTables, exciseSpan)
410+
if err != nil {
411+
return err.Error()
412+
}
413+
return describeLSM(db, true /* verbose */)
414+
default:
415+
return "unknown command: " + td.Cmd
416+
}
417+
})
418+
}
419+
278420
func TestIngestLoadInvalid(t *testing.T) {
279421
mem := vfs.NewMem()
280422
f, err := mem.Create("invalid", vfs.WriteCategoryUnspecified)
@@ -293,6 +435,43 @@ func TestIngestLoadInvalid(t *testing.T) {
293435
}
294436
}
295437

438+
func TestIngestLocalErrors(t *testing.T) {
439+
ctx := context.Background()
440+
441+
t.Run("ReadOnlyDB", func(t *testing.T) {
442+
fs := vfs.NewMem()
443+
444+
// First create a database
445+
opts := &Options{FS: fs}
446+
db, err := Open("test_db", opts)
447+
require.NoError(t, err)
448+
require.NoError(t, db.Close())
449+
450+
// Then open it in read-only mode
451+
opts.ReadOnly = true
452+
db, err = Open("test_db", opts)
453+
require.NoError(t, err)
454+
defer func() { require.NoError(t, db.Close()) }()
455+
456+
_, err = db.IngestLocal(ctx, LocalSSTables{LocalSST{Path: "test.sst"}}, KeyRange{})
457+
require.ErrorIs(t, err, ErrReadOnly)
458+
})
459+
460+
t.Run("InvalidExciseSpan", func(t *testing.T) {
461+
fs := vfs.NewMem()
462+
opts := &Options{FS: fs, Comparer: testkeys.Comparer}
463+
db, err := Open("", opts)
464+
require.NoError(t, err)
465+
defer func() { require.NoError(t, db.Close()) }()
466+
467+
localTables := LocalSSTables{LocalSST{Path: "test.sst"}}
468+
exciseSpan := KeyRange{Start: []byte("a@1"), End: []byte("z")}
469+
_, err = db.IngestLocal(ctx, localTables, exciseSpan)
470+
require.Error(t, err)
471+
require.Contains(t, err.Error(), "suffixed start key")
472+
})
473+
}
474+
296475
func TestIngestSortAndVerify(t *testing.T) {
297476
comparers := map[string]Compare{
298477
"default": DefaultComparer.Compare,
@@ -327,7 +506,7 @@ func TestIngestSortAndVerify(t *testing.T) {
327506
m.InitPhysicalBacking()
328507
meta = append(meta, ingestLocalMeta{
329508
TableMetadata: m,
330-
path: strconv.Itoa(i),
509+
local: LocalSST{Path: strconv.Itoa(i)},
331510
})
332511
}
333512
lr := ingestLoadResult{local: meta}
@@ -336,7 +515,7 @@ func TestIngestSortAndVerify(t *testing.T) {
336515
return fmt.Sprintf("%v\n", err)
337516
}
338517
for i := range meta {
339-
fmt.Fprintf(&buf, "%s: %v-%v\n", meta[i].path, meta[i].Smallest(), meta[i].Largest())
518+
fmt.Fprintf(&buf, "%s: %v-%v\n", meta[i].local.Path, meta[i].Smallest(), meta[i].Largest())
340519
}
341520
return buf.String()
342521

@@ -366,11 +545,11 @@ func TestIngestLink(t *testing.T) {
366545
meta := make([]ingestLocalMeta, 10)
367546
contents := make([][]byte, len(meta))
368547
for j := range meta {
369-
meta[j].path = fmt.Sprintf("external%d", j)
548+
meta[j].local.Path = fmt.Sprintf("external%d", j)
370549
meta[j].TableMetadata = &manifest.TableMetadata{}
371550
meta[j].TableNum = base.TableNum(j)
372551
meta[j].InitPhysicalBacking()
373-
f, err := opts.FS.Create(meta[j].path, vfs.WriteCategoryUnspecified)
552+
f, err := opts.FS.Create(meta[j].local.Path, vfs.WriteCategoryUnspecified)
374553
require.NoError(t, err)
375554

376555
contents[j] = []byte(fmt.Sprintf("data%d", j))
@@ -382,7 +561,7 @@ func TestIngestLink(t *testing.T) {
382561
}
383562

384563
if i < count {
385-
opts.FS.Remove(meta[i].path)
564+
opts.FS.Remove(meta[i].local.Path)
386565
}
387566

388567
err = ingestLinkLocal(context.Background(), 0 /* jobID */, opts, objProvider, meta)
@@ -453,7 +632,7 @@ func TestIngestLinkFallback(t *testing.T) {
453632

454633
meta := &manifest.TableMetadata{TableNum: 1}
455634
meta.InitPhysicalBacking()
456-
err = ingestLinkLocal(context.Background(), 0, opts, objProvider, []ingestLocalMeta{{TableMetadata: meta, path: "source"}})
635+
err = ingestLinkLocal(context.Background(), 0, opts, objProvider, []ingestLocalMeta{{TableMetadata: meta, local: LocalSST{Path: "source"}}})
457636
require.NoError(t, err)
458637

459638
dest, err := mem.Open("000001.sst")
@@ -2750,9 +2929,12 @@ func TestIngestCleanup(t *testing.T) {
27502929
fns := []base.TableNum{0, 1, 2}
27512930

27522931
testCases := []struct {
2753-
closeFiles []base.TableNum
2754-
cleanupFiles []base.TableNum
2755-
wantErr string
2932+
closeFiles []base.TableNum
2933+
cleanupFiles []base.TableNum
2934+
closeBlobFiles []base.DiskFileNum
2935+
cleanupBlobFiles []base.DiskFileNum // blob files linked for last table, but table not linked yet
2936+
cleanupMetaBlobFiles map[base.TableNum][]base.DiskFileNum // blob files per table in meta
2937+
wantErr string
27562938
}{
27572939
// Close and remove all files.
27582940
{
@@ -2777,6 +2959,22 @@ func TestIngestCleanup(t *testing.T) {
27772959
cleanupFiles: []base.TableNum{0, 1, 2, 3},
27782960
wantErr: oserror.ErrInvalid.Error(), // The first error encountered is due to the open file.
27792961
},
2962+
// Remove with stray blob files.
2963+
{
2964+
closeFiles: []base.TableNum{},
2965+
cleanupFiles: []base.TableNum{},
2966+
closeBlobFiles: []base.DiskFileNum{10},
2967+
cleanupBlobFiles: []base.DiskFileNum{10},
2968+
},
2969+
// Remove blob files in meta.
2970+
{
2971+
closeFiles: fns,
2972+
cleanupFiles: fns,
2973+
closeBlobFiles: []base.DiskFileNum{10},
2974+
cleanupMetaBlobFiles: map[base.TableNum][]base.DiskFileNum{
2975+
0: {10},
2976+
},
2977+
},
27802978
}
27812979

27822980
for _, tc := range testCases {
@@ -2787,16 +2985,32 @@ func TestIngestCleanup(t *testing.T) {
27872985
require.NoError(t, err)
27882986
defer objProvider.Close()
27892987

2790-
// Create the files in the VFS.
2988+
// Create the table files in the VFS.
27912989
metaMap := make(map[base.TableNum]objstorage.Writable)
27922990
for _, fn := range fns {
27932991
w, _, err := objProvider.Create(t.Context(), base.FileTypeTable, base.PhysicalTableDiskFileNum(fn), objstorage.CreateOptions{})
27942992
require.NoError(t, err)
2795-
27962993
metaMap[fn] = w
27972994
}
27982995

2799-
// Close a select number of files.
2996+
// Create the blob files in the VFS.
2997+
blobMetaMap := make(map[base.DiskFileNum]objstorage.Writable)
2998+
allBlobFiles := make(map[base.DiskFileNum]struct{})
2999+
for _, bfn := range tc.cleanupBlobFiles {
3000+
allBlobFiles[bfn] = struct{}{}
3001+
}
3002+
for _, blobFiles := range tc.cleanupMetaBlobFiles {
3003+
for _, bfn := range blobFiles {
3004+
allBlobFiles[bfn] = struct{}{}
3005+
}
3006+
}
3007+
for bfn := range allBlobFiles {
3008+
w, _, err := objProvider.Create(t.Context(), base.FileTypeBlob, bfn, objstorage.CreateOptions{})
3009+
require.NoError(t, err)
3010+
blobMetaMap[bfn] = w
3011+
}
3012+
3013+
// Close a select number of table files.
28003014
for _, m := range tc.closeFiles {
28013015
w, ok := metaMap[m]
28023016
if !ok {
@@ -2805,15 +3019,47 @@ func TestIngestCleanup(t *testing.T) {
28053019
require.NoError(t, w.Finish())
28063020
}
28073021

3022+
// Close a select number of blob files.
3023+
for _, bfn := range tc.closeBlobFiles {
3024+
w, ok := blobMetaMap[bfn]
3025+
if !ok {
3026+
continue
3027+
}
3028+
require.NoError(t, w.Finish())
3029+
}
3030+
28083031
// Cleanup the set of files in the FS.
28093032
var toRemove []ingestLocalMeta
28103033
for _, fn := range tc.cleanupFiles {
28113034
m := &manifest.TableMetadata{TableNum: fn}
28123035
m.InitPhysicalBacking()
2813-
toRemove = append(toRemove, ingestLocalMeta{TableMetadata: m})
3036+
meta := ingestLocalMeta{TableMetadata: m}
3037+
if blobFiles, ok := tc.cleanupMetaBlobFiles[fn]; ok {
3038+
meta.blobFiles = make([]manifest.BlobFileMetadata, len(blobFiles))
3039+
for i, bfn := range blobFiles {
3040+
meta.blobFiles[i] = manifest.BlobFileMetadata{
3041+
FileID: base.BlobFileID(bfn),
3042+
Physical: &manifest.PhysicalBlobFile{
3043+
FileNum: bfn,
3044+
},
3045+
}
3046+
}
3047+
}
3048+
toRemove = append(toRemove, meta)
3049+
}
3050+
3051+
// Create stray blob files that are provided as the last arg for cleanup.
3052+
var blobFiles []manifest.BlobFileMetadata
3053+
for _, bfn := range tc.cleanupBlobFiles {
3054+
blobFiles = append(blobFiles, manifest.BlobFileMetadata{
3055+
FileID: base.BlobFileID(bfn),
3056+
Physical: &manifest.PhysicalBlobFile{
3057+
FileNum: bfn,
3058+
},
3059+
})
28143060
}
28153061

2816-
err = ingestCleanup(objProvider, toRemove)
3062+
err = ingestCleanup(objProvider, toRemove, blobFiles)
28173063
if tc.wantErr != "" {
28183064
require.Error(t, err, "got no error, expected %s", tc.wantErr)
28193065
require.Contains(t, err.Error(), tc.wantErr)

0 commit comments

Comments
 (0)