Skip to content

Commit

Permalink
feat: added monthly statistics. Added user statistics (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
akorchyn authored Aug 27, 2024
1 parent 3d13d62 commit 3f45ac5
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 62 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ http-body-util = "0.1"
num-format = "0.4"
resvg = "0.42.0"
tiny-skia = "0.11"
itertools = "0.12.0"

[profile.release]
codegen-units = 1
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ http-body-util.workspace = true
rocket_cors.workspace = true
num-format.workspace = true
tracing.workspace = true
itertools.workspace = true

shared = { workspace = true, features = ["client"] }
96 changes: 72 additions & 24 deletions server/src/weekly_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,44 @@ use std::{
time::Duration,
};

use rocket::{fairing::AdHoc, futures::future::join_all};
use chrono::NaiveDate;
use itertools::Itertools;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use shared::telegram::TelegramSubscriber;
use shared::{telegram::TelegramSubscriber, GithubHandle};
use tracing::Level;

use crate::{db::DB, github_pull::GithubClient};

async fn calculate_weekly_pr_stats(
async fn calculate_pr_stats(
db: &DB,
github: &GithubClient,
telegram: &TelegramSubscriber,
period_string: &str,
start_period: NaiveDate,
) -> anyhow::Result<()> {
let projects = db.get_projects().await?;
let mut project_stats = Vec::with_capacity(projects.len());
let mut user_stats = std::collections::HashMap::<GithubHandle, (u32, u32)>::new();
for (org, repo) in projects {
let period = (chrono::Utc::now() - chrono::Duration::days(7)).date_naive();
let prs = github.pull_requests_for_period(&org, &repo, period).await?;
let result = join_all(
prs.iter()
.map(|pr| db.is_pr_available(&org, &repo, pr.number as i32)),
)
.await
.into_iter()
.filter(|r| matches!(r, Ok(true)))
.count();
let prs = github
.pull_requests_for_period(&org, &repo, start_period)
.await?;
let mut sloths_prs = 0;
let total_prs = prs.len();

for pr in prs {
let user = pr.user.map(|u| u.login).unwrap_or_default();
let (total_prs, user_sloths_pr) = user_stats.entry(user).or_default();
*total_prs += 1;

if let Ok(true) = db.is_pr_available(&org, &repo, pr.number as i32).await {
sloths_prs += 1;
*user_sloths_pr += 1;
}
}

project_stats.push((org, repo, prs.len(), result));
project_stats.push((org, repo, total_prs, sloths_prs));
}

project_stats.sort_by(|(_, _, prs1, with_sloth1), (_, _, prs2, with_sloth2)| {
Expand All @@ -38,21 +49,43 @@ async fn calculate_weekly_pr_stats(
diff2.cmp(&diff1)
});

let mut message = String::from("Weekly PR stats:\n");
for (org, repo, prs, prs_with_sloth) in project_stats.into_iter().take(10) {
message.push_str(&format!(
"- [{org}/{repo}](https://github.com/{org}/{repo}) - {prs} PRs, {prs_with_sloth} PRs with sloth, {} Difference\n",
prs - prs_with_sloth
));
}
let mut user_stats: Vec<(String, (u32, u32))> = user_stats.into_iter().collect();
user_stats.sort_by(|(_, (prs1, with_sloth1)), (_, (prs2, with_sloth2))| {
let diff1 = *prs1 - *with_sloth1;
let diff2 = *prs2 - *with_sloth2;
diff2.cmp(&diff1)
});

let message = [format!("{period_string} PR stats:")].into_iter().chain(project_stats.into_iter().take(10).map(|(org, repo, prs, prs_with_sloth)| format!(
"- [{org}/{repo}](https://github.com/{org}/{repo}) - {prs} PRs, {prs_with_sloth} PRs with sloth, {} Difference\n",
prs - prs_with_sloth
))).join("\n");

let user_message = [String::from("User;Total PRs;Sloth PRs;Difference")]
.into_iter()
.chain(
user_stats
.into_iter()
.map(|(user, (prs, prs_with_sloths))| {
format!(
"{};{};{};{}",
user,
prs,
prs_with_sloths,
prs - prs_with_sloths
)
}),
)
.join("\n");

telegram.send_to_telegram(&message, &Level::INFO);
telegram.send_csv_file_to_telegram(user_message.into_bytes(), "user-stats.csv".to_string());

Ok(())
}

pub fn stage(sleep_duration: Duration, atomic_bool: Arc<AtomicBool>) -> AdHoc {
AdHoc::on_ignite("Weekly stats", move |rocket| async move {
AdHoc::on_ignite("Weekly/Monthly stats", move |rocket| async move {
rocket.attach(AdHoc::on_liftoff(
"Analyzes weekly statistics",
move |rocket| {
Expand All @@ -72,10 +105,24 @@ pub fn stage(sleep_duration: Duration, atomic_bool: Arc<AtomicBool>) -> AdHoc {
rocket::tokio::spawn(async move {
let mut interval: rocket::tokio::time::Interval =
rocket::tokio::time::interval(sleep_duration);
let mut count = 0;
while atomic_bool.load(std::sync::atomic::Ordering::Relaxed) {
interval.tick().await;
if let Err(e) =
calculate_weekly_pr_stats(&db, &github_client, &telegram).await

let (period, start_period) = if count % 4 == 0 {
("Monthly", (chrono::Utc::now() - chrono::Duration::days(30)))
} else {
("Weekly", (chrono::Utc::now() - chrono::Duration::days(7)))
};

if let Err(e) = calculate_pr_stats(
&db,
&github_client,
&telegram,
period,
start_period.date_naive(),
)
.await
{
telegram.send_to_telegram(
&format!("Failed to calculate weekly stats: {:#?}", e),
Expand All @@ -84,6 +131,7 @@ pub fn stage(sleep_duration: Duration, atomic_bool: Arc<AtomicBool>) -> AdHoc {

tracing::error!("Failed to calculate weekly stats: {:#?}", e);
}
count += 1;
}
});
})
Expand Down
2 changes: 1 addition & 1 deletion shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ near-workspaces = { workspace = true, optional = true }
anyhow = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true, features = ["multipart"] }
tokio = { workspace = true, optional = true, features = ["sync"] }
tracing-subscriber = { workspace = true, optional = true }

Expand Down
128 changes: 91 additions & 37 deletions shared/src/telegram.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,101 @@
use reqwest::Client;
use reqwest::{multipart, Client, Response};
use std::fmt;
use tokio::sync::mpsc;
use tracing::{Event, Level, Subscriber};

pub enum MessageType {
CsvFile((String, Vec<u8>)),
Message((String, Level)),
}

#[derive(Clone)]
pub struct TelegramSubscriber {
sender: mpsc::UnboundedSender<(String, Level)>,
sender: mpsc::UnboundedSender<MessageType>,
}

async fn send_message(
client: &Client,
bot_token: &str,
chat_id: &str,
message: String,
level: Level,
) -> anyhow::Result<Response> {
let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token);

let message = if level == Level::INFO {
message.replace('-', "\\-")
} else {
let message = message
.replace('_', "\\_")
.replace('*', "\\*")
.replace('[', "\\[")
.replace(']', "\\]")
.replace('(', "\\(")
.replace(')', "\\)")
.replace('~', "\\~")
.replace('`', "\\`")
.replace('>', "\\>")
.replace('#', "\\#")
.replace('+', "\\+")
.replace('-', "\\-")
.replace('=', "\\=")
.replace('|', "\\|")
.replace('{', "\\{")
.replace('}', "\\}")
.replace('.', "\\.")
.replace('!', "\\!");

format!("*{}*: `{}`", level.as_str(), message)
};
let params = [
("chat_id", chat_id),
("text", &message),
("parse_mode", "MarkdownV2"),
];

Ok(client.post(&url).form(&params).send().await?)
}

async fn send_csv_to_telegram(
client: &Client,
bot_token: &str,
chat_id: &str,
csv_content: Vec<u8>,
filename: String,
) -> anyhow::Result<Response> {
let url = format!("https://api.telegram.org/bot{}/sendDocument", bot_token);

let form = multipart::Form::new()
.text("chat_id", chat_id.to_string())
.part(
"document",
multipart::Part::bytes(csv_content)
.file_name(filename)
.mime_str("text/csv")?,
);

let response = client.post(&url).multipart(form).send().await?;

Ok(response)
}

async fn sender_task(
mut reader: mpsc::UnboundedReceiver<(String, Level)>,
mut reader: mpsc::UnboundedReceiver<MessageType>,
client: Client,
bot_token: String,
chat_id: String,
) {
while let Some((message, level)) = reader.recv().await {
let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token);

let message = if level == Level::INFO {
message.replace('-', "\\-")
} else {
let message = message
.replace('_', "\\_")
.replace('*', "\\*")
.replace('[', "\\[")
.replace(']', "\\]")
.replace('(', "\\(")
.replace(')', "\\)")
.replace('~', "\\~")
.replace('`', "\\`")
.replace('>', "\\>")
.replace('#', "\\#")
.replace('+', "\\+")
.replace('-', "\\-")
.replace('=', "\\=")
.replace('|', "\\|")
.replace('{', "\\{")
.replace('}', "\\}")
.replace('.', "\\.")
.replace('!', "\\!");

format!("*{}*: `{}`", level.as_str(), message)
while let Some(msg) = reader.recv().await {
let result = match msg {
MessageType::Message((message, level)) => {
send_message(&client, &bot_token, &chat_id, message, level).await
}
MessageType::CsvFile((file, csv)) => {
send_csv_to_telegram(&client, &bot_token, &chat_id, csv, file).await
}
};
let params = [
("chat_id", chat_id.as_str()),
("text", &message),
("parse_mode", "MarkdownV2"),
];
match client.post(&url).form(&params).send().await {

match result {
Ok(response) if response.status().is_success() => {}
// We use eprintln! here because it doesn't make sense to send back a message to the chat
Ok(response) => eprintln!(
Expand All @@ -72,7 +120,13 @@ impl TelegramSubscriber {
}

pub fn send_to_telegram(&self, message: &str, level: &Level) {
let _ = self.sender.send((message.to_string(), *level));
let _ = self
.sender
.send(MessageType::Message((message.to_string(), *level)));
}

pub fn send_csv_file_to_telegram(&self, bytes: Vec<u8>, filename: String) {
let _ = self.sender.send(MessageType::CsvFile((filename, bytes)));
}
}

Expand Down

0 comments on commit 3f45ac5

Please sign in to comment.