Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions apps/addons/management/commands/download_counts_from_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from datetime import datetime
from optparse import make_option

from django.core.management.base import BaseCommand, CommandError

import commonware.log

from addons.models import File
# TODO: use DownloadCount when the script is proven to work correctly.
from stats.models import update_inc, DownloadCountTmp as DownloadCount


log = commonware.log.getLogger('adi.downloadcountsfromfile')


class Command(BaseCommand):
"""Update download count metrics from a file in the database.

Usage:
./manage.py download_counts_from_file <filename> --date=YYYY-MM-DD


We get a row for each "addon download" request, in this format:

<count> <addon id> <click source>

There is one DownloadCount entry per addon per day, and each field holds
the json-ified dict of keys/counters.

Eg, for the above request:

addon: <the addon that has this id>
count: <the number of requests for this addon, for this day>
date: <the date of the day the queries were made>
src: {'dp-btn-primary': 1}

"""
help = __doc__

option_list = BaseCommand.option_list + (
make_option('--date', action='store', type='string',
dest='date', help='Date in the YYYY-MM-DD format.'),
make_option('--separator', action='store', type='string', default='\t',
dest='separator', help='Field separator in file.'),
)

def handle(self, *args, **options):
start = datetime.now() # Measure the time it takes to run the script.
day = options['date']
if not day:
raise CommandError('You must specify a --date parameter in the '
' YYYY-MM-DD format.')
sep = options['separator']
filename = args[0]
# First, make sure we don't have any existing counts for the same day,
# or it would just increment again the same data.
DownloadCount.objects.filter(date=day).delete()

# Memoize the files to addon relations and the DownloadCounts.
download_counts = {}
# Perf: preload all the files once and for all.
# This builds a dict where each key (the file_id we get from the hive
# query) has the addon_id as value.
files_to_addon = dict(File.objects.values_list('id',
'version__addon_id'))

with open(filename) as count_file:
for index, line in enumerate(count_file):
if index and (index % 10000) == 0:
log.info('Processed %s lines' % index)

splitted = line[:-1].split(sep)

if len(splitted) != 3:
log.debug('Badly formatted row: %s' % line)
continue

counter, file_id, src = splitted
try:
file_id, counter = int(file_id), int(counter)
except ValueError: # Badly formatted? Drop.
continue

# Does this file exist?
if file_id in files_to_addon:
addon_id = files_to_addon[file_id]
else:
log.info('File with id: %s not found' % file_id)
continue

# Memoize the DownloadCount.
if addon_id in download_counts:
dc = download_counts[addon_id]
else:
dc = DownloadCount(date=day, addon_id=addon_id, count=0)
download_counts[addon_id] = dc

# We can now fill the DownloadCount object.
dc.count += counter
dc.sources = update_inc(dc.sources, src, counter)

# Create in bulk: this is much faster.
DownloadCount.objects.bulk_create(download_counts.values(), 100)
total_time = (datetime.now() - start).total_seconds()
log.info('Processed a total of %s lines' % (index + 1))
log.debug('Total processing time: %s seconds' % total_time)
191 changes: 191 additions & 0 deletions apps/addons/management/commands/download_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import os
from datetime import datetime
from optparse import make_option

import pyhs2
from pyhs2 import connections, cursor
from pyhs2.TCLIService.ttypes import TFetchOrientation, TFetchResultsReq

from django.core.management.base import BaseCommand, CommandError

import commonware.log


log = commonware.log.getLogger('adi.export')


fetch_time = 0 # Used for time reporting.


# This class and the following are needed because the pyhs2 lib doesn't return
# a generator, but a full list! Doing this allows us to return a generator.
class YieldedCursor(cursor.Cursor):
"""Override the fetch method to return a generator."""

def fetch(self):
max_rows = int(os.getenv('MAX_HIVE_ROWS', 10000))
fetchReq = TFetchResultsReq(operationHandle=self.operationHandle,
orientation=TFetchOrientation.FETCH_NEXT,
maxRows=max_rows)

while True:
global fetch_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why you need this to be global

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because assigning to a variable that wasn't declared won't work. Just using it or returning it would use the global namespace, but not if you're assigning to it. This is why the global statement here is mandatory.

And if you feel uncomfortable because this is a global, I'm too, but the other solutions we came up with were far worse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I understand now that YieldedCursor comes yielded itself, and so the fetch method is run more than once per call.

# Measure the time it takes to retrieve from hive.
start = datetime.now()
resultsRes = self.client.FetchResults(fetchReq)
fetch_time += (datetime.now() - start).total_seconds()
if len(resultsRes.results.rows) == 0:
break
for row in resultsRes.results.rows:
rowData = []
for i, col in enumerate(row.colVals):
rowData.append(pyhs2.cursor.get_value(col))
yield rowData


class ClevererConnection(connections.Connection):
"""Return our own YieldedCursor.

Yeah, it seems pysh2 isn't dealing with so much data... so there's just a
huge list returned.

"""

def cursor(self):
return YieldedCursor(self.client, self.session)


class Command(BaseCommand):
"""Execute queries on HIVE, and store the results on disk.

Query the downloads or updates requests for addons on HIVE. These will then
be processed by other scripts to store counts in the DownloadCount and
UploadCount objects.

Usage:
./manage.py download_metrics --date YYYY-MM-DD \
--with-updates --with-downloads

Set a ``MAX_HIVE_ROWS`` environment variable to minimize the network
latency (default is 10000 rows fetched from hive at once), but it will
increase the memory footprint.

"""
help = __doc__

option_list = BaseCommand.option_list + (
make_option('--output', action='store', type='string',
dest='filename', help='Filename to output to.'),
make_option('--separator', action='store', type='string', default='\t',
dest='separator', help='Field separator in file.'),
make_option('--date', action='store', type='string',
dest='date', help='Date in the YYYY-MM-DD format.'),
make_option('--with-updates', action='store_true', default=False,
dest='with_updates', help='Store update requests.'),
make_option('--with-downloads', action='store_true', default=False,
dest='with_downloads', help='Store download requests.'),
make_option('--limit', action='store', type='int',
dest='limit', help='(debug) max number of requests.'),
)

def handle(self, *args, **options):
day = options['date']
if not day:
raise CommandError('You must specify a --date parameter in the '
' YYYY-MM-DD format.')
filename = options['filename']
if filename is None:
filename = day
sep = options['separator']
limit = options['limit']
with_updates = options['with_updates']
with_downloads = options['with_downloads']
if not with_updates and not with_downloads:
raise CommandError('Please specify at least one of --with-updates '
'or --with-downloads.')

with ClevererConnection(host='peach-gw.peach.metrics.scl3.mozilla.com',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to obfuscate those credentials with a os.environ.get() or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are only accessible through the vpn, so I believe this is ok, please let us know if it isn't the case @jasonthomas

port=10000,
user='aphadke',
password='',
authMechanism='PLAIN') as conn:
num_reqs = 0
with conn.cursor() as cur:
start = datetime.now() # Measure the time to run the script.
if with_downloads:
num_reqs += self.process_downloads(
cur, day, filename, sep=sep, limit=limit)
if with_updates:
num_reqs += self.process_updates(
cur, day, filename, sep=sep, limit=limit)

total_time = (datetime.now() - start).total_seconds()
log.info('Stored a total of %s requests' % num_reqs)
log.debug('Total processing time: %s seconds' % total_time)
log.debug('Time spent fetching data from hive over the network: %s' %
fetch_time)

def process_updates(self, cur, day, filename, sep, limit=None):
"""Query the update requests and store them on disk."""
limit = ('limit %s' % limit) if limit else ''
# We use "concat" and http://a.com in the following request to have
# fully qualified URLs.
cur.execute("select count(1), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'id'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'version'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'status'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appID'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appVersion'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appOS'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'locale'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'updateType') "
"from v2_raw_logs "
"where domain='versioncheck.addons.mozilla.org' "
" and ds='%s' "
" and request_url like '/update/VersionCheck.php?%%' "
"group by "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'id'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'version'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'status'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appID'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appVersion'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'appOS'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'locale'), "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'updateType') "
"%s" % (day, limit))

return self.to_file(cur, '%s.updates' % filename, sep)

def process_downloads(self, cur, day, filename, sep, limit=None):
"""Query the download requests and store them on disk."""
limit = ('limit %s' % limit) if limit else ''
# We use "concat" and http://a.com in the following request to have
# fully qualified URLs.
cur.execute("select count(1), "
" split(request_url,'/')[4], "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'src') "
"from v2_raw_logs "
"where domain='addons.mozilla.org' "
" and ds='%s' "
" and request_url like '/firefox/downloads/file/%%' "
" and !(parse_url(concat('http://a.com',request_url), 'QUERY', 'src') LIKE 'sync') "
"group by "
" split(request_url,'/')[4], "
" parse_url(concat('http://a.com',request_url), 'QUERY', 'src') "
"%s" % (day, limit))
return self.to_file(cur, '%s.downloads' % filename, sep)

def to_file(self, cur, filename, sep):
log.info('Storing hive results in %s' % filename)
count = 0
with open(filename, 'w') as f:
for row in cur.fetch():
count += 1
if (count % 100000) == 0:
log.info('Processed %s requests' % count)
if None in row: # Incomplete result: skip.
continue
f.write(sep.join([str(col) for col in row]))
f.write('\n')

return count
Loading