Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge + Restore user timeseries data with long-term storage #952

Open
wants to merge 51 commits into
base: master
Choose a base branch
from

Conversation

MukuFlash03
Copy link
Contributor

Building on top of PR #904 with the objective of integrating this into the current codebase.
Will be analyzing the file storage options (CSV, JSON) as well as including the option for mongodump/restore.
Additionally long-term storage to AWS will be looked at as well.

paultranvan and others added 7 commits January 31, 2023 19:03
When operating a server, the `Stage_timeseries` database can become
quite big.
In the case where only the `Stage_analysis_timeseries` is actually
useful after the pipeline execution, the user's timeseries can be
deleted to speed up the pipeline and gain some disk space.
Print() statements weren't being logged in AWS Cloudwatch logs.

Logging.debug() statements are meant for this purpose.

These statements may or may not show up in normal execution output depending on the set logger level.
Choosing JSON instead of CSV since:
1. CSV does not retain nested dict-like document data structure of MongoDB documents.
2. CSV stores redundant empty NaN columns as well.
CSV export kept on hold for now as restoring from CSV is complicated due to loss of data structure.

This commit includes working code for export as JSON file and import from JSON file.
Default option for now is JSON which is easier for data restore.

Provided export flags as a boolean dictionary which calls the specific export function as per the set boolean flag.
@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Jan 9, 2024

Thinking of options to be made available for type of export file - CSV, JSON, Mongodump

  • CSV -> Complicates restoring due to: NaN field columns, loss of data structure
  • JSON -> Works good with both export and import
  • Mongodump -> Should be same as JSON

I’m proposing default option to be JSON which will occur irrespective of other type of export (CSV for now). Pros of this:

  • Currently JSON export/import has been implemented succesfully.
  • Avoid complexities of mongodump's readability from the user's perspective, as it can be viewed in a text editor and data can be interpreted more easily as compared to Mongodump.

Pros of having Mongodump as default option:

  • Purge + Restore should be much simpler as it should be some built-in functionality of simply loading mongodump.

Currently integrated CSV export as an optional export. Once Mongodump is implemented and even that data can be imported, then even Mongodump becomes an option for export.
This will mean JSON file will always be generated so will get either just JSON or a combination of JSON and other file types (JSON + CSV or JSON + CSV + Mongodump).

I have grouped all export data file types in a boolean dictionary which can the call specific export function is that flag is set.

Next working on adding Mongodump as an option.

Mahadik, Mukul Chandrakant added 3 commits January 10, 2024 19:00
Built on and added tests for normal data operations of purge() and restore().

Added edge cases tests:
1. Loading duplicate or already existing data by calling restore function again.
2. Loading from empty JSON file containing no data.

Will add additional tests if needed.
Changed file path of empty json file used for testing to generic /var/tmp instead of local path.
Changed from "/tmp" to operating system's default temporary directory.
Makes this functionality work on a cross-platform basis.
@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Jan 19, 2024

Latest fixes work with JSON files used as the export file type and this same file works for importing data correctly back into the database as well.

However, with CSV files, it's a bit complicated because of two reasons:

  1. nested data structure is lost 2) data is stored as strings which converts types like arrays into strings too.

Also, with Mongodump, I tried including the python subprocess module to use it to run the mongodump command as a terminal command but there were issues with dependencies missing (such as mongodump), then had to figure out how to run and/or save the file locally in the Docker container and then extract it out of there if needed for analysis on one's local system.

With these in mind, JSON currently seems a good option for basic export/import.
I will check out the already existing code for JSON export too.
I found this export function mentioned here

@shankari
Copy link
Contributor

Yes, that is the export PR. We can also potentially use the associated pipeline state for incremental exports since this is planned to be part of a running system. I would suggest the use of command line flags to indicate whether we want an incremental dump or a full dump.

Mahadik, Mukul Chandrakant added 3 commits January 19, 2024 11:35
Import file type added as command line argument with appropriate functions.
Checking for valid cstate, query results count before initiating purging.
@shankari
Copy link
Contributor

As discussed earlier, this needs to use the existing export method.

@shankari
Copy link
Contributor

shankari commented Jul 9, 2024

The scalability considerations here, if you follow the PR chain, are that we store the raw data in the timeseries, we generate the analysis timeseries from the raw timeseries, and then we use the analysis timeseries (aka with the analysis results) in the rest of our work. So the argument is that you don't need to store the raw timeseries on an ongoing basis. It is only required when we need to reset the pipeline and/or change analysis algorithms. So it can be moved to long-term storage, but with the ability to restore before we reset the pipeline.

@MukuFlash03
Copy link
Contributor Author

I started off by first understanding the differences in terms of code changes from the newer Purge PR and the older Export PR.


A) Purge vs Export

  1. Purge PR does purge or delete entries once export is completed.
  2. Export PR only exports timeseries entries to JSON dump without any deletion from the database. I did see from this comment that bin/debug has a script for purging as well. But this script purges all the entries for a specific user from all databases, which is not what we want.

B) Do we want to allow the user to specify the time range for export + purge ? Or should it be fetched programmatically using pipeline stages like it's being done in the Purge PR and in the emission.pipeline.export_stage?

  1. Purge PR script does not need time range for extraction to be specified in command line inputs; fetched programmatically using last_run_ts. The correct time variable to be used is last_processed_ts however (refer to this comment).

  2. Export PR uses the extract_timeline_for_day_range_and_user.py script to use the export functionality via command-line but it needs start and end dates to be passed as arguments.
    I do also see another way to execute the export functionality would be to run the export pipeline similar to how we run the intake pipeline.
    The export pipeline handles fetching the time range based on the pipeline state using:

start_ts = curr_state.last_processed_ts
end_ts = time.time() - END_FUZZ_AVOID_LTE

C) Output files containing exported data differ in the format of the ObjectID and UUID

  1. Purge PR script output format matches the entries in the database - ObjectID and UUID
        ...
	'_id': ObjectId('657b98c36078c82e567e462a'), 
	'user_id': UUID('2a4acad8-ee98-45ff-b4b5-322345700646'),
        ...
  1. Export PR script output format is changed to $oid and $uuid.
        ...
        "_id": {
            "$oid": "657b98c36078c82e567e462a"
        },
        "user_id": {
            "$uuid": "2a4acad8ee9845ffb4b5322345700646"
        }, ...

This is managed via emission.storage.json_wrappers module that uses bson.json_util module.
Handling this will be required when restoring the data but the conversion should be fairly simple or might even be handled automatically by MongoDB when inserting into the database.


@MukuFlash03
Copy link
Contributor Author

Looking at the files involved in the export PR:

  • bin/debug/extract_timeline_for_day_range_and_user.py: Command-line script to execute export()
  • emission/export/export.py: Main nested export file that exports to json dump.
    - emission/core/wrapper/pipelinestate.py: Declares a new Pipeline Stage.
  • emission/exportdata/export_data.py: Fetches time range for export and calls emission.export.export()
  • emission/pipeline/export_stage.py: Defines a new export Pipeline that calls emission.exportdata.export_data()
  • emission/tests/exportTests/TestExportModule.py: Unit tests for export()

Export Data Flow:
emission/pipeline/export_stage.py -> emission/exportdata/export_data.py -> emission/export/export.py


  1. TestExportModule.py
    I started with trying to execute this test to get a sense of the flow of data from setting up a sample user to exporting the data.

But I got some errors:

A. FileNotFoundError

FileNotFoundError: [Errno 2] No such file or directory: 'emission/archived/archive_28712d00-ca16-4fae-9fc2-032a2b4bd5ea_1437607737.997_1720561247.677196.gz'

Erroneous Code:
        file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (self.testUUID, time_query.startTs, time_query.endTs)

To solve this issue, I had to manually create the archive directory under emission, and then execute the test file.


  1. Assertion Error
    Fails this assertion in the code:
        #Testing the matching total number of confirmed trips, testing no confirmed trips in raw.
        #Testing also for the first three trips that the object ids and user ids are consistent.
        self.assertEqual(len(confirmed_trips_exported), len(confirmed_trips_db))

Error stack trace

======================================================================
FAIL: testExportModule (__main__.TestExportModule)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/mmahadik/Documents/Work/OpenPATH/Code/GitHub/e-mission-server/emission/tests/exportTests/TestExportModule.py", line 51, in testExportModule
    self.assertEqual(len(confirmed_trips_exported), len(confirmed_trips_db))
AssertionError: 0 != 6

I'm yet to ascertain the cause of this.


@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Jul 10, 2024

Main query / concern for now

One of the main things that I am concerned with is that, the existing export() method takes in data from three sources: timeseries_db, analysis_timeseries_db, usercache.

But the requirement in this PR is to export the raw data entries stored in timeseries_db only and then go on and purge the exported data entries and restore from the exported data when required.


This snippet is from emission/export/export.py:

def get_from_all_three_sources_with_retry(user_id, in_query):
    import emission.storage.timeseries.builtin_timeseries as estb

    ts = estb.BuiltinTimeSeries(user_id)
    uc = enua.UserCache.getUserCache(user_id)

    sort_key = ts._get_sort_key(in_query)
    base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq,
        geo_query=None, extra_query_list=None, sort_key = sort_key)
    analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq,
        geo_query=None, extra_query_list=None, sort_key = sort_key)
    uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq))

Related PRs / Issues for Fetching data from three sources

Reasoning behind these changes to read from the three database sources is mentioned in this issue

This PR added these changes to fix this issue.

This commit has the specific changes that were added.

This commit talks about restoring dumped data.

This comment has logs about restoring user data.


I am going to scan these related issues again to see if there's a workaround or we'll need a separate export script altogether.


@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Jul 10, 2024

I'm somewhat unclear about the usage of last_run_ts, last_processed_ts, curr_run_ts, start_ts, end_ts in the pipeline code.
Have found these references in the export PR, my understanding is better but need to grasp the flow entirely:

Detailed discussion here in the export PR:

Will need to read up on the pipeline code, pipeline states again.

@shankari
Copy link
Contributor

shankari commented Jul 10, 2024

B) Do we want to allow the user to specify the time range for export + purge ? Or should it be fetched programmatically using pipeline stages like it's being done in the Purge PR and in the emission.pipeline.export_stage?

Programatically. I don't see any arguments for not. In general, the "purge" PR is what we should base our work on since it is by far the most polished option.

I am going to scan these related issues again to see if there's a workaround or we'll need a separate export script altogether.

I don't see why you need a separate export script. It should be trivial to add a database argument to the existing script. We can also consider exporting using mongodump instead of to csv or json to be consistent with our internal processes.

Mahadik, Mukul Chandrakant added 5 commits July 10, 2024 13:37
…ile as well. Some pointers: had to comment out logging.config.dictConfig in export_stage as it was giving a logging has no module named config error. Also, running the test with and without running intake pipeline gives different results. I believe it's got to do with how the start_ts is being set as a part of the pipeline process. Next, will move onto purging and restoring from the .gz dump file.
@MukuFlash03
Copy link
Contributor Author

Here’s an overview of the pipeline stages along with the approaches used for updating the last_processed_ts.

S. No. Pipeline Stage Function Filename Variable used Approach
1. USERCACHE uh.moveToLongTerm() emission/net/usercache/builtin_usercache_handler.py last_processed_ts metadata.write_ts used for each trip iteratively
2. USER_INPUT_MATCH_INCOMING eaum.match_incoming_user_inputs() emission/analysis/userinput/matcher.py last_processed_ts metadata.write_ts used for each trip iteratively
3. ACCURACY_FILTERING eaicf.filter_accuracy() emission/analysis/intake/cleaning/filter_accuracy.py last_processed_ts metadata.write_ts used by indexing last element
4. TRIP_SEGMENTATION eaist.segment_current_trips() emission/analysis/intake/segmentation/trip_segmentation.py last_processed_ts filter_methods used
5. SECTION_SEGMENTATION eaiss.segment_current_sections() emission/analysis/intake/segmentation/section_segmentation.py last_trip_done last trip fetched directly by indexing and data.end_ts used
6. JUMP_SMOOTHING eaicl.filter_current_sections() emission/analysis/intake/cleaning/location_smoothing.py last_section_done last section fetched directly by indexing and data.end_ts used
7. CLEAN_RESAMPLING eaicr.clean_and_resample() emission/analysis/intake/cleaning/clean_and_resample.py last_section_done last_place trip fetched using functions present in e-mission-server/emission/storage/decorations/timeline.py and data.enter_ts used
8. MODE_INFERENCE eacimr.predict_mode() emission/analysis/classification/inference/mode/rule_engine.py last_section_done last predicted section fetched directly by indexing and data.end_ts used
9. LABEL_INFERENCE eacilp.infer_labels() emission/analysis/classification/inference/labels/pipeline.py last_trip_done iteratively last_trip_done's data.end_ts comparing with cleaned_trip's data.end_ts and updated if newer trip found
10. EXPECTATION_POPULATION eaue.populate_expectations() emission/analysis/userinput/expectations.py last_trip_done iteratively last_trip_done's data.end_ts comparing with expected_trip's data.end_ts and updated if newer trip found
11. CREATE_CONFIRMED_OBJECTS eaum.create_confirmed_objects() emission/analysis/userinput/matcher.py last_processed_ts last_place trip fetched using functions present in e-mission-server/emission/storage/decorations/timeline.py and data.enter_ts used
12. CREATE_COMPOSITE_OBJECTS eapcc.create_composite_objects() emission/analysis/plotting/composite_trip_creation.py last_processed_ts iteratively creating composite trips and using data.end_ts value

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Aug 9, 2024

Having seen all these, I opted to go with the approach that first fetches all trip entries first before performing the operation.

trips_to_export = self.get_exported_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs, export_queries)
...
...

# Inside get_exported_timeseries_entries()
(ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key)

Then once the specific stage's primary operations are completed, then the last trip is directly indexed. I'm using the value present in data.ts to keep track of the last processed entry for the Purge stage.

if len(trips_to_export) == 0:
    # Didn't process anything new so start at the same point next time
    self._last_processed_ts = None
else:  
    self._last_processed_ts = trips_to_export[-1]['data']['ts']

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Aug 9, 2024

I thought of using data.enter_ts or data.end_ts like other stages were doing it by making use of the wrapper Entry().
Also, these stages were using ts.find_entries() which combines entries from both timeseries_db and analysis_timeseries_db

Took reference from emission/analysis/userinput/matcher.py

toMatchInputs = [ecwe.Entry(e) for e in ts.find_entries(input_key_list, time_query=timerange)]

So, I decided to use _get_entries_for_timeseries() to query just the timeseries db entries and retrieve raw trip data.
But this was giving empty entries when trying to convert result cursor obtained above.

Hence, decided to go ahead with data.ts.
I did observe that for some entries, this value is the same as metadata.write_ts ; for others, data.ts != metadata.write_ts.

I will take a second look and possibly using a similar Entry() based approach and see if end_ts can be used. But it seems these values are more catered for the analysis timeseries value and not raw data?

@MukuFlash03
Copy link
Contributor Author

PENDING / TODO:

  1. Add additional test cases to add for the logic to handle last_processed_ts.
  • Checking whether retrieved last_processed_ts is the max last_processed_ts in the raw exported data / restored data
  • After running export / restore, re-run it to check whether incremental export occurs and entire data isn’t exported or restored every time.
  1. Add last_processed_ts logic for Restore function as well.

Mahadik, Mukul Chandrakant added 14 commits August 27, 2024 15:10
…cesssed_ts for restore_data

Simply using last index value like how it was done in purge_data
When validate truncate function signature still has 3 arguments, it gives an error but exports all entries.
However, when I modify it to include only location query, it passes but does not export all entries.
The last curr_end_ts timetstamp doesn't go all the way to the last one.

Committing changes before switching to another branch.
Exporting and Purging entries in 1 hour time chunks into separate files with a defined start_ts and end_ts.

Start_ts is the last_processed_ts for the user_id's Purge pipeline state.
- If the pipeline has already been run for the user, then this would be a non-None value.
- If pipeline hasn't been run, then it will be None; in this case, the earliest timestamp entry is chosen as the start ts. This helps avoid ValueErrors when adding 1 hour (3600 seconds) to start_ts for incremental export.

End_ts differs for Full export and Incremental export:
- Full export: current time at which pipeline is run will be the end ts; value returned by pipeline_queries on initiating pipeline stage is used.
- Incremental export: First end_ts value would be 1 hour ahead of start_ts; this value would continue to be incremented by 1 hour as long as data exists for a user. If the value after adding 1 hour exceeds the current time, then the end_ts is set to the current time itself. The export + delete process continue as long as there is raw timeseries data for the user.

-------

But what does 1 hour’s worth of data mean?
- In any case, purge pipeline runs upto the current time or until no more raw timeseries entries present in db for the user.
- If Purge pipeline running for the first time for a user, then it will export and purge all the timeseries data for a user from its first entry (which can be really old data and first purge run might take a lot of time)
- If Purge pipeline has already been run before for a user, then it will set start_ts to last_processed_ts and export data from that point.
    - If purge pipeline run hourly, then it would eventually just have a small subset of entries.

-------

Some points to consider:

A. Numerous files; Less data quantity per file

One observation is that current logic is creating multiple files in 1 hour chunks, which is okay.
But these files don’t really have a lot of entries.
What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
If this default threshold batch size isn't reached, keep adding to the same file. Keeping updating the end_ts but start_ts would remain the same.

Will attempt this next step.

------

B. Right approach for Export + Purge?

Approach A
1. Export data in chunks to File
2. Delete exported data from DB.
3. Repeat until all data purged.

Flow looks like: Export -> Delete -> Export -> Delete -> Export -> Delete

——

Approach B
1. Export data in chunks to file.
2. Repeat until all data exported.
3. Delete all exported data from DB.

Flow looks like: Export -> Export -> Export ... -> Delete

---------------

C. Do we need all 3 types of entries: locations, trips, places?

For now, commented out code from export_timeseries.py.
If we only need location entries, can simplify code further to just work for these entries.

If these are sort of like subsets of each other: location -> trip -> place.
Then I can safely just take location.
But this is valid only if all entries contain location and hence ts values.
If only trip entries present or only place entries, then directly choosing latest ts is incorrect since trips use enter_ts while places use start_ts

Searching codebase for references and read through Shankari’s thesis to start_ts and enter_ts.

I’m getting hints that start_ts and enter_ts are analysis_timeseries entries?
In that case, can ignore these since the purge + restore is concerned with raw timeseries data only.

Trip entries created in emission/analysis/intake/segmentation/trip_segmentation.py

——

Hint 1: Timeseries_Sample.ipynb

- ct_df fetches analysis/cleaned_trip entries -> analysis DB

------

Hint 2:  bin/historical/migrations/populate_local_dt.py

- Looks like old code, some changes were last made 8 years ago.
- The collection parameter refers to some non-time series databases as seen from the function calls.
- The entry[start_ts] or entry[‘enter_ts’] values are then used in the find query by setting data.ts to this value.

---------

D. Is pipeline_states export needed?

Remove pipeline_states export if not needed.
Currently being used in existing export + load scripts.

---------
…_range

Main reason for this change was to avoid dealing with the pipeline_states file.

-----

Problem:

Observed in the exported pipeline state tar file that last_processed_ts for PURGE stage was null.

But from logs it looks like updated pipeline state inserted in the DB
```
INFO:root:For stage PipelineStages.PURGE_TIMESERIES_DATA, last_ts_processed = 2015-07-23T06:40:40.069000

DEBUG:root:About to save object PipelineState({'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778})

DEBUG:root:After saving state PipelineState({'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778}), list is [{'_id': ObjectId('66c79deddbf93d53d0f61184'), 'user_id': UUID('222a6ab7-94f0-4fec-a48f-0471297a9644'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358120.242778}]
```
---

Checked the DB, last_processed_ts is updated in the pipeline state db.

```
Command:
	pipeline_states_list = list(edb.get_pipeline_state_db().find({"user_id": UUID(uuid_test)}))
       	for state in  pipeline_states_list:
		print(state)

Output:
{'_id': ObjectId('66c79eaad2f85784cd19fd79'), 'user_id': UUID('b9f26d62-ef7b-489a-b814-154ea8e08dae'), 'pipeline_stage': 19, 'curr_run_ts': None, 'last_processed_ts': 1437633640.069, 'last_ts_run': 1724358309.836113}
```

-----

Why was it added in the original script?

Commit that added it in the export PR
e-mission@dd9ec1c

These were added in Jan 2018
Commit that added it in extract_timeline_for_day_range_and_user.py
e-mission@b38366b

Commit that added it in load_multi_timeline_for_range.py
e-mission@06a0a4e

Why do we need it?
Shankari
“””
The raw data and the analysis results do not constitute the entire state of a
pipeline. In particular, if we store only the raw + analysis results, and then
we try to run the pipeline again, we will end up with two copies of the
analysis results.
“””

-----

Do we need it for purge PR?
- The commit message states that pipelinestates were also exported / loaded so that we don't have duplicate analysis entries.
- In the purge PR, we are strictly dealing with timeseries_db data.
- Hence can remove it from the purge_restore related code.

------

Something wrong with the export_pipeline_states() in purge_data then?
- No, I was calling export_timeseries to export pipeline states inside the run_purge_pipeline function in purge_data.
- This was running for every export file but at this point last_processed_ts isn’t updated.
- It is only updated once the function exits and goes back to the parent function where the stage is marked as done.
final update to the pipeline state occurs when on returning to the parent function purge_data()

```
        if pdp.last_processed_ts is None:
            logging.debug("After run, last_processed_ts == None, must be early return")
        espq.mark_purge_data_done(user_id, pdp.last_processed_ts)
```
- Hence in all the pipeline state files, last_processed_ts has the value NULL.
- Also, we get multiple pipeline state files since the function call to export the pipeline states is also within the run_purge_data_pipeline function that exports the timeseries data.

Now, one approach to resolve this that I thought would work:
- Move the export pipeline states call to the parent function after the stage has been marked as done.
- Also if we move the export pipeline states to the parent function, only one pipelinestates file would be created; I tried using the earliest start ts and the latest end ts to name the file.
- But this had a few problems.
- First, it doesn’t seem right that after a stage is marked as done we are still performing some steps. I checked the existing intake pipeline stages and none of them do this.
- Second, while it would work for full export, it would be difficult for incremental export where each export filename has a different start and end ts corresponding to multiple time ranges.
- This is because, load_multi_timeline_for_range calls load_pipeline_states which uses the same file_prefix name as the exported timeseries filename to load the pipeline states; it just adds "pipelinestates" to the file_prefix.
- But then this file wasn’t found by the load_multi_timeline function since it would load the data from the first time range file say start_ts_1 to end_ts_1, so on. But when it tries to load pipeline_states file with the same prefix, it doesn’t exist as we now only have only file with earliest_start_ts to latest_end_ts.
- Hence, it gives a FileNotFoundError.

------
One observation is that current logic is creating multiple files, which is okay.
But these files don’t really have a lot of entries.

What could be more efficient is to perhaps store more entries until a threshold say 5000 or 10000 (like batch_size in load_multi_timeline_for_range).
If this default threshold batch size isn't reached, keep adding to the same file.
Keeping updating the end_ts but start_ts would remain the same.

----

Found an edge case
Incremental export is fine.

Let’s say we have chosen full export.

In the sample data we have 1906 entries.
In batch testing I’m setting batch_size_limit to 500.

Now, when the code executes:
- current_end_ts will be set to initEndTs which is current time () - FUZZ time as set by the pipeline queries.
- new_entries will have all 1906 entries which is more than the batch_size_limit
- BOTH batch_size_limit check and current_end_ts checks will be TRUE.
- It will export the excessive batch of more than limit and also delete entries.
- While it seems fine, it will cause issues when we attempt to restore data whose size exceeds batch size.

Hence, need a way to handle this by perhaps:
- Setting the current_end_ts to the ts value of the entry at the batch_size_limit - 1 index.
- Fetching entries unto this point only.
- Then fetching the next batch of entries.

Essentially, in this scenario, unlike the incremental scenario where we are incrementing current_end_ts by 3600 seconds,
Here, we need to increment current_end_ts to the next batch size limit - 1 index entry’s ts value.

--------

Working on this but pending writing tests for this.
Also, batch size still being exceeded.
…emental

But 1st batch has 501 entries.
2nd has 500, 3rd has 500.
4th has 405 entries.

Understood the issue.

Right now we are segregating based on time ranges as well as batch sizes.
For incremental export, both are in play and right now, logic is getting messed up.

For full export, mainly batch size is in play as end_ts would initially be set to current time.
But if batch size exceeds limit, then we are setting end_ts to current batch size’s last entry.

Now, while the run_purge_data_pipeline() is able to stop at batch size, the existing export() script is unable to do so.
The export script just checks for the timestamps and exports everything in that range.
Similarly, the delete function also doesn’t care about the batch size and just deletes all matching entries within the time range.

A simple fix could be to try and limit the entries exported and deleted.

For export, just returning 500 entries for now in export script. This works.

For delete, there is no limit flag.
Can try deleting only matching IDs

-------

Trying to solve for incremental export.

But realized that we might not need the batch size at all.
The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.

No need to handle in the purge export script.

----------

Also, can simplify delete function in purge.

-------

New test added for batch size

------

Just committing code here for reference.
Realized that we might not need the batch size at all.
The batch_size default in load_multi_timeline_for_range isn't a fixed cutoff that it'll only process the limited data. It just separates the data into batches in the script itself.

------

Will clean up code in next commit.
Will clean up and add more tests.

Looks good for now.
Need to update PR with queries now.
This seems fine but can read in from DB before deletion occurs.
Then compare db entries with those read in from export file.

Will work on that next.
Also pending, temp directory for tests to store generated export files.
Added tests that assert first and last few entries from db and export files.
Comparing object IDs only for now.

Also added temp directory for tests so that local directory isn't filled with export files in emission/archived
@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

Most of these latest code changes are focussed on the export part mainly to implement both full and incremental export.
I did not see any implementations of incremental export, nor was last_processed_ts being updated correctly in the original export PR.
Hence, I implemented these taking inputs from discussions in the export PR.

Some highlights from Export PR

Comment): Full vs Incremental export discussion by kafitz
Comment: Detailed discussion by kafitz on full vs incremental export + REST API
Comment : Shankari explained difference between full export and incremental export with usage of last_processed_ts for 1 hour’s worth of data
Comment: Queries by jruzekowicz around start_ts, end_ts, last_processed_ts, file storage, queue
Comment: File storage, FIFO streams discussed
Comment : Next steps for intake pipeline to export one hour’s worth of data (don’t see this implemented - more details on how I tested whether it works or not? )


Summary of changes made since last time around (details in below comments)

  1. Added export_type parameter for “full” or “incremental” export.
  • Implemented both full and incremental export by keeping track of time ranges in 1 hour slots until current time is reached (present when pipeline / code is run).

Discussed in detail here: 1.A, 1.B, 1.C, 1.D


  1. Added separate export_timeseries and import_timeseries scripts. Main difference between existing scripts:
  • export_timeseries: Removed extra queries (place, trip, first_place) to ensure we deal only with timeseries database entries.
  • import_timeseries: Removed loading pipeline states

Discussed in detail here: 2.A, 2.B


  1. Improved tests for both export types (will improvise further):
    1. Checking count of data before and after purging and restoring.
    2. Checking number of export files generated.
    3. Comparing object IDs of a few entries from DB and from export files
    4. Checking multiple restore doesn’t add duplicate entries.

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

Main questions I had which also give some reasoning around the changes I’ve made.
If my assumptions are not correct, will need to change approach for purging mainly with regards to what "ts" values to base logic around.

Commit with questions in more detail.


Questions

  1. Right approach for Export + Delete? (See below for details)
  2. Do we need all 3 types of entries: locations, trips, places? (See below comment)
  3. Is pipeline_states export necessary? (See below comment)

Q. 1. Right approach for Export + Delete?

Approach A

  1. Export data in chunks to File
  2. Delete exported data from DB.
  3. Repeat until all data purged.
Flow looks like: Export -> Delete -> Export -> Delete -> Export -> Delete

Approach B

  1. Export data in chunks to file.
  2. Repeat until all data exported.
  3. Delete all exported data from DB.
Flow looks like: Export -> Export -> Export ... -> Delete

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

Q. 2. Do we need all 3 types of entries: locations, trips, places?

From export.py

    loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
    loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases)
    trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts)
    trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query, databases)
    place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts)
    place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query, databases)

For now, commented out code from export_timeseries.py that gets all three types of entries.
If we only need location entries, can simplify code further to just work for these entries.

I went through Shankari’s thesis and understood that entries are broken down as per the travel diary structure: place -> trip -> location.

In my current logic, I’m using data.ts as the main time value.
If only trip entries present or only place entries, then directly choosing latest data.ts is incorrect since trips use data.enter_ts while places use data.start_ts

Searching codebase for references and read through Shankari’s thesis to start_ts and enter_ts.
I've gotten some hints that start_ts and enter_ts are analysis_timeseries entries?

In that case, can we ignore these since the purge + restore functionality is concerned primarily with raw timeseries data?


Hint 1: Check timeseries and analysis database entries for distinct metadata.key values

Saw that trips, places related keys were only found in analysis_timeseries_db.

>>> edb.get_timeseries_db().distinct("metadata.key")
['background/battery', 'background/filtered_location', 'background/location', 'background/motion_activity', 'config/app_ui_config', 'config/consent', 'config/sensor_config', 'config/sync_config', 'manual/demographic_survey', 'manual/mode_confirm', 'manual/place_addition_input', 'manual/purpose_confirm', 'manual/replaced_mode', 'manual/trip_addition_input', 'manual/trip_user_input', 'statemachine/transition', 'stats/client_error', 'stats/client_nav_event', 'stats/client_time', 'stats/pipeline_error', 'stats/pipeline_time', 'stats/server_api_time']
>>> 
>>> 
>>> edb.get_analysis_timeseries_db().distinct("metadata.key")
['analysis/cleaned_place', 'analysis/cleaned_section', 'analysis/cleaned_stop', 'analysis/cleaned_trip', 'analysis/cleaned_untracked', 'analysis/composite_trip', 'analysis/confirmed_place', 'analysis/confirmed_trip', 'analysis/confirmed_untracked', 'analysis/expected_trip', 'analysis/inferred_labels', 'analysis/inferred_section', 'analysis/inferred_trip', 'analysis/recreated_location', 'analysis/smoothing', 'inference/labels', 'inference/prediction', 'segmentation/raw_place', 'segmentation/raw_section', 'segmentation/raw_stop', 'segmentation/raw_trip', 'segmentation/raw_untracked']

Hint 2: Timeseries_Sample.ipynb

  • ct_df fetches analysis/cleaned_trip entries -> analysis DB
# ——

trip_start_end_fuzz = 10 # seconds
ct_df = ts.get_data_df("analysis/cleaned_trip", time_query=None)
tl = esdl.get_cleaned_timeline(test_user_id, ct_df.iloc[0].start_ts - trip_start_end_fuzz, ct_df.iloc[-1].end_ts + trip_start_end_fuzz)

# ——

for e in tl:
    if 'enter_ts' in e.data:
        # Must be place-like
        print(e.metadata.key, e.data.enter_fmt_time, "->", e.data.exit_fmt_time)
    else:
        print(e.metadata.key, e.data.start_fmt_time, "->", e.data.end_fmt_time)

# ——

Hint 3: bin/historical/migrations/populate_local_dt.py

  • Looks like old code, some changes were last made 8 years ago.
  • The collection parameter refers to some non-time series databases as seen from the function calls.
  • The entry[start_ts] or entry[‘enter_ts’] values are then used in the find query by setting data.ts to this value.
# ——

def fix_trips_or_sections(collection):
    tsdb = edb.get_timeseries_db()
    for entry in collection.find():
        start_loc_entry = tsdb.find_one({'user_id': entry['user_id'],
            'metadata.key': 'background/location', 'data.ts': entry['start_ts']})
        end_loc_entry = tsdb.find_one({'user_id': entry['user_id'],
            'metadata.key': 'background/location', 'data.ts': entry['end_ts']})


# ——

def fix_stops_or_places(collection):
    tsdb = edb.get_timeseries_db()
    for entry in collection.find():
        if 'enter_ts' in entry:
            enter_loc_entry = tsdb.find_one({'user_id': entry['user_id'],
                'metadata.key': 'background/location', 'data.ts': entry['enter_ts']})

# ——

    elif args.key == "trips":
        fix_trips_or_sections(edb.get_trip_new_db())
    elif args.key == "sections":
        fix_trips_or_sections(edb.get_section_new_db())
    elif args.key == "places":
        fix_stops_or_places(edb.get_place_db())

@MukuFlash03
Copy link
Contributor Author

Continued discussion of changes made:

  1. Added export_type parameter for “full” or “incremental” export.

Discussion from Export PR

Comment: Detailed discussion by kafitz on full vs incremental export + REST API
Comment : Shankari explained difference between full export and incremental export with usage of last_processed_ts for 1 hour’s worth of data
Comment: Queries by jruzekowicz around start_ts, end_ts, last_processed_ts, file storage, queue
Comment : Next steps for intake pipeline to export one hour’s worth of data (don’t see this implemented)


1. A. Last_processed_ts explained by Shankari

Our pipeline consists of multiple stages and for every stage we store the timestamp of how far along in the timeline the data has been processed.

So if you think of, you know, a timeline as a long hello linear pathway into the past and the future and the last processed time stamp is a marker along that which indicates how much of the timeline you have processed.


There is also last_run_ts which is when the pipeline was last run but last run is not the same as last processed because it could be that I went on a trip and then didn't have connectivity at the end of the trip. 
So the data still on my phone. But on the server, the pipeline runs every hour and so the pipeline has run, but it hasn't even seen my data yet.

It should be if the export code was written correctly, that if you export ones and then you export again and it well only it should only export the new data that came in after the previous export.

That is the whole point of using a pipeline as opposed to a script.
I mean, you know, using the pipeline instead of a naive script.



For a one time export, you can run export script manually and then it's fine to write a script that takes a snapshot of the data and sends it over.
And we do that with the Mongo dumps all the time.

But if you want to run this as part of an ongoing system and you wanna archive on an ongoing basis the raw data that comes in, you don't want to run a script that pulls all of the data every time, because that's inefficient.

You just pulled it yesterday.
Why do you need to pull all of the data again today, right?

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

1. B. What does 1 hour’s worth of data mean?

This is my understanding of what it possibly means.

“1 hour’s worth of data” -> different connotations:

  • Export PR had analysis db entries as well as timeseries db entries.
  • Purge PR, dealing with timeseries db entries.

For the export PR

  • 1 hour is important since the analysis pipeline runs every hour and the export data sources include the both timeseries and analysis_timeseries DBs.
  • Also, 1 hour would help reduce volume of data being processed.

For the purge PR,

  • 1 hour with regards to analysis pipeline may not be relevant since we are dealing only with raw timeseries data.
  • But at the same time, we can still have the purge pipeline run hourly to move new data to long-term storage once the analysis pipeline has finished using the new data.
  • There’s also the 2nd use case to help with large data volumes.

Time range of exported data in Purge pipeline.

  • In any case, purge pipeline runs upto the current time or until no more raw timeseries entries present in db for the user.
  • If Purge pipeline running for the first time for a user:
    • Then it will export and purge all the timeseries data for a user from its first entry with the earliest data.ts value.
    • This data can be really old data and first purge run might take a lot of time)
  • If Purge pipeline has already been run before for a user:
    • Then it will set start_ts to last_processed_ts and export data from that point onwards.
    • Once our new purge pipeline system is in place, it would run hourly, then it would eventually just have a small subset of entries. This would be latest data that was just pulled into the timeseries db and is ready to be purged once it has been used in the analysis pipeline.

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

1. C. Does restore script also need to handle 1 hour data?

I've based the import_timeseries.py script on the existing load_multi_timeline_for_range.py script.

We shouldn’t need to handle 1 hour logic in restore_data.py since:

  • Internally, load_multi_timeline_for_range already reads in batches with default batch_size = 10000 entries.
  • We’ll be handling 1 hour logic in purge_data.py while exporting entries so no need to handle it again in restore_data as we can be assured that our exported files will have 1 hour’s data only.
    • There is the case however where a lot of data volume is there even in 1 hour’s worth of data.
    • Again, we don’t need to worry as import_timeseries script processes data in batches.

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

1. D. Exit conditions for purge pipeline

The break condition to exit the while loop would be:

  1. Start_ts of any time range exceeds init_end_ts which is present time of running pipeline (default value set by pipeline)
if current_start_ts >= initEndTs:
    break
  1. No more entries within current time range AND no more entries in timeseries db.
elif export_queries is None and count_entries_1 == 0:
        # Didn't process anything new so start at the same point next time
        # self._last_processed_ts = None
        logging.debug("No new data to export, breaking out of while loop")
        print("No new data to export, breaking out of while loop")
        break

While the first condition is fairly obvious, I had to consider the 2nd condition, since I came across an edge case where not all entries exported.
This happened when: the difference in the ts values of entries is more than 1 hour.
In my previous logic, I just checked that there are no entries retrieved in a time range and exited the loop.
But there might still be entries in the timeseries db beyond the 1 hour window.


Example from sample data:

Entry No: 1387 - 55affbab7d65cb39ee976734 1437596198.387 background/location
Entry No: 1388 - 55b08d347d65cb39ee976a8f 1437582760.583 statemachine/transition

The first time range gets entries from 1906 to 1498.
The second time range gets entries from 1497 to 1388.

For the third time range, start_ts is set as previous iteration’s end_ts = 1437585293.881.
And end_ts will be set to start_ts + 1 hour (or +3600 seconds) = 1437588893.881

Now, the next entry 1387 has a ts value that is more than 1 hour ahead of the current end_ts.
So, in the time range for the 3rd iteration start_ts (1437585293.881) to end_ts (1437588893.881), no entries are found.
Thus the loop was breaking despite there being more data available to export.


Hence, I added the condition to check the timeseries db entries in each iteration.
The count of these raw entries would be updated since we are also deleting entries after exporting them.

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

Continued discussion of changes made:

  1. Added separate export_timeseries and import_timeseries scripts.

This is a lot of duplicated code and eventually either we can refactor and pull out common code for both export and purge scripts.
Or, is it possible to refactor the existing export / load scripts themselves to handle purge usecases as well?

Main difference between existing scripts:
a. export_timeseries based off extract_timeline_for_day_range_and_user

  • Removed extra queries (place, trip, first_place) to ensure we deal only with timeseries database entries.

b. import_timeseries based off load_multi_timeline_for_range

  • Removed loading pipeline states

2. A. Switched to using data.ts instead of metadata.write_ts

Usercache is the first stage and there are not trips loaded hence we work with metadata.write_ts.
For most stages, we are using trip data -> data.end_ts.
But these stages are a part of the analysis intake pipeline.

Since we’re dealing with raw timeseries data for purging / restoring data, we mainly have data.ts in the raw timeseries entries.
Sometimes the data.ts value doesn’t match the metadata.write_ts which I’m guessing is because of some latency between sensing the values at the time data.ts but writing it onto device storage occurs at metadata.write_ts

So I went ahead with using data.ts for the purposes of this PR.

@MukuFlash03
Copy link
Contributor Author

MukuFlash03 commented Sep 1, 2024

2. B. Removed pipeline_states_export file

Check commit message for more details


To summarize:

Code referenced from purge_data

Problem faced

  • The last_processed_ts field in the exported pipeline state files is NULL because it's updated only after the function exits and the stage is marked as done.
  • During run_purge_pipeline, export_pipeline_states is called to export pipeline states. I got the idea to do this from extract_timeline_for_day_range_and_user.
    But last_processed_ts is only set in the DB after the function completes.

What I tried to resolve it

  • Moving the export_pipeline_states call to the parent function after the stage is marked as done would result in only one pipeline states file.
  • For incremental exports, for handling multiple time ranges, I thought of using earliest start_ts and latest end_ts (current time) to name the pipeline states file. So instead of having multiple pipeline_states files, only one file would exist for the entire time range for the incremental export.
  • But when it tries to load pipeline_states file with the same prefix, it doesn’t exist as we now only have only file with .
  • Hence, it gives a FileNotFoundError.
  • But load_multi_timeline_for_range attempts to load pipeline states based on a file prefix. It would load the data from the first time range file say start_ts_1 to end_ts_1, then from start_ts_2 to end_ts_2, so on. In the current code, it is assumed that the file_prefix for the actual raw data will match the pipeline_states file.
    But since only one file will exist now (earliest_start_ts to latest_end_ts) , it causes a FileNotFoundError.

Pipeline states were added in Jan 2018
Commit that added it in extract_timeline_for_day_range_and_user.py
b38366b

Commit that added it in load_multi_timeline_for_range.py
06a0a4e

Why do we need it as explained in commits above

The raw data and the analysis results do not constitute the entire state of a
pipeline. In particular, if we store only the raw + analysis results, and then
we try to run the pipeline again, we will end up with two copies of the
analysis results.

Do we need it for purge PR?

  • The commit message states that pipelinestates were also exported / loaded so that we don't have duplicate analysis entries.
  • In the purge PR, we are dealing with timeseries_db data.

Thus, can we remove it from the purge_restore related code?

@shankari
Copy link
Contributor

shankari commented Sep 7, 2024

@MukuFlash03 is this currently "Ready for review" or waiting for "Questions from Shankari"? Can you put it into the correct column?

@shankari
Copy link
Contributor

The pipeline state is part of the analysis pipeline, so is related to analysed data.
While archiving the raw data, we should not store the pipeline state.
And we certainly should not restore the pipeline state while restoring onto a running, production system.

Changes

1. Fetching only loc-like entries from the existing export data logic as the raw timeseries entries.
- Found a lot of references that trip and place entries are a part of analysis timeseries database.

Almost every place I’ve found uses data.start_ts for “analysis/*” metadata key entries

In bin/debug/export_participants_trips_csv.py
```
    ts = esta.TimeSeries.get_time_series(user_id)
    trip_time_query = estt.TimeQuery("data.start_ts", start_day_ts, end_day_ts)
    ct_df = ts.get_data_df("analysis/confirmed_trip", trip_time_query)

```

---------

In bin/debug/label_stats.py
```
for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})):
    if t["data"]["inferred_labels"] != []:
        confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"],
                "metadata.key": "analysis/confirmed_trip",
                "data.start_ts": t["data"]["start_ts"]})
```

Similarly for data.entry_ts.

-----------------

On the other hand for data.ts, timeseries_db was used since “background/*” metadata key entries were used:

In emission/analysis/intake/segmentation/section_segmentation.py
```
    get_loc_for_ts = lambda time: ecwl.Location(ts.get_entry_at_ts("background/filtered_location", "data.ts", time)["data"])
    trip_start_loc = get_loc_for_ts(trip_entry.data.start_ts)
    trip_end_loc = get_loc_for_ts(trip_entry.data.end_ts)
```

----------------

In emission/analysis/intake/segmentation/trip_segmentation.py
```
            untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location",
                                                     "data.ts", last_place_entry.data.enter_ts)).data

```

--------------------------------------

2. Refactored emission/export/export.py

- Added a separate function that returns exported entries so that this function can be reused in the purge pipeline code.
- This helped to remove repeated code for re-fetching exported entries.

- Also using databases parameter for exporting data from specific db. For the purge usecase, `databases` should only have 'timeseries_db'

--------------------------------------

3. Added raw_timeseries_only parameter to load_multi_timeline_for_range.py
- If this argument is set, then pipeline_states will not be loaded since we don't want pipeline states to be restored during restoring raw timeseries data.

--------------------------------------

4. Cleaned up tests
- Reduced repetitive code by moving assertion tests to functions that can be reused for both full and incremental export testing.

--------------------------------------

5. Removed export_timeseries.py and import_timeseries.py
- No need to have duplicate code since now using existing scripts present in load_multi_timeline_for_range.py and export.py

--------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Ready for review by Shankari
Development

Successfully merging this pull request may close these issues.

4 participants