Skip to content

Commit

Permalink
pushMedia ok
Browse files Browse the repository at this point in the history
  • Loading branch information
juliuswwj committed Apr 5, 2019
1 parent 2b33293 commit a4e8609
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 73 deletions.
99 changes: 26 additions & 73 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ function MKVBuilder(options, cb){
];
//if(options.v.duration) vtrack.push( ebml(0x23E383, beint(options.v.duration*options.timescale)) ); // Duration
var tracks = [ ebml(0xAE, Buffer.concat(vtrack)) ];
this.vduration = options.v.duration || 40;
this.vduration = options.v.duration || 33;
this.aduration = this.vduration / 2;
if(options.a){
if(options.a.duration) this.aduration = options.a.duration;
Expand Down Expand Up @@ -181,8 +181,10 @@ MKVBuilder.prototype.putFrame = function(data, type){
this.cb(this.data);
}
this.tsbase = this.vts;
this.data = [0, ebml(0xe7, beint(this.tsbase))]; // tmcode
this.csize = this.data[1].length;
this.data = [0,
ebml(0xe7, beint(this.tsbase)), // tmcode
ebml(0xa7, new Buffer(1))]; // Position - 0 for live
this.csize = this.data[1].length + this.data[2].length;
}
if(this.data.length < 2) return false; // wait video I frame

Expand All @@ -198,7 +200,7 @@ MKVBuilder.prototype.putFrame = function(data, type){
ts -= this.tsbase;

var block = Buffer.concat([beint(0xa3), benum(4 + data.length),
benum( ((type>>8)+1) & 0xff), // track number
benum( ((type>>8)+1) & 0xf), // track number
new Buffer([ts>>8, ts&0xff, type&0xff])]); // sint16 timecode diff, uint8 flags
//console.log('block', data.length, block);
this.data.push(block);
Expand Down Expand Up @@ -400,11 +402,14 @@ Kinesis.prototype.getEndPoint = function(cb){

function putMedia(options, url, cb, cb2){
// parse https://s-4010bf70.kinesisvideo.us-west-2.amazonaws.com
//var http = require('http');
//var host = 'localhost';
var host = url.substring( url.indexOf('//')+2 );
var opt = {
method: 'POST',
path: '/putMedia',
host: host,
//port: 8000,
headers: {
'Accept': '*/*',
'user-agent': options.useragent,
Expand Down Expand Up @@ -452,87 +457,35 @@ Kinesis.prototype.start = function(cb){
throw new Error("start: invalid callback");
}

// queue data, then send it in chunked format
var options = this.options;
var req = null;
var dq = [];
var busy = false;
var nl = '\r\n';
var BLOCKSIZE = 0x3ff4;

function sendchunks(data, cb) {
var l = data.length;
if(l == 0)return;

var sz = 0;
var pos = 0;
var st = 0; // 0 header, 1 data, 2 footer, 3 end
for(var i = 0; i < l; i++){
sz += data[i].length;
}
var d = data.shift();
function send(){
var ok = true;
var bsz = 0;
while(st != 3){
if(st == 0){
bsz = sz>BLOCKSIZE ? BLOCKSIZE : sz;
ok = req.write(bsz.toString(16) + nl);
if(!ok) break;
console.log('chunk', bsz);
st = 1;
sz -= bsz;
}
if(st == 1){
var l = d.length;
if(bsz >= l && pos == 0){
ok = req.write(d);
if(!ok)break;
bsz -= l;
d = data.shift();
} else if(bsz >= l - pos) {
ok = req.write(d.slice(pos));
if(!ok)break;
bsz -= l - pos;
pos = 0;
d = data.shift();
} else {
ok = req.write(d.slice(pos, pos + bsz));
if(!ok)break;
pos += bsz;
bsz = 0;
}
if(bsz == 0) st = 2;
}
if(st == 2){
if(sz == 0){
req.write(nl, cb);
st = 3;
} else {
ok = req.write(nl);
if(ok) st = 0;
}
}
}
if(!ok) req.once('drain', send);
}
send();
}

function dosend(){
//console.log('dosend', busy, dq.length, req);
if(busy || dq.length == 0 || !req) return;
options.req = req;
busy = true;
sendchunks(dq.shift(), function(){
busy = false;
dosend();
});

function send(){
while(true){
if(dq.length == 0){
busy = false;
break;
}
var d = dq.shift();
if( !req.write(d) ){
req.once('drain', send);
break;
}
}
}
send();
}

this.mkv = new MKVBuilder(this.options, function(data){
console.log('mkvpush', data.length);
dq.push(data);
//console.log('mkvpush', data.length);
dq = dq.concat(data);
dosend();
});

Expand Down
57 changes: 57 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,65 @@ test(function(resovle){
});
});



function testserver(){
const net = require('net');
var fw = fs.openSync('/tmp/test.txt', 'w');
var s = net.createServer(function(c){
c.write('HTTP/1.0 200 Ok\r\n\r\n');

var cnt = 0;
var itm = setInterval(function(){
var ob = {
EventType: 'Received',
FragmentTimecode: cnt*1000,
FragmentNumber: cnt,
ErrorId: 0,
};
c.write(JSON.stringify({Acknowledgement:ob}) + '\r\n');
}, 3000);

var hex = false;
c.on('data', function(data){
if(!hex){
var ptr = data.indexOf('\r\n\r\n');
fs.writeSync(fw, '-- HDR\n');
if(ptr > 0){
fs.writeSync(fw, data.slice(0, ptr+4).toString());
hex = true;
} else {
fs.writeSync(fw, data.toString());
}
if(!hex) return;
data = data.slice(ptr+4);
}
fs.writeSync(fw, '-- DATA_OUT\n');
for(var i=0, l=data.length; i < l; i+=16){
var t = i+16<=l ? data.slice(i, i+16) : data.slice(i);
fs.writeSync(fw, t.toString('hex'));
fs.writeSync(fw, '\n');
}
});

c.on('end', function(){
clearInterval(itm);
});
});
s.on('error', function(e){
console.log('testserver', e);
});
s.listen(8000, function(){
console.log('testserver bind on 8000');
});
}



// node test.js region keyid key
if(process.argv.length >= 5) test(function(resolve){
//testserver();

var ex = new MKVExtractor(mkvin);
var itm;
var s;
Expand Down

0 comments on commit a4e8609

Please sign in to comment.