Skip to content

Commit

Permalink
Merge pull request #1 from jeffijoe/misc
Browse files Browse the repository at this point in the history
More CLI options, update packages
  • Loading branch information
jeffijoe authored May 19, 2023
2 parents c50728d + 449d59d commit c4e7199
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 80 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltio"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
authors = ["Jeff Hansen"]
description = "A Google Cloud Pub/Sub emulator alternative for local testing and CI"
Expand Down
41 changes: 19 additions & 22 deletions src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::topics::{
PublishMessagesError,
};
use crate::topics::{TopicMessage, TopicName};
use crate::tracing::ActivitySpan;
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -39,7 +40,7 @@ impl PublisherService {
#[async_trait::async_trait]
impl Publisher for PublisherService {
async fn create_topic(&self, request: Request<Topic>) -> Result<Response<Topic>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();
let topic_name = parser::parse_topic_name(&request.name)?;
let topic_name_str = topic_name.to_string();
Expand All @@ -61,11 +62,7 @@ impl Publisher for PublisherService {
message_storage_policy: None,
};

log::info!(
"{}: creating topic took {:?}",
topic_name_str,
start.elapsed()
);
log::debug!("{}: creating topic {}", topic_name_str, start);
Ok(Response::new(response))
}

Expand All @@ -82,7 +79,7 @@ impl Publisher for PublisherService {
&self,
request: Request<PublishRequest>,
) -> Result<Response<PublishResponse>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();
let topic_name = parser::parse_topic_name(&request.topic)?;

Expand All @@ -108,11 +105,11 @@ impl Publisher for PublisherService {
message_ids: result.message_ids.iter().map(|m| m.to_string()).collect(),
});

log::info!(
"{}: publishing {} messages took {:?}",
log::debug!(
"{}: publishing {} messages {}",
&topic_name,
request.messages.len(),
start.elapsed()
start
);

Ok(response)
Expand All @@ -122,13 +119,13 @@ impl Publisher for PublisherService {
&self,
request: Request<GetTopicRequest>,
) -> Result<Response<Topic>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();
let topic_name = parser::parse_topic_name(&request.topic)?;

let topic = self.get_topic_internal(&topic_name).await?;

log::info!("{}: getting topic took {:?}", &topic_name, start.elapsed());
log::debug!("{}: getting topic {}", &topic_name, start);
Ok(Response::new(Topic {
name: topic.name.to_string(),
labels: Default::default(),
Expand All @@ -144,7 +141,7 @@ impl Publisher for PublisherService {
&self,
request: Request<ListTopicsRequest>,
) -> Result<Response<ListTopicsResponse>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();
let paging = parser::parse_paging(request.page_size, &request.page_token)?;
let project_id = parser::parse_project_id(&request.project)?;
Expand Down Expand Up @@ -176,11 +173,11 @@ impl Publisher for PublisherService {
next_page_token: page_token.unwrap_or(String::default()),
};

log::info!(
"{}: listing {} topics took {:?}",
log::debug!(
"{}: listing {} topics {}",
&request.project,
response.topics.len(),
start.elapsed()
start
);
Ok(Response::new(response))
}
Expand All @@ -189,7 +186,7 @@ impl Publisher for PublisherService {
&self,
request: Request<ListTopicSubscriptionsRequest>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();
let topic_name = parser::parse_topic_name(&request.topic)?;

Expand All @@ -204,11 +201,11 @@ impl Publisher for PublisherService {
ListSubscriptionsError::Closed => conflict(),
})?;

log::info!(
"{}: listing {} subscriptions took {:?}",
log::debug!(
"{}: listing {} subscriptions {}",
&topic_name,
page.subscriptions.len(),
start.elapsed()
start
);
Ok(Response::new(ListTopicSubscriptionsResponse {
subscriptions: page
Expand Down Expand Up @@ -236,7 +233,7 @@ impl Publisher for PublisherService {
&self,
request: Request<DeleteTopicRequest>,
) -> Result<Response<()>, Status> {
let start = std::time::Instant::now();
let start = ActivitySpan::start();
let request = request.get_ref();

let topic_name = parser::parse_topic_name(&request.topic)?;
Expand All @@ -246,7 +243,7 @@ impl Publisher for PublisherService {
DeleteError::Closed => conflict(),
})?;

log::info!("{}: deleting topic took {:?}", &topic_name, start.elapsed());
log::debug!("{}: deleting topic {}", &topic_name, start);
Ok(Response::new(()))
}

Expand Down
Loading

0 comments on commit c4e7199

Please sign in to comment.