diff --git a/cmd/start.go b/cmd/start.go index 320a3d267e..e4fdf3758c 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -237,6 +237,11 @@ func (x *Start) Execute(args []string) error { log.Error("scan data sharing config:", err) return err } + webRelays, err := schema.GetWebRelays(configFile) + if err != nil { + log.Error("scan web relays config:", err) + return err + } dropboxToken, err := schema.GetDropboxApiToken(configFile) if err != nil { log.Error("scan dropbox api token:", err) @@ -583,6 +588,8 @@ func (x *Start) Execute(args []string) error { rootHash = string(cachedIPNSRecord.Value) } + wm := obnet.NewWebRelayManager(webRelays, identity.PeerID) + // OpenBazaar node setup core.Node = &core.OpenBazaarNode{ AcceptStoreRequests: dataSharing.AcceptStoreRequests, @@ -595,6 +602,7 @@ func (x *Start) Execute(args []string) error { OfflineMessageFailoverTimeout: 30 * time.Second, Pubsub: ps, PushNodes: pushNodes, + WebRelayManager: wm, RegressionTestEnable: x.Regtest, RepoPath: repoPath, RootHash: rootHash, @@ -676,6 +684,7 @@ func (x *Start) Execute(args []string) error { }() } } + core.Node.Service = service.New(core.Node, sqliteDB) core.Node.Service.WaitForReady() log.Info("OpenBazaar Service Ready") @@ -684,6 +693,8 @@ func (x *Start) Execute(args []string) error { core.Node.StartPointerRepublisher() core.Node.StartRecordAgingNotifier() + core.Node.WebRelayManager.ConnectToRelays(core.Node.Service) + core.Node.PublishLock.Unlock() err = core.Node.UpdateFollow() if err != nil { diff --git a/core/core.go b/core/core.go index 9133e75fd6..800805d3b4 100644 --- a/core/core.go +++ b/core/core.go @@ -159,6 +159,9 @@ type OpenBazaarNode struct { // Manage blocked peers BanManager *net.BanManager + // Web Relay nodes + WebRelayManager *net.WebRelayManager + // Allow other nodes to push data to this node for storage AcceptStoreRequests bool diff --git a/core/net.go b/core/net.go index 6926d8923f..dce9743fe7 100644 --- a/core/net.go +++ b/core/net.go @@ -1,17 +1,23 @@ package core import ( + "encoding/base64" "errors" "fmt" + "gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore" + "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing" "sync" "time" + "github.com/OpenBazaar/openbazaar-go/net" + libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" - peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" "github.com/OpenBazaar/openbazaar-go/ipfs" + "github.com/OpenBazaar/openbazaar-go/pb" "github.com/OpenBazaar/openbazaar-go/repo" "github.com/golang/protobuf/proto" @@ -39,6 +45,7 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + n.SendRelayedMessage(p, k, &message) // send relayed message immediately defer cancel() err = n.Service.SendMessage(ctx, p, &message) if err != nil { @@ -51,30 +58,82 @@ func (n *OpenBazaarNode) sendMessage(peerID string, k *libp2p.PubKey, message pb return nil } -// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup -func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { - pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes() +// SendRelayedMessage - send message through web relay manager to recipient +func (n *OpenBazaarNode) SendRelayedMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { + messageBytes, err := n.getMessageBytes(m) if err != nil { return err } + + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + defer cancel() + if k == nil { + var pubKey libp2p.PubKey + keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey("/pubkey/" + p.Pretty())) + if err != nil { + pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, p) + if err != nil { + log.Errorf("Failed to find public key for %s", p.Pretty()) + return err + } + } else { + pubKey, err = libp2p.UnmarshalPublicKey(keyval) + if err != nil { + log.Errorf("Failed to find public key for %s", p.Pretty()) + return err + } + } + k = &pubKey + } + + relayciphertext, err := net.Encrypt(*k, messageBytes) + if err != nil { + return fmt.Errorf("Error: %s", err.Error()) + } + + // Base64 encode + encodedCipherText := base64.StdEncoding.EncodeToString(relayciphertext) + + n.WebRelayManager.SendRelayMessage(encodedCipherText, p.Pretty()) + + return nil +} + +func (n *OpenBazaarNode) getMessageBytes(m *pb.Message) ([]byte, error) { + pubKeyBytes, err := n.IpfsNode.PrivateKey.GetPublic().Bytes() + if err != nil { + return nil, err + } ser, err := proto.Marshal(m) if err != nil { - return err + return nil, err } sig, err := n.IpfsNode.PrivateKey.Sign(ser) if err != nil { - return err + return nil, err } + env := pb.Envelope{Message: m, Pubkey: pubKeyBytes, Signature: sig} messageBytes, merr := proto.Marshal(&env) if merr != nil { - return merr + return nil, merr } + return messageBytes, nil +} + +// SendOfflineMessage Supply of a public key is optional, if nil is instead provided n.EncryptMessage does a lookup +func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.Message) error { + messageBytes, err := n.getMessageBytes(m) + if err != nil { + return err + } + // TODO: this function blocks if the recipient's public key is not on the local machine ciphertext, cerr := n.EncryptMessage(p, k, messageBytes) if cerr != nil { return cerr } + addr, aerr := n.MessageStorage.Store(p, ciphertext) if aerr != nil { return aerr @@ -84,7 +143,7 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M return mherr } /* TODO: We are just using a default prefix length for now. Eventually we will want to customize this, - but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ + but we will need some way to get the recipient's desired prefix length. Likely will be in profile. */ pointer, err := ipfs.NewPointer(mh, DefaultPointerPrefixLength, addr, ciphertext) if err != nil { return err @@ -280,6 +339,8 @@ func (n *OpenBazaarNode) ResendCachedOrderMessage(orderID string, msgType pb.Mes return fmt.Errorf("unable to decode invalid peer ID for order (%s) and message type (%s)", orderID, msgType.String()) } + n.SendRelayedMessage(p, nil, &msg.Msg) // send relayed message immediately + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) defer cancel() @@ -575,6 +636,7 @@ func (n *OpenBazaarNode) SendChat(peerID string, chatMessage *pb.Chat) error { return err } ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) + n.SendRelayedMessage(p, nil, &m) // send relayed message immediately defer cancel() err = n.Service.SendMessage(ctx, p, &m) if err != nil && chatMessage.Flag != pb.Chat_TYPING { @@ -795,6 +857,9 @@ func (n *OpenBazaarNode) SendOrderPayment(peerID string, paymentMessage *pb.Orde if err != nil { return err } + + n.SendRelayedMessage(p, nil, &m) // send relayed message immediately + ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout) err = n.Service.SendMessage(ctx, p, &m) cancel() diff --git a/mobile/node.go b/mobile/node.go index 87b98cbd17..e7bbc099d6 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -26,6 +26,8 @@ import ( ipfslogging "gx/ipfs/QmbkT7eMTyXfpeyB3ZMxxcxg7XH8t6uXp49jqzz4HB7BGF/go-log/writer" "gx/ipfs/Qmc85NSvmSG4Frn9Vb2cBc1rMyULH6D3TNVEfCzSKoUpip/go-multiaddr-net" + _ "net/http/pprof" + "github.com/OpenBazaar/openbazaar-go/api" "github.com/OpenBazaar/openbazaar-go/core" "github.com/OpenBazaar/openbazaar-go/ipfs" @@ -51,7 +53,6 @@ import ( "github.com/natefinch/lumberjack" "github.com/op/go-logging" "github.com/tyler-smith/go-bip39" - _ "net/http/pprof" ) var log = logging.MustGetLogger("mobile") @@ -151,6 +152,11 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N return nil, err } + webRelays, err := apiSchema.GetWebRelays(configFile) + if err != nil { + return nil, err + } + walletsConfig, err := apiSchema.GetWalletsConfig(configFile) if err != nil { return nil, err @@ -275,6 +281,8 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N pushNodes = append(pushNodes, p) } + wm := obnet.NewWebRelayManager(webRelays, identity.PeerID) + // OpenBazaar node setup node := &core.OpenBazaarNode{ BanManager: bm, @@ -283,6 +291,7 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N Multiwallet: mw, OfflineMessageFailoverTimeout: 5 * time.Second, PushNodes: pushNodes, + WebRelayManager: wm, RepoPath: config.RepoPath, UserAgent: core.USERAGENT, IPNSQuorumSize: uint(ipnsExtraConfig.DHTQuorumSize), @@ -474,6 +483,8 @@ func (n *Node) start() error { n.OpenBazaarNode.PointerRepublisher = PR MR.Wait() + n.OpenBazaarNode.WebRelayManager.ConnectToRelays(n.OpenBazaarNode.Service) + n.OpenBazaarNode.PublishLock.Unlock() publishUnlocked = true n.OpenBazaarNode.UpdateFollow() diff --git a/net/web_relay_manager.go b/net/web_relay_manager.go new file mode 100644 index 0000000000..25edbf445e --- /dev/null +++ b/net/web_relay_manager.go @@ -0,0 +1,216 @@ +package net + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/json" + peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + "time" + + "github.com/OpenBazaar/openbazaar-go/pb" + "github.com/golang/protobuf/ptypes/any" + + "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" + + "github.com/btcsuite/btcutil/base58" + + "github.com/gorilla/websocket" +) + +// WebRelayManager - manages connections to web relay servers +type WebRelayManager struct { + webrelays []string + peerID string + connections []*websocket.Conn + obService NetworkService +} + +// EncryptedMessage - message envelope for relay messages +type EncryptedMessage struct { + Message string `json:"encryptedMessage"` + Recipient string `json:"recipient"` +} + +// TypedMessage - generic typed message for transport +type TypedMessage struct { + Type string + Data json.RawMessage +} + +// SubscribeMessage - authentication message for web relay server +type SubscribeMessage struct { + UserID string `json:"userID"` + SubscriptionKey string `json:"subscriptionKey"` +} + +// SubscribeResponse - for marshaling authN response from web relay server +type SubscribeResponse struct { + Subscribe string `json:"subscribe"` +} + +// NewWebRelayManager - creates a web relay manager to maintain connections +func NewWebRelayManager(webrelays []string, sender string) *WebRelayManager { + return &WebRelayManager{webrelays, sender, nil, nil} +} + +// ConnectToRelays - initiate websocket connections to the relay servers configured +func (wrm *WebRelayManager) ConnectToRelays(service NetworkService) { + // Set WRM service + wrm.obService = service + + // Establish connections + var conns []*websocket.Conn + for _, relay := range wrm.webrelays { + + // Connect and subscribe to websocket server + conn, err := wrm.connectToServer(relay, wrm.peerID) + if err != nil { + log.Error("Could not connect to: %s", relay) + } + + wrm.connections = append(conns, conn) + } +} + +func (wrm *WebRelayManager) connectToServer(relay string, sender string) (*websocket.Conn, error) { + // Generate subscription key for web relay + peerIDMultihash, _ := multihash.FromB58String(sender) + decoded, _ := multihash.Decode(peerIDMultihash) + digest := decoded.Digest + prefix := digest[:8] + + prefix64 := binary.BigEndian.Uint64(prefix) + + // Then shifting + shiftedPrefix64 := prefix64 >> uint(48) + + // Then converting back to a byte array + shiftedBytes := make([]byte, 8) + binary.BigEndian.PutUint64(shiftedBytes, shiftedPrefix64) + + hashedShiftedPrefix := sha256.Sum256(shiftedBytes) + + subscriptionKey, _ := multihash.Encode(hashedShiftedPrefix[:], multihash.SHA2_256) + + // Generate subscribe message + subscribeMessage := SubscribeMessage{ + UserID: sender, + SubscriptionKey: base58.Encode(subscriptionKey), + } + + data, _ := json.Marshal(subscribeMessage) + typedmessage := TypedMessage{ + Type: "SubscribeMessage", + Data: data, + } + + socketmessage, _ := json.Marshal(typedmessage) + + // Connect to websocket server + log.Debugf("Connecting to relay server: %s\n", relay) + + c, _, err := websocket.DefaultDialer.Dial(relay, nil) + if err != nil { + log.Error("dial:", err) + return nil, err + } + + err = c.WriteMessage(websocket.TextMessage, socketmessage) + if err != nil { + log.Debugf("write:", err) + return nil, err + } + + log.Debugf("Successfully connected to %s and subscribed to: %s\n", relay, base58.Encode(subscriptionKey)) + + go func() { + for { + // read in a message + _, p, err := c.ReadMessage() + if err != nil { + log.Debugf("Connection to relay has an error: %s", err) + log.Debugf("Attempting to reconnect to the relay...") + wrm.reconnectToRelay(relay) + break + } + + if string(p) == "{\"subscribe\": true}" { + log.Debugf("Received subscribe success message") + } else { + // turn encrypted message into OFFLINE_RELAY and process normally + m := new(pb.Message) + m.MessageType = pb.Message_OFFLINE_RELAY + m.Payload = &any.Any{Value: p} + + handler := wrm.obService.HandlerForMsgType(m.MessageType) + + peerID, _ := peer.IDB58Decode(sender) + + if peerID != "" { + m, err = handler(peerID, m, nil) + if err != nil { + if m != nil { + log.Debugf("%s handle message error: %s", m.MessageType.String(), err.Error()) + } else { + log.Errorf("Error: %s", err.Error()) + } + } + log.Debugf("Received OFFLINE_RELAY2 message from %s", peerID.Pretty()) + } + } + + } + }() + + return c, nil +} + +func (wrm *WebRelayManager) reconnectToRelay(relay string) { + conn, err := wrm.connectToServer(relay, wrm.peerID) + if err != nil { + log.Error("Could not connect to: %s", relay) + time.Sleep(10 * time.Second) + wrm.reconnectToRelay(relay) + } else { + wrm.connections = append(wrm.connections, conn) + } +} + +// SendRelayMessage - Wrap relay message in encrypted envelope and broadcast +func (wrm *WebRelayManager) SendRelayMessage(ciphertext string, recipient string) { + encryptedmessage := EncryptedMessage{ + Message: ciphertext, + Recipient: recipient, + } + + data, _ := json.Marshal(encryptedmessage) + + typedmessage := TypedMessage{ + Type: "EncryptedMessage", + Data: data, + } + + outgoing, _ := json.Marshal(typedmessage) + log.Debugf("Sending encrypted relay message: %s", string(outgoing)) + + // Transmit the encrypted message to each web relay socket connection + wrm.broadcastMessage(outgoing) +} + +func (wrm *WebRelayManager) broadcastMessage(msg []byte) { + for _, conn := range wrm.connections { + if conn != nil { + err := conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + log.Debugf("write:", err) + } else { + log.Debugf("Successfully sent message to relay: %s\n", conn.RemoteAddr()) + } + } + } + + if len(wrm.connections) == 0 { + log.Debugf("There are no websocket connections to send relay message to") + } + +} diff --git a/repo/init.go b/repo/init.go index e7d2ce868b..0b742e9901 100644 --- a/repo/init.go +++ b/repo/init.go @@ -179,6 +179,8 @@ func addConfigExtensions(repoRoot string) error { APIRouter: schema.IPFSCachingRouterDefaultURI, } + wr = schema.WebRelayServers + t = schema.TorConfig{} ) if err := r.SetConfigKey("Wallets", schema.DefaultWalletsConfig()); err != nil { @@ -187,6 +189,9 @@ func addConfigExtensions(repoRoot string) error { if err := r.SetConfigKey("DataSharing", ds); err != nil { return err } + if err := r.SetConfigKey("WebRelays", wr); err != nil { + return err + } if err := r.SetConfigKey("Bootstrap-testnet", schema.BootstrapAddressesTestnet); err != nil { return err } diff --git a/schema/configuration.go b/schema/configuration.go index 89ef896ebd..beb53a775a 100644 --- a/schema/configuration.go +++ b/schema/configuration.go @@ -415,6 +415,40 @@ func GetRepublishInterval(cfgBytes []byte) (time.Duration, error) { return d, nil } +// GetWebRelays - retrieves web relay server addresses from config file +func GetWebRelays(cfgBytes []byte) ([]string, error) { + var cfgIface interface{} + err := json.Unmarshal(cfgBytes, &cfgIface) + if err != nil { + return nil, MalformedConfigError + } + + var webRelays []string + + cfg, ok := cfgIface.(map[string]interface{}) + if !ok { + return webRelays, MalformedConfigError + } + + wrcfg, ok := cfg["WebRelays"] + if !ok { + return webRelays, MalformedConfigError + } + wr, ok := wrcfg.([]interface{}) + if !ok { + return webRelays, MalformedConfigError + } + + for _, nd := range wr { + ndStr, ok := nd.(string) + if !ok { + return webRelays, MalformedConfigError + } + webRelays = append(webRelays, ndStr) + } + return webRelays, nil +} + func GetDataSharing(cfgBytes []byte) (*DataSharing, error) { var cfgIface interface{} err := json.Unmarshal(cfgBytes, &cfgIface) diff --git a/schema/constants.go b/schema/constants.go index ff48b727f6..72f26c05ed 100644 --- a/schema/constants.go +++ b/schema/constants.go @@ -51,6 +51,8 @@ const ( DataPushNodeTwo = "QmPPg2qeF3n2KvTRXRZLaTwHCw8JxzF4uZK93RfMoDvf2o" DataPushNodeThree = "QmY8puEnVx66uEet64gAf4VZRo7oUyMCwG6KdB9KM92EGQ" + WebRelayOne = "wss://webchat.ob1.io:8080" + BootstrapNodeTestnet_BrooklynFlea = "/ip4/165.227.117.91/tcp/4001/ipfs/Qmaa6De5QYNqShzPb9SGSo8vLmoUte8mnWgzn4GYwzuUYA" BootstrapNodeTestnet_Shipshewana = "/ip4/46.101.221.165/tcp/4001/ipfs/QmVAQYg7ygAWTWegs8HSV2kdW1MqW8WMrmpqKG1PQtkgTC" BootstrapNodeDefault_LeMarcheSerpette = "/ip4/107.170.133.32/tcp/4001/ipfs/QmUZRGLhcKXF1JyuaHgKm23LvqcoMYwtb9jmh8CkP4og3K" @@ -79,6 +81,8 @@ var ( BootstrapNodeTestnet_BrooklynFlea, BootstrapNodeTestnet_Shipshewana, } + + WebRelayServers = []string{WebRelayOne} ) func EthereumDefaultOptions() map[string]interface{} {