From 192a5354d50d5cc0242b5e075608180dabfee687 Mon Sep 17 00:00:00 2001 From: Macpie Date: Thu, 6 Jun 2024 10:52:43 -0700 Subject: [PATCH] Fix Price report --- Cargo.lock | 1 + .../price_report.1717008060431.gz | Bin 33 -> 0 bytes .../price_report.1717632204453.gz | Bin 0 -> 44 bytes docker/mobile/localstack/init-s3.sh | 49 +++++------ test_mobile/Cargo.toml | 2 + test_mobile/README.md | 2 +- test_mobile/src/cli/price.rs | 81 +++++++++++++++--- test_mobile/src/main.rs | 7 ++ 8 files changed, 98 insertions(+), 44 deletions(-) delete mode 100644 docker/mobile/localstack/data/mobile-price/price_report.1717008060431.gz create mode 100644 docker/mobile/localstack/data/mobile-price/price_report.1717632204453.gz diff --git a/Cargo.lock b/Cargo.lock index 1ac48da71..1d2e6a608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7872,6 +7872,7 @@ dependencies = [ "tonic", "tracing", "tracing-subscriber", + "triggered", "uuid", ] diff --git a/docker/mobile/localstack/data/mobile-price/price_report.1717008060431.gz b/docker/mobile/localstack/data/mobile-price/price_report.1717008060431.gz deleted file mode 100644 index b73a29510bfcc87b699090ea9951e8a80176f94e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 33 kcmb2|=3oGW|Bo#$BxzsRpYZRNeVdREL%oXZeO{mp0JjASzyJUM diff --git a/docker/mobile/localstack/data/mobile-price/price_report.1717632204453.gz b/docker/mobile/localstack/data/mobile-price/price_report.1717632204453.gz new file mode 100644 index 0000000000000000000000000000000000000000..747ae7ae45e65c597932e69700fa5eef2f300e3c GIT binary patch literal 44 ucmb2|=3oGW|49i651v|FNYdW$<^ApK|2(QQK;r-ZGc)8a(0nEcWCH+aW)J-U literal 0 HcmV?d00001 diff --git a/docker/mobile/localstack/init-s3.sh b/docker/mobile/localstack/init-s3.sh index 98bd5e0ff..029d80864 100755 --- a/docker/mobile/localstack/init-s3.sh +++ b/docker/mobile/localstack/init-s3.sh @@ -6,23 +6,26 @@ awslocal s3 mb s3://mobile-packet-verifier awslocal s3 mb s3://mobile-price awslocal s3 mb s3://mobile-verifier-data-sets -# Description: -# This Bash script uploads files from subdirectories in /tmp/data to an S3-compatible service using awslocal. -# It treats each subdirectory as a separate bucket and uploads files with modified timestamps in their filenames. +# This shell script automates the process of uploading files from local directories +# to S3 buckets using `awslocal` (typically used with LocalStack for local AWS service emulation). # -# Script Overview: -# 1. Iterate Through Directories: The script iterates over each subdirectory in /tmp/data. -# 2. Extract Bucket Name: The script extracts the bucket name from each subdirectory's basename. -# 3. Process Files in Each Directory: For each file in the subdirectory, the script: -# - Prints the original file name and the bucket name for debugging purposes. -# - Generates a new file name by replacing any existing 13-digit timestamp with the current timestamp in milliseconds. -# - Prints the new file name and the S3 path for debugging purposes. -# - Uploads the file to the specified S3 bucket using awslocal s3 cp. +# 1. Define Source Directories: The script begins by setting the `dirs` variable to include +# all directories under `/tmp/data/`. # -# Key Points: -# - Directories and Buckets: Each subdirectory in /tmp/data is treated as a bucket. -# - File Processing: Files within these directories are uploaded to their respective buckets with a new timestamp in their filenames. - +# 2. Directory Iteration: It loops through each directory found in `/tmp/data/*`, +# processing one directory at a time. +# +# 3. Extract Bucket Name: For each directory, the script extracts the directory name +# (using `basename`) and assigns it as the S3 bucket name. +# +# 4. File Iteration: Within each directory, the script iterates over all files, +# checking if each item is a file (excluding subdirectories). +# +# 5. Upload to S3: For each file, the script uploads it to the corresponding S3 bucket +# using the `awslocal s3 cp` command. The file is placed in the S3 bucket with its original filename. +# +# 6. Debug Output: After each upload, the script prints the executed command for verification +# and debugging purposes, followed by a separator line for readability. dirs=/tmp/data/* for dir in $dirs; do echo "Looking @ $dir" @@ -31,25 +34,13 @@ for dir in $dirs; do for file in "$dir"/*; do if [[ -f "$file" ]]; then echo "Uploading $file to bucket $bucket" - now=$(date +%s000) file_name=$(basename "$file") - # Debugging output to check the file name and bucket - echo "Original file name: $file_name" - echo "Current timestamp: $now" - - # Replace timestamp in file name - new_file=$(echo "$file_name" | sed -E 's|[0-9]{13}|'"${now}"'|g') - - # Debugging output to check the new file name and bucket path - echo "New file name: $new_file" - echo "s3 path: s3://$bucket/$new_file" - # Perform the upload - awslocal s3 cp "$file" "s3://$bucket/$new_file" + awslocal s3 cp "$file" "s3://$bucket/$file_name" # Debugging output to confirm upload command - echo "Executed: awslocal s3 cp \"$file\" \"s3://$bucket/$new_file\"" + echo "Executed: awslocal s3 cp \"$file\" \"s3://$bucket/$file_name\"" echo "################################################################" fi done diff --git a/test_mobile/Cargo.toml b/test_mobile/Cargo.toml index 931524a09..cbc34e5a3 100644 --- a/test_mobile/Cargo.toml +++ b/test_mobile/Cargo.toml @@ -10,11 +10,13 @@ license.workspace = true anyhow = { workspace = true } async-compression = { version = "0", features = ["tokio", "gzip"] } clap = { workspace = true } +custom-tracing = { path = "../custom_tracing" } file-store = { path = "../file_store" } h3o = { workspace = true, features = ["geo"] } helium-proto = { workspace = true } hextree = { workspace = true } tokio = { workspace = true, features = ["io-util", "fs"] } +triggered = { workspace = true } [dev-dependencies] backon = "0" diff --git a/test_mobile/README.md b/test_mobile/README.md index 37982011d..9d2939f47 100644 --- a/test_mobile/README.md +++ b/test_mobile/README.md @@ -7,7 +7,7 @@ **NOTE:** Data is auto-generated. If you do not wish to change it, skip these steps. The commands are here to show how the data is generated. - Run `test-mobile assignment` and move the generated files[^files] to `docker/mobile/localstack/data/mobile-verifier-data-sets/` -- Run `test-mobile price` and move the generated file to `docker/mobile/localstack/data/mobile-price/` +- Run `AWS_ACCESS_KEY_ID=X AWS_SECRET_ACCESS_KEY=X AWS_SESSION_TOKEN=X test-mobile price` and move the generated file to `docker/mobile/localstack/data/mobile-price/`. This can also be ran when localstack is up and will uopload files. ### 2. Build Docker images diff --git a/test_mobile/src/cli/price.rs b/test_mobile/src/cli/price.rs index fa99d90b4..b871d3f81 100644 --- a/test_mobile/src/cli/price.rs +++ b/test_mobile/src/cli/price.rs @@ -1,9 +1,10 @@ -use std::time::{SystemTime, UNIX_EPOCH}; - use anyhow::Result; -use file_store::FileType; -use helium_proto::{BlockchainTokenTypeV1, Message, PriceReportV1}; -use tokio::{fs::File, io::AsyncWriteExt}; +use file_store::{file_sink, file_upload, FileType}; +use helium_proto::{BlockchainTokenTypeV1, PriceReportV1}; +use std::{ + path, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; /// Generate Mobile price report #[derive(Debug, clap::Args)] @@ -14,24 +15,76 @@ pub struct Cmd { impl Cmd { pub async fn run(self) -> Result<()> { + let settings = file_store::Settings { + bucket: "mobile-price".to_string(), + endpoint: Some("http://localhost:4566".to_string()), + region: "us-east-1".to_string(), + access_key_id: None, + secret_access_key: None, + }; + + let (shutdown_trigger1, shutdown_listener1) = triggered::trigger(); + + // Initialize uploader + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&settings).await?; + + let file_upload_thread = tokio::spawn(async move { + file_upload_server + .run(shutdown_listener1) + .await + .expect("failed to complete file_upload_server"); + }); + + let store_base_path = path::Path::new("."); + + let (price_sink, price_sink_server) = file_sink::FileSinkBuilder::new( + FileType::PriceReport, + store_base_path, + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_report_submission"), + ) + .auto_commit(false) + .roll_time(Duration::from_millis(100)) + .create() + .await?; + + let (shutdown_trigger2, shutdown_listener2) = triggered::trigger(); + let price_sink_thread = tokio::spawn(async move { + price_sink_server + .run(shutdown_listener2) + .await + .expect("failed to complete price_sink_server"); + }); + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis(); - let file_name = format!("{}.{now}.gz", FileType::PriceReport.to_str()); - let file = File::create(file_name).await?; - - let mobile_price_report = PriceReportV1 { - price: self.price, + let price_report = PriceReportV1 { + price: 1000000, timestamp: now as u64, token_type: BlockchainTokenTypeV1::Mobile.into(), }; - let encoded_mpr = PriceReportV1::encode_to_vec(&mobile_price_report); - let mut writer = async_compression::tokio::write::GzipEncoder::new(file); - writer.write_all(&encoded_mpr).await?; - writer.shutdown().await?; + price_sink.write(price_report, []).await?; + + let price_sink_rcv = price_sink.commit().await.expect("commit failed"); + let _ = price_sink_rcv + .await + .expect("commit didn't complete completed"); + + let _ = tokio::time::sleep(Duration::from_secs(1)).await; + + shutdown_trigger1.trigger(); + shutdown_trigger2.trigger(); + file_upload_thread + .await + .expect("file_upload_thread did not complete"); + price_sink_thread + .await + .expect("price_sink_thread did not complete"); Ok(()) } diff --git a/test_mobile/src/main.rs b/test_mobile/src/main.rs index 4178e88b2..9f61e83e1 100644 --- a/test_mobile/src/main.rs +++ b/test_mobile/src/main.rs @@ -10,6 +10,13 @@ pub struct Cli { impl Cli { pub async fn run(self) -> Result<()> { + custom_tracing::init( + "info".to_string(), + custom_tracing::Settings { + tracing_cfg_file: "".to_string(), + }, + ) + .await?; self.cmd.run().await } }