-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
134 lines (107 loc) · 3.32 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package doltswarm
import (
"sync"
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
)
// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map.
type mapChunkCache struct {
mu *sync.Mutex
hashToChunk map[hash.Hash]nbs.CompressedChunk
toFlush map[hash.Hash]nbs.CompressedChunk
cm remotestorage.CapacityMonitor
}
func newMapChunkCache() *mapChunkCache {
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]nbs.CompressedChunk),
make(map[hash.Hash]nbs.CompressedChunk),
remotestorage.NewUncappedCapacityMonitor(),
}
}
// Put puts a slice of chunks into the cache.
func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()
for i := 0; i < len(chnks); i++ {
c := chnks[i]
h := c.Hash()
if curr, ok := mcc.hashToChunk[h]; ok {
if !curr.IsEmpty() {
continue
}
}
if mcc.cm.CapacityExceeded(len(c.FullCompressedChunk)) {
return true
}
mcc.hashToChunk[h] = c
if !c.IsEmpty() {
mcc.toFlush[h] = c
}
}
return false
}
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk {
hashToChunk := make(map[hash.Hash]nbs.CompressedChunk)
mcc.mu.Lock()
defer mcc.mu.Unlock()
for h := range hashes {
if c, ok := mcc.hashToChunk[h]; ok {
hashToChunk[h] = c
} else {
hashToChunk[h] = nbs.EmptyCompressedChunk
}
}
return hashToChunk
}
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
absent = make(hash.HashSet)
mcc.mu.Lock()
defer mcc.mu.Unlock()
for h := range hashes {
if _, ok := mcc.hashToChunk[h]; !ok {
absent[h] = struct{}{}
}
}
return absent
}
func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()
h := ch.Hash()
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
if mcc.cm.CapacityExceeded(len(ch.FullCompressedChunk)) {
return true
}
mcc.hashToChunk[h] = ch
mcc.toFlush[h] = ch
}
return false
}
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk {
newToFlush := make(map[hash.Hash]nbs.CompressedChunk)
mcc.mu.Lock()
defer mcc.mu.Unlock()
toFlush := mcc.toFlush
mcc.toFlush = newToFlush
return toFlush
}