Skip to content

Commit 22252a8

Browse files
pre filter for ivf (#23117)
pre filter for ivf Approved by: @heni02, @ouyuanning, @daviszhen, @zhangxu19830126, @gouhongshen, @LeftHandCold, @fengttt, @aunjgr, @XuPeng-SH
1 parent 153a447 commit 22252a8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+13199
-10228
lines changed

pkg/common/bloomfilter/bloomfilter.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
package bloomfilter
1616

1717
import (
18+
"bytes"
19+
20+
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
1821
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
22+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
1923
"github.com/matrixorigin/matrixone/pkg/container/hashtable"
24+
"github.com/matrixorigin/matrixone/pkg/container/types"
2025
"github.com/matrixorigin/matrixone/pkg/container/vector"
2126
)
2227

@@ -114,6 +119,81 @@ func (bf *BloomFilter) TestAndAdd(v *vector.Vector, callBack func(bool, int)) {
114119

115120
}
116121

122+
// Marshal encodes BloomFilter into byte sequence for transmission via runtime filter message within the same CN.
123+
// Encoding format:
124+
//
125+
// [seedCount:uint32][seeds...:uint64][bitmapLen:uint32][bitmapBytes...]
126+
func (bf *BloomFilter) Marshal() ([]byte, error) {
127+
var buf bytes.Buffer
128+
129+
seedCount := uint32(len(bf.hashSeed))
130+
buf.Write(types.EncodeUint32(&seedCount))
131+
for i := 0; i < int(seedCount); i++ {
132+
buf.Write(types.EncodeUint64(&bf.hashSeed[i]))
133+
}
134+
135+
bmBytes := bf.bitmap.Marshal()
136+
bmLen := uint32(len(bmBytes))
137+
buf.Write(types.EncodeUint32(&bmLen))
138+
buf.Write(bmBytes)
139+
140+
return buf.Bytes(), nil
141+
}
142+
143+
// Unmarshal restores BloomFilter from byte sequence.
144+
// Initializes internal structures (keys / states / vals / addVals) based on encoded seedCount and bitmap.
145+
func (bf *BloomFilter) Unmarshal(data []byte) error {
146+
if len(data) < 4 {
147+
return moerr.NewInternalErrorNoCtx("invalid bloomfilter data")
148+
}
149+
150+
seedCount := int(types.DecodeUint32(data[:4]))
151+
data = data[4:]
152+
153+
if seedCount <= 0 {
154+
return moerr.NewInternalErrorNoCtx("invalid bloomfilter seed count")
155+
}
156+
157+
hashSeed := make([]uint64, seedCount)
158+
for i := 0; i < seedCount; i++ {
159+
if len(data) < 8 {
160+
return moerr.NewInternalErrorNoCtx("invalid bloomfilter data (seed truncated)")
161+
}
162+
hashSeed[i] = types.DecodeUint64(data[:8])
163+
data = data[8:]
164+
}
165+
166+
if len(data) < 4 {
167+
return moerr.NewInternalErrorNoCtx("invalid bloomfilter data (no bitmap length)")
168+
}
169+
bmLen := int(types.DecodeUint32(data[:4]))
170+
data = data[4:]
171+
if bmLen < 0 || len(data) < bmLen {
172+
return moerr.NewInternalErrorNoCtx("invalid bloomfilter data (bitmap truncated)")
173+
}
174+
175+
var bm bitmap.Bitmap
176+
bm.Unmarshal(data[:bmLen])
177+
178+
// Reinitialize internal auxiliary structures for subsequent Test/TestAndAdd
179+
vals := make([][]uint64, hashmap.UnitLimit)
180+
keys := make([][]byte, hashmap.UnitLimit)
181+
states := make([][3]uint64, hashmap.UnitLimit)
182+
for j := 0; j < hashmap.UnitLimit; j++ {
183+
vals[j] = make([]uint64, seedCount*3)
184+
}
185+
186+
bf.bitmap = bm
187+
bf.hashSeed = hashSeed
188+
bf.keys = keys
189+
bf.states = states
190+
bf.vals = vals
191+
bf.addVals = make([]uint64, hashmap.UnitLimit*3*seedCount)
192+
bf.valLength = len(hashSeed) * 3
193+
194+
return nil
195+
}
196+
117197
// for an incoming vector, compute the hash value of each of its elements, and manipulate it with func tf.fn
118198
func (bf *BloomFilter) handle(v *vector.Vector, callBack func(int, int)) {
119199
length := v.Length()

pkg/common/bloomfilter/bloomfilter_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,157 @@ func BenchmarkBloomFiltrerTestAndAdd(b *testing.B) {
111111
}
112112
}
113113
}
114+
115+
func TestMarshal(t *testing.T) {
116+
mp := mpool.MustNewZero()
117+
vec := testutil.NewVector(testCount, types.New(types.T_int64, 0, 0), mp, false, nil)
118+
defer vec.Free(mp)
119+
120+
bf := New(testCount, testRate)
121+
bf.Add(vec)
122+
123+
data, err := bf.Marshal()
124+
require.NoError(t, err)
125+
require.NotNil(t, data)
126+
require.Greater(t, len(data), 0)
127+
128+
// Verify basic structure: should have at least seedCount (4 bytes) + some seeds + bitmapLen (4 bytes) + bitmap data
129+
require.GreaterOrEqual(t, len(data), 8, "marshaled data should have at least seedCount and bitmapLen")
130+
}
131+
132+
func TestUnmarshal(t *testing.T) {
133+
mp := mpool.MustNewZero()
134+
vec := testutil.NewVector(testCount, types.New(types.T_int64, 0, 0), mp, false, nil)
135+
defer vec.Free(mp)
136+
137+
// Test normal case: marshal and unmarshal
138+
bf1 := New(testCount, testRate)
139+
bf1.Add(vec)
140+
141+
data, err := bf1.Marshal()
142+
require.NoError(t, err)
143+
144+
bf2 := BloomFilter{}
145+
err = bf2.Unmarshal(data)
146+
require.NoError(t, err)
147+
148+
// Verify that unmarshaled filter has same functionality
149+
allFound := true
150+
bf2.Test(vec, func(exists bool, _ int) {
151+
allFound = allFound && exists
152+
})
153+
require.True(t, allFound, "unmarshaled filter should find all added elements")
154+
155+
// Test error cases
156+
tests := []struct {
157+
name string
158+
data []byte
159+
wantErr bool
160+
}{
161+
{
162+
name: "empty data",
163+
data: []byte{},
164+
wantErr: true,
165+
},
166+
{
167+
name: "data too short (< 4 bytes)",
168+
data: []byte{1, 2, 3},
169+
wantErr: true,
170+
},
171+
{
172+
name: "invalid seed count (0)",
173+
data: []byte{0, 0, 0, 0}, // seedCount = 0
174+
wantErr: true,
175+
},
176+
{
177+
name: "seed data truncated",
178+
data: []byte{1, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7}, // seedCount=1, but only 7 bytes for seed (need 8)
179+
wantErr: true,
180+
},
181+
{
182+
name: "no bitmap length",
183+
data: []byte{1, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8}, // seedCount=1, one seed, but no bitmapLen
184+
wantErr: true,
185+
},
186+
{
187+
name: "bitmap data truncated",
188+
data: []byte{1, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 0, 0, 0, 1, 2, 3}, // seedCount=1, one seed, bitmapLen=10, but only 3 bytes
189+
wantErr: true,
190+
},
191+
}
192+
193+
for _, tt := range tests {
194+
t.Run(tt.name, func(t *testing.T) {
195+
bf := BloomFilter{}
196+
err := bf.Unmarshal(tt.data)
197+
if tt.wantErr {
198+
require.Error(t, err)
199+
} else {
200+
require.NoError(t, err)
201+
}
202+
})
203+
}
204+
}
205+
206+
func TestMarshalUnmarshal(t *testing.T) {
207+
mp := mpool.MustNewZero()
208+
vecs := make([]*vector.Vector, vecCount)
209+
for i := 0; i < vecCount; i++ {
210+
vecs[i] = testutil.NewVector(testCount/vecCount, types.New(types.T_int64, 0, 0), mp, false, nil)
211+
}
212+
defer func() {
213+
for i := 0; i < vecCount; i++ {
214+
vecs[i].Free(mp)
215+
}
216+
}()
217+
218+
// Create and populate original filter
219+
bf1 := New(testCount, testRate)
220+
for j := 0; j < vecCount; j++ {
221+
bf1.Add(vecs[j])
222+
}
223+
224+
// Marshal
225+
data, err := bf1.Marshal()
226+
require.NoError(t, err)
227+
require.NotNil(t, data)
228+
229+
// Unmarshal into new filter
230+
bf2 := BloomFilter{}
231+
err = bf2.Unmarshal(data)
232+
require.NoError(t, err)
233+
234+
// Verify both filters behave the same
235+
testVec := testutil.NewVector(testCount/vecCount, types.New(types.T_int64, 0, 0), mp, false, nil)
236+
defer testVec.Free(mp)
237+
238+
// Test original filter
239+
allFound1 := true
240+
bf1.Test(testVec, func(exists bool, _ int) {
241+
allFound1 = allFound1 && exists
242+
})
243+
244+
// Test unmarshaled filter
245+
allFound2 := true
246+
bf2.Test(testVec, func(exists bool, _ int) {
247+
allFound2 = allFound2 && exists
248+
})
249+
250+
require.Equal(t, allFound1, allFound2, "original and unmarshaled filters should behave the same")
251+
252+
// Test with new data that wasn't added
253+
newVec := testutil.NewVector(testCount*2, types.New(types.T_int64, 0, 0), mp, false, nil)
254+
defer newVec.Free(mp)
255+
256+
allFoundNew1 := true
257+
bf1.Test(newVec, func(exists bool, _ int) {
258+
allFoundNew1 = allFoundNew1 && exists
259+
})
260+
261+
allFoundNew2 := true
262+
bf2.Test(newVec, func(exists bool, _ int) {
263+
allFoundNew2 = allFoundNew2 && exists
264+
})
265+
266+
require.Equal(t, allFoundNew1, allFoundNew2, "original and unmarshaled filters should behave the same for new data")
267+
}

pkg/defines/type.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ type VarScopeKey struct{}
241241
// Determine if it is a stored procedure
242242
type InSp struct{}
243243

244+
// IvfBloomFilter carries BloomFilter bytes for ivf entries scan in internal SQL executor.
245+
// This key is set on context when invoking internal SQL from ivf_search.
246+
type IvfBloomFilter struct{}
247+
244248
// PkCheckByTN whether TN does primary key uniqueness check against transaction's workspace or not.
245249
type PkCheckByTN struct{}
246250

pkg/objectio/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2121
"github.com/matrixorigin/matrixone/pkg/container/batch"
2222
"github.com/matrixorigin/matrixone/pkg/container/types"
23-
"github.com/matrixorigin/matrixone/pkg/container/vector"
2423
"github.com/matrixorigin/matrixone/pkg/fileservice"
2524
"github.com/matrixorigin/matrixone/pkg/vectorindex/metric"
25+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2626
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
2727
)
2828

@@ -42,13 +42,14 @@ type ColumnMetaFetcher interface {
4242
MustGetColumn(seqnum uint16) ColumnMeta
4343
}
4444

45-
type ReadFilterSearchFuncType func(*vector.Vector) []int64
45+
type ReadFilterSearchFuncType func(containers.Vectors) []int64
4646

4747
type BlockReadFilter struct {
4848
HasFakePK bool
4949
Valid bool
5050
SortedSearchFunc ReadFilterSearchFuncType
5151
UnSortedSearchFunc ReadFilterSearchFuncType
52+
Cleanup func() // Cleanup function to release resources (e.g., reusableTempVec)
5253
}
5354

5455
func (f BlockReadFilter) DecideSearchFunc(isSortedBlk bool) ReadFilterSearchFuncType {

0 commit comments

Comments
 (0)