Skip to content

Commit

Permalink
Fix Price report
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Jun 6, 2024
1 parent 7cf11b7 commit 192a535
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Binary file not shown.
49 changes: 20 additions & 29 deletions docker/mobile/localstack/init-s3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ awslocal s3 mb s3://mobile-packet-verifier
awslocal s3 mb s3://mobile-price
awslocal s3 mb s3://mobile-verifier-data-sets

# Description:
# This Bash script uploads files from subdirectories in /tmp/data to an S3-compatible service using awslocal.
# It treats each subdirectory as a separate bucket and uploads files with modified timestamps in their filenames.
# This shell script automates the process of uploading files from local directories
# to S3 buckets using `awslocal` (typically used with LocalStack for local AWS service emulation).
#
# Script Overview:
# 1. Iterate Through Directories: The script iterates over each subdirectory in /tmp/data.
# 2. Extract Bucket Name: The script extracts the bucket name from each subdirectory's basename.
# 3. Process Files in Each Directory: For each file in the subdirectory, the script:
# - Prints the original file name and the bucket name for debugging purposes.
# - Generates a new file name by replacing any existing 13-digit timestamp with the current timestamp in milliseconds.
# - Prints the new file name and the S3 path for debugging purposes.
# - Uploads the file to the specified S3 bucket using awslocal s3 cp.
# 1. Define Source Directories: The script begins by setting the `dirs` variable to include
# all directories under `/tmp/data/`.
#
# Key Points:
# - Directories and Buckets: Each subdirectory in /tmp/data is treated as a bucket.
# - File Processing: Files within these directories are uploaded to their respective buckets with a new timestamp in their filenames.

# 2. Directory Iteration: It loops through each directory found in `/tmp/data/*`,
# processing one directory at a time.
#
# 3. Extract Bucket Name: For each directory, the script extracts the directory name
# (using `basename`) and assigns it as the S3 bucket name.
#
# 4. File Iteration: Within each directory, the script iterates over all files,
# checking if each item is a file (excluding subdirectories).
#
# 5. Upload to S3: For each file, the script uploads it to the corresponding S3 bucket
# using the `awslocal s3 cp` command. The file is placed in the S3 bucket with its original filename.
#
# 6. Debug Output: After each upload, the script prints the executed command for verification
# and debugging purposes, followed by a separator line for readability.
dirs=/tmp/data/*
for dir in $dirs; do
echo "Looking @ $dir"
Expand All @@ -31,25 +34,13 @@ for dir in $dirs; do
for file in "$dir"/*; do
if [[ -f "$file" ]]; then
echo "Uploading $file to bucket $bucket"
now=$(date +%s000)
file_name=$(basename "$file")

# Debugging output to check the file name and bucket
echo "Original file name: $file_name"
echo "Current timestamp: $now"

# Replace timestamp in file name
new_file=$(echo "$file_name" | sed -E 's|[0-9]{13}|'"${now}"'|g')

# Debugging output to check the new file name and bucket path
echo "New file name: $new_file"
echo "s3 path: s3://$bucket/$new_file"

# Perform the upload
awslocal s3 cp "$file" "s3://$bucket/$new_file"
awslocal s3 cp "$file" "s3://$bucket/$file_name"

# Debugging output to confirm upload command
echo "Executed: awslocal s3 cp \"$file\" \"s3://$bucket/$new_file\""
echo "Executed: awslocal s3 cp \"$file\" \"s3://$bucket/$file_name\""
echo "################################################################"
fi
done
Expand Down
2 changes: 2 additions & 0 deletions test_mobile/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ license.workspace = true
anyhow = { workspace = true }
async-compression = { version = "0", features = ["tokio", "gzip"] }
clap = { workspace = true }
custom-tracing = { path = "../custom_tracing" }
file-store = { path = "../file_store" }
h3o = { workspace = true, features = ["geo"] }
helium-proto = { workspace = true }
hextree = { workspace = true }
tokio = { workspace = true, features = ["io-util", "fs"] }
triggered = { workspace = true }

[dev-dependencies]
backon = "0"
Expand Down
2 changes: 1 addition & 1 deletion test_mobile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
**NOTE:** Data is auto-generated. If you do not wish to change it, skip these steps. The commands are here to show how the data is generated.

- Run `test-mobile assignment` and move the generated files[^files] to `docker/mobile/localstack/data/mobile-verifier-data-sets/`
- Run `test-mobile price` and move the generated file to `docker/mobile/localstack/data/mobile-price/`
- Run `AWS_ACCESS_KEY_ID=X AWS_SECRET_ACCESS_KEY=X AWS_SESSION_TOKEN=X test-mobile price` and move the generated file to `docker/mobile/localstack/data/mobile-price/`. This can also be ran when localstack is up and will uopload files.

### 2. Build Docker images

Expand Down
81 changes: 67 additions & 14 deletions test_mobile/src/cli/price.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::Result;
use file_store::FileType;
use helium_proto::{BlockchainTokenTypeV1, Message, PriceReportV1};
use tokio::{fs::File, io::AsyncWriteExt};
use file_store::{file_sink, file_upload, FileType};
use helium_proto::{BlockchainTokenTypeV1, PriceReportV1};
use std::{
path,
time::{Duration, SystemTime, UNIX_EPOCH},
};

/// Generate Mobile price report
#[derive(Debug, clap::Args)]
Expand All @@ -14,24 +15,76 @@ pub struct Cmd {

impl Cmd {
pub async fn run(self) -> Result<()> {
let settings = file_store::Settings {
bucket: "mobile-price".to_string(),
endpoint: Some("http://localhost:4566".to_string()),
region: "us-east-1".to_string(),
access_key_id: None,
secret_access_key: None,
};

let (shutdown_trigger1, shutdown_listener1) = triggered::trigger();

// Initialize uploader
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings).await?;

let file_upload_thread = tokio::spawn(async move {
file_upload_server
.run(shutdown_listener1)
.await
.expect("failed to complete file_upload_server");
});

let store_base_path = path::Path::new(".");

let (price_sink, price_sink_server) = file_sink::FileSinkBuilder::new(
FileType::PriceReport,
store_base_path,
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_report_submission"),
)
.auto_commit(false)
.roll_time(Duration::from_millis(100))
.create()
.await?;

let (shutdown_trigger2, shutdown_listener2) = triggered::trigger();
let price_sink_thread = tokio::spawn(async move {
price_sink_server
.run(shutdown_listener2)
.await
.expect("failed to complete price_sink_server");
});

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();

let file_name = format!("{}.{now}.gz", FileType::PriceReport.to_str());
let file = File::create(file_name).await?;

let mobile_price_report = PriceReportV1 {
price: self.price,
let price_report = PriceReportV1 {
price: 1000000,
timestamp: now as u64,
token_type: BlockchainTokenTypeV1::Mobile.into(),
};
let encoded_mpr = PriceReportV1::encode_to_vec(&mobile_price_report);

let mut writer = async_compression::tokio::write::GzipEncoder::new(file);
writer.write_all(&encoded_mpr).await?;
writer.shutdown().await?;
price_sink.write(price_report, []).await?;

let price_sink_rcv = price_sink.commit().await.expect("commit failed");
let _ = price_sink_rcv
.await
.expect("commit didn't complete completed");

let _ = tokio::time::sleep(Duration::from_secs(1)).await;

shutdown_trigger1.trigger();
shutdown_trigger2.trigger();
file_upload_thread
.await
.expect("file_upload_thread did not complete");
price_sink_thread
.await
.expect("price_sink_thread did not complete");

Ok(())
}
Expand Down
7 changes: 7 additions & 0 deletions test_mobile/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ pub struct Cli {

impl Cli {
pub async fn run(self) -> Result<()> {
custom_tracing::init(
"info".to_string(),
custom_tracing::Settings {
tracing_cfg_file: "".to_string(),
},
)
.await?;
self.cmd.run().await
}
}
Expand Down

0 comments on commit 192a535

Please sign in to comment.