Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge + Restore user timeseries data with long-term storage #952

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
8a71ce5
feat: Add timeseries purge script
paultranvan Jan 31, 2023
6ea8ac5
Added csv export feature to bin/purge_user_timeseries
TTalex Mar 28, 2023
d20011b
Replaced print() with logging.debug()
Jan 8, 2024
0d0a0ba
Merge remote-tracking branch 'ttalex/purge-timeseries' into purge-res…
Jan 8, 2024
ae6eae6
Storing data as JSON + Restore code added
Jan 8, 2024
78979ff
Current working code for JSON based purge/restore of data
Jan 9, 2024
2c1ef44
Added CSV export as an option
Jan 9, 2024
315510c
Added more tests for purge and restore operations
Jan 11, 2024
479a37f
Updated test file path
Jan 11, 2024
d8ef5f7
Updated default directory path
Jan 11, 2024
28719f4
Added import options
Jan 19, 2024
da584e6
Added conditional checks before purging
Jan 19, 2024
190f4d8
Remove print
Jan 19, 2024
ee1aada
Testing changes to get TestExportModule to work
Jul 9, 2024
3550b1e
Testing adding database argument to export script
Jul 10, 2024
41ba8f0
Trying to run export using export pipeline + Added logging statements
Jul 11, 2024
332ba43
Executing the purge process via the export pipeline now. Added Test f…
Jul 11, 2024
e601491
Trying to make the entire flow work : export -> purge -> restore
Jul 11, 2024
d7823b4
Cleaned up code before pushing commits.
Jul 11, 2024
24548f4
Added TODO comment to Test file
Jul 11, 2024
6a990cb
Fixed datetime module + Cleaned up redundant changes from other PRs
Jul 11, 2024
2d73ef9
Removing changes made to original export PR scripts.
Jul 31, 2024
82ecd73
Removed newline change from .gitignore
Jul 31, 2024
01636bf
Added purge + restore pipeline implementation + Corrected EXPORT_DATA…
Jul 31, 2024
b9fc467
Added purge + restore pipeline implementation + Corrected EXPORT_DATA…
Jul 31, 2024
3bdc8cb
Removed import_timeseries import - no longer using this file.
Jul 31, 2024
2729f2b
Skipping export, purge, restore stages in pipeline test.
Jul 31, 2024
a600682
Removed purge/restore standalone scripts; can reuse existing extract/…
Aug 1, 2024
1b6833d
Added flag to existing extract script to allow purging
Aug 1, 2024
306f4de
Removed continue_on_error parameter to load timeseries function
Aug 1, 2024
9fb7a93
Deleting unused Test file
Aug 1, 2024
2097ff4
Tests added (more tests to be added) + Using continue_on_error flag f…
Aug 2, 2024
103537a
Added duplicate data test + log message + returning inserted entries …
Aug 2, 2024
33eae2a
Removed unused Test file for purge / restore
Aug 2, 2024
b65430d
New logic for handling last processed entry; need to add last_trip va…
Aug 9, 2024
5fb6370
Using data.ts for last_processed_ts; not using last_trip_done
Aug 9, 2024
592636a
Added last_processed_ts logic to restore_data by using tsdb_count
Aug 27, 2024
0e57a76
Added more tests for testing purge restore pipelines + Using last_pro…
Aug 28, 2024
661a222
Draft commit - Testing 1 hour incremental export - some entries missing
Aug 29, 2024
ec162ad
Implemented Full + Incremental export + purge + restore
Aug 29, 2024
852fd09
Revert "Added flag to existing extract script to allow purging"
Aug 30, 2024
63f7985
Added a new import_timeseries that replicates load_multi_timeline_for…
Aug 30, 2024
34ab73d
Reduced number of files by merging data using batch_size
Aug 31, 2024
4ab627b
Draft commit -> Fix full export batch size limit + Trying to fix incr…
Aug 31, 2024
02fb2ce
Removed batch size limit
Aug 31, 2024
c38b82d
Shortened core logic + Added tests to check file contents
Aug 31, 2024
6c82123
Draft commit - added print statements to Test; pending restore multip…
Aug 31, 2024
0a7138b
Revert "Shortened core logic + Added tests to check file contents"
Aug 31, 2024
6dc72b6
Tests for assserting few entries after export
Aug 31, 2024
23734e5
Added more tests for comparing entries from db and export files
Sep 1, 2024
4703f04
Cleaned up duplicate code, log statements + Refactored export.py
Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 63 additions & 53 deletions bin/debug/load_multi_timeline_for_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import emission.storage.json_wrappers as esj
import argparse

import common
import bin.debug.common as common
import os

import gzip
Expand All @@ -26,19 +26,19 @@

args = None

def register_fake_users(prefix, unique_user_list):
def register_fake_users(prefix, unique_user_list, verbose):
logging.info("Creating user entries for %d users" % len(unique_user_list))

format_string = "{0}-%0{1}d".format(prefix, len(str(len(unique_user_list))))
logging.info("pattern = %s" % format_string)

for i, uuid in enumerate(unique_user_list):
username = (format_string % i)
if args.verbose is not None and i % args.verbose == 0:
if verbose is not None and i % verbose == 0:
logging.info("About to insert mapping %s -> %s" % (username, uuid))
user = ecwu.User.registerWithUUID(username, uuid)

def register_mapped_users(mapfile, unique_user_list):
def register_mapped_users(mapfile, unique_user_list, verbose):
uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook)
logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile)))

Expand All @@ -50,17 +50,17 @@ def register_mapped_users(mapfile, unique_user_list):
# register this way
# Pro: will do everything that register does, including creating the profile
# Con: will insert only username and uuid - id and update_ts will be different
if args.verbose is not None and i % args.verbose == 0:
if verbose is not None and i % verbose == 0:
logging.info("About to insert mapping %s -> %s" % (username, uuid))
user = ecwu.User.registerWithUUID(username, uuid)

def get_load_ranges(entries):
start_indices = list(range(0, len(entries), args.batch_size))
def get_load_ranges(entries, batch_size):
start_indices = list(range(0, len(entries), batch_size))
ranges = list(zip(start_indices, start_indices[1:]))
ranges.append((start_indices[-1], len(entries)))
return ranges

def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error):
def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose):
import emission.core.get_database as edb
import pymongo

Expand All @@ -70,7 +70,7 @@ def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error):
(curr_uuid, pipeline_filename))
with gzip.open(pipeline_filename) as gfd:
states = json.load(gfd, object_hook = esj.wrapped_object_hook)
if args.verbose:
if verbose:
logging.debug("Loading states of length %s" % len(states))
if len(states) > 0:
try:
Expand Down Expand Up @@ -109,6 +109,55 @@ def post_check(unique_user_list, all_rerun_list):
else:
logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!")

def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000, raw_timeseries_only=False):
fn = file_prefix
logging.info("Loading file or prefix %s" % fn)
sel_file_list = common.read_files_with_prefix(fn)

all_user_list = []
all_rerun_list = []
(tsdb_count, ucdb_count) = (0,0)

for i, filename in enumerate(sel_file_list):
if "pipelinestate" in filename:
continue
logging.info("=" * 50)
logging.info("Loading data from file %s" % filename)

entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook)

# Obtain uuid and rerun information from entries
curr_uuid_list, needs_rerun = common.analyse_timeline(entries)
if len(curr_uuid_list) > 1:
logging.warning("Found %d users, %s in filename, aborting! " %
(len(curr_uuid_list), curr_uuid_list))
raise RuntimeException("Found %d users, %s in filename, expecting 1, %s" %
(len(curr_uuid_list), curr_uuid_list, common.split_user_id(filename)))
curr_uuid = curr_uuid_list[0]
all_user_list.append(curr_uuid)
all_rerun_list.append(needs_rerun)

load_ranges = get_load_ranges(entries, batch_size)
if not info_only:
for j, curr_range in enumerate(load_ranges):
if verbose is not None and j % verbose == 0:
logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1]))
wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]]
(tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error)
logging.debug("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count))

unique_user_list = set(all_user_list)
if not info_only:
if not raw_timeseries_only:
load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose)
if mapfile is not None:
register_mapped_users(mapfile, unique_user_list, verbose)
elif prefix is not None:
register_fake_users(prefix, unique_user_list, verbose)

post_check(unique_user_list, all_rerun_list)
return (tsdb_count, ucdb_count)

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("file_prefix",
Expand All @@ -128,6 +177,9 @@ def post_check(unique_user_list, all_rerun_list):

parser.add_argument("-s", "--batch-size", default=10000, type=int,
help="batch size to use for the entries")

parser.add_argument("-t", "--raw-timeseries-only", default=False, action='store_true',
help="load only raw timeseries data; if not set load both raw and analysis timeseries data")

group = parser.add_mutually_exclusive_group(required=False)
group.add_argument("-p", "--prefix", default="user",
Expand All @@ -141,47 +193,5 @@ def post_check(unique_user_list, all_rerun_list):
else:
logging.basicConfig(level=logging.INFO)

fn = args.file_prefix
logging.info("Loading file or prefix %s" % fn)
sel_file_list = common.read_files_with_prefix(fn)

all_user_list = []
all_rerun_list = []

for i, filename in enumerate(sel_file_list):
if "pipelinestate" in filename:
continue
logging.info("=" * 50)
logging.info("Loading data from file %s" % filename)

entries = json.load(gzip.open(filename), object_hook = esj.wrapped_object_hook)

# Obtain uuid and rerun information from entries
curr_uuid_list, needs_rerun = common.analyse_timeline(entries)
if len(curr_uuid_list) > 1:
logging.warning("Found %d users, %s in filename, aborting! " %
(len(curr_uuid_list), curr_uuid_list))
raise RuntimeException("Found %d users, %s in filename, expecting 1, %s" %
(len(curr_uuid_list), curr_uuid_list, common.split_user_id(filename)))
curr_uuid = curr_uuid_list[0]
all_user_list.append(curr_uuid)
all_rerun_list.append(needs_rerun)

load_ranges = get_load_ranges(entries)
if not args.info_only:
for j, curr_range in enumerate(load_ranges):
if args.verbose is not None and j % args.verbose == 0:
logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1]))
wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]]
(tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, args.continue_on_error)
print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count))

unique_user_list = set(all_user_list)
if not args.info_only:
load_pipeline_states(args.file_prefix, unique_user_list, args.continue_on_error)
if args.mapfile is not None:
register_mapped_users(args.mapfile, unique_user_list)
elif args.prefix is not None:
register_fake_users(args.prefix, unique_user_list)

post_check(unique_user_list, all_rerun_list)
# load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size)
load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size, args.raw_timeseries_only)
43 changes: 43 additions & 0 deletions conf/log/purge.conf.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"handlers": {
"errors": {
"backupCount": 2,
"mode": "a",
"level": "ERROR",
"formatter": "detailed",
"class": "logging.handlers.RotatingFileHandler",
"maxBytes": 1073741824,
"filename": "/var/tmp/purge-errors.log",
"encoding": "UTF-8"
},
"console": {
"class": "logging.StreamHandler",
"level": "WARNING"
},
"file": {
"backupCount": 8,
"filename": "/var/tmp/purge.log",
"maxBytes": 1073741824,
"mode": "a",
"formatter": "detailed",
"class": "logging.handlers.RotatingFileHandler",
"encoding": "UTF-8"
}
},
"version": 1,
"root": {
"handlers": [
"console",
"file",
"errors"
],
"level": "DEBUG"
},
"formatters": {
"detailed": {
"class": "logging.Formatter",
"format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s"
}
}
}

43 changes: 43 additions & 0 deletions conf/log/restore.conf.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"handlers": {
"errors": {
"backupCount": 2,
"mode": "a",
"level": "ERROR",
"formatter": "detailed",
"class": "logging.handlers.RotatingFileHandler",
"maxBytes": 1073741824,
"filename": "/var/tmp/restore-errors.log",
"encoding": "UTF-8"
},
"console": {
"class": "logging.StreamHandler",
"level": "WARNING"
},
"file": {
"backupCount": 8,
"filename": "/var/tmp/restore.log",
"maxBytes": 1073741824,
"mode": "a",
"formatter": "detailed",
"class": "logging.handlers.RotatingFileHandler",
"encoding": "UTF-8"
}
},
"version": 1,
"root": {
"handlers": [
"console",
"file",
"errors"
],
"level": "DEBUG"
},
"formatters": {
"detailed": {
"class": "logging.Formatter",
"format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s"
}
}
}

4 changes: 3 additions & 1 deletion emission/core/wrapper/pipelinestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class PipelineStages(enum.Enum):
USER_MODEL = 7
RECOMMENDATION = 8
OUTPUT_GEN = 9
EXPORT_DATA = 15
EXPORT_DATA = 18
PURGE_TIMESERIES_DATA = 19
RESTORE_TIMESERIES_DATA = 20

class PipelineState(ecwb.WrapperBase):
props = {"pipeline_stage": ecwb.WrapperBase.Access.RW, # the value of the stage from the enum above
Expand Down
Loading
Loading