Skip to content

Commit

Permalink
Hybrid logstore (#463)
Browse files Browse the repository at this point in the history
* Refactoring headbook test suite

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl headbook export test

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl exportable for ds-backed headbook

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl exportable for in-memory headbook

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Cleanup go modules

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Specialize dump/restore signatures in components

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl export for address books (ds + mem)

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Protect against empty dumps

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Cover address book export with tests

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Move log/thread ID parsing into dedicated functions

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Use named error for empty dump restore

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl export for key books (ds + mem)

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Fix key suffix comparison

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Cover key book export with tests

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Fix resource leak on early return from key iteration

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl export for metadata books (ds + mem)

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Fix value decoding on dumps

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Cover metadata book export with tests

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Control restore behaviour with flag

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl hybrid logstore

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Add logstore suite to datastore tests

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Run standard test suites on a hybrid store

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Impl resource finalizer

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Logstore kind option + refactoring

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Rename: LogstoreKind -> LogstoreType

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
  • Loading branch information
dgtony authored Nov 6, 2020
1 parent de1407d commit c8aace9
Show file tree
Hide file tree
Showing 22 changed files with 1,971 additions and 265 deletions.
185 changes: 103 additions & 82 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ package common

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

ipfslite "github.com/hsanjuan/ipfs-lite"
datastore "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
cconnmgr "github.com/libp2p/go-libp2p-core/connmgr"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
ma "github.com/multiformats/go-multiaddr"
"github.com/textileio/go-threads/core/app"
core "github.com/textileio/go-threads/core/logstore"
"github.com/textileio/go-threads/logstore/lstoreds"
"github.com/textileio/go-threads/logstore/lstorehybrid"
"github.com/textileio/go-threads/logstore/lstoremem"
"github.com/textileio/go-threads/net"
util "github.com/textileio/go-threads/util"
"github.com/textileio/go-threads/util"
"google.golang.org/grpc"
)

Expand All @@ -37,41 +37,41 @@ type NetBoostrapper interface {
}

func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error) {
config := &NetConfig{}
for _, opt := range opts {
if err := opt(config); err != nil {
return nil, err
}
}
var (
config NetConfig
fin = util.NewFinalizer()
)

if config.HostAddr == nil {
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
for _, opt := range opts {
if err := opt(&config); err != nil {
return nil, err
}
config.HostAddr = addr
}

if config.ConnManager == nil {
config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20)
if err := setDefaults(&config); err != nil {
return nil, err
}

ipfsLitePath := filepath.Join(repoPath, defaultIpfsLitePath)
if err := os.MkdirAll(ipfsLitePath, os.ModePerm); err != nil {
return nil, err
}

litestore, err := ipfslite.BadgerDatastore(ipfsLitePath)
if err != nil {
return nil, err
}
fin.Add(litestore)

ctx, cancel := context.WithCancel(context.Background())
fin.Add(util.NewContextCloser(cancel))

pstore, err := pstoreds.NewPeerstore(ctx, litestore, pstoreds.DefaultOpts())
if err != nil {
litestore.Close()
cancel()
return nil, err
return nil, fin.Cleanup(err)
}
fin.Add(pstore)

priv := util.LoadKey(filepath.Join(ipfsLitePath, "key"))
h, d, err := ipfslite.SetupLibp2p(
ctx,
Expand All @@ -84,37 +84,17 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
libp2p.DisableRelay(),
)
if err != nil {
cancel()
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

lite, err := ipfslite.New(ctx, litestore, h, d, nil)
if err != nil {
cancel()
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

// Build a logstore
logstorePath := filepath.Join(repoPath, defaultLogstorePath)
if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil {
cancel()
return nil, err
}
logstore, err := ipfslite.BadgerDatastore(logstorePath)
tstore, err := buildLogstore(ctx, config.LSType, repoPath, fin)
if err != nil {
cancel()
litestore.Close()
return nil, err
}
tstore, err := lstoreds.NewLogstore(ctx, logstore, lstoreds.DefaultOpts())
if err != nil {
cancel()
if err := logstore.Close(); err != nil {
return nil, err
}
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

// Build a network
Expand All @@ -123,33 +103,89 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
PubSub: config.PubSub,
}, config.GRPCServerOptions, config.GRPCDialOptions)
if err != nil {
cancel()
if err := logstore.Close(); err != nil {
return nil, err
}
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}
fin.Add(h, d, api)

return &netBoostrapper{
cancel: cancel,
Net: api,
litepeer: lite,
pstore: pstore,
logstore: logstore,
litestore: litestore,
host: h,
dht: d,
finalizer: fin,
}, nil
}

func buildLogstore(ctx context.Context, lstype LogstoreType, repoPath string, fin *util.Finalizer) (core.Logstore, error) {
switch lstype {
case LogstoreInMemory:
return lstoremem.NewLogstore(), nil

case LogstoreHybrid:
pls, err := persistentLogstore(ctx, repoPath, fin)
if err != nil {
return nil, err
}
mls := lstoremem.NewLogstore()
return lstorehybrid.NewLogstore(pls, mls)

case LogstorePersistent:
return persistentLogstore(ctx, repoPath, fin)

default:
return nil, fmt.Errorf("unsupported logstore type: %s", lstype)
}
}

func persistentLogstore(ctx context.Context, repoPath string, fin *util.Finalizer) (core.Logstore, error) {
logstorePath := filepath.Join(repoPath, defaultLogstorePath)
if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil {
return nil, err
}

dstore, err := ipfslite.BadgerDatastore(logstorePath)
if err != nil {
return nil, err
}
fin.Add(dstore)

return lstoreds.NewLogstore(ctx, dstore, lstoreds.DefaultOpts())
}

func setDefaults(config *NetConfig) error {
if config.HostAddr == nil {
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
return err
}
config.HostAddr = addr
}

if config.ConnManager == nil {
config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20)
}

if len(config.LSType) == 0 {
config.LSType = LogstorePersistent
}

return nil
}

type LogstoreType string

const (
LogstoreInMemory LogstoreType = "in-memory"
LogstorePersistent LogstoreType = "persistent"
LogstoreHybrid LogstoreType = "hybrid"
)

type NetConfig struct {
HostAddr ma.Multiaddr
ConnManager cconnmgr.ConnManager
Debug bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
LSType LogstoreType
PubSub bool
Debug bool
}

type NetOption func(c *NetConfig) error
Expand Down Expand Up @@ -196,15 +232,17 @@ func WithNetPubSub(enabled bool) NetOption {
}
}

func WithNetLogstore(lt LogstoreType) NetOption {
return func(c *NetConfig) error {
c.LSType = lt
return nil
}
}

type netBoostrapper struct {
cancel context.CancelFunc
app.Net
litepeer *ipfslite.Peer
pstore peerstore.Peerstore
logstore datastore.Datastore
litestore datastore.Datastore
host host.Host
dht *dual.DHT
finalizer *util.Finalizer
}

var _ NetBoostrapper = (*netBoostrapper)(nil)
Expand All @@ -218,22 +256,5 @@ func (tsb *netBoostrapper) GetIpfsLite() *ipfslite.Peer {
}

func (tsb *netBoostrapper) Close() error {
if err := tsb.Net.Close(); err != nil {
return err
}
tsb.cancel()
if err := tsb.dht.Close(); err != nil {
return err
}
if err := tsb.host.Close(); err != nil {
return err
}
if err := tsb.pstore.Close(); err != nil {
return err
}
if err := tsb.litestore.Close(); err != nil {
return err
}
return tsb.logstore.Close()
// Logstore closed by network
return tsb.finalizer.Cleanup(nil)
}
66 changes: 66 additions & 0 deletions core/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstore

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,9 @@ var ErrLogNotFound = fmt.Errorf("log not found")
// ErrLogExists indicates a requested log already exists.
var ErrLogExists = fmt.Errorf("log already exists")

// ErrEmptyDump indicates an attempt to restore from empty dump.
var ErrEmptyDump = errors.New("empty dump")

// Logstore stores log keys, addresses, heads and thread meta data.
type Logstore interface {
Close() error
Expand Down Expand Up @@ -87,6 +91,12 @@ type ThreadMetadata interface {

// ClearMetadata clears all metadata under a thread.
ClearMetadata(t thread.ID) error

// DumpMeta packs all the stored metadata.
DumpMeta() (DumpMetadata, error)

// RestoreMeta restores metadata from the dump.
RestoreMeta(book DumpMetadata) error
}

// KeyBook stores log keys.
Expand Down Expand Up @@ -126,6 +136,12 @@ type KeyBook interface {

// ThreadsFromKeys returns a list of threads referenced in the book.
ThreadsFromKeys() (thread.IDSlice, error)

// DumpKeys packs all stored keys.
DumpKeys() (DumpKeyBook, error)

// RestoreKeys restores keys from the dump.
RestoreKeys(book DumpKeyBook) error
}

// AddrBook stores log addresses.
Expand Down Expand Up @@ -159,6 +175,12 @@ type AddrBook interface {

// ThreadsFromAddrs returns a list of threads referenced in the book.
ThreadsFromAddrs() (thread.IDSlice, error)

// DumpHeads packs all stored addresses.
DumpAddrs() (DumpAddrBook, error)

// RestoreHeads restores addresses from the dump.
RestoreAddrs(book DumpAddrBook) error
}

// HeadBook stores log heads.
Expand All @@ -180,4 +202,48 @@ type HeadBook interface {

// ClearHeads deletes the head entry for a log.
ClearHeads(thread.ID, peer.ID) error

// DumpHeads packs entire headbook into the tree.
DumpHeads() (DumpHeadBook, error)

// RestoreHeads restores headbook from the dump.
RestoreHeads(DumpHeadBook) error
}

type (
DumpHeadBook struct {
Data map[thread.ID]map[peer.ID][]cid.Cid
}

ExpiredAddress struct {
Addr ma.Multiaddr
Expires time.Time
}

DumpAddrBook struct {
Data map[thread.ID]map[peer.ID][]ExpiredAddress
}

DumpKeyBook struct {
Data struct {
Public map[thread.ID]map[peer.ID]crypto.PubKey
Private map[thread.ID]map[peer.ID]crypto.PrivKey
Read map[thread.ID][]byte
Service map[thread.ID][]byte
}
}

MetadataKey struct {
T thread.ID
K string
}

DumpMetadata struct {
Data struct {
Int64 map[MetadataKey]int64
Bool map[MetadataKey]bool
String map[MetadataKey]string
Bytes map[MetadataKey][]byte
}
}
)
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ require (
github.com/libp2p/go-libp2p v0.10.3
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-gostream v0.2.0
github.com/libp2p/go-libp2p-kad-dht v0.8.3
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.2.4
github.com/libp2p/go-libp2p-swarm v0.2.8
Expand Down
Loading

0 comments on commit c8aace9

Please sign in to comment.