forked from gundambox/PttCrawler
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathquery.py
164 lines (134 loc) · 6.09 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import argparse
import csv
import os
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import Dict, List
from pyexcel_ods import save_data
from sqlalchemy import case, func
from models import (Article, ArticleHistory, Board, IpAsn, PttDatabase, Push,
User, UserLastRecord)
from utils import load_config, log, valid_date_type
def parse_argument():
base_subparser = argparse.ArgumentParser(add_help=False)
base_subparser.add_argument('--verbose',
action='store_true',
help='Show more debug messages.')
base_subparser.add_argument('--config-path',
type=str,
default='',
help='Config ini file path.')
parser = argparse.ArgumentParser(parents=[base_subparser])
parser.add_argument('--board-name',
type=str,
required=True)
parser.add_argument('--date-range',
metavar=('START_DATE', 'END_DATE'),
nargs=2,
type=valid_date_type,
help='date in format "YYYY-MM-DD"',
required=True)
parser.add_argument('--format',
type=str,
default='console',
choices=['ods', 'csv', 'console'])
parser.add_argument('--output-folder',
type=str,
default='')
parser.add_argument('--output-prefix',
type=str,
default='')
args = parser.parse_args()
arguments = vars(args)
return arguments
"""
Input:看板名/時間(起)/時間(迄)
Output:看板名/時間(起)/時間(迄)/國內IP數量/國外IP數量
"""
class QueryHelper(object):
def __init__(self, arguments: Dict[str, str]):
config_path = (arguments['config_path']
if arguments['config_path']
else 'config.ini')
self.start_date, self.end_date = arguments['date_range']
self.board_name = arguments['board_name']
self.file_format = arguments['format']
self.config = load_config(config_path)
self.output_folder = arguments['output_folder']
self.output_prefix = arguments['output_prefix']
self.db = PttDatabase(dbtype=self.config['Database']['Type'],
dbname=self.config['Database']['Name'])
self.db_session = self.db.get_session()
@log()
def _get_export_rows(self):
rows = [['Type', 'Board', 'Start date',
'End date', 'TW Ip', 'Not TW Ip']]
tw_ip_label = case(value=IpAsn.asn_country_code,
whens={'TW': True},
else_=False).label("TW_IP")
article_res = self.db_session.query(Article, ArticleHistory, tw_ip_label) \
.join(ArticleHistory, ArticleHistory.article_id == Article.id) \
.join(Board, Board.id == Article.board_id) \
.order_by(ArticleHistory.id) \
.group_by(Article.id) \
.join(IpAsn, IpAsn.ip == Article.post_ip) \
.filter(Board.name == self.board_name).all()
article_tw_ip = sum(1 for _, _, tw_ip in article_res
if tw_ip == True)
article_not_tw_ip = sum(1 for _, _, tw_ip in article_res
if tw_ip == False)
rows.append(['Article', self.board_name,
str(self.start_date or ''), str(self.end_date or ''), article_tw_ip or '0', article_not_tw_ip or '0'])
article_history_id_list = []
for res in article_res:
_, history, _ = res
article_history_id_list.append(history.id)
push_res = self.db_session.query(Push, tw_ip_label) \
.join(IpAsn, IpAsn.ip == Push.push_ip) \
.filter(Push.article_history_id.in_(article_history_id_list)).all()
push_tw_ip = sum(1 for _, tw_ip in push_res
if tw_ip == True)
push_not_tw_ip = sum(1 for _, tw_ip in push_res
if tw_ip == False)
rows.append(['Push', self.board_name,
str(self.start_date or ''), str(self.end_date or ''), push_tw_ip or '0', push_not_tw_ip or '0'])
return rows
def _print_rows(self):
data = self._get_export_rows()
for idx, row in enumerate(data):
print('{:8} | {:16} | {:20} | {:20} | {:5} | {:8}'.format(
*map(str, row)))
if idx == 0:
print(
'---------+------------------+----------------------+----------------------+-------+----------')
def _export_ods(self):
data = {'Query': self._get_export_rows()}
output_filename = 'Ptt_query_{export_datetime}'.format(
export_datetime=datetime.now().strftime('%Y-%m-%d'))
output_path = os.path.join(
self.output_folder, '{filename}.ods'.format(filename=output_filename))
save_data(output_path, data)
def _export_csv(self):
data = self._get_export_rows()
output_filename = 'Ptt_query_{export_datetime}'.format(
export_datetime=datetime.now().strftime('%Y-%m-%d'))
csv_path = os.path.join(
self.output_folder, '{filename}.csv'.format(filename=output_filename))
with open(csv_path, 'w') as csvfile:
csvwriter = csv.writer(csvfile, delimiter=',')
for row in data:
csvwriter.writerow(row)
def go(self):
if self.file_format == 'console':
self._print_rows()
elif self.file_format == 'ods':
self._export_ods()
elif self.file_format == 'csv':
self._export_csv()
def main():
args = parse_argument()
helper = QueryHelper(args)
helper.go()
if __name__ == "__main__":
main()