@@ -3,6 +3,7 @@ use std::{
3
3
time:: Duration ,
4
4
} ;
5
5
6
+ use anyhow:: Context ;
6
7
use chrono:: DateTime ;
7
8
use rocket:: fairing:: AdHoc ;
8
9
use rocket_db_pools:: Database ;
@@ -12,7 +13,6 @@ use sqlx::{Postgres, Transaction};
12
13
use crate :: db:: DB ;
13
14
14
15
async fn fetch_and_store_users (
15
- telegram : & Arc < TelegramSubscriber > ,
16
16
near_client : & NearClient ,
17
17
tx : & mut Transaction < ' static , Postgres > ,
18
18
) -> anyhow:: Result < ( ) > {
@@ -23,61 +23,61 @@ async fn fetch_and_store_users(
23
23
. into_iter ( )
24
24
. map ( |e| e. time_string ( timestamp as u64 ) )
25
25
. collect ( ) ;
26
- let users = near_client. users ( periods) . await ?;
26
+ let users = near_client
27
+ . users ( periods)
28
+ . await
29
+ . context ( "Failed to fetch users" ) ?;
27
30
28
31
for user in users {
29
- let user_id = match DB :: upsert_user ( tx, user. id , & user. name , user. percentage_bonus ) . await {
30
- Ok ( id) => id,
31
- Err ( e) => {
32
- crate :: error (
33
- telegram,
34
- & format ! ( "Failed to upsert user ({}): {:#?}" , user. name, e) ,
35
- ) ;
36
- continue ;
37
- }
38
- } ;
32
+ let user_id = DB :: upsert_user ( tx, user. id , & user. name , user. percentage_bonus )
33
+ . await
34
+ . with_context ( || format ! ( "Failed to upsert user with id: {}" , user. id) ) ?;
39
35
for ( period, data) in user. period_data {
40
- if let Err ( e) = DB :: upsert_user_period_data ( tx, period, & data, user_id) . await {
41
- crate :: error (
42
- telegram,
43
- & format ! (
44
- "Failed to upsert user ({}) period data: {:#?}" ,
45
- user. name, e
46
- ) ,
47
- ) ;
48
- }
36
+ DB :: upsert_user_period_data ( tx, period, & data, user_id)
37
+ . await
38
+ . with_context ( || {
39
+ format ! ( "Failed to upsert period data for user id: {}" , user_id)
40
+ } ) ?;
49
41
}
50
42
for ( streak_id, streak_data) in user. streaks {
51
- if let Err ( e) =
52
- DB :: upsert_streak_user_data ( tx, & streak_data, streak_id as i32 , user_id) . await
53
- {
54
- crate :: error (
55
- telegram,
56
- & format ! (
57
- "Failed to upsert user ({}) streak data: {:#?}" ,
58
- user. name, e
59
- ) ,
60
- ) ;
61
- }
43
+ DB :: upsert_streak_user_data ( tx, & streak_data, streak_id as i32 , user_id)
44
+ . await
45
+ . with_context ( || {
46
+ format ! (
47
+ "Failed to upsert streak data for user id: {} and streak id: {}" ,
48
+ user_id, streak_id
49
+ )
50
+ } ) ?;
62
51
}
63
52
}
64
53
65
54
Ok ( ( ) )
66
55
}
67
56
68
57
async fn fetch_and_store_prs (
69
- telegram : & Arc < TelegramSubscriber > ,
70
58
near_client : & NearClient ,
71
59
tx : & mut Transaction < ' static , Postgres > ,
72
60
) -> anyhow:: Result < ( ) > {
73
- let prs = near_client. prs ( ) . await ?;
61
+ let prs = near_client
62
+ . prs ( )
63
+ . await
64
+ . context ( "Failed to fetch PRs from near_client" ) ?;
65
+
66
+ DB :: clear_prs ( tx)
67
+ . await
68
+ . context ( "Failed to clear existing PRs from the database" ) ?;
74
69
75
- DB :: clear_prs ( tx) . await ?;
76
70
for ( pr, executed) in prs {
77
- let organization_id = DB :: upsert_organization ( tx, & pr. organization ) . await ?;
78
- let repo_id = DB :: upsert_repo ( tx, organization_id, & pr. repo ) . await ?;
79
- let author_id = DB :: get_user_id ( tx, & pr. author ) . await ?;
80
- if let Err ( e) = DB :: upsert_pull_request (
71
+ let organization_id = DB :: upsert_organization ( tx, & pr. organization )
72
+ . await
73
+ . context ( "Failed on upserting organization" ) ?;
74
+ let repo_id = DB :: upsert_repo ( tx, organization_id, & pr. repo )
75
+ . await
76
+ . context ( "Failed on upserting repo" ) ?;
77
+ let author_id = DB :: get_user_id ( tx, & pr. author )
78
+ . await
79
+ . context ( "Failed on getting user id" ) ?;
80
+ DB :: upsert_pull_request (
81
81
tx,
82
82
repo_id,
83
83
pr. number as i32 ,
@@ -93,69 +93,44 @@ async fn fetch_and_store_prs(
93
93
executed,
94
94
)
95
95
. await
96
- {
97
- crate :: error (
98
- telegram,
99
- & format ! (
100
- "Failed to upsert PR ({}/{}/pull/{}): {:#?}" ,
101
- pr. organization, pr. repo, pr. number, e
102
- ) ,
103
- ) ;
104
- }
96
+ . context ( "Failed on upserting PR" ) ?;
105
97
}
106
98
107
99
Ok ( ( ) )
108
100
}
109
101
110
102
async fn fetch_and_store_repos (
111
- telegram : & Arc < TelegramSubscriber > ,
112
103
near_client : & NearClient ,
113
104
tx : & mut Transaction < ' static , Postgres > ,
114
105
) -> anyhow:: Result < ( ) > {
115
106
let organizations = near_client. repos ( ) . await ?;
116
107
for org in organizations {
117
- let organization_id = match DB :: upsert_organization ( tx, & org. organization ) . await {
118
- Ok ( id) => id,
119
- Err ( e) => {
120
- crate :: error (
121
- telegram,
122
- & format ! (
123
- "Failed to upsert organization ({}): {:#?}" ,
124
- org. organization, e
125
- ) ,
126
- ) ;
127
- continue ;
128
- }
129
- } ;
108
+ let organization_id = DB :: upsert_organization ( tx, & org. organization )
109
+ . await
110
+ . context ( "Failed on upserting organization" ) ?;
130
111
for repo in org. repos {
131
- if let Err ( e) = DB :: upsert_repo ( tx, organization_id, & repo) . await {
132
- crate :: error (
133
- telegram,
134
- & format ! (
135
- "Failed to upsert repo ({}/{}): {:#?}" ,
136
- org. organization, repo, e
137
- ) ,
138
- ) ;
139
- }
112
+ DB :: upsert_repo ( tx, organization_id, & repo)
113
+ . await
114
+ . context ( "Failed on upserting repo" ) ?;
140
115
}
141
116
}
142
117
143
118
Ok ( ( ) )
144
119
}
145
120
146
121
// TODO: more efficient way to fetch only updated data
147
- async fn fetch_and_store_all_data (
148
- telegram : & Arc < TelegramSubscriber > ,
149
- near_client : & NearClient ,
150
- db : & DB ,
151
- ) -> anyhow:: Result < ( ) > {
122
+ async fn fetch_and_store_all_data ( near_client : & NearClient , db : & DB ) -> anyhow:: Result < ( ) > {
152
123
let mut tx = db. begin ( ) . await ?;
153
124
154
- fetch_and_store_users ( telegram, near_client, & mut tx) . await ?;
155
-
156
- fetch_and_store_repos ( telegram, near_client, & mut tx) . await ?;
157
- // It matters that we fetch users first, because we need to know their IDs
158
- fetch_and_store_prs ( telegram, near_client, & mut tx) . await ?;
125
+ fetch_and_store_users ( near_client, & mut tx)
126
+ . await
127
+ . context ( "Failed to fetch and store users" ) ?;
128
+ fetch_and_store_repos ( near_client, & mut tx)
129
+ . await
130
+ . context ( "Failed to fetch and store repositories" ) ?;
131
+ fetch_and_store_prs ( near_client, & mut tx)
132
+ . await
133
+ . context ( "Failed to fetch and store pull requests" ) ?;
159
134
160
135
tx. commit ( ) . await ?;
161
136
Ok ( ( ) )
@@ -180,11 +155,8 @@ pub fn stage(client: NearClient, sleep_duration: Duration, atomic_bool: Arc<Atom
180
155
interval. tick ( ) . await ;
181
156
182
157
// Execute a query of some kind
183
- if let Err ( e) = fetch_and_store_all_data ( & telegram, & near_client, & db) . await {
184
- crate :: error (
185
- & telegram,
186
- & format ! ( "Failed to fetch and store data: {:#?}" , e) ,
187
- ) ;
158
+ if let Err ( e) = fetch_and_store_all_data ( & near_client, & db) . await {
159
+ crate :: error ( & telegram, & e. to_string ( ) ) ;
188
160
}
189
161
}
190
162
} ) ;
0 commit comments