A Kappa-style peer-to-peer content database, on top of hyperdrives.
Note: I'm not using this anymore. Try out kappa-record-db instead.
const hypercontent = require('hyper-content-db')
const db = hypercontent('./data/db1')
// Let's put a basic schema first.
db.putSchema('event', {
properties: {
title: {
type: 'string',
index: true
},
date: {
type: 'date',
index: true
}
}
})
// Now add some records.
db.batch([
{ schema: 'event', value: { title: 'Workshop', date: new Date(2019, 10, 10) } },
{ schema: 'event', value: { title: 'Reading', date: new Date(2019, 8, 2) } }
])
// When all indexing is done, query and log results.
db.on('indexed-all', query)
db.ready(() => {
// Create a second database. Set the first database as primary key.
// This will make db2 a "fork" or "extension" of the first.
const db2 = hypercontent('./data/db2', db.key)
db2.ready(() => {
// Add the second database as a source for the first.
db.addSource(db2.localKey)
// Connect the two databases.
replicate(db, db2)
// Add content to the second database.
db2.batch([
{ schema: 'event', value: { title: 'Dinner', date: new Date(2019, 9, 22) } }
])
})
})
function query () {
const eventsSortedByDate = db.api.indexes.query({ schema: 'event', prop: 'date' }).pipe(db.createGetStream())
eventsSortedByDate.on('data', row => console.log(row.value.date, row.value.title))
}
function replicate (a, b) {
const stream = a.replicate()
const stream2 = b.replicate()
stream.pipe(stream2).pipe(stream)
}
const hypercontent = require('hyper-content-db')
storage
is either a string to a file system path or a random-access-storage instance.
key
is a Buffer
containing a key to the primary drive. If omitted, it will be loaded from storage. If no key exists a new keypair will be generated.
cb
is called after the database is fully initialized.
Create a hypercore-protocol replication stream. See hyperdrive for details.
Add an additional source hyperdrive. key
is the key of a hypercontent hyperdrive. Barcobase treats all sources equally.
TODO: Document how records from different sources relate to each other.
Put a record into the database.
record
is a plain js object:
{
id: 'string',
schema: 'string'
value: someObject,
}
-
schema
is required. All records have a schema name. Schemas are identified by strings. Schemas can either be local or well-defined. Local schemas are identifiers that should be unique in the context of the database, their names may not contain slashes (/
). Well-defined schemas are identified by a domain, followed by an identifier (e.g.arso.xyz/event
). They have to include exactly one slash. By this convention, schema names are compatible with the unwalled.garden spec. Usually, you will want to put the schema's declaration into the database (see below), but this is not required. -
id
identifies a record uniquely within the database. When creating new recors, leaveid
undefined. When updating existing records,id
is required. -
value
is the record's value. It has to be a JavaScript object that is serializable to JSON. If the record's schema has its definition stored in the database, the value has to conform to the schema.TODO: Validating records to their schemas is not yet implemented.
-
cb
is a callback that will be called with(err, id)
.
The records will be saved as files within the database according to a fixed naming schema: /.data/schema-domain/schema-name/id.json
. For local schemas, the schema-domain
is the key of the database.
Get a record from the database. req
should look like this:
{
id: 'string' // required,
schema: 'string' // required,
source: 'string' // optional,
seq: int // optional
}
id
and schema
are required. If source
is set to a source key (hex string), it will only lookup in that source. If source is omitted, all sources will be checked.
cb
is a callback and will be called with (err, record)
if source is set and with (err, records)
if source is omitted.
If opts.reduce
is true, conflicting records will be reduced by modification timestamp, and the callback will be called with (err, record)
even if source
is not set. Set opts.reduce
to a reduce function to change the reduce logic. The reduce function will be called with (recordA, recordB)
and should return the preferred record.
Save a schema into the database. The schema declaration follows the JSON Schema, with some additional properties.
const schema = {
properties: {
title: {
type: 'string',
index: true
},
date: {
type: 'string',
index: true
}
}
}
db.putSchema('event', schema, (err) => {
if (!err) console.log('schema saved.')
})
Supported properties in addition to the JSON Schema spec are:
index
: Set on a top-level simple field to index values of that field in the database.
The top-level JSON schema declaration can be omitted and will be filled in automatically.
TODO: Also support putting full JSON schemas (including the outer section)
Load a schema declaration from the database.
Execute multiple operations. ops
looks like this:
const ops = [
{
op: 'put' | 'del',
schema,
id,
value
}
]
If op
is omitted, it is set to put
.
Returns a duplex stream. The writable side expects to be written to with op
objects as in db.batch()
. The readable side emits arrays of ids of the putted records and errors in case of errors.
Returns a transform stream that transforms get requests into records.
const getStream = db.createGetStream()
getStream.push({ id, schema })
getStream.on('data', (record) => {
console.log(record.value)
})
Get a list of all IDs for a schema.
TODO: This should be a stream instead that can be piped into
createGetStream()
.
Register a new database view. Views are functions that will be called whenever records are being put or deleted. The database maintains the state of each view so that they catch up on updates automatically. See kappa-core for good introduction on how to work with kappa views.
name
is the name of the view. It has to be unique per database.
makeView
is a constructor function. It will be called with (level, db, opts)
:
level
: an LevelUp-compatible LevelDB instance for this viewdb
: the hypercontent dbopts
: optional opts passed intouseRecordView
The constructor function should return a view object with the following keys:
map: function (records, next) {}
This function will be called with a batch of records. Process the entries (e.g. by inserting rows into the leveldb). Callnext()
when done.api
: An object of query functions that this view exposes to the outside world. They should be safe to call (may not modify data) as they may be called from the client side.- TODO: Document more props.
- TODO: Add support for
filter()
andreduce()
const through = require('through2')
const hypercontent = require('hyper-content-db')
const db = hypercontent('/tmp/testdb')
function dateView (lvl, db) {
return {
map (records, next) {
let ops = []
for (let record of records) {
if (!record.value.date) return
ops.push({
type: 'put',
key: `${record.value.date}!${record.id}!${record.schema}!${record.source}`
value: record.seq
})
}
lvl.batch(ops, next)
},
api: {
range (from, to) {
from = from.toJSON()
to = to.toJSON()
return db.createReadStream({
gte: from,
lte: to
}).pipe(through(function (row, enc, next) {
const { key: [date, id, schema, source] } = row
this.push({ id, schema, source })
}))
}
}
}
}
db.useRecordView('dates', dateView)
const records = [
{ title: 'Party', date: new Date(2019, 11, 2) },
{ title: 'Demonstration', date: new Date(2020, 1, 10) },
{ title: 'Reading circle', date: new Date(2019, 8, 7) },
{ title: 'Workshop', date: new Date(2019, 12, 5) }
]
const ops = records.map(value => ({ op: 'put', schema: 'event', value }))
db.batch(ops, (err, ids) => {
if (err) return console.error(err)
const queryStream = db.api.date.range(new Date(2019, 9), new Date(2019, 12, 31))
const resultStream = queryStream.pipe(db.createGetStream())
resultStream.on('data', record => console.log(record))
})
This is where query functions from views are exposed.
Emitted whenever a view finished an indexing batch. cb
is called with (viewName, sourceKey, batch)
where batch is an array of the processed records.
Emitted whenever all views are finished with processing.
Emitted when a new indexing round is started after indexed-all
has been emitted.