Skip to content

Commit

Permalink
Flag locally created logs as direct (#374)
Browse files Browse the repository at this point in the history
feat: own vs managed log concepts and tooling

Directly "managed" logs are those logs created or directly added by the local peer. Otherwise, unmanaged logs are those from other peers in a given thread. Related, "owned" logs are those logs for which the peer has the private key. There can only be one "owned" log, versus potentially many "managed" logs.
  • Loading branch information
carsonfarmer authored Jun 12, 2020
1 parent 4638cb8 commit a8477ff
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 56 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ api/pb/dart/lib

# vscode config folder
.vscode/
.idea/

**/node_modules
tags

# Misc
**.DS_Store
14 changes: 13 additions & 1 deletion core/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var ErrThreadNotFound = fmt.Errorf("thread not found")
// ErrLogNotFound indicates a requested log was not found.
var ErrLogNotFound = fmt.Errorf("log not found")

// ErrLogExists indicates a requested log already exists.
var ErrLogExists = fmt.Errorf("log already exists")

// Logstore stores log keys, addresses, heads and thread meta data.
type Logstore interface {
Close() error
Expand Down Expand Up @@ -49,6 +52,9 @@ type Logstore interface {
// GetLog returns info about a log.
GetLog(thread.ID, peer.ID) (thread.LogInfo, error)

// GetManagedLogs returns info about locally managed logs.
GetManagedLogs(thread.ID) ([]thread.LogInfo, error)

// DeleteLog deletes a log.
DeleteLog(thread.ID, peer.ID) error
}
Expand All @@ -61,12 +67,18 @@ type ThreadMetadata interface {
// PutInt64 stores an int value under key.
PutInt64(t thread.ID, key string, val int64) error

// GetString retrieves an int value under key.
// GetString retrieves a string value under key.
GetString(t thread.ID, key string) (*string, error)

// PutString stores a string value under key.
PutString(t thread.ID, key string, val string) error

// GetBool retrieves a boolean value under key.
GetBool(t thread.ID, key string) (*bool, error)

// PutBool stores a boolean value under key.
PutBool(t thread.ID, key string, val bool) error

// GetBytes retrieves a byte value under key.
GetBytes(t thread.ID, key string) (*[]byte, error)

Expand Down
21 changes: 15 additions & 6 deletions core/thread/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func Decode(v string) (ID, error) {
return Cast(data)
}

// Extract the encoding from an ID. If Decode on the same string did
// ExtractEncoding from an ID. If Decode on the same string did
// not return an error neither will this function.
func ExtractEncoding(v string) (mbase.Encoding, error) {
if len(v) < 2 {
Expand Down Expand Up @@ -171,6 +171,7 @@ func uvError(read int) error {
}
}

// Validate the ID.
func (i ID) Validate() error {
data := i.Bytes()
return validateIDData(data)
Expand Down Expand Up @@ -269,7 +270,7 @@ func (i ID) String() string {
}
}

// String returns the string representation of an ID
// StringOfBase returns the string representation of an ID
// encoded is selected base.
func (i ID) StringOfBase(base mbase.Encoding) (string, error) {
if err := i.Validate(); err != nil {
Expand Down Expand Up @@ -353,6 +354,7 @@ type Info struct {
}

// GetOwnLog returns the first log found with a private key.
// This is a strict owership check, vs returning all directly 'managed' logs.
func (i Info) GetOwnLog() *LogInfo {
for _, lg := range i.Logs {
if lg.PrivKey != nil {
Expand All @@ -364,9 +366,16 @@ func (i Info) GetOwnLog() *LogInfo {

// LogInfo holds log keys, addresses, and heads.
type LogInfo struct {
ID peer.ID
PubKey crypto.PubKey
// ID is the log's identifier.
ID peer.ID
// PubKey is the log's public key.
PubKey crypto.PubKey
// PrivKey is the log's private key.
PrivKey crypto.PrivKey
Addrs []ma.Multiaddr
Head cid.Cid
// Addrs are the addresses associated with the given log.
Addrs []ma.Multiaddr
// Head is the log's current head.
Head cid.Cid
// Managed logs are any logs directly added/created by the host, and/or logs for which we have the private key
Managed bool
}
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewDB(ctx context.Context, network app.Net, id thread.ID, opts ...NewOption
opt(args)
}

if _, err := network.CreateThread(ctx, id, net.WithNewThreadToken(args.Token)); err != nil {
if _, err := network.CreateThread(ctx, id, net.WithNewThreadToken(args.Token), net.WithThreadKey(args.ThreadKey)); err != nil {
if !errors.Is(err, lstore.ErrThreadExists) {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func TestWithNewName(t *testing.T) {
t.Fatalf("expected name %s, got %s", name, d.name)
}

_, key, err := d.GetDBInfo()
checkErr(t, err)

// Re-do again to re-use key. If something wasn't closed correctly, would fail
checkErr(t, n.Close())
checkErr(t, d.Close())
Expand All @@ -117,7 +120,7 @@ func TestWithNewName(t *testing.T) {
checkErr(t, err)
defer n.Close()
defer d.Close()
d, err = NewDB(context.Background(), n, id, WithNewRepoPath(tmpDir))
d, err = NewDB(context.Background(), n, id, WithNewRepoPath(tmpDir), WithNewThreadKey(key))
checkErr(t, err)
if d.name != name {
t.Fatalf("expected name %s, got %s", name, d.name)
Expand Down Expand Up @@ -150,6 +153,9 @@ func TestWithNewEventCodec(t *testing.T) {
t.Fatalf("custom event codec wasn't called")
}

_, key, err := d.GetDBInfo()
checkErr(t, err)

// Re-do again to re-use key. If something wasn't closed correctly, would fail
checkErr(t, n.Close())
checkErr(t, d.Close())
Expand All @@ -158,7 +164,7 @@ func TestWithNewEventCodec(t *testing.T) {
n, err = common.DefaultNetwork(tmpDir, common.WithNetDebug(true), common.WithNetHostAddr(util.FreeLocalAddr()))
checkErr(t, err)
defer n.Close()
d, err = NewDB(context.Background(), n, id, WithNewRepoPath(tmpDir), WithNewEventCodec(ec))
d, err = NewDB(context.Background(), n, id, WithNewRepoPath(tmpDir), WithNewEventCodec(ec), WithNewThreadKey(key))
checkErr(t, err)
checkErr(t, d.Close())
}
Expand Down
8 changes: 8 additions & 0 deletions db/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type NewOptions struct {
EventCodec core.EventCodec
LowMem bool
Debug bool
ThreadKey thread.Key
}

// NewOption specifies a new db option.
Expand All @@ -68,6 +69,13 @@ func WithNewToken(t thread.Token) NewOption {
}
}

// WithNewThreadKey provides control over thread keys to use with a db.
func WithNewThreadKey(key thread.Key) NewOption {
return func(o *NewOptions) {
o.ThreadKey = key
}
}

// WithNewCollections is used to specify collections that
// will be created.
func WithNewCollections(cs ...CollectionConfig) NewOption {
Expand Down
75 changes: 68 additions & 7 deletions logstore/logstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logstore

import (
"bytes"
"fmt"
"io"
"sync"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/textileio/go-threads/core/thread"
)

var managedSuffix = "/managed"

// logstore is a collection of books for storing thread logs.
type logstore struct {
sync.RWMutex
Expand Down Expand Up @@ -89,13 +92,35 @@ func (ls *logstore) AddThread(info thread.Info) error {
if info.Key.Service() == nil {
return fmt.Errorf("a service-key is required to add a thread")
}
if err := ls.AddServiceKey(info.ID, info.Key.Service()); err != nil {
sk, err := ls.ServiceKey(info.ID)
if err != nil {
return err
}
if sk == nil {
if err := ls.AddServiceKey(info.ID, info.Key.Service()); err != nil {
return err
}
} else {
// Ensure keys are the same
if !bytes.Equal(info.Key.Service().Bytes(), sk.Bytes()) {
return fmt.Errorf("service-key mismatch")
}
}
if info.Key.CanRead() {
if err := ls.AddReadKey(info.ID, info.Key.Read()); err != nil {
rk, err := ls.ReadKey(info.ID)
if err != nil {
return err
}
if rk == nil {
if err := ls.AddReadKey(info.ID, info.Key.Read()); err != nil {
return err
}
} else {
// Ensure keys are the same
if !bytes.Equal(info.Key.Read().Bytes(), rk.Bytes()) {
return fmt.Errorf("read-key mismatch")
}
}
}
return nil
}
Expand Down Expand Up @@ -186,15 +211,18 @@ func (ls *logstore) AddLog(id thread.ID, lg thread.LogInfo) error {
ls.Lock()
defer ls.Unlock()

err := ls.AddPubKey(id, lg.ID, lg.PubKey)
if err != nil {
return err
}
if lg.PrivKey != nil {
if err = ls.AddPrivKey(id, lg.ID, lg.PrivKey); err != nil {
if pk, _ := ls.PrivKey(id, lg.ID); pk != nil {
return core.ErrLogExists
}
if err := ls.AddPrivKey(id, lg.ID, lg.PrivKey); err != nil {
return err
}
}
err := ls.AddPubKey(id, lg.ID, lg.PubKey)
if err != nil {
return err
}
if err = ls.AddAddrs(id, lg.ID, lg.Addrs, pstore.PermanentAddrTTL); err != nil {
return err
}
Expand All @@ -203,6 +231,12 @@ func (ls *logstore) AddLog(id thread.ID, lg thread.LogInfo) error {
return err
}
}
// By definition 'owned' logs are also 'managed' logs.
if lg.Managed || lg.PrivKey != nil {
if err = ls.PutBool(id, lg.ID.Pretty()+managedSuffix, true); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -234,6 +268,13 @@ func (ls *logstore) getLog(id thread.ID, lid peer.ID) (info thread.LogInfo, err
if err != nil {
return
}
managed, err := ls.GetBool(id, lid.Pretty()+managedSuffix)
if err != nil {
return
}
if managed != nil {
info.Managed = *managed
}
info.ID = lid
info.PubKey = pk
info.PrivKey = sk
Expand All @@ -244,6 +285,26 @@ func (ls *logstore) getLog(id thread.ID, lid peer.ID) (info thread.LogInfo, err
return
}

// GetManagedLogs returns the logs the host is 'managing' under the given thread.
func (ls *logstore) GetManagedLogs(id thread.ID) ([]thread.LogInfo, error) {
logs, err := ls.LogsWithKeys(id)
if err != nil {
return nil, err
}
var managed []thread.LogInfo
for _, lid := range logs {
lg, err := ls.GetLog(id, lid)
if err != nil {
return nil, err
}
if lg.Managed || lg.PrivKey != nil {
managed = append(managed, lg)
continue
}
}
return managed, nil
}

// DeleteLog deletes a log.
func (ls *logstore) DeleteLog(id thread.ID, lid peer.ID) (err error) {
ls.Lock()
Expand Down
16 changes: 16 additions & 0 deletions logstore/lstoreds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ func (m *dsThreadMetadata) PutString(t thread.ID, key string, val string) error
return m.setValue(t, key, val)
}

func (m *dsThreadMetadata) GetBool(t thread.ID, key string) (*bool, error) {
var val bool
err := m.getValue(t, key, &val)
if err == ds.ErrNotFound {
return nil, nil
}
if err != nil {
return nil, err
}
return &val, nil
}

func (m *dsThreadMetadata) PutBool(t thread.ID, key string, val bool) error {
return m.setValue(t, key, val)
}

func (m *dsThreadMetadata) GetBytes(t thread.ID, key string) (*[]byte, error) {
var val []byte
err := m.getValue(t, key, &val)
Expand Down
13 changes: 13 additions & 0 deletions logstore/lstoremem/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ func (m *memoryThreadMetadata) GetString(t thread.ID, key string) (*string, erro
return &val, nil
}

func (m *memoryThreadMetadata) PutBool(t thread.ID, key string, val bool) error {
m.putValue(t, key, val)
return nil
}

func (m *memoryThreadMetadata) GetBool(t thread.ID, key string) (*bool, error) {
val, ok := m.getValue(t, key).(bool)
if !ok {
return nil, nil
}
return &val, nil
}

func (m *memoryThreadMetadata) PutBytes(t thread.ID, key string, val []byte) error {
b := make([]byte, len(val))
copy(b, val)
Expand Down
Loading

0 comments on commit a8477ff

Please sign in to comment.