From a4e860915e5b42065f573227434817ce6fce547a Mon Sep 17 00:00:00 2001 From: wenjun wang Date: Fri, 5 Apr 2019 12:43:17 -0700 Subject: [PATCH] pushMedia ok --- index.js | 99 +++++++++++++++----------------------------------------- test.js | 57 ++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 73 deletions(-) diff --git a/index.js b/index.js index 1aa5ad6..7f39b3b 100644 --- a/index.js +++ b/index.js @@ -136,7 +136,7 @@ function MKVBuilder(options, cb){ ]; //if(options.v.duration) vtrack.push( ebml(0x23E383, beint(options.v.duration*options.timescale)) ); // Duration var tracks = [ ebml(0xAE, Buffer.concat(vtrack)) ]; - this.vduration = options.v.duration || 40; + this.vduration = options.v.duration || 33; this.aduration = this.vduration / 2; if(options.a){ if(options.a.duration) this.aduration = options.a.duration; @@ -181,8 +181,10 @@ MKVBuilder.prototype.putFrame = function(data, type){ this.cb(this.data); } this.tsbase = this.vts; - this.data = [0, ebml(0xe7, beint(this.tsbase))]; // tmcode - this.csize = this.data[1].length; + this.data = [0, + ebml(0xe7, beint(this.tsbase)), // tmcode + ebml(0xa7, new Buffer(1))]; // Position - 0 for live + this.csize = this.data[1].length + this.data[2].length; } if(this.data.length < 2) return false; // wait video I frame @@ -198,7 +200,7 @@ MKVBuilder.prototype.putFrame = function(data, type){ ts -= this.tsbase; var block = Buffer.concat([beint(0xa3), benum(4 + data.length), - benum( ((type>>8)+1) & 0xff), // track number + benum( ((type>>8)+1) & 0xf), // track number new Buffer([ts>>8, ts&0xff, type&0xff])]); // sint16 timecode diff, uint8 flags //console.log('block', data.length, block); this.data.push(block); @@ -400,11 +402,14 @@ Kinesis.prototype.getEndPoint = function(cb){ function putMedia(options, url, cb, cb2){ // parse https://s-4010bf70.kinesisvideo.us-west-2.amazonaws.com + //var http = require('http'); + //var host = 'localhost'; var host = url.substring( url.indexOf('//')+2 ); var opt = { method: 'POST', path: '/putMedia', host: host, + //port: 8000, headers: { 'Accept': '*/*', 'user-agent': options.useragent, @@ -452,87 +457,35 @@ Kinesis.prototype.start = function(cb){ throw new Error("start: invalid callback"); } - // queue data, then send it in chunked format var options = this.options; var req = null; var dq = []; var busy = false; - var nl = '\r\n'; - var BLOCKSIZE = 0x3ff4; - - function sendchunks(data, cb) { - var l = data.length; - if(l == 0)return; - - var sz = 0; - var pos = 0; - var st = 0; // 0 header, 1 data, 2 footer, 3 end - for(var i = 0; i < l; i++){ - sz += data[i].length; - } - var d = data.shift(); - function send(){ - var ok = true; - var bsz = 0; - while(st != 3){ - if(st == 0){ - bsz = sz>BLOCKSIZE ? BLOCKSIZE : sz; - ok = req.write(bsz.toString(16) + nl); - if(!ok) break; - console.log('chunk', bsz); - st = 1; - sz -= bsz; - } - if(st == 1){ - var l = d.length; - if(bsz >= l && pos == 0){ - ok = req.write(d); - if(!ok)break; - bsz -= l; - d = data.shift(); - } else if(bsz >= l - pos) { - ok = req.write(d.slice(pos)); - if(!ok)break; - bsz -= l - pos; - pos = 0; - d = data.shift(); - } else { - ok = req.write(d.slice(pos, pos + bsz)); - if(!ok)break; - pos += bsz; - bsz = 0; - } - if(bsz == 0) st = 2; - } - if(st == 2){ - if(sz == 0){ - req.write(nl, cb); - st = 3; - } else { - ok = req.write(nl); - if(ok) st = 0; - } - } - } - if(!ok) req.once('drain', send); - } - send(); - } function dosend(){ - //console.log('dosend', busy, dq.length, req); if(busy || dq.length == 0 || !req) return; options.req = req; busy = true; - sendchunks(dq.shift(), function(){ - busy = false; - dosend(); - }); + + function send(){ + while(true){ + if(dq.length == 0){ + busy = false; + break; + } + var d = dq.shift(); + if( !req.write(d) ){ + req.once('drain', send); + break; + } + } + } + send(); } this.mkv = new MKVBuilder(this.options, function(data){ - console.log('mkvpush', data.length); - dq.push(data); + //console.log('mkvpush', data.length); + dq = dq.concat(data); dosend(); }); diff --git a/test.js b/test.js index 0ea5d87..74d641e 100644 --- a/test.js +++ b/test.js @@ -267,8 +267,65 @@ test(function(resovle){ }); }); + + +function testserver(){ + const net = require('net'); + var fw = fs.openSync('/tmp/test.txt', 'w'); + var s = net.createServer(function(c){ + c.write('HTTP/1.0 200 Ok\r\n\r\n'); + + var cnt = 0; + var itm = setInterval(function(){ + var ob = { + EventType: 'Received', + FragmentTimecode: cnt*1000, + FragmentNumber: cnt, + ErrorId: 0, + }; + c.write(JSON.stringify({Acknowledgement:ob}) + '\r\n'); + }, 3000); + + var hex = false; + c.on('data', function(data){ + if(!hex){ + var ptr = data.indexOf('\r\n\r\n'); + fs.writeSync(fw, '-- HDR\n'); + if(ptr > 0){ + fs.writeSync(fw, data.slice(0, ptr+4).toString()); + hex = true; + } else { + fs.writeSync(fw, data.toString()); + } + if(!hex) return; + data = data.slice(ptr+4); + } + fs.writeSync(fw, '-- DATA_OUT\n'); + for(var i=0, l=data.length; i < l; i+=16){ + var t = i+16<=l ? data.slice(i, i+16) : data.slice(i); + fs.writeSync(fw, t.toString('hex')); + fs.writeSync(fw, '\n'); + } + }); + + c.on('end', function(){ + clearInterval(itm); + }); + }); + s.on('error', function(e){ + console.log('testserver', e); + }); + s.listen(8000, function(){ + console.log('testserver bind on 8000'); + }); +} + + + // node test.js region keyid key if(process.argv.length >= 5) test(function(resolve){ + //testserver(); + var ex = new MKVExtractor(mkvin); var itm; var s;