Skip to content

Commit

Permalink
realtime sync with 40 tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
amark committed May 26, 2016
1 parent 3ff91ec commit 08418ee
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 31 deletions.
105 changes: 87 additions & 18 deletions gun.js
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@
at.ack(cat.err, cat.ok, cat);
}
Gun.on('put', function(at, ev){
//console.log("???????????????????????", at, ev);
if(is_graph(at.graph)){ return }
at.cb({err: "Invalid graph!"});
ev.stun();
Expand Down Expand Up @@ -952,7 +951,7 @@
function stream(err, node){
Gun.on('chain', this, link, this);
}
function link(cat, ev){ var at = this, u;
function link(cat, ev){ var at = this, u; // TODO: What if this was a Gun.on('chain')?
var err = cat.err, node = cat.node, cex = cat.lex, lex = at.lex, field = lex.field, rel, val;
if(lex !== cex && lex.field && cex.field){
if(obj_has(node, cex.field) && (rel = is_rel(val = node[cex.field]))){
Expand Down Expand Up @@ -1065,7 +1064,7 @@
;(function(){
Gun.chain.val = function(cb, opt, t){
var gun = this, at = gun._;
if(at.val){ // TODO: Not null!
if(at.val || null === at.val){
cb.call(gun, at.val, at.lex.field);
return gun;
}
Expand All @@ -1090,17 +1089,21 @@
return Gun.get(arg[3].gun, {soul: rel}, function(err, data){
arg[0] = err; arg[1] = data;
if(opt && opt.at){ opt.at.val = arg[1] }
if(arg[3] && arg[3].gun){ arg[3].val = arg[1] }
if(t && t.gun){ t.val = arg[1] }
if(arg[4] && 'ok' == arg[4].tag){
if(f && obj_empty(arg[1], _meta)){ return }
arg[4].off(); // TODO: BUG! for plurals
if(opt && opt.off){ arg[4].off(); } // TODO: BUG! for plurals
}
cb.apply(t, f? arg.slice(1) : arg);
});
}
if(opt && opt.at){ opt.at.val = arg[1] }
if(arg[3] && arg[3].gun){ arg[3].val = arg[1] }
if(t && t.gun){ t.val = arg[1] }
if(arg[4] && 'ok' == arg[4].tag){
if(f && obj_empty(arg[1], _meta)){ return }
arg[4].off(); // TODO: BUG! For plurals.
if(opt && opt.off){ arg[4].off(); } // TODO: BUG! For plurals.
}
cb.apply(t, f? arg.slice(1) : arg);
}
Expand Down Expand Up @@ -1164,12 +1167,12 @@
Tab.on(msg['#'], function(err, ok){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
at.cb(err, ok);
});
console.log("PUT SEND", msg);
Tab.peers(opt.peers).send(msg);
Tab.peers(opt.peers).send(msg, {headers: {'gun-sid': Tab.server.sid}});
});

Gun.on('get', function(at){
var opt = at.opt, lex = at.lex;
var gun = at.gun, opt = at.opt, lex = at.lex;
opt.peers = opt.peers || gun.__.opt.peers; // TODO: CLEAN UP!
if(Gun.obj.empty(opt.peers)){
if(!Gun.log.count('no-wire-get')){ Gun.log("Warning! You have no peers to get from!") }
return;
Expand All @@ -1181,7 +1184,71 @@
Tab.on(msg['#'], function(err, data){ // TODO: ONE? PERF! Clear out listeners, maybe with setTimeout?
at.cb(err, data);
});
Tab.peers(opt.peers).send(msg);
Tab.peers(opt.peers).send(msg, {headers: {'gun-sid': Tab.server.sid}});
});

Gun.on('opt', function(at){ // TODO: BUG! Does not respect separate instances!!!
if(Tab.server){ return }
var gun = at.gun, server = Tab.server = {};
server.sid = Gun.text.random();
Tab.request.createServer(function(req, res){
if(!req || !res || !req.body || !req.headers){ return }
var msg = req.body;
// AUTH for non-replies.
if(server.msg(msg['#'])){ return }
//server.on('network', Gun.obj.copy(req)); // Unless we have WebRTC, not needed.
if(msg['@']){ // no need to process.
if(Tab.ons[msg['@'] || msg['#']]){
Tab.on(msg['@'] || msg['#'], [msg['!'], msg['$']]);
}
return
}
if(Gun.is.lex(msg['$'])){ return server.get(req, res) }
else { return server.put(req, res) }
});
server.get = function(req, cb){
var body = req.body, lex = body['$'], opt;
if(!(node = gun.__.graph[lex[Gun._.soul]])){ return } // Don't reply to data we don't have it in memory. TODO: Add localStorage?
cb({body: {
'#': server.msg(),
'@': body['#'],
'$': node
}});
}
server.put = function(req, cb){
var body = req.body, graph = body['$'];
if(!(graph = Gun.obj.map(graph, function(node, soul, map){ // filter out what we don't have in memory.
if(!gun.__.graph[soul]){ return }
map(soul, node);
}))){ return }
Gun.put(gun, graph, function(err, ok){
return cb({body: {
'#': server.msg(),
'@': body['#'],
'$': ok,
'!': err
}});
}, {websocket: false});
}
server.msg = function(id){
if(!id){
return server.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
}
clearTimeout(server.msg.clear);
server.msg.clear = setTimeout(function(){
var now = Gun.time.is();
Gun.obj.map(server.msg.debounce, function(t,id){
if((now - t) < (1000 * 60 * 5)){ return }
Gun.obj.del(server.msg.debounce, id);
});
},500);
if(server.msg.debounce[id]){
return server.msg.debounce[id] = Gun.time.is(), id;
}
server.msg.debounce[id] = Gun.time.is();
return;
};
server.msg.debounce = server.msg.debounce || {};
});

(function(exports){
Expand All @@ -1192,14 +1259,17 @@
P.is = function(p){ return (p instanceof P) }
P.chain = P.prototype;
function map(peer, url){
var msg = this;
console.log("PEER SEND", peer, msg);
var msg = this.msg;
var opt = this.opt || {};
opt.out = true;
Tab.request(url, msg, null, opt);
return;
Tab.request(url, msg, function(err, reply){ var body = (reply||{}).body||{};
Tab.on(body['@'] || msg['#'], err || body['!'], body['$']);
});
Tab.on(body['@'] || msg['#'], [err || body['!'], body['$']]);
}, this.opt);
}
P.chain.send = function(msg){
Gun.obj.map(this.peers, map, msg);
P.chain.send = function(msg, opt){
Gun.obj.map(this.peers, map, {msg: msg, opt: opt});
}
exports.peers = P;
}(Tab));
Expand Down Expand Up @@ -1246,7 +1316,7 @@
if(opt.body){ req.body = opt.body }
if(opt.url){ req.url = opt.url }
req.headers = req.headers || {};
if(!ws.cbs[req.headers['ws-rid']]){
if(!opt.out && !ws.cbs[req.headers['ws-rid']]){
ws.cbs[req.headers['ws-rid'] = 'WS' + (+ new Date()) + '.' + Math.floor((Math.random()*65535)+1)] = function(err,res){
if(!res || res.body || res.end){ delete ws.cbs[req.headers['ws-rid']] }
cb(err,res);
Expand Down Expand Up @@ -1282,8 +1352,7 @@
if(!res){ return }
res.headers = res.headers || {};
if(res.headers['ws-rid']){ return (ws.cbs[res.headers['ws-rid']]||function(){})(null, res) }
//Gun.log("We have a pushed message!", res);
if(res.body){ r.createServer.ing(res, function(res){ r(opt.base, null, null, res)}) } // emit extra events.
if(res.body){ r.createServer.ing(res, function(res){ res.out = true; r(opt.base, null, null, res)}) } // emit extra events.
};
ws.onerror = function(e){ (ws||{}).err = e };
return true;
Expand Down
1 change: 0 additions & 1 deletion lib/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module.exports = function(wss, server, opt){
req.headers = ws.upgradeReq.headers || {};
//Gun.log("wsReq", req);
ws.on('message', function(msg){
console.log("FOOOBAR", msg);
msg = Gun.obj.ify(msg);
msg.url = msg.url || {};
msg.url.pathname = (req.url.pathname||'') + (msg.url.pathname||'');
Expand Down
25 changes: 13 additions & 12 deletions lib/wsp.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@
var stream, cb = res = require('./jsonp')(req, res);
if(req.headers && (stream = req.headers['gun-sid'])){
stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
if(!stream){ return ev.off() } // self cleans up after itself!
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
(stream.queue = stream.queue || []).push(req);
stream.drain(stream.reply);
});
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
stream.drain = stream.drain || function(res){
if(!res || !stream || !stream.queue || !stream.queue.length){ return }
res({headers: {'gun-sid': stream.sid}, body: stream.queue });
stream.off = setTimeout(function(){ stream = null }, gun.wsp.pull);
stream.reply = stream.queue = null;
return true;
}
stream.sub = stream.sub || gun.wsp.on('network', function(req, ev){
if(!stream){ return ev.off() } // self cleans up after itself!
if(!req || (req.headers && req.headers['gun-sid'] === stream.sid)){ return }
(stream.queue = stream.queue || []).push(req);
stream.drain(stream.reply);
});
cb = function(r){ (r.headers||{}).poll = gun.wsp.poll; res(r) }
clearTimeout(stream.off);
if(req.headers.pull){
if(stream.drain(cb)){ return }
Expand Down Expand Up @@ -122,6 +122,8 @@
if(id = gun.wsp.msg.debounce[id]){
return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
}
gun.wsp.msg.debounce[id] = Gun.time.is();
return;
};
gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
gun.wsp.wire = gun.wsp.wire || (function(){
Expand All @@ -142,12 +144,11 @@
// TODO: BUG! server put should push.
}
tran.get = function(req, cb){
console.log("EGET", req);
var body = req.body, lex = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
Gun.get(gun, lex, function(err, node){
console.log("tran.get", lex, err, node);
//console.log("tran.get", lex, err, node);
return cb({headers: reply.headers, body: {
'#': Gun.text.random(9),
'#': gun.wsp.msg(),
'@': body['#'],
'$': node,
'!': err
Expand Down Expand Up @@ -180,13 +181,13 @@
}, opt);
}
tran.put = function(req, cb){
console.log("tran.put", req);
//console.log("tran.put", req);
// NOTE: It is highly recommended you do your own PUT/POSTs through your own API that then saves to gun manually.
// This will give you much more fine-grain control over security, transactions, and what not.
var body = req.body, graph = body['$'], reply = {headers: {'Content-Type': tran.json}}, opt;
Gun.put(gun, graph, function(err, ok){
return cb({headers: reply.headers, body: {
'#': Gun.text.random(9),
'#': gun.wsp.msg(),
'@': body['#'],
'$': ok,
'!': err
Expand Down

0 comments on commit 08418ee

Please sign in to comment.