This repository has been archived by the owner on May 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
111 lines (81 loc) · 3.45 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import sys
import os
import logging
from logging.handlers import RotatingFileHandler
from config import Config
def config_log(log_instance="app"):
if Config.LOG_TO_STDOUT:
stream_handler = logging.StreamHandler()
logging.root.addHandler(stream_handler)
else:
os.makedirs(Config.LOG_DIR, exist_ok=True)
my_file_handler = RotatingFileHandler(
filename=os.path.join(Config.LOG_DIR, '%s.log' % log_instance),
maxBytes=1024*10,
backupCount=3,
encoding='utf-8'
)
my_file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s:[in %(pathname)s:%(lineno)d] %(message)s '))
logging.root.addHandler(my_file_handler)
logging.root.setLevel(logging.INFO)
logging.info('[Config] Logging : DONE ')
def main(argv):
from transit_feed import TransitFeed
from dal import table_def_init, save_new_feed_version, extract_gtfs_init
# process command line
# args = process_command_line(argv)
try:
logging.info("Exec from Main")
config_log()
logging.info("Creating app tables on %s ..." % Config.DB_URI)
DBSessionMaker = table_def_init(Config.DB_URI)
session = DBSessionMaker()
logging.info("Fecthing TransitFeedAPI ...")
tf = TransitFeed()
feed_version = tf.getLastFeedVersion()
is_new = save_new_feed_version(feed_version, session)
if is_new is False:
raise Exception("Feed #%s from %s is the newest" %
(feed_version['id'], feed_version['start_date']))
logging.info("Downloading TransitFeed Version : %s ..." %
feed_version['id'])
# gtfs_zip_filename = feed_version['id'] + ".zip"
gtfs_zip_filename = "527_20190223.zip"
tf.downloadLastVersion(file_name=gtfs_zip_filename)
logging.info("Extracting GTFS data into Octotrails DB ...")
gtfs_zip_path = os.path.join(Config.GTFS_DIR, gtfs_zip_filename)
extract_gtfs_init(gtfs_zip_path, DBSessionMaker)
except Exception as ex:
logging.critical(ex)
def process_command_line(argv):
import argparse
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"-r", "--run",
choices=['pipeline', 'web', 'all'],
const='all', default='all', nargs='?',
help="Which part of the app to run : pipeline, web, all (by default: %(default)s)")
# parse options
options, args = arg_parser.parse_args()
# check that files are specified
# if len(args) != 2:
# arg_parser.print_help()
# arg_parser.error("you must supply a source and destination")
# make surce source & destination are not the same
# if args[0] == args[1]:
# arg_parser.error("source and destination cannot be the same")
# we're ok
return args
def remove_gtfs_files():
from zipfile import ZipFile
light_files = ['stops', 'translations', 'routes', 'trips', 'stop_times']
def is_ok_for_gtfs_light(i): return i.filename[:-4] in light_files
with ZipFile('./data/versions/gtfs.zip', 'r') as gtfs:
with ZipFile('./data/versions/gtfs-light.zip', 'w') as gtfs_light:
only_files = list(filter(is_ok_for_gtfs_light, gtfs.infolist()))
for light_file in only_files:
buffer = gtfs.read(light_file.filename)
gtfs_light.writestr(light_file, buffer)
if __name__ == "__main__":
main(sys.argv)