Skip to content

Commit

Permalink
Merge pull request #63 from linkedconnections/development
Browse files Browse the repository at this point in the history
v2.0.1
  • Loading branch information
julianrojas87 authored Jul 5, 2022
2 parents 1646a58 + f655deb commit 0d46b34
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 46 deletions.
74 changes: 33 additions & 41 deletions lib/GtfsIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ const { URL } = require('url');
const { request } = require('undici');
const util = require('util');
const fs = require('fs');
const unzip = require('unzipper');
const csv = require('fast-csv');
const del = require('del');
const childProcess = require('child_process');
const { Level } = require('level');
const Utils = require('./Utils');

const exec = util.promisify(childProcess.exec);

Expand All @@ -17,60 +17,48 @@ class GtfsIndex {
this._headers = options.headers || {};
}

getIndexes(options) {
return new Promise(async (resolve, reject) => {
try {
await this.cleanUp();
// Download or load from disk static GTFS feed
if (this.path.startsWith('http') || this.path.startsWith('https')) {
const downloadURL = new URL(this.path);
if (!downloadURL.protocol) {
reject('Please provide a valid URL or a path to a GTFS feed');
return;
} else {
await this.download(this.path, this.headers);
}
async getIndexes(options) {
try {
await this.cleanUp();
// Download or load from disk static GTFS feed
if (this.path.startsWith('http') || this.path.startsWith('https')) {
const downloadURL = new URL(this.path);
if (!downloadURL.protocol) {
throw new Error('Please provide a valid URL or a path to a GTFS feed');
} else {
if (!fs.existsSync(this.path)) {
reject('Please provide a valid url or a path to a GTFS feed');
return;
} else {
await this.download(this.path, this.headers);
}
} else {
if (!fs.existsSync(this.path)) {
throw new Error('Please provide a valid url or a path to a GTFS feed');
} else {
if (this.path.endsWith('.zip')) {
await this.unzip(fs.createReadStream(this.path));
} else {
this.auxPath = this.path;
}
}

resolve(this.createIndexes(options.store, options.trips, options.deduce));
} catch (err) {
await this.cleanUp();
reject(err);
}
});

return this.createIndexes(options.store, options.trips, options.deduce);
} catch (err) {
await this.cleanUp();
throw err;
}
}

async download(url, headers) {
const res = await request(url, { method: 'GET', headers });

if(res.statusCode === 200) {
if (res.statusCode === 200) {
await this.unzip(await res.body);
} else {
throw new Error(`Error on HTTP request: ${url}, Message: ${await res.body.text()}`);
}
}

unzip(res) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(this.auxPath)) {
fs.mkdirSync(this.auxPath);
}
res.pipe(unzip.Extract({ path: this.auxPath }))
.on('error', async err => {
await this.cleanUp();
reject(err);
})
.on('close', () => {
resolve();
});
});
unzip(stream) {
return Utils.unzipStream(stream, this.auxPath);
}

async createIndexes(store, uTrips, deduce) {
Expand All @@ -88,7 +76,7 @@ class GtfsIndex {
let cdp = null;


if(deduce) {
if (deduce) {
tripsByRoute = new Map();
firstStops = new Map();
calendar = new Map();
Expand Down Expand Up @@ -147,7 +135,7 @@ class GtfsIndex {
let sp = this.createIndex(this.auxPath + '/stops.txt', stops_index, 'stop_id');
let rp = this.createIndex(this.auxPath + '/routes.txt', routes_index, 'route_id');

if(deduce) {
if (deduce) {
cp = this.createIndex(this.auxPath + '/calendar.txt', calendar, 'service_id');
cdp = this.processCalendarDates(this.auxPath + '/calendar_dates.txt', calendarDates);
}
Expand Down Expand Up @@ -317,6 +305,10 @@ class GtfsIndex {
return this._auxPath;
}

set auxPath(path) {
this._auxPath = path;
}

get headers() {
return this._headers;
}
Expand Down
22 changes: 22 additions & 0 deletions lib/Utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const fs = require('fs');
const unzip = require('unzipper');

function unzipStream(stream, outPath) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(outPath)) {
fs.mkdirSync(outPath);
}
stream.pipe(unzip.Extract({ path: outPath }))
.on('error', async err => {
await this.cleanUp();
reject(err);
})
.on('close', () => {
resolve();
});
});
}

module.exports = {
unzipStream
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "gtfsrt2lc",
"version": "2.0.0",
"version": "2.0.1",
"description": "Converts the GTFS-RT to Linked Connections",
"main": "./Gtfsrt2LC.js",
"bin": {
Expand Down
22 changes: 20 additions & 2 deletions test/gtfsrt2lc.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const fs = require('fs');
const GtfsIndex = require('../lib/GtfsIndex');
const Gtfsrt2lc = require('../lib/Gtfsrt2LC');
const Utils = require('../lib/Utils');

const static_path = './test/data/static_rawdata.zip';
const rt_path = './test/data/realtime_rawdata';
Expand All @@ -24,7 +26,7 @@ var grepConnections = [];
var levelConnections = [];


// Make sure travis-ci does not crash due to timeouts
// Make sure test process does not crash due to timeouts
jest.setTimeout(180000);

test('Obtain the list of trips to be updated from GTFS-RT data', async () => {
Expand Down Expand Up @@ -77,7 +79,7 @@ test('Extract all indexes from sample static GTFS data (test/data/static_rawdata
});

test('Extract all indexes from sample static GTFS data (test/data/static_rawdata.zip) using LevelStore', async () => {
let gti = new GtfsIndex({ path: static_path });
const gti = new GtfsIndex({ path: static_path });
expect.assertions(4);
levelIndexes = await gti.getIndexes({ store: 'LevelStore' });

Expand All @@ -92,6 +94,22 @@ test('Extract all indexes from sample static GTFS data (test/data/static_rawdata
expect(levelStop_times).toBeDefined();
});

test('Extract all indexes when source is given as decompressed folder', async () => {
// First decompress GTFS zip file
const fileStream = fs.createReadStream(static_path);
const sourcePath = './test/data/decompressed';
await Utils.unzipStream(fileStream, sourcePath);
// Extract indexes
expect.assertions(4);
const gti = new GtfsIndex({ path: sourcePath });
const indexes = await gti.getIndexes({ store: 'MemStore' });

expect(indexes.routes).toBeDefined();
expect(indexes.trips).toBeDefined();
expect(indexes.stops).toBeDefined();
expect(indexes.stop_times).toBeDefined();
});

test('Check all parsed connections are consistent regarding departure and arrival times', async () => {
grt.setIndexes(memIndexes);
let connStream = await grt.parse({ format: 'json' });
Expand Down

0 comments on commit 0d46b34

Please sign in to comment.