diff --git a/bin/debug/load_multi_timeline_for_range.py b/bin/debug/load_multi_timeline_for_range.py index 1a79a76f2..152736289 100644 --- a/bin/debug/load_multi_timeline_for_range.py +++ b/bin/debug/load_multi_timeline_for_range.py @@ -14,7 +14,7 @@ import emission.storage.json_wrappers as esj import argparse -import common +import bin.debug.common as common import os import gzip @@ -26,7 +26,7 @@ args = None -def register_fake_users(prefix, unique_user_list): +def register_fake_users(prefix, unique_user_list, verbose): logging.info("Creating user entries for %d users" % len(unique_user_list)) format_string = "{0}-%0{1}d".format(prefix, len(str(len(unique_user_list)))) @@ -34,11 +34,11 @@ def register_fake_users(prefix, unique_user_list): for i, uuid in enumerate(unique_user_list): username = (format_string % i) - if args.verbose is not None and i % args.verbose == 0: + if verbose is not None and i % verbose == 0: logging.info("About to insert mapping %s -> %s" % (username, uuid)) user = ecwu.User.registerWithUUID(username, uuid) -def register_mapped_users(mapfile, unique_user_list): +def register_mapped_users(mapfile, unique_user_list, verbose): uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook) logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile))) @@ -50,17 +50,17 @@ def register_mapped_users(mapfile, unique_user_list): # register this way # Pro: will do everything that register does, including creating the profile # Con: will insert only username and uuid - id and update_ts will be different - if args.verbose is not None and i % args.verbose == 0: + if verbose is not None and i % verbose == 0: logging.info("About to insert mapping %s -> %s" % (username, uuid)) user = ecwu.User.registerWithUUID(username, uuid) -def get_load_ranges(entries): - start_indices = list(range(0, len(entries), args.batch_size)) +def get_load_ranges(entries, batch_size): + start_indices = list(range(0, len(entries), batch_size)) ranges = list(zip(start_indices, start_indices[1:])) ranges.append((start_indices[-1], len(entries))) return ranges -def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error): +def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose): import emission.core.get_database as edb import pymongo @@ -70,7 +70,7 @@ def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error): (curr_uuid, pipeline_filename)) with gzip.open(pipeline_filename) as gfd: states = json.load(gfd, object_hook = esj.wrapped_object_hook) - if args.verbose: + if verbose: logging.debug("Loading states of length %s" % len(states)) if len(states) > 0: try: @@ -109,6 +109,55 @@ def post_check(unique_user_list, all_rerun_list): else: logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!") +def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000, raw_timeseries_only=False): + fn = file_prefix + logging.info("Loading file or prefix %s" % fn) + sel_file_list = common.read_files_with_prefix(fn) + + all_user_list = [] + all_rerun_list = [] + (tsdb_count, ucdb_count) = (0,0) + + for i, filename in enumerate(sel_file_list): + if "pipelinestate" in filename: + continue + logging.info("=" * 50) + logging.info("Loading data from file %s" % filename) + + entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook) + + # Obtain uuid and rerun information from entries + curr_uuid_list, needs_rerun = common.analyse_timeline(entries) + if len(curr_uuid_list) > 1: + logging.warning("Found %d users, %s in filename, aborting! " % + (len(curr_uuid_list), curr_uuid_list)) + raise RuntimeException("Found %d users, %s in filename, expecting 1, %s" % + (len(curr_uuid_list), curr_uuid_list, common.split_user_id(filename))) + curr_uuid = curr_uuid_list[0] + all_user_list.append(curr_uuid) + all_rerun_list.append(needs_rerun) + + load_ranges = get_load_ranges(entries, batch_size) + if not info_only: + for j, curr_range in enumerate(load_ranges): + if verbose is not None and j % verbose == 0: + logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1])) + wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]] + (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error) + logging.debug("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) + + unique_user_list = set(all_user_list) + if not info_only: + if not raw_timeseries_only: + load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose) + if mapfile is not None: + register_mapped_users(mapfile, unique_user_list, verbose) + elif prefix is not None: + register_fake_users(prefix, unique_user_list, verbose) + + post_check(unique_user_list, all_rerun_list) + return (tsdb_count, ucdb_count) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("file_prefix", @@ -128,6 +177,9 @@ def post_check(unique_user_list, all_rerun_list): parser.add_argument("-s", "--batch-size", default=10000, type=int, help="batch size to use for the entries") + + parser.add_argument("-t", "--raw-timeseries-only", default=False, action='store_true', + help="load only raw timeseries data; if not set load both raw and analysis timeseries data") group = parser.add_mutually_exclusive_group(required=False) group.add_argument("-p", "--prefix", default="user", @@ -141,47 +193,5 @@ def post_check(unique_user_list, all_rerun_list): else: logging.basicConfig(level=logging.INFO) - fn = args.file_prefix - logging.info("Loading file or prefix %s" % fn) - sel_file_list = common.read_files_with_prefix(fn) - - all_user_list = [] - all_rerun_list = [] - - for i, filename in enumerate(sel_file_list): - if "pipelinestate" in filename: - continue - logging.info("=" * 50) - logging.info("Loading data from file %s" % filename) - - entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook) - - # Obtain uuid and rerun information from entries - curr_uuid_list, needs_rerun = common.analyse_timeline(entries) - if len(curr_uuid_list) > 1: - logging.warning("Found %d users, %s in filename, aborting! " % - (len(curr_uuid_list), curr_uuid_list)) - raise RuntimeException("Found %d users, %s in filename, expecting 1, %s" % - (len(curr_uuid_list), curr_uuid_list, common.split_user_id(filename))) - curr_uuid = curr_uuid_list[0] - all_user_list.append(curr_uuid) - all_rerun_list.append(needs_rerun) - - load_ranges = get_load_ranges(entries) - if not args.info_only: - for j, curr_range in enumerate(load_ranges): - if args.verbose is not None and j % args.verbose == 0: - logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1])) - wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]] - (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, args.continue_on_error) - print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) - - unique_user_list = set(all_user_list) - if not args.info_only: - load_pipeline_states(args.file_prefix, unique_user_list, args.continue_on_error) - if args.mapfile is not None: - register_mapped_users(args.mapfile, unique_user_list) - elif args.prefix is not None: - register_fake_users(args.prefix, unique_user_list) - - post_check(unique_user_list, all_rerun_list) + # load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size) + load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size, args.raw_timeseries_only) diff --git a/conf/log/purge.conf.sample b/conf/log/purge.conf.sample new file mode 100644 index 000000000..a1b1cc5d7 --- /dev/null +++ b/conf/log/purge.conf.sample @@ -0,0 +1,43 @@ +{ + "handlers": { + "errors": { + "backupCount": 2, + "mode": "a", + "level": "ERROR", + "formatter": "detailed", + "class": "logging.handlers.RotatingFileHandler", + "maxBytes": 1073741824, + "filename": "/var/tmp/purge-errors.log", + "encoding": "UTF-8" + }, + "console": { + "class": "logging.StreamHandler", + "level": "WARNING" + }, + "file": { + "backupCount": 8, + "filename": "/var/tmp/purge.log", + "maxBytes": 1073741824, + "mode": "a", + "formatter": "detailed", + "class": "logging.handlers.RotatingFileHandler", + "encoding": "UTF-8" + } + }, + "version": 1, + "root": { + "handlers": [ + "console", + "file", + "errors" + ], + "level": "DEBUG" + }, + "formatters": { + "detailed": { + "class": "logging.Formatter", + "format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s" + } + } +} + diff --git a/conf/log/restore.conf.sample b/conf/log/restore.conf.sample new file mode 100644 index 000000000..eca961cc4 --- /dev/null +++ b/conf/log/restore.conf.sample @@ -0,0 +1,43 @@ +{ + "handlers": { + "errors": { + "backupCount": 2, + "mode": "a", + "level": "ERROR", + "formatter": "detailed", + "class": "logging.handlers.RotatingFileHandler", + "maxBytes": 1073741824, + "filename": "/var/tmp/restore-errors.log", + "encoding": "UTF-8" + }, + "console": { + "class": "logging.StreamHandler", + "level": "WARNING" + }, + "file": { + "backupCount": 8, + "filename": "/var/tmp/restore.log", + "maxBytes": 1073741824, + "mode": "a", + "formatter": "detailed", + "class": "logging.handlers.RotatingFileHandler", + "encoding": "UTF-8" + } + }, + "version": 1, + "root": { + "handlers": [ + "console", + "file", + "errors" + ], + "level": "DEBUG" + }, + "formatters": { + "detailed": { + "class": "logging.Formatter", + "format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s" + } + } +} + diff --git a/emission/core/wrapper/pipelinestate.py b/emission/core/wrapper/pipelinestate.py index 9b6c15321..ecaf3b937 100644 --- a/emission/core/wrapper/pipelinestate.py +++ b/emission/core/wrapper/pipelinestate.py @@ -28,7 +28,9 @@ class PipelineStages(enum.Enum): USER_MODEL = 7 RECOMMENDATION = 8 OUTPUT_GEN = 9 - EXPORT_DATA = 15 + EXPORT_DATA = 18 + PURGE_TIMESERIES_DATA = 19 + RESTORE_TIMESERIES_DATA = 20 class PipelineState(ecwb.WrapperBase): props = {"pipeline_stage": ecwb.WrapperBase.Access.RW, # the value of the stage from the enum above diff --git a/emission/export/export.py b/emission/export/export.py index 614c3327e..a8af447f3 100644 --- a/emission/export/export.py +++ b/emission/export/export.py @@ -39,13 +39,40 @@ def get_with_retry(retrieve_call, in_query): query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] return list_so_far -def get_from_all_three_sources_with_retry(user_id, in_query): +def get_from_all_three_sources_with_retry(user_id, in_query, databases=None): + logging.info("In get_from_all_three_sources_with_retry: Databases = %s" % databases) + import emission.storage.timeseries.builtin_timeseries as estb ts = estb.BuiltinTimeSeries(user_id) uc = enua.UserCache.getUserCache(user_id) sort_key = ts._get_sort_key(in_query) + + source_db_calls = [] + + if databases is None or 'timeseries_db' in databases: + logging.info("Fetching from timeseries_db") + base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, + geo_query=None, extra_query_list=None, sort_key = sort_key) + source_db_calls.append(base_ts_call) + if databases is None or 'analysis_db' in databases: + logging.info("Fetching from analysis_timeseries_db") + analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq, + geo_query=None, extra_query_list=None, sort_key = sort_key) + source_db_calls.append(analysis_ts_call) + if databases is None or 'usercache_db' in databases: + logging.info("Fetching from usercache_db") + uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq)) + source_db_calls.append(uc_ts_call) + + retry_lists = [] + for source_db_call in source_db_calls: + retry_lists = retry_lists + get_with_retry(source_db_call, in_query) + + return retry_lists + + ''' base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq, @@ -55,48 +82,73 @@ def get_from_all_three_sources_with_retry(user_id, in_query): return get_with_retry(base_ts_call, in_query) + \ get_with_retry(analysis_ts_call, in_query) + \ get_with_retry(uc_ts_call, in_query) + ''' + +def get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases=None): + combined_list = [] + entries_lists = { + "loc_entry_list": None, + "trip_entry_list": None, + "place_entry_list": None + } -def export(user_id, ts, start_ts, end_ts, file_name, ma_bool): + if databases == ['timeseries_db']: + loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) + loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases) + combined_list = loc_entry_list + logging.info("Found %d loc-like entries = %d total entries" % + (len(loc_entry_list), len(combined_list))) + entries_lists["loc_entry_list"] = loc_entry_list + else: + loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) + loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query) + # Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts + # from the usercache as well + trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts) + trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query) + place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts) + place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query) + # Handle the case of the first place, which has no enter_ts and won't be + # matched by the default query + first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}}, + {'data.exit_ts': {'$exists': True}}]} + first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query])) + logging.info("First place entry list = %s" % first_place_entry_list) + combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list + logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" % + (len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list))) + entries_lists["loc_entry_list"] = loc_entry_list + entries_lists["trip_entry_list"] = trip_entry_list + entries_lists["place_entry_list"] = place_entry_list + + return entries_lists, combined_list + +def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None): + logging.info("In export: Databases = %s" % databases) logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" % (user_id, start_ts, end_ts, file_name)) + + entries_lists, combined_list = get_exported_timeseries_entries(user_id, ts, start_ts, end_ts, databases) - loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts) - loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query) - # Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts - # from the usercache as well - trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts) - trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query) - place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts) - place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query) - # Handle the case of the first place, which has no enter_ts and won't be - # matched by the default query - first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}}, - {'data.exit_ts': {'$exists': True}}]} - first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query])) - logging.info("First place entry list = %s" % first_place_entry_list) - - combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list - logging.info("Found %d loc-like entries, %d trip-like entries, %d place-like entries = %d total entries" % - (len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list))) - - validate_truncation(loc_entry_list, trip_entry_list, place_entry_list) + validate_truncation(entries_lists["loc_entry_list"], entries_lists["trip_entry_list"], entries_lists["place_entry_list"]) unique_key_list = set([e["metadata"]["key"] for e in combined_list]) logging.info("timeline has unique keys = %s" % unique_key_list) if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']): logging.info("No entries found in range for user %s, skipping save" % user_id) + return None else: combined_filename = "%s.gz" % (file_name) with gzip.open(combined_filename, "wt") as gcfd: json.dump(combined_list, gcfd, default=esj.wrapped_default, allow_nan=False, indent=4) + return combined_list - -def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): +def validate_truncation(loc_entry_list=None, trip_entry_list=None, place_entry_list=None): MAX_LIMIT = 25 * 10000 - if len(loc_entry_list) == MAX_LIMIT: + if loc_entry_list is not None and len(loc_entry_list) == MAX_LIMIT: logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list)) - if len(trip_entry_list) == MAX_LIMIT: + if trip_entry_list is not None and len(trip_entry_list) == MAX_LIMIT: logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list)) - if len(place_entry_list) == MAX_LIMIT: + if place_entry_list is not None and len(place_entry_list) == MAX_LIMIT: logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list)) diff --git a/emission/pipeline/purge_stage.py b/emission/pipeline/purge_stage.py new file mode 100644 index 000000000..1e842d4bb --- /dev/null +++ b/emission/pipeline/purge_stage.py @@ -0,0 +1,60 @@ +from __future__ import print_function +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() +from builtins import str +from builtins import * +import json +import logging +import numpy as np +import arrow +from uuid import UUID +import time + +import emission.core.timer as ect + +import emission.core.wrapper.pipelinestate as ecwp + +import emission.storage.decorations.stats_queries as esds +import emission.purge_restore.purge_data as eprpd + +def run_purge_pipeline(process_number, uuid, archive_dir=None): + try: + with open("conf/log/purge.conf", "r") as cf: + purge_log_config = json.load(cf) + except: + with open("conf/log/purge.conf.sample", "r") as cf: + purge_log_config = json.load(cf) + + purge_log_config["handlers"]["file"]["filename"] = \ + purge_log_config["handlers"]["file"]["filename"].replace("purge", "purge_%s" % process_number) + purge_log_config["handlers"]["errors"]["filename"] = \ + purge_log_config["handlers"]["errors"]["filename"].replace("purge", "purge_%s" % process_number) + + # logging.config.dictConfig(purge_log_config) + np.random.seed(61297777) + + logging.info("processing UUID list = %s" % uuid) + + try: + file_names = run_purge_pipeline_for_user(uuid, archive_dir) + except Exception as e: + esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) + logging.exception("Found error %s while processing pipeline " + "for user %s, skipping" % (e, uuid)) + + return file_names + + +def run_purge_pipeline_for_user(uuid, archive_dir, export_type): + with ect.Timer() as edt: + logging.info("*" * 10 + "UUID %s: purging timeseries data" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: purging timeseries data" % uuid + "*" * 10) + file_names = eprpd.purge_data(uuid, archive_dir, export_type) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.PURGE_TIMESERIES_DATA.name, + time.time(), edt.elapsed) + + return file_names diff --git a/emission/pipeline/restore_stage.py b/emission/pipeline/restore_stage.py new file mode 100644 index 000000000..1bd320761 --- /dev/null +++ b/emission/pipeline/restore_stage.py @@ -0,0 +1,60 @@ +from __future__ import print_function +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() +from builtins import str +from builtins import * +import json +import logging +import numpy as np +import arrow +from uuid import UUID +import time + +import emission.core.timer as ect + +import emission.core.wrapper.pipelinestate as ecwp + +import emission.storage.decorations.stats_queries as esds +import emission.purge_restore.restore_data as eprrd + +def run_restore_pipeline(process_number, uuid_list, file_names): + try: + with open("conf/log/restore.conf", "r") as cf: + restore_log_config = json.load(cf) + except: + with open("conf/log/restore.conf.sample", "r") as cf: + restore_log_config = json.load(cf) + + restore_log_config["handlers"]["file"]["filename"] = \ + restore_log_config["handlers"]["file"]["filename"].replace("restore", "restore_%s" % process_number) + restore_log_config["handlers"]["errors"]["filename"] = \ + restore_log_config["handlers"]["errors"]["filename"].replace("restore", "restore_%s" % process_number) + + # logging.config.dictConfig(restore_log_config) + np.random.seed(61297777) + + logging.info("processing UUID list = %s" % uuid_list) + + for uuid in uuid_list: + if uuid is None: + continue + + try: + run_restore_pipeline_for_user(uuid, file_names) + except Exception as e: + esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) + logging.exception("Found error %s while processing pipeline " + "for user %s, skipping" % (e, uuid)) + + +def run_restore_pipeline_for_user(uuid, file_names): + with ect.Timer() as edt: + logging.info("*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: restoring timeseries data" % uuid + "*" * 10) + eprrd.restore_data(uuid, file_names) + + esds.store_pipeline_time(uuid, ecwp.PipelineStages.RESTORE_TIMESERIES_DATA.name, + time.time(), edt.elapsed) diff --git a/emission/purge_restore/purge_data.py b/emission/purge_restore/purge_data.py new file mode 100644 index 000000000..cd6de383f --- /dev/null +++ b/emission/purge_restore/purge_data.py @@ -0,0 +1,161 @@ +# Standard imports +import logging + +# Our imports +import emission.storage.pipeline_queries as espq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import gzip +import json +import os +import emission.core.get_database as edb +import emission.storage.json_wrappers as esj +import emission.core.wrapper.entry as ecwe +import emission.storage.timeseries.timequery as estt +import emission.export.export as eee + +def purge_data(user_id, archive_dir, export_type): + file_names = None + try: + pdp = PurgeDataPipeline() + pdp.user_id = user_id + file_names = pdp.run_purge_data_pipeline(user_id, archive_dir, export_type) + if pdp.last_processed_ts is None: + logging.debug("After run, last_processed_ts == None, must be early return") + espq.mark_purge_data_done(user_id, pdp.last_processed_ts) + except: + logging.exception("Error while purging timeseries data, timestamp unchanged") + espq.mark_purge_data_failed(user_id) + return file_names + +class PurgeDataPipeline: + def __init__(self): + self._last_processed_ts = None + + @property + def last_processed_ts(self): + return self._last_processed_ts + + def run_purge_data_pipeline(self, user_id, archive_dir, export_type): + ts = esta.TimeSeries.get_time_series(user_id) + time_query = espq.get_time_range_for_purge_data(user_id) + + if archive_dir is None: + archive_dir = os.environ.get('DATA_DIR', "emission/archived") + + if os.path.isdir(archive_dir) == False: + os.mkdir(archive_dir) + + initStartTs = time_query.startTs + initEndTs = time_query.endTs + logging.debug("Initial pipeline purge query range = start_time: %s , end_time: %s" % (initStartTs, initEndTs)) + + file_names = [] + total_entries_to_export = eee.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, ['timeseries_db'])[1] + + # If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point + # Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected + current_start_ts = initStartTs if initStartTs is not None else total_entries_to_export[0]['data']['ts'] + + while True: + logging.debug("Inside while loop: current_start_ts = %s" % current_start_ts) + current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs + + if export_type == 'incremental': + current_end_ts = min(current_start_ts + 3600, initEndTs) + logging.debug("Performing incremental export, increasing current_end_ts: %s" % current_end_ts) + elif export_type == 'full': + current_end_ts = initEndTs + logging.debug("Performing full export, setting current_end_ts to current time: %s" % current_end_ts) + else: + raise ValueError("Unknown export_type %s" % export_type) + + logging.debug(f"Processing current batch from {current_start_ts} to {current_end_ts}") + + file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts) + current_batch_exported_entries = eee.export(user_id, ts, current_start_ts, current_end_ts, file_name, False, ['timeseries_db']) + + # Recompute total entries from pipeline initial start time to end time since we are deleting entries iteratively + # This is used to keep a track of remaining entries to export + remaining_timeseries_entries = eee.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, ['timeseries_db'])[1] + + if current_batch_exported_entries is None and len(remaining_timeseries_entries) > 0: + logging.debug("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts)) + logging.debug("Incrementing time range by 1 hour to process remaining timeseries entries") + current_start_ts = current_end_ts + continue + elif current_batch_exported_entries is None and len(remaining_timeseries_entries) == 0: + logging.debug("No new data to export, breaking out of while loop") + break + + logging.debug("Exported to file: %s" % file_name) + file_names.append(file_name) + logging.debug("List of exported file names: %s" % file_names) + + self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts) + + logging.debug("Total entries to export: %s" % len(total_entries_to_export)) + logging.debug("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(current_batch_exported_entries))) + logging.debug("Remaining entries to export: %s" % len(remaining_timeseries_entries)) + + self._last_processed_ts = current_batch_exported_entries[-1]['data']['ts'] + logging.debug("Updated last_processed_ts to last entry in current export batch = %s" % self._last_processed_ts) + + current_start_ts = current_end_ts + if current_start_ts >= initEndTs: + break + + logging.debug("Exported data to %s files: %s" % (len(file_names), file_names)) + return file_names + + + def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): + export_timequery = estt.TimeQuery("data.ts", start_ts_datetime, end_ts_datetime) + ts_query = ts._get_query(time_query=export_timequery) + delete_query = {"user_id": user_id, **ts_query} + + count_entries_to_delete = ts.timeseries_db.count_documents(delete_query) + logging.debug(f"Number of matching entries for deletion = {count_entries_to_delete}") + + logging.debug("Deleting entries from database...") + result = ts.timeseries_db.delete_many(delete_query) + assert(result.deleted_count == count_entries_to_delete) + logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) + + ''' + def delete_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): + export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) + for key, value in export_queries.items(): + ts_query = ts._get_query(time_query=value) + logging.debug(ts_query) + delete_query = {"user_id": user_id, **ts_query} + + count = ts.timeseries_db.count_documents(delete_query) + logging.debug(f"Number of documents matching for {ts.timeseries_db} with {key} query: {count}") + + logging.debug("Deleting entries from database...") + result = ts.timeseries_db.delete_many(delete_query) + logging.debug(f"Key query: {key}") + logging.debug("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) + + def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_datetime): + entries_to_export = [] + export_queries = self.get_export_queries(start_ts_datetime, end_ts_datetime) + for key, value in export_queries.items(): + tq = value + sort_key = ts._get_sort_key(tq) + (ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) + entries_to_export.extend(list(ts_db_result)) + logging.debug(f"Key query: {key}") + logging.debug("{} fetched entries from {} to {}".format(ts_db_count, start_ts_datetime, end_ts_datetime)) + return entries_to_export + + + def get_export_queries(self, start_ts, end_ts): + export_queries = { + # 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs), + # 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs), + 'loc_time_query': estt.TimeQuery("data.ts", start_ts, end_ts) + } + return export_queries + ''' diff --git a/emission/purge_restore/restore_data.py b/emission/purge_restore/restore_data.py new file mode 100644 index 000000000..71c402418 --- /dev/null +++ b/emission/purge_restore/restore_data.py @@ -0,0 +1,47 @@ +# Standard imports +import logging + +# Our imports +import emission.storage.pipeline_queries as espq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import gzip +import json +import os +import bin.debug.load_multi_timeline_for_range as lmtfr +import emission.storage.json_wrappers as esj +import emission.core.get_database as edb +import emission.core.wrapper.pipelinestate as ecwp + +def restore_data(user_id, file_names): + try: + rdp = RestoreDataPipeline() + rdp.user_id = user_id + rdp.run_restore_data_pipeline(user_id, file_names) + if rdp.last_processed_ts is None: + logging.debug("After run, last_processed_ts == None, must be early return") + espq.mark_restore_data_done(user_id, rdp.last_processed_ts) + except: + logging.exception("Error while restoring timeseries data, timestamp unchanged") + espq.mark_restore_data_failed(user_id) + +class RestoreDataPipeline: + def __init__(self): + self._last_processed_ts = None + + @property + def last_processed_ts(self): + return self._last_processed_ts + + def run_restore_data_pipeline(self, user_id, file_names): + time_query = espq.get_time_range_for_restore_data(user_id) + for file_name in file_names: + entries_to_import = json.load(gzip.open(file_name + ".gz"), object_hook = esj.wrapped_object_hook) + (tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True, raw_timeseries_only=True) + logging.debug("After load, tsdb_count = %s, ucdb_count = %s" % (tsdb_count, ucdb_count)) + if tsdb_count == 0: + # Didn't process anything new so start at the same point next time + self._last_processed_ts = None + else: + self._last_processed_ts = entries_to_import[-1]['data']['ts'] + logging.debug("After load, last_processed_ts = %s" % (self._last_processed_ts)) diff --git a/emission/storage/pipeline_queries.py b/emission/storage/pipeline_queries.py index 63bb8acb0..6460c9a41 100644 --- a/emission/storage/pipeline_queries.py +++ b/emission/storage/pipeline_queries.py @@ -232,6 +232,36 @@ def mark_export_data_done(user_id, last_trip_done): def mark_export_data_failed(user_id): mark_stage_failed(user_id, ps.PipelineStages.EXPORT_DATA) +def get_time_range_for_purge_data(user_id): + tq = get_time_range_for_stage(user_id, ps.PipelineStages.PURGE_TIMESERIES_DATA) + tq.timeType = "data.end_ts" + return tq + +def mark_purge_data_done(user_id, last_processed_ts): + if last_processed_ts is None: + mark_stage_done(user_id, ps.PipelineStages.PURGE_TIMESERIES_DATA, None) + else: + mark_stage_done(user_id, ps.PipelineStages.PURGE_TIMESERIES_DATA, + last_processed_ts + END_FUZZ_AVOID_LTE) + +def mark_purge_data_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.PURGE_TIMESERIES_DATA) + +def get_time_range_for_restore_data(user_id): + tq = get_time_range_for_stage(user_id, ps.PipelineStages.RESTORE_TIMESERIES_DATA) + tq.timeType = "data.end_ts" + return tq + +def mark_restore_data_done(user_id, last_processed_ts): + if last_processed_ts is None: + mark_stage_done(user_id, ps.PipelineStages.RESTORE_TIMESERIES_DATA, None) + else: + mark_stage_done(user_id, ps.PipelineStages.RESTORE_TIMESERIES_DATA, + last_processed_ts + END_FUZZ_AVOID_LTE) + +def mark_restore_data_failed(user_id): + mark_stage_failed(user_id, ps.PipelineStages.RESTORE_TIMESERIES_DATA) + def get_time_range_for_label_inference(user_id): tq = get_time_range_for_stage(user_id, ps.PipelineStages.LABEL_INFERENCE) tq.timeType = "data.end_ts" diff --git a/emission/storage/timeseries/cache_series.py b/emission/storage/timeseries/cache_series.py index 3d3cc0830..0417ac56e 100644 --- a/emission/storage/timeseries/cache_series.py +++ b/emission/storage/timeseries/cache_series.py @@ -13,6 +13,7 @@ from future import standard_library standard_library.install_aliases() from builtins import * +import logging import emission.core.get_database as edb import emission.net.usercache.abstract_usercache as enua @@ -65,5 +66,10 @@ def insert_entries(uuid, entry_it, continue_on_error): except pymongo.errors.DuplicateKeyError as e: if not continue_on_error: raise(e) + else: + if "write_fmt_time" in entry["metadata"]: + logging.info("ignoring duplicate key error while restoring timeseries entries") + else: + logging.info("ignoring duplicate key error while restoring usercache entries") return (tsdb_count, ucdb_count) diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py index 56075b83b..4163949a7 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py @@ -49,7 +49,11 @@ def testNoData(self): ewps.PipelineStages.ALTERNATIVES, ewps.PipelineStages.USER_MODEL, ewps.PipelineStages.RECOMMENDATION, - ewps.PipelineStages.OUTPUT_GEN] + ewps.PipelineStages.OUTPUT_GEN, + ewps.PipelineStages.EXPORT_DATA, + ewps.PipelineStages.PURGE_TIMESERIES_DATA, + ewps.PipelineStages.RESTORE_TIMESERIES_DATA + ] for pse in ewps.PipelineStages.__iter__(): if pse in stages_skipped_in_testing: @@ -80,7 +84,11 @@ def testSkipPipelineNoNewEntries(self): ewps.PipelineStages.ALTERNATIVES, ewps.PipelineStages.USER_MODEL, ewps.PipelineStages.RECOMMENDATION, - ewps.PipelineStages.OUTPUT_GEN] + ewps.PipelineStages.OUTPUT_GEN, + ewps.PipelineStages.EXPORT_DATA, + ewps.PipelineStages.PURGE_TIMESERIES_DATA, + ewps.PipelineStages.RESTORE_TIMESERIES_DATA + ] test_run_states = list([pse.value for pse in filter(lambda pse: pse not in stages_skipped_in_testing, ewps.PipelineStages.__iter__())]) diff --git a/emission/tests/exportTests/TestPurgeRestoreModule.py b/emission/tests/exportTests/TestPurgeRestoreModule.py new file mode 100644 index 000000000..bc51eae3e --- /dev/null +++ b/emission/tests/exportTests/TestPurgeRestoreModule.py @@ -0,0 +1,154 @@ +from future import standard_library +standard_library.install_aliases() +from builtins import * +import os +import unittest +import json +import pathlib as pl +import gzip +import logging +import tempfile +import time +from bson.objectid import ObjectId + +import emission.core.get_database as edb +import emission.tests.common as etc +import emission.storage.timeseries.abstract_timeseries as esta +import emission.pipeline.purge_stage as epp +import emission.pipeline.restore_stage as epr +import emission.purge_restore.purge_data as eprpd +import emission.storage.json_wrappers as esj +import emission.storage.timeseries.timequery as estt + +class TestPurgeRestoreModule(unittest.TestCase): + def setUp(self): + self.testEmail = "testPurgeRestoreUser123" + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") + logging.debug("Test UUID for Purge: %s" % self.testUUID) + etc.runIntakePipeline(self.testUUID) + + def tearDown(self): + self.clearAllDb() + + def clearAllDb(self): + edb.get_timeseries_db().delete_many({}) + edb.get_analysis_timeseries_db().delete_many({}) + edb.get_usercache_db().delete_many({}) + edb.get_pipeline_state_db().delete_many({}) + edb.get_uuid_db().delete_one({}) + + def getEntriesToExport(self, tmpdirname): + self.assertTrue(os.path.isdir(tmpdirname)) + + #Set the envrionment variable + os.environ['DATA_DIR'] = tmpdirname + self.assertEqual(os.environ['DATA_DIR'], tmpdirname) + + # Fetch entries from timeseries db before purging to use in tests + ts = esta.TimeSeries.get_time_series(self.testUUID) + tq = estt.TimeQuery("data.ts", None, time.time() - 5) + sort_key = ts._get_sort_key(tq) + (ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key) + entries_to_export = list(ts_db_result) + return entries_to_export + + def prePipelineTests(self): + ''' + Test 1 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.debug(f"About to purge {res} entries") + self.assertEqual(res, 1906) + + def postPipelineTests(self, entries_to_export, file_names): + logging.debug("Exported file names: %s" % file_names) + ''' + Test 2 - Assert the file exists after the export process + ''' + exported_data = [] + for file_name in file_names: + self.assertTrue(pl.Path(file_name + ".gz").is_file()) + with gzip.open(file_name + ".gz", 'r') as ef: + exported_data.extend(json.loads(ef.read().decode('utf-8'))) + # exported_data = json.loads(ef.read().decode('utf-8')) + self.assertEqual(len(exported_data), 1906) + + ''' + Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db + ''' + entries_from_db = entries_to_export + logging.debug("Entries from db size: %s" % len(entries_from_db)) + entries_from_db = entries_from_db[:5] + entries_from_db[-5:] + entries_from_file = exported_data[:5] + exported_data[-5:] + objectIds_from_db = [entry["_id"] for entry in entries_from_db] + objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file] + logging.debug("Object ids from db: %s" % objectIds_from_db) + logging.debug("Object ids from file: %s" % objectIds_from_file) + self.assertEqual(objectIds_from_db, objectIds_from_file) + + ''' + Test 4 - Verify that purging timeseries data works with sample real data + ''' + # Check how much data there is after + entries = edb.get_timeseries_db().find({"user_id" : self.testUUID}) + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + logging.debug(f"Purging complete: {res} entries remaining") + + # A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline + stat_pipeline_key = entries[0].get('metadata').get('key') + logging.debug(f"stat_pipeline_key = {stat_pipeline_key}") + self.assertEqual(stat_pipeline_key,'stats/pipeline_time') + self.assertEqual(res, 1) + + # Run the restore pipeline + logging.debug(f"About to restore entries") + logging.debug("File names: %s" % file_names) + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + + ''' + Test 5 - Verify that restoring timeseries data works with sample real data + ''' + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.debug(f"Restoring complete: {res-2} entries restored") + + # Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline + self.assertEqual(res_stats_count, 2) + self.assertEqual(res, 1908) + + ''' + Test 6 - Verify that restoring timeseries data is skipped if data already exists + Duplicate key error is ignored in import_timeseries.py + Hence no entries should be inserted + ''' + logging.debug("Attempting to load duplicate data...") + epr.run_restore_pipeline_for_user(self.testUUID, file_names) + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID}) + res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'}) + logging.debug(f"Restoring complete: {res-2} entries restored") + + # A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again + self.assertEqual(res_stats_count, 3) + self.assertEqual(res, 1909) + + + def testPurgeRestorePipelineFull(self): + with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + entries_to_export = self.getEntriesToExport(tmpdirname) + self.prePipelineTests() + file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full") + self.postPipelineTests(entries_to_export, file_names) + + def testPurgeRestorePipelineIncremental(self): + with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + entries_to_export = self.getEntriesToExport(tmpdirname) + self.prePipelineTests() + file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "incremental") + self.postPipelineTests(entries_to_export, file_names) + +if __name__ == '__main__': + etc.configLogging() + unittest.main()