Skip to content

Commit

Permalink
fix indexing on coarse split
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Nov 28, 2017
1 parent bdf6efa commit 8b67af1
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 11 deletions.
14 changes: 11 additions & 3 deletions datatype/labelarray/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,17 @@ func (d *Data) splitIndices(v dvid.VersionID, delta labels.DeltaSplit, deleteBlk
shard := delta.OldLabel % numLabelHandlers
d.indexCh[shard] <- labelChange{v: v, label: delta.OldLabel, bdm: deletebdm}

splitbdm := make(blockDiffMap, len(delta.Split))
for izyx := range delta.Split {
splitbdm[izyx] = labelDiff{present: true}
var splitbdm blockDiffMap
if delta.Split == nil {
splitbdm = make(blockDiffMap, len(delta.SortedBlocks))
for _, izyx := range delta.SortedBlocks {
splitbdm[izyx] = labelDiff{present: true}
}
} else {
splitbdm = make(blockDiffMap, len(delta.Split))
for izyx := range delta.Split {
splitbdm[izyx] = labelDiff{present: true}
}
}
shard = delta.NewLabel % numLabelHandlers
d.indexCh[shard] <- labelChange{v: v, label: delta.NewLabel, bdm: splitbdm}
Expand Down
95 changes: 87 additions & 8 deletions datatype/labelarray/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func TestSplitCoarseLabel(t *testing.T) {
config.Set("BlockSize", "32,32,32") // Previous test data was on 32^3 blocks
server.CreateTestInstance(t, uuid, "labelarray", "labels", config)

// Post label volume and setup expected volume after split of block coords (2, 1, 1) and (3, 1, 2)
// Post label volume and setup expected volume after split of block coords (2, 1, 1) and (2, 1, 2)
expected := createLabelTestVolume(t, uuid, "labels")
fromLabel := uint64(4)
toLabel := uint64(5)
Expand Down Expand Up @@ -754,6 +754,33 @@ func TestSplitCoarseLabel(t *testing.T) {
if err := retrieved.equals(expected); err != nil {
t.Errorf("Split label volume not equal to expected volume: %v\n", err)
}

// make sure the sparse volume for new label is correct
reqStr = fmt.Sprintf("%snode/%s/labels/sparsevol-coarse/%d", server.WebAPIPath, uuid, toLabel)
encoding = server.TestHTTP(t, "GET", reqStr, nil)
if len(encoding) != len(rleBytes)+12 {
t.Errorf("expected %d bytes from GET /sparsevol-coarse, got %d bytes\n", len(rleBytes)+12, len(encoding))
}
if len(encoding) < 12 {
t.Fatalf("bad sparsevol-coarse encoding. Expected at least 12 bytes, got %d\n", len(encoding))
}
numSpans := binary.LittleEndian.Uint32(encoding[8:12])
if numSpans != 2 {
t.Errorf("Expected 2 blocks, got %d blocks after split coarse.\n", numSpans)
}
coarseRLEs := new(dvid.RLEs)
if err := coarseRLEs.UnmarshalBinary(encoding[12:]); err != nil {
t.Fatalf("error unmarshaling coarse sparsevolume: %v\n", err)
}
if len(*coarseRLEs) != 2 {
t.Fatalf("expected 2 blocks in sparsevol after split, got: %v\n", *coarseRLEs)
}
expectedRLEs := dvid.RLEs{dvid.NewRLE(dvid.Point3d{2, 1, 1}, 1), dvid.NewRLE(dvid.Point3d{2, 1, 2}, 1)}
for i, rle := range *coarseRLEs {
if !rle.StartPt().Equals(expectedRLEs[i].StartPt()) {
t.Errorf("Bad block %d: expected %s, got %s\n", i, expectedRLEs[i], rle)
}
}
}

func TestSplitCoarseGivenLabel(t *testing.T) {
Expand Down Expand Up @@ -851,6 +878,33 @@ func TestSplitCoarseGivenLabel(t *testing.T) {
if err := retrieved.equals(expected); err != nil {
t.Errorf("Split label volume not equal to expected volume: %v\n", err)
}

// make sure the sparse volume for new label is correct
reqStr = fmt.Sprintf("%snode/%s/labels/sparsevol-coarse/%d", server.WebAPIPath, uuid, toLabel)
encoding := server.TestHTTP(t, "GET", reqStr, nil)
if len(encoding) != len(rleBytes)+12 {
t.Errorf("expected %d bytes from GET /sparsevol-coarse, got %d bytes\n", len(rleBytes)+12, len(encoding))
}
if len(encoding) < 12 {
t.Fatalf("bad sparsevol-coarse encoding. Expected at least 12 bytes, got %d\n", len(encoding))
}
numSpans := binary.LittleEndian.Uint32(encoding[8:12])
if numSpans != 2 {
t.Errorf("Expected 2 blocks, got %d blocks after split coarse.\n", numSpans)
}
coarseRLEs := new(dvid.RLEs)
if err := coarseRLEs.UnmarshalBinary(encoding[12:]); err != nil {
t.Fatalf("error unmarshaling coarse sparsevolume: %v\n", err)
}
if len(*coarseRLEs) != 2 {
t.Fatalf("expected 2 blocks in sparsevol after split, got: %v\n", *coarseRLEs)
}
expectedRLEs := dvid.RLEs{dvid.NewRLE(dvid.Point3d{2, 1, 1}, 1), dvid.NewRLE(dvid.Point3d{2, 1, 2}, 1)}
for i, rle := range *coarseRLEs {
if !rle.StartPt().Equals(expectedRLEs[i].StartPt()) {
t.Errorf("Bad block %d: expected %s, got %s\n", i, expectedRLEs[i], rle)
}
}
}

func TestSplitLabel(t *testing.T) {
Expand Down Expand Up @@ -1411,7 +1465,7 @@ func TestConcurrentMutations(t *testing.T) {
offset: dvid.Point3d{9, 68, 68},
size: dvid.Point3d{72, 4, 4},
blockSpans: []dvid.Span{
{1, 1, 0, 2},
{1, 1, 0, 1},
},
voxelSpans: []dvid.Span{
{68, 68, 9, 80},
Expand All @@ -1437,7 +1491,7 @@ func TestConcurrentMutations(t *testing.T) {
offset: dvid.Point3d{9, 68, 72},
size: dvid.Point3d{72, 4, 4},
blockSpans: []dvid.Span{
{1, 1, 0, 2},
{1, 1, 0, 1},
},
voxelSpans: []dvid.Span{
{72, 68, 9, 80},
Expand All @@ -1463,7 +1517,7 @@ func TestConcurrentMutations(t *testing.T) {
offset: dvid.Point3d{9, 68, 76},
size: dvid.Point3d{72, 4, 4},
blockSpans: []dvid.Span{
{1, 1, 0, 2},
{1, 1, 0, 1},
},
voxelSpans: []dvid.Span{
{76, 68, 9, 80},
Expand All @@ -1484,6 +1538,7 @@ func TestConcurrentMutations(t *testing.T) {
{79, 71, 9, 80},
},
}
testbodies := [3]testBody{tbody1, tbody2, tbody3}

tvol := newTestVolume(192, 128, 128)
tvol.addBody(tbody1, 1)
Expand All @@ -1506,19 +1561,19 @@ func TestConcurrentMutations(t *testing.T) {

// Run concurrent split/merge ops on each body where split is random location in X.
wg := new(sync.WaitGroup)
wg.Add(3000)
wg.Add(750)
go func() {
for n := 0; n < 1000; n++ {
for n := 0; n < 250; n++ {
tbody1.splitmerge(t, wg, uuid, "labels")
}
}()
go func() {
for n := 0; n < 1000; n++ {
for n := 0; n < 250; n++ {
tbody2.splitmerge(t, wg, uuid, "labels")
}
}()
go func() {
for n := 0; n < 1000; n++ {
for n := 0; n < 250; n++ {
tbody3.splitmerge(t, wg, uuid, "labels")
}
}()
Expand All @@ -1532,6 +1587,30 @@ func TestConcurrentMutations(t *testing.T) {
if err := retrieved2.equals(tvol); err != nil {
t.Errorf("Concurrent split/merge producing bad result: %v\n", err)
}

reqStr := fmt.Sprintf("%snode/%s/labels/sparsevols-coarse/1/3", server.WebAPIPath, uuid)
encoding := server.TestHTTP(t, "GET", reqStr, nil)
var i int
for label := uint64(1); label <= 3; label++ {
if i+28 > len(encoding) {
t.Fatalf("Expected label %d but only %d bytes remain in encoding\n", label, len(encoding[i:]))
}
gotLabel := binary.LittleEndian.Uint64(encoding[i : i+8])
if gotLabel != label {
t.Errorf("Expected label %d, got label %d in returned coarse sparsevols\n", label, gotLabel)
}
i += 8
var spans dvid.Spans
if err := spans.UnmarshalBinary(encoding[i:]); err != nil {
t.Errorf("Error in decoding coarse sparse volume: %v\n", err)
return
}
i += 4 + len(spans)*16
b := testbodies[label-1]
if !reflect.DeepEqual(spans, b.blockSpans) {
t.Errorf("Expected coarse spans for label %d:\n%s\nGot spans:\n%s\n", b.label, b.blockSpans, spans)
}
}
}

func (b testBody) splitmerge(t *testing.T, wg *sync.WaitGroup, uuid dvid.UUID, name dvid.InstanceName) {
Expand Down

0 comments on commit 8b67af1

Please sign in to comment.