diff --git a/index.js b/index.js index 867f95f..2e54ef1 100644 --- a/index.js +++ b/index.js @@ -1,89 +1,54 @@ -const { Peer } = require('peerjs') // => STUN/TURN + RTCPeerConnection +const { RTCPeerConnection, RTCIceCandidate } = require('werift') const Protomux = require('protomux') const c = require('compact-encoding') const safetyCatch = require('safety-catch') const ReadyResource = require('ready-resource') -const sodium = require('sodium-universal') -const b4a = require('b4a') const WebStream = require('./lib/web-stream.js') +const iceServers = [ + { urls: 'stun:stun.l.google.com:19302' } +] + // TODO: This could be a Duplex but don't know about that, for now emitting an event is good enough module.exports = class WebPeer extends ReadyResource { constructor (stream) { super() - const id = b4a.toString(randomBytes(8), 'hex') - - // this.peer = new Peer(id) - this.peer = new Peer() + this.peer = new RTCPeerConnection({ iceServers }) this.stream = stream - // this.mux = new Protomux(stream) + this.mux = null // new Protomux(stream) this.channel = null - - this.handshake = null - this.token = b4a.toString(randomBytes(8), 'hex') // TODO: Think another solution for validating connections this.remote = null + this.peer.onicecandidate = onicecandidate.bind(this) + this.ready().catch(safetyCatch) } async _open () { - // TODO: Investigate about reusing the relayed handshake to create new SecretStream instances - /* const onhandshake = () => this.handshake = getHandshake(this.stream) - if (this.stream.handshakeHash) onhandshake() // Or this.stream._encrypt - else this.stream.once('handshake', onhandshake) */ - - this.peer.on('connection', rawStream => { - console.log('peerjs incoming', rawStream) - - // TODO: Check metadata.token before accepting the connection - - rawStream.on('open', () => { - console.log('rawStream open') - - // if (!this.handshake) throw new Error('No handshake') - - const duplex = new WebStream(rawStream) - - this.remote = duplex // new SecretStream(false, duplex) - // this.remote.on('data', (data) => console.log(data)) - // this.remote.on('close', () => console.log('remote closed')) - this.remote.on('error', console.error) - - // const done = () => this.mux.destroy() - // waitForRemote(this.remote).then(done, done) - - this.emit('continue', this.remote) - }) - - rawStream.on('error', function (err) { - console.log('rawStream error', err) - }) - - rawStream.on('close', function () { - console.log('rawStream close') - }) - }) - - try { - await waitForPeer(this.peer) - } catch (err) { - console.error(err) - this.stream.destroy() - throw err - } - this.mux = new Protomux(this.stream) - this._attachChannel() + this.channel = this.mux.createChannel({ + protocol: 'hyper-webrtc/signaling', + id: null, + handshake: c.json, // TODO: Make strict messages + onopen: this._onmuxopen.bind(this), + onerror: this._onmuxerror.bind(this), + onclose: this._onmuxclose.bind(this), + messages: [ + { encoding: c.json, onmessage: this._onwireice.bind(this) }, + { encoding: c.json, onmessage: this._onwireoffer.bind(this) }, + { encoding: c.json, onmessage: this._onwireanswer.bind(this) } + ] + }) - console.log('peer.id', this.peer.id) + this.channel.userData = this this.channel.open({ // isInitiator: this.mux.stream.isInitiator, - id: this.peer.id, - token: this.token }) + + // TODO: Maximize speed at connecting, e.g. don't wait until open } _close () { @@ -92,72 +57,54 @@ module.exports = class WebPeer extends ReadyResource { this.stream.destroy() } - _attachChannel () { - const channel = this.mux.createChannel({ - protocol: 'hyperconnection', - id: null, - handshake: c.json, // TODO: Make strict messages - onopen: this._onmuxopen.bind(this), - onerror: this._onmuxerror.bind(this), - onclose: this._onmuxclose.bind(this), - messages: [ - // { encoding: c.json, onmessage: this._onmuxmessage } - ] - }) - - if (channel === null) return - - this.channel = channel - } - - _onmuxopen (handshake) { + async _onmuxopen (handshake) { console.log('_onmuxopen', handshake) - if (this.mux.stream.isInitiator) { - console.log('Connecting to', handshake.id) - - // TODO: Investigate if metadata is kept truly private between both peers (E2E encrypted, not publicly stored in the middle server, etc) - const rawStream = this.peer.connect(handshake.id, { - reliable: true, - /* metadata: { - token: this.token - } */ - }) - - rawStream.on('open', () => { - console.log('rawStream open') - - // if (!this.handshake) throw new Error('No handshake') + // const rawStream = this.peer.createDataChannel('wire', { negotiated: true, id: 0 }) - const duplex = new WebStream(rawStream) + const done = (rawStream) => { + this.remote = new WebStream(this.mux.stream.isInitiator, rawStream, { + handshakeHash: this.mux.stream.handshakeHash + }) // new SecretStream(false, rawStream) - this.remote = duplex // new SecretStream(true, duplex) + this.remote.once('connect', () => { + this.emit('continue', this.remote) // TODO: It should be a Duplex to avoid this event + }) + } - /* this.remote.on('connect', () => { - console.log('remote connected') - }) + if (this.mux.stream.isInitiator) { + const rawStream = this.peer.createDataChannel('wire') + done(rawStream) + + const offer = await this.peer.createOffer() + await this.peer.setLocalDescription(offer) + + this.channel.messages[1].send({ offer: this.peer.localDescription }) + } else { + this.peer.ondatachannel = (e) => { + const rawStream = e.channel + done(rawStream) + } + } + } - this.remote.on('open', () => { - console.log('remote opened') - }) */ + _onwireice ({ ice }) { + this.peer.addIceCandidate(new RTCIceCandidate(ice)) + } - this.remote.on('error', console.error) + async _onwireoffer ({ offer }) { + // TODO: Only once - // TODO: Can destroy it right away? - // const done = () => this.mux.destroy() - // waitForRemote(this.remote).then(done, done) + this.peer.setRemoteDescription(offer) - this.emit('continue', this.remote) - }) + const answer = await this.peer.createAnswer() + await this.peer.setLocalDescription(answer) - rawStream.on('error', function (err) { - console.log('rawStream error', err) - }) + this.channel.messages[2].send({ answer: this.peer.localDescription }) + } - rawStream.on('close', function () { - console.log('rawStream close') - }) - } + _onwireanswer ({ answer }) { + this.peer.setRemoteDescription(answer) } _onmuxerror (err) { @@ -172,71 +119,6 @@ module.exports = class WebPeer extends ReadyResource { } } -/* function getHandshake (stream) { - return { - publicKey: stream.publicKey, - remotePublicKey: stream.remotePublicKey, - hash: stream.handshakeHash, - tx: stream.tx || stream._encrypt?.key || null, - rx: stream.rx || stream._decrypt?.key || null - } -} */ - -function waitForRemote (remote) { - return new Promise(resolve => { - remote.on('open', done) - remote.on('error', done) - remote.on('close', done) - - function done () { - remote.off('open', done) - remote.off('error', done) - remote.off('close', done) - - resolve() - } - }) -} - -// TODO: Simplify a bit -function waitForPeer (peer) { - return new Promise((resolve, reject) => { - if (peer.disconnected === true) { - reject(new Error('Peer is disconnected')) - return - } - - if (peer.destroyed) { - reject(new Error('Peer is destroyed')) - return - } - - peer.on('open', onopen) - peer.on('error', onclose) - peer.on('close', onclose) - - function onopen (id) { - cleanup() - resolve() - } - - function onclose (err) { - cleanup() - - if (err) reject(err) - else reject(new Error('Could not create peer')) - } - - function cleanup () { - peer.off('open', onopen) - peer.off('error', onclose) - peer.off('close', onclose) - } - }) -} - -function randomBytes (n) { - const buf = b4a.allocUnsafe(n) - sodium.randombytes_buf(buf) - return buf +function onicecandidate (e) { + if (e.candidate) this.channel.messages[0].send({ ice: e.candidate }) } diff --git a/lib/web-stream.js b/lib/web-stream.js index c68123b..93ef226 100644 --- a/lib/web-stream.js +++ b/lib/web-stream.js @@ -1,60 +1,112 @@ -const b4a = require('b4a') const { Duplex } = require('streamx') +const b4a = require('b4a') module.exports = class WebStream extends Duplex { - constructor (rtc) { + constructor (isInitiator, dc, opts = {}) { super({ mapWritable: toBuffer }) - this.rtc = rtc + this.dc = dc + this.noiseStream = this this.rawStream = this + this.isInitiator = isInitiator + this.handshakeHash = opts.handshakeHash + + this._opening = null this._openedDone = null - this.opened = new Promise(resolve => this._openedDone = resolve) + this.opened = new Promise(resolve => { this._openedDone = resolve }) + this.userData = null - this._setup() + this._onopen = onopen.bind(this) + this._onmessage = onmessage.bind(this) + this._onerror = onerror.bind(this) + this._onclose = onclose.bind(this) + + this.dc.addEventListener('open', this._onopen) + this.dc.addEventListener('message', this._onmessage) + this.dc.addEventListener('error', this._onerror) + this.dc.addEventListener('close', this._onclose) this.resume().pause() // Open immediately } - _setup () { - this.rtc.on('data', data => { - this.push(new Uint8Array(data)) - }) + _open (cb) { + if (this.dc.readyState === 'closed' || this.dc.readyState === 'closing') { + cb(new Error('Stream is closed')) + return + } - this.rtc.on('close', () => { - this.push(null) - this.emit('close') - }) + if (this.dc.readyState === 'connecting') { + this._opening = cb + return + } - this.rtc.on('error', (err) => { - this.emit('error', err) - }) + this._resolveOpened(true) + cb(null) + } + + _continueOpen (err) { + if (err) this.destroy(err) + + if (this._opening === null) return - this._openedDone(true) // TODO + const cb = this._opening + this._opening = null + this._open(cb) } - _open (cb) { - cb(null) + _resolveOpened (opened) { + const cb = this._openedDone + + if (cb) { + this._openedDone = null + cb(opened) + + if (opened) this.emit('connect') + } } - _read (cb) { + _write (data, cb) { + this.dc.send(data) cb(null) } - _write (chunk, cb) { - this.rtc.send(chunk) - cb(null) + _predestroy () { + this._continueOpen(new Error('Stream was destroyed')) } - _destroy () { - this.rtc.close() + _destroy (cb) { + this.dc.close() + this._resolveOpened(false) + cb(null) } setKeepAlive () {} // TODO } +function onopen () { + this._continueOpen() +} + +function onmessage (event) { + this.push(event.data) // new Uint8Array(data) // b4a.from(event.data) +} + +function onerror (err) { + this.destroy(err) +} + +function onclose () { + this.dc.removeEventListener('open', this._onopen) + this.dc.removeEventListener('message', this._onmessage) + this.dc.removeEventListener('error', this._onerror) + this.dc.removeEventListener('close', this._onclose) + + this.destroy() +} + function toBuffer (data) { return typeof data === 'string' ? b4a.from(data) : data } diff --git a/package.json b/package.json index 1acb0b7..ec8770e 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "WebRTC tools for the hypercore-protocol stack", "main": "index.js", "scripts": { - "test": "standard" + "test": "standard && brittle test.js" }, "repository": { "type": "git", @@ -24,13 +24,15 @@ "@hyperswarm/dht-relay": "^0.4.3", "b4a": "^1.6.4", "compact-encoding": "^2.13.0", - "hypercore": "^10.32.5", + "hypercore": "^10.32.6", "hypercore-crypto": "^3.4.0", + "hyperdht": "^6.11.5", "hyperswarm": "^4.7.13", - "peerjs": "^1.5.2", "protomux": "^3.5.1", "random-access-memory": "^6.2.0", + "ready-resource": "^1.0.0", "sodium-universal": "^4.0.0", + "werift": "^0.19.0", "ws": "^8.16.0" } } diff --git a/test.js b/test.js new file mode 100644 index 0000000..481d20f --- /dev/null +++ b/test.js @@ -0,0 +1,139 @@ +const test = require('brittle') + +const { WebSocket, WebSocketServer } = require('ws') +const DHTRelay = require('@hyperswarm/dht-relay') +const { relay } = require('@hyperswarm/dht-relay') +const Stream = require('@hyperswarm/dht-relay/ws') + +const DHT = require('hyperdht') +const createTestnet = require('hyperdht/testnet') + +const Hyperswarm = require('hyperswarm') +const Hypercore = require('hypercore') +const RAM = require('random-access-memory') +const crypto = require('hypercore-crypto') +const HypercoreId = require('hypercore-id-encoding') +const HyperWebRTC = require('./index.js') + +test('basic', async function (t) { + // t.plan(1) + + const bootstrap = await createBootstrap(t) + const relayAddress = createRelayServer(t, { bootstrap }) + + const key = await writer(t, { relayAddress }) + + await reader(t, key, { relayAddress }) + + // await new Promise(resolve => setTimeout(resolve, 5000)) +}) + +async function writer (t, { relayAddress }) { + const dht = createRelayClient(t, relayAddress) + const swarm = new Hyperswarm({ dht }) + t.teardown(() => swarm.destroy()) + + const core = new Hypercore(RAM) + await core.append(['a', 'b', 'c']) + t.teardown(() => core.close()) + + const done = core.findingPeers() + swarm.on('connection', function (signal) { + const peer = new HyperWebRTC(signal) + + peer.on('continue', function (stream) { + console.log('core replicate') + core.replicate(stream) + }) + }) + const discoveryWeb = crypto.discoveryKey(core.discoveryKey) + const discovery = swarm.join(discoveryWeb) + swarm.flush().then(done, done) + + await discovery.flushed() + console.log('Fully announced') + + console.log('Writer ID', core.id) + + return core.id +} + +async function reader (t, key, { relayAddress }) { + const dht = createRelayClient(t, relayAddress) + const swarm = new Hyperswarm({ dht }) + t.teardown(() => swarm.destroy()) + + const clone = new Hypercore(RAM, HypercoreId.decode(key)) + await clone.ready() + t.teardown(() => clone.close()) + + const done = clone.findingPeers() + swarm.on('connection', function (signal) { + const peer = new HyperWebRTC(signal) + + peer.on('continue', function (stream) { + console.log('clone replicate') + clone.replicate(stream) + }) + }) + const discoveryWeb = crypto.discoveryKey(clone.discoveryKey) + swarm.join(discoveryWeb) + swarm.flush().then(done, done) + + console.log(await clone.get(0)) + console.log(await clone.get(1)) + console.log(await clone.get(2)) +} + +function createRelayClient (t, relayAddress) { + const ws = new WebSocket(relayAddress) + const dht = new DHTRelay(new Stream(true, ws)) + + t.teardown(() => dht.destroy(), { order: Infinity }) + + return dht +} + +function createRelayServer (t, { bootstrap }) { + // const dht = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const dht = new DHT({ bootstrap }) + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', function (socket) { + relay(dht, new Stream(false, socket)) + }) + + // await waitForServer(server) + + t.teardown(async function () { + console.log('teardown') + await new Promise(resolve => server.close(resolve)) + await dht.destroy() + }) + + return 'ws://127.0.0.1:' + server.address().port +} + +async function createBootstrap (t) { + const testnet = await createTestnet(4, { teardown: t.teardown }) + + t.teardown(() => testnet.destroy(), { order: Infinity }) + + return testnet.bootstrap +} + +/* function waitForServer (server) { + return new Promise((resolve, reject) => { + server.on('listening', done) + server.on('error', done) + + function done (err) { + server.off('listening', done) + server.off('error', done) + + if (err) reject(err) + else resolve() + } + }) +} */