Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

[WIP] (#1851) Send offline messages through web relay if configured #1824

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ func (x *Start) Execute(args []string) error {
log.Error("scan data sharing config:", err)
return err
}
webRelays, err := schema.GetWebRelays(configFile)
if err != nil {
log.Error("scan web relays config:", err)
return err
}
dropboxToken, err := schema.GetDropboxApiToken(configFile)
if err != nil {
log.Error("scan dropbox api token:", err)
Expand Down Expand Up @@ -583,6 +588,8 @@ func (x *Start) Execute(args []string) error {
rootHash = string(cachedIPNSRecord.Value)
}

wm := obnet.NewWebRelayManager(webRelays, identity.PeerID)

// OpenBazaar node setup
core.Node = &core.OpenBazaarNode{
AcceptStoreRequests: dataSharing.AcceptStoreRequests,
Expand All @@ -595,6 +602,7 @@ func (x *Start) Execute(args []string) error {
OfflineMessageFailoverTimeout: 30 * time.Second,
Pubsub: ps,
PushNodes: pushNodes,
WebRelayManager: wm,
RegressionTestEnable: x.Regtest,
RepoPath: repoPath,
RootHash: rootHash,
Expand Down Expand Up @@ -676,6 +684,7 @@ func (x *Start) Execute(args []string) error {
}()
}
}

core.Node.Service = service.New(core.Node, sqliteDB)
core.Node.Service.WaitForReady()
log.Info("OpenBazaar Service Ready")
Expand All @@ -684,6 +693,8 @@ func (x *Start) Execute(args []string) error {
core.Node.StartPointerRepublisher()
core.Node.StartRecordAgingNotifier()

core.Node.WebRelayManager.ConnectToRelays(core.Node.Service)

core.Node.PublishLock.Unlock()
err = core.Node.UpdateFollow()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ type OpenBazaarNode struct {
// Manage blocked peers
BanManager *net.BanManager

// Web Relay nodes
WebRelayManager *net.WebRelayManager

// Allow other nodes to push data to this node for storage
AcceptStoreRequests bool

Expand Down
81 changes: 73 additions & 8 deletions core/net.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package core

import (
"encoding/base64"
"errors"
"fmt"
"gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore"
"gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
"sync"
"time"

"github.com/OpenBazaar/openbazaar-go/net"

libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
"gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
"gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash"

"github.com/OpenBazaar/openbazaar-go/ipfs"

"github.com/OpenBazaar/openbazaar-go/pb"
"github.com/OpenBazaar/openbazaar-go/repo"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -39,6 +45,7 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb
return err
}
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
n.SendRelayedMessage(p, k, &message) // send relayed message immediately
defer cancel()
err = n.Service.SendMessage(ctx, p, &message)
if err != nil {
Expand All @@ -51,30 +58,82 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb
return nil
}

// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup
func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error {
pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes()
// SendRelayedMessage - send message through web relay manager to recipient
func (n *OpenBazaarNode) SendRelayedMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error {
messageBytes, err := n.getMessageBytes(m)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
defer cancel()
if k == nil {
var pubKey libp2p.PubKey
keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey("/pubkey/" + p.Pretty()))
if err != nil {
pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, p)
if err != nil {
log.Errorf("Failed to find public key for %s", p.Pretty())
return err
}
} else {
pubKey, err = libp2p.UnmarshalPublicKey(keyval)
if err != nil {
log.Errorf("Failed to find public key for %s", p.Pretty())
return err
}
}
k = &pubKey
}

relayciphertext, err := net.Encrypt(*k, messageBytes)
if err != nil {
return fmt.Errorf("Error: %s", err.Error())
}

// Base64 encode
encodedCipherText := base64.StdEncoding.EncodeToString(relayciphertext)

n.WebRelayManager.SendRelayMessage(encodedCipherText, p.Pretty())

return nil
}

func (n *OpenBazaarNode) getMessageBytes(m *pb.Message) ([]byte, error) {
pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes()
if err != nil {
return nil, err
}
ser, err := proto.Marshal(m)
if err != nil {
return err
return nil, err
}
sig, err := n.IpfsNode.PrivateKey.Sign(ser)
if err != nil {
return err
return nil, err
}

env := pb.Envelope{Message: m, Pubkey: pubKeyBytes, Signature: sig}
messageBytes, merr := proto.Marshal(&env)
if merr != nil {
return merr
return nil, merr
}
return messageBytes, nil
}

// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup
func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error {
messageBytes, err := n.getMessageBytes(m)
if err != nil {
return err
}

// TODO: this function blocks if the recipient's public key is not on the local machine
ciphertext, cerr := n.EncryptMessage(p, k, messageBytes)
if cerr != nil {
return cerr
}

addr, aerr := n.MessageStorage.Store(p, ciphertext)
if aerr != nil {
return aerr
Expand All @@ -84,7 +143,7 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
return mherr
}
/* TODO: We are just using a default prefix length for now. Eventually we will want to customize this,
but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */
but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */
pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext)
if err != nil {
return err
Expand Down Expand Up @@ -280,6 +339,8 @@ func (n *OpenBazaarNode) ResendCachedOrderMessage(orderID string, msgType pb.Mes
return fmt.Errorf("unable to decode invalid peer ID for order (%s) and message type (%s)", orderID, msgType.String())
}

n.SendRelayedMessage(p, nil, &msg.Msg) // send relayed message immediately

ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
defer cancel()

Expand Down Expand Up @@ -575,6 +636,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
n.SendRelayedMessage(p, nil, &m) // send relayed message immediately
defer cancel()
err = n.Service.SendMessage(ctx, p, &m)
if err != nil && chatMessage.Flag != pb.Chat_TYPING {
Expand Down Expand Up @@ -795,6 +857,9 @@ func (n *OpenBazaarNode) SendOrderPayment(peerID string, paymentMessage *pb.Orde
if err != nil {
return err
}

n.SendRelayedMessage(p, nil, &m) // send relayed message immediately

ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
err = n.Service.SendMessage(ctx, p, &m)
cancel()
Expand Down
13 changes: 12 additions & 1 deletion mobile/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
ipfslogging "gx/ipfs/QmbkT7eMTyXfpeyB3ZMxxcxg7XH8t6uXp49jqzz4HB7BGF/go-log/writer"
"gx/ipfs/Qmc85NSvmSG4Frn9Vb2cBc1rMyULH6D3TNVEfCzSKoUpip/go-multiaddr-net"

_ "net/http/pprof"

"github.com/OpenBazaar/openbazaar-go/api"
"github.com/OpenBazaar/openbazaar-go/core"
"github.com/OpenBazaar/openbazaar-go/ipfs"
Expand All @@ -51,7 +53,6 @@ import (
"github.com/natefinch/lumberjack"
"github.com/op/go-logging"
"github.com/tyler-smith/go-bip39"
_ "net/http/pprof"
)

var log = logging.MustGetLogger("mobile")
Expand Down Expand Up @@ -151,6 +152,11 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N
return nil, err
}

webRelays, err := apiSchema.GetWebRelays(configFile)
if err != nil {
return nil, err
}

walletsConfig, err := apiSchema.GetWalletsConfig(configFile)
if err != nil {
return nil, err
Expand Down Expand Up @@ -275,6 +281,8 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N
pushNodes = append(pushNodes, p)
}

wm := obnet.NewWebRelayManager(webRelays, identity.PeerID)

// OpenBazaar node setup
node := &core.OpenBazaarNode{
BanManager: bm,
Expand All @@ -283,6 +291,7 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N
Multiwallet: mw,
OfflineMessageFailoverTimeout: 5 * time.Second,
PushNodes: pushNodes,
WebRelayManager: wm,
RepoPath: config.RepoPath,
UserAgent: core.USERAGENT,
IPNSQuorumSize: uint(ipnsExtraConfig.DHTQuorumSize),
Expand Down Expand Up @@ -474,6 +483,8 @@ func (n *Node) start() error {
n.OpenBazaarNode.PointerRepublisher = PR
MR.Wait()

n.OpenBazaarNode.WebRelayManager.ConnectToRelays(n.OpenBazaarNode.Service)

n.OpenBazaarNode.PublishLock.Unlock()
publishUnlocked = true
n.OpenBazaarNode.UpdateFollow()
Expand Down
Loading