-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
145 lines (121 loc) · 3.49 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
'use strict'
const { ReadableStream, CountQueuingStrategy, WritableStream } = require('./streams')
const kCanceled = Symbol('canceled')
const kIterator = Symbol('iterator')
const kOperations = Symbol('operations')
const kBatchSize = Symbol('batchSize')
const kBatchType = Symbol('batchType')
const kBatchOptions = Symbol('batchOptions')
const kDb = Symbol('db')
class LevelSource {
constructor (iterator) {
this[kIterator] = iterator
this[kCanceled] = false
}
/**
* @param {ReadableStreamDefaultController} controller
*/
async pull (controller) {
let items
try {
items = await this[kIterator].nextv(controller.desiredSize)
} catch (err) {
await this[kIterator].close()
throw err
}
// Nothing to do if cancel() was called
if (this[kCanceled]) {
return
}
if (items.length === 0) {
// Reached the natural end of the iterator
await this[kIterator].close()
controller.close()
} else {
for (const item of items) {
controller.enqueue(item)
}
}
}
cancel () {
this[kCanceled] = true
return this[kIterator].close()
}
}
class LevelReadableStream extends ReadableStream {
constructor (db, method, options) {
const { highWaterMark, ...rest } = options || {}
const iterator = db[method](rest)
const source = new LevelSource(iterator)
const queueingStrategy = new CountQueuingStrategy({
highWaterMark: highWaterMark || 1000
})
super(source, queueingStrategy)
// Keep db around to prevent GC
this[kDb] = db
}
}
class EntryStream extends LevelReadableStream {
constructor (db, options) {
super(db, 'iterator', { ...options, keys: true, values: true })
}
}
class KeyStream extends LevelReadableStream {
constructor (db, options) {
super(db, 'keys', options)
}
}
class ValueStream extends LevelReadableStream {
constructor (db, options) {
super(db, 'values', options)
}
}
class LevelSink {
constructor (db, batchSize, batchType, batchOptions) {
this[kDb] = db
this[kOperations] = []
this[kBatchSize] = batchSize
this[kBatchType] = batchType
this[kBatchOptions] = batchOptions
}
write (operation) {
if (Array.isArray(operation)) {
operation = {
key: operation[0],
value: operation[1],
type: this[kBatchType]
}
} else if (operation.type == null) {
operation = Object.assign({}, operation, {
type: this[kBatchType]
})
}
const length = this[kOperations].push(operation)
// Flush if we have a full batch
if (length >= this[kBatchSize]) {
const operations = this[kOperations].splice(0, length)
return this[kDb].batch(operations, this[kBatchOptions])
}
}
close () {
// Flush remainder if any, returning a promise
if (this[kOperations].length > 0) {
return this[kDb].batch(this[kOperations], this[kBatchOptions])
}
}
}
class BatchStream extends WritableStream {
constructor (db, options) {
let { highWaterMark, type, ...batchOptions } = options || {}
// Note there are two buffers. Unfortunately Web Streams have no _writev() equivalent
highWaterMark = highWaterMark || 500
type = type || 'put'
const sink = new LevelSink(db, highWaterMark, type, batchOptions)
const queueingStrategy = new CountQueuingStrategy({ highWaterMark })
super(sink, queueingStrategy)
}
}
exports.EntryStream = EntryStream
exports.KeyStream = KeyStream
exports.ValueStream = ValueStream
exports.BatchStream = BatchStream