Skip to content
This repository has been archived by the owner on May 18, 2021. It is now read-only.

Streaming json support #4

Merged
merged 12 commits into from
Dec 1, 2017
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ docker run -p3000:3000 -e API_URL=http://$DOCKER_LOCALHOST:8080 -e COMPLETER_B

You need nodejs (tested with 8.5.0)

this assumes that the completer is running on 127.0.0.1:8081 and the corresponding functions server is running on 8081
this assumes that the completer is running on 127.0.0.1:8081 and the corresponding functions server is running on 8080

```bash
npm install
Expand Down
47 changes: 20 additions & 27 deletions dumpevents.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
#!/usr/bin/env node

var WebSocket = require('ws');
var fs = require('fs');
var oboe = require('oboe');

var graphId = process.argv[2];
var flowId = process.argv[2];

if ( !graphId){
console.log("Specify a graph iD");
if ( !flowId){
console.log("Specify a flow iD");
return 1;
}
console.log("Getting graphId", graphId);
console.log("Getting flowId", flowId);

const ws = new WebSocket('ws://localhost:8081/wss');
var events = {};
var creates = [];

var eventCount =0;

ws.on('message', function (msgString) {
//console.log('got data ',msgString);

var msg = JSON.parse(msgString);
if (msg.sub !== '_all') {

oboe(`http://localhost:8081/v1/flows/${flowId}/stream`)
.done((data) => {
console.log("Received event from stream", data.result);
eventCount++;
if(!events[msg.sub]){
events[msg.sub] = [msg];
if(!events[data.result.flow_id]){
events[data.result.flow_id] = [data.result];
} else {
events[data.result.flow_id].push(data.result);
}
events[msg.sub].push(msg);
if(msg.type === "model.GraphCreatedEvent"){
const newMsg = JSON.parse(msgString);
newMsg.sub = "_all";
creates.push(newMsg);

if (data.result.graph_created) {
creates.push(data.result);
}
}
});

ws.on('open', function () {
console.log("Connection open subscribing to ",graphId);
console.log("Hit Ctrl+C to save data when you are done");
ws.send(JSON.stringify({command: 'subscribe', graph_id: graphId}));
});
})
.fail((error) => {
events.error("Failed to subscribe to stream", error);
});


process.on('SIGINT', () => {
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 81 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"bootstrap3": "^3.3.5",
"express": "^4.15.4",
"http-proxy-middleware": "^0.17.4",
"oboe": "^2.1.4",
"react": "^15.6.1",
"react-bootstrap": "^0.31.3",
"react-dom": "^15.6.1",
Expand All @@ -34,5 +35,84 @@
"react-vis-force": "^0.3.1",
"ws": "^3.1.0"
},
"description": "FnFlow completer UI demo"
"description": "FnFlow completer UI demo",
"resolutions": {
"array-flatten": "2.1.1",
"debug": "3.1.0",
"path-to-regexp": "1.7.0",
"qs": "6.5.1",
"setprototypeof": "1.1.0",
"statuses": "1.4.0",
"mime-db": "1.32.0",
"inherits": "2.0.3",
"mime": "1.6.0",
"is-glob": "4.0.0",
"is-extglob": "2.1.1",
"kind-of": "4.0.0",
"is-number": "3.0.0",
"isarray": "1.0.0",
"glob-parent": "3.1.0",
"object-assign": "4.1.1",
"core-js": "2.5.1",
"process": "0.11.10",
"source-map": "0.6.1",
"chalk": "2.3.0",
"ansi-styles": "3.2.0",
"strip-ansi": "4.0.0",
"supports-color": "4.5.0",
"ansi-regex": "3.0.0",
"jsesc": "1.3.0",
"minimist": "1.2.0",
"balanced-match": "1.0.0",
"loader-utils": "1.1.0",
"pify": "3.0.0",
"find-up": "2.1.0",
"path-exists": "3.0.0",
"regexpu-core": "2.0.0",
"glob": "7.1.2",
"postcss": "6.0.14",
"clone": "1.0.3",
"has-flag": "2.0.0",
"commander": "2.12.2",
"uglify-js": "3.2.0",
"utila": "0.4.0",
"domutils": "1.5.1",
"domelementtype": "1.3.0",
"readable-stream": "2.3.3",
"string_decoder": "1.0.3",
"file-type": "4.4.0",
"replace-ext": "1.0.0",
"tempfile": "2.0.0",
"uuid": "3.1.0",
"vinyl": "1.2.0",
"through2": "2.0.3",
"duplexer2": "0.1.4",
"strip-bom": "3.0.0",
"tunnel-agent": "0.6.0",
"time-stamp": "2.0.0",
"co": "3.1.0",
"read-pkg-up": "2.0.0",
"camelcase": "3.0.0",
"semver": "5.4.1",
"read-pkg": "2.0.0",
"load-json-file": "2.0.0",
"path-type": "2.0.0",
"ajv": "5.5.0",
"acorn": "5.2.1",
"async": "2.6.0",
"yargs": "8.0.2",
"punycode": "1.3.2",
"hash-base": "3.0.4",
"cliui": "3.2.0",
"string-width": "2.1.1",
"is-fullwidth-code-point": "2.0.0",
"assert-plus": "0.2.0",
"extsprintf": "1.4.0",
"os-locale": "2.1.0",
"which-module": "2.0.0",
"yargs-parser": "7.0.0",
"faye-websocket": "0.11.1",
"url-parse": "1.2.0",
"querystringify": "1.0.0"
}
}
31 changes: 14 additions & 17 deletions src/client/app/GraphTimeline.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class GraphTimeline extends React.Component {
lastEvent: null,
selectedNode: null,
maxTimeStamp: Date.now(),
cursorTs: props.graph.created,
cursorTs: props.graph.toBrowserTime(props.graph.created),
intervalTimer: -1,
autoScroll: props.graph.isLive(),
autoVScroll: true,
Expand Down Expand Up @@ -86,18 +86,18 @@ class GraphTimeline extends React.Component {

if (newGraph) {
console.log("New graph selected");
update.cursorTs = graph.created;
update.cursorTs = graph.toBrowserTime(graph.created);
this.startWatch();
}


const maxTs = graph.isLive() ? Date.now() : graph.finished;
let curDurationTs = (maxTs - graph.created);
const maxTs = graph.isLive() ? Date.now() : graph.toBrowserTime(graph.finished);
let curDurationTs = (maxTs - graph.toBrowserTime(graph.created));


if (this.state.autoScroll){
if(curDurationTs > (this.state.viewPortWidth / this.state.pxPerMs)) {
update.cursorTs = graph.created + (curDurationTs - (this.state.viewPortWidth / this.state.pxPerMs));
update.cursorTs = graph.toBrowserTime(graph.created) + (curDurationTs - (this.state.viewPortWidth / this.state.pxPerMs));
}
if(this.state.autoVScroll){
update.verticalScrollRatio = 1.0;
Expand All @@ -106,7 +106,7 @@ class GraphTimeline extends React.Component {
}
}else{
update.cursorTs = Math.min(this.state.cursorTs,maxTs - this.state.viewPortWidth / this.state.pxPerMs);
update.cursorTs = Math.max(update.cursorTs,graph.created);
update.cursorTs = Math.max(update.cursorTs,graph.toBrowserTime(graph.created));

}

Expand All @@ -115,14 +115,14 @@ class GraphTimeline extends React.Component {
if (graph.isLive()) {
update.maxTimeStamp = Date.now();
} else {
update.maxTimeStamp = graph.finished;
update.maxTimeStamp = graph.toBrowserTime(graph.finished);
}

let lastGraphEvent = graph.all_events.length > 0 ? graph.all_events[graph.all_events.length - 1] : null;
let lastGraphEvent = graph.getLastEvent() ;

let timeline;
if (this.state.lastEvent !== lastGraphEvent) {
console.log("Updating graph");
console.debug("Updating graph");
timeline = update.timeline = graph.createTimeline(this.isNodeShownByDefault);
update.lastEvent = lastGraphEvent;
} else {
Expand Down Expand Up @@ -277,9 +277,7 @@ class GraphTimeline extends React.Component {
}

render() {
let nodes = this.state.graph.getNodes();
//nodes.shift();
let startTs = this.state.graph.created;
let startTs = this.state.graph.toBrowserTime(this.state.graph.created);
let self = this;

if (!this.state.timeline) {
Expand All @@ -288,8 +286,9 @@ class GraphTimeline extends React.Component {

// converts a timestamp to a relative X in the display viewport
let relativeX = function (timeStamp) {
return ((timeStamp - startTs) * self.state.pxPerMs);
};
let relTs = this.state.graph.toBrowserTime(timeStamp);
return ((relTs - startTs) * self.state.pxPerMs);
}.bind(this);

let pendingElems = [(<div key='pending-title'
className={styles.pendingTitle}
Expand All @@ -303,7 +302,7 @@ class GraphTimeline extends React.Component {
let createTs = relativeX(node.created);

if (!this.state.timeline.rankMap.has(node.id())) {
console.log("no rank for", node.id());
console.debug("no rank for", node.id());
return;
}
let rank = this.state.timeline.rankMap.get(node.id());
Expand Down Expand Up @@ -362,7 +361,6 @@ class GraphTimeline extends React.Component {
}

let waitingTime = startPx - createTs;
let waitElem;
if (waitingTime > 10) {
//waitElem = this.createWaitingElem(idx, this.state.nodeHeight, createTs, waitingTime);
}
Expand All @@ -383,7 +381,6 @@ class GraphTimeline extends React.Component {

if (displayNode) {
nodeElements.push(<div key={node.id() + "_1"}>
{waitElem}
<div className={styles.node + ' ' + styleExtra.join(' ')}
style={runboxStyle}
onClick={(e) => this.selectNode(node)}
Expand Down
12 changes: 7 additions & 5 deletions src/client/app/ZoomLine.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class ZoomLine extends React.Component {
}

relativeX(timeStamp) {
var minTs = this.state.graph.created;
const maxTs = this.state.graph.finished ? this.state.graph.finished : Date.now();
let curDurationTs = (maxTs - this.state.graph.created);
var minTs = this.state.graph.toBrowserTime(this.state.graph.created);
const maxTs = this.state.graph.finished ? this.state.graph.toBrowserTime(this.state.graph.finished): Date.now();
let curDurationTs = (maxTs - minTs);
timeStamp = this.state.graph.toBrowserTime(timeStamp);

if (curDurationTs < this.state.windowDurationMs) {
return (timeStamp - minTs) * this.state.width / (this.state.windowDurationMs);
} else {
Expand All @@ -56,8 +58,8 @@ class ZoomLine extends React.Component {
let deltaX = wmme.screenX - this.state.dragStartX;
let inverted;

let minCursorTs = this.state.graph.created;
const maxTs = this.state.graph.finished ? this.state.graph.finished : Date.now();
let minCursorTs = this.state.graph.toBrowserTime(this.state.graph.created);
const maxTs = this.state.graph.finished? this.state.graph.toBrowserTime(this.state.graph.finished) : Date.now();
const maxCursorTs = maxTs - this.state.windowDurationMs;
let curDurationTs = (maxTs - minCursorTs);
if (curDurationTs < this.state.windowDurationMs) {
Expand Down
Loading