Skip to content

Commit

Permalink
File Sink Client Typing (#849)
Browse files Browse the repository at this point in the history
* make file sink parts declare their type

Instead of dealing with bundles of Vec<u8> each file sink and client
will be paired over any type that can be turned into bytes.

* ingest: type all FileSinkClients

* boost_manager: type all FileSinkClients

* iot_packet_verifier: type all FileSinkClients

* mobile_packet_verifier: type all FileSinkClients

* price: type all FileSinkClients

* poc_entropy: type all FileSinkClients

* iot_verifier: type all FileSinkClients

* mobile_verifier: type all FileSinkClients

* Rename trait FileStoreAsBytes -> MsgBytes to follow convention for prost traits

To keep from dealing with Vec<u8> and String auto implementations for
prost::Message that disallows using a file sync with anything but a
prost::Message.

We're going to follow the other Msg* trait conventions where we
implement them only for the types we are using.

This allows someone to make a file sync for whatever type they may have,
and convenience for proto types we write reports about.

* create file sinks directly from the message

* rename file sink trait to writer

The abstraction comes in two parts.

The first we can get a file sink from a type if it implements the
FileSinkWrite trait.

The second is a type being able to be written to a file sink if it can
be turned into bytes.

This separates the convenience from the actual functionality.

* file sink metric from static str to String

this allows us to attach the metric name to the file type
implementation, and only have to pass in the calling crate for the
prefix.

FileSinks are really only created once, and there are not so many of
them that owning a string introduces a crazy amount of overhead.

* ingest iot/mobile servers to use convenience file_sink method

* iot_verifier to use file_sink convenience method

* boost manager to use file_sink convenience method

* mobile packet verifier use file_sink convenience methods

* mobile_verifier to use file_sink convenience methods

* poc_entropy to use file_sink convenience method

* price to use file_sink convenience method

* move MsgBytes trait back out of FileSinkWrite

The FileSinkWrite trait is a convenience method for creating file sinks,
but it should not be tied to being able to use a file sink. The only
requirement for that is being able to turn your message into bytes.

* autocommit=false is already the default

* msg is no longer bytes in test

* stick with single style of reference

* combine auto_commit and roll_time into 1 argument

This is a specialized helper function, every crate using it was only
using these two options. We can provide the base functionality with a
single option.

Ideally the Builder would be updated to take some sort of enum for the
commit type as it doesn't make a lot of sense to auto_commit on every
write _and_ roll the file at a certain interval.

* fixup after rebase

* fixup after rebase

- include new verified mapper code
- update deps
- fix botched rebase conflicts

* uncouple FileSinkWriteExt from FileType enum

Taking a static string allows FileSinkWriteExt to be implemented by
users outside the oracles codebase, and starts the journey of moving
away from the FileType enum.
  • Loading branch information
michaeldjeffrey authored Aug 26, 2024
1 parent 0cced21 commit ca12bb6
Show file tree
Hide file tree
Showing 54 changed files with 820 additions and 586 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use boost_manager::{
};
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_sink, file_source, file_upload,
reward_manifest::RewardManifest, FileStore, FileType,
file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest,
traits::FileSinkWriteExt, FileStore, FileType,
};
use helium_proto::BoostedHexUpdateV1;
use mobile_config::client::hex_boosting_client::HexBoostingClient;
use solana::start_boost::SolanaRpc;
use std::{
Expand Down Expand Up @@ -99,14 +100,12 @@ impl Server {
.await?;

// setup the writer for our updated hexes
let (updated_hexes_sink, updated_hexes_sink_server) = file_sink::FileSinkBuilder::new(
FileType::BoostedHexUpdate,
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_boosted_hex_update"),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

// The server to monitor rewards and activate any newly seen boosted hexes
Expand Down
4 changes: 2 additions & 2 deletions boost_manager/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const LAST_PROCESSED_TIMESTAMP_KEY: &str = "last_processed_hex_boosting_info";
pub struct Watcher<A> {
pub pool: Pool<Postgres>,
pub hex_boosting_client: A,
pub file_sink: FileSinkClient,
pub file_sink: FileSinkClient<BoostedHexUpdateProto>,
}

impl<A> ManagedTask for Watcher<A>
Expand All @@ -45,7 +45,7 @@ where
{
pub async fn new(
pool: Pool<Postgres>,
file_sink: FileSinkClient,
file_sink: FileSinkClient<BoostedHexUpdateProto>,
hex_boosting_client: A,
) -> Result<Self> {
Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions boost_manager/tests/integrations/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ impl HexBoostingInfoResolver for MockHexBoostingClient {
}

pub struct MockFileSinkReceiver {
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage>,
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage<BoostedHexUpdateProto>>,
}

impl MockFileSinkReceiver {
pub async fn receive(&mut self) -> Option<Vec<u8>> {
match timeout(seconds(2), self.receiver.recv()).await {
Ok(Some(SinkMessage::Data(on_write_tx, msg))) => {
let _ = on_write_tx.send(Ok(()));
Some(msg)
Some(msg.encode_to_vec())
}
Ok(None) => None,
Err(e) => panic!("timeout while waiting for message1 {:?}", e),
Expand Down Expand Up @@ -81,12 +81,12 @@ impl MockFileSinkReceiver {
}
}

pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) {
pub fn create_file_sink() -> (FileSinkClient<BoostedHexUpdateProto>, MockFileSinkReceiver) {
let (tx, rx) = tokio::sync::mpsc::channel(20);
(
FileSinkClient {
sender: tx,
metric: "metric",
metric: "metric".into(),
},
MockFileSinkReceiver { receiver: rx },
)
Expand Down
2 changes: 1 addition & 1 deletion file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl fmt::Display for FileType {
}

impl FileType {
pub fn to_str(&self) -> &'static str {
pub const fn to_str(&self) -> &'static str {
match self {
Self::InvalidatedRadioThresholdReq => INVALIDATED_RADIO_THRESHOLD_REQ,
Self::InvalidatedRadioThresholdIngestReport => {
Expand Down
63 changes: 34 additions & 29 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{file_upload::FileUpload, Error, Result};
use crate::{file_upload::FileUpload, traits::MsgBytes, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -45,16 +45,16 @@ fn transport_sink(transport: &mut Transport) -> &mut Sink {
}

#[derive(Debug)]
pub enum Message {
Data(oneshot::Sender<Result>, Vec<u8>),
pub enum Message<T> {
Data(oneshot::Sender<Result>, T),
Commit(oneshot::Sender<Result<FileManifest>>),
Rollback(oneshot::Sender<Result<FileManifest>>),
}

pub type MessageSender = mpsc::Sender<Message>;
pub type MessageReceiver = mpsc::Receiver<Message>;
pub type MessageSender<T> = mpsc::Sender<Message<T>>;
pub type MessageReceiver<T> = mpsc::Receiver<Message<T>>;

fn message_channel(size: usize) -> (MessageSender, MessageReceiver) {
fn message_channel<T>(size: usize) -> (MessageSender<T>, MessageReceiver<T>) {
mpsc::channel(size)
}

Expand All @@ -66,15 +66,15 @@ pub struct FileSinkBuilder {
roll_time: Duration,
file_upload: FileUpload,
auto_commit: bool,
metric: &'static str,
metric: String,
}

impl FileSinkBuilder {
pub fn new(
prefix: impl ToString,
target_path: &Path,
file_upload: FileUpload,
metric: &'static str,
metric: impl Into<String>,
) -> Self {
Self {
prefix: prefix.to_string(),
Expand All @@ -84,7 +84,7 @@ impl FileSinkBuilder {
roll_time: Duration::from_secs(DEFAULT_SINK_ROLL_SECS),
file_upload,
auto_commit: true,
metric,
metric: metric.into(),
}
}

Expand Down Expand Up @@ -120,15 +120,18 @@ impl FileSinkBuilder {
}
}

pub async fn create(self) -> Result<(FileSinkClient, FileSink)> {
pub async fn create<T>(self) -> Result<(FileSinkClient<T>, FileSink<T>)>
where
T: MsgBytes,
{
let (tx, rx) = message_channel(50);

let client = FileSinkClient {
sender: tx,
metric: self.metric,
};

metrics::counter!(client.metric, vec![OK_LABEL]);
metrics::counter!(client.metric.clone(), vec![OK_LABEL]);

let mut sink = FileSink {
target_path: self.target_path,
Expand All @@ -148,33 +151,35 @@ impl FileSinkBuilder {
}

#[derive(Debug, Clone)]
pub struct FileSinkClient {
pub sender: MessageSender,
pub metric: &'static str,
pub struct FileSinkClient<T> {
pub sender: MessageSender<T>,
pub metric: String,
}

const OK_LABEL: Label = Label::from_static_parts("status", "ok");
const ERROR_LABEL: Label = Label::from_static_parts("status", "error");
const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl FileSinkClient {
pub fn new(sender: MessageSender, metric: &'static str) -> Self {
Self { sender, metric }
impl<T> FileSinkClient<T> {
pub fn new(sender: MessageSender<T>, metric: impl Into<String>) -> Self {
Self {
sender,
metric: metric.into(),
}
}

pub async fn write<T: prost::Message>(
pub async fn write(
&self,
item: T,
labels: impl IntoIterator<Item = &(&'static str, &'static str)>,
) -> Result<oneshot::Receiver<Result>> {
let (on_write_tx, on_write_rx) = oneshot::channel();
let bytes = item.encode_to_vec();
let labels = labels.into_iter().map(Label::from);
tokio::select! {
result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result {
result = self.sender.send_timeout(Message::Data(on_write_tx, item), SEND_TIMEOUT) => match result {
Ok(_) => {
metrics::counter!(
self.metric,
self.metric.clone(),
labels
.chain(std::iter::once(OK_LABEL))
.collect::<Vec<Label>>()
Expand All @@ -184,7 +189,7 @@ impl FileSinkClient {
}
Err(SendTimeoutError::Closed(_)) => {
metrics::counter!(
self.metric,
self.metric.clone(),
labels
.chain(std::iter::once(ERROR_LABEL))
.collect::<Vec<Label>>()
Expand All @@ -203,7 +208,7 @@ impl FileSinkClient {
/// Writes all messages to the file sink, return the last oneshot
pub async fn write_all(
&self,
items: impl IntoIterator<Item = impl prost::Message>,
items: impl IntoIterator<Item = T>,
) -> Result<Option<oneshot::Receiver<Result>>> {
let mut last_oneshot = None;
for item in items {
Expand Down Expand Up @@ -244,14 +249,14 @@ impl FileSinkClient {
}

#[derive(Debug)]
pub struct FileSink {
pub struct FileSink<T> {
target_path: PathBuf,
tmp_path: PathBuf,
prefix: String,
max_size: usize,
roll_time: Duration,

messages: MessageReceiver,
messages: MessageReceiver<T>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
auto_commit: bool,
Expand All @@ -273,7 +278,7 @@ impl ActiveSink {
}
}

impl ManagedTask for FileSink {
impl<T: MsgBytes + Send + Sync + 'static> ManagedTask for FileSink<T> {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
Expand All @@ -288,7 +293,7 @@ impl ManagedTask for FileSink {
}
}

impl FileSink {
impl<T: MsgBytes> FileSink<T> {
async fn init(&mut self) -> Result {
fs::create_dir_all(&self.target_path).await?;
fs::create_dir_all(&self.tmp_path).await?;
Expand Down Expand Up @@ -350,8 +355,8 @@ impl FileSink {
_ = shutdown.clone() => break,
_ = rollover_timer.tick() => self.maybe_roll().await?,
msg = self.messages.recv() => match msg {
Some(Message::Data(on_write_tx, bytes)) => {
let res = match self.write(Bytes::from(bytes)).await {
Some(Message::Data(on_write_tx, item)) => {
let res = match self.write(item.as_bytes()).await {
Ok(_) => Ok(()),
Err(err) => {
tracing::error!("failed to store {}: {err:?}", &self.prefix);
Expand Down
Loading

0 comments on commit ca12bb6

Please sign in to comment.