11use diesel:: r2d2:: Builder ;
22use diesel:: { connection:: SimpleConnection , pg:: PgConnection } ;
33use diesel:: {
4- r2d2:: { self , event as e , ConnectionManager , HandleEvent , Pool , PooledConnection } ,
4+ r2d2:: { ConnectionManager , Pool , PooledConnection } ,
55 Connection ,
66} ;
77use diesel:: { sql_query, RunQueryDsl } ;
@@ -14,15 +14,14 @@ use graph::internal_error;
1414use graph:: prelude:: tokio:: time:: Instant ;
1515use graph:: prelude:: {
1616 anyhow:: anyhow, crit, debug, error, info, o, tokio:: sync:: Semaphore , CancelGuard , CancelHandle ,
17- CancelToken as _, CancelableError , Counter , Gauge , Logger , MovingStats , PoolWaitStats ,
18- StoreError , ENV_VARS ,
17+ CancelToken as _, CancelableError , Gauge , Logger , MovingStats , PoolWaitStats , StoreError ,
18+ ENV_VARS ,
1919} ;
2020use graph:: prelude:: { tokio, MetricsRegistry } ;
2121use graph:: slog:: warn;
2222use graph:: util:: timed_rw_lock:: TimedMutex ;
2323
2424use std:: fmt:: { self } ;
25- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
2625use std:: sync:: Arc ;
2726use std:: time:: Duration ;
2827use std:: { collections:: HashMap , sync:: RwLock } ;
@@ -33,9 +32,11 @@ use crate::{Shard, PRIMARY_SHARD};
3332
3433mod coordinator;
3534mod foreign_server;
35+ mod state_tracker;
3636
3737pub use coordinator:: PoolCoordinator ;
3838pub use foreign_server:: ForeignServer ;
39+ use state_tracker:: { ErrorHandler , EventHandler , StateTracker } ;
3940
4041/// The namespace under which the `PRIMARY_TABLES` are mapped into each
4142/// shard
@@ -204,7 +205,7 @@ impl PoolState {
204205pub struct ConnectionPool {
205206 inner : PoolState ,
206207 pub shard : Shard ,
207- state_tracker : PoolStateTracker ,
208+ state_tracker : StateTracker ,
208209}
209210
210211impl fmt:: Debug for ConnectionPool {
@@ -240,47 +241,6 @@ impl PoolRole {
240241 }
241242}
242243
243- #[ derive( Clone ) ]
244- struct PoolStateTracker {
245- available : Arc < AtomicBool > ,
246- ignore_timeout : Arc < AtomicBool > ,
247- }
248-
249- impl PoolStateTracker {
250- fn new ( ) -> Self {
251- Self {
252- available : Arc :: new ( AtomicBool :: new ( true ) ) ,
253- ignore_timeout : Arc :: new ( AtomicBool :: new ( false ) ) ,
254- }
255- }
256-
257- fn mark_available ( & self ) {
258- self . available . store ( true , Ordering :: Relaxed ) ;
259- }
260-
261- fn mark_unavailable ( & self ) {
262- self . available . store ( false , Ordering :: Relaxed ) ;
263- }
264-
265- fn is_available ( & self ) -> bool {
266- self . available . load ( Ordering :: Relaxed )
267- }
268-
269- fn timeout_is_ignored ( & self ) -> bool {
270- self . ignore_timeout . load ( Ordering :: Relaxed )
271- }
272-
273- fn ignore_timeout < F , R > ( & self , f : F ) -> R
274- where
275- F : FnOnce ( ) -> R ,
276- {
277- self . ignore_timeout . store ( true , Ordering :: Relaxed ) ;
278- let res = f ( ) ;
279- self . ignore_timeout . store ( false , Ordering :: Relaxed ) ;
280- res
281- }
282- }
283-
284244impl ConnectionPool {
285245 fn create (
286246 shard_name : & str ,
@@ -292,7 +252,7 @@ impl ConnectionPool {
292252 registry : Arc < MetricsRegistry > ,
293253 coord : Arc < PoolCoordinator > ,
294254 ) -> ConnectionPool {
295- let state_tracker = PoolStateTracker :: new ( ) ;
255+ let state_tracker = StateTracker :: new ( ) ;
296256 let shard =
297257 Shard :: new ( shard_name. to_string ( ) ) . expect ( "shard_name is a valid name for a shard" ) ;
298258 let inner = {
@@ -461,169 +421,6 @@ impl ConnectionPool {
461421 }
462422}
463423
464- fn brief_error_msg ( error : & dyn std:: error:: Error ) -> String {
465- // For 'Connection refused' errors, Postgres includes the IP and
466- // port number in the error message. We want to suppress that and
467- // only use the first line from the error message. For more detailed
468- // analysis, 'Connection refused' manifests as a
469- // `ConnectionError(BadConnection("could not connect to server:
470- // Connection refused.."))`
471- error
472- . to_string ( )
473- . split ( '\n' )
474- . next ( )
475- . unwrap_or ( "no error details provided" )
476- . to_string ( )
477- }
478-
479- #[ derive( Clone ) ]
480- struct ErrorHandler {
481- logger : Logger ,
482- counter : Counter ,
483- state_tracker : PoolStateTracker ,
484- }
485-
486- impl ErrorHandler {
487- fn new ( logger : Logger , counter : Counter , state_tracker : PoolStateTracker ) -> Self {
488- Self {
489- logger,
490- counter,
491- state_tracker,
492- }
493- }
494- }
495- impl std:: fmt:: Debug for ErrorHandler {
496- fn fmt ( & self , _f : & mut fmt:: Formatter ) -> fmt:: Result {
497- fmt:: Result :: Ok ( ( ) )
498- }
499- }
500-
501- impl r2d2:: HandleError < r2d2:: Error > for ErrorHandler {
502- fn handle_error ( & self , error : r2d2:: Error ) {
503- let msg = brief_error_msg ( & error) ;
504-
505- // Don't count canceling statements for timeouts etc. as a
506- // connection error. Unfortunately, we only have the textual error
507- // and need to infer whether the error indicates that the database
508- // is down or if something else happened. When querying a replica,
509- // these messages indicate that a query was canceled because it
510- // conflicted with replication, but does not indicate that there is
511- // a problem with the database itself.
512- //
513- // This check will break if users run Postgres (or even graph-node)
514- // in a locale other than English. In that case, their database will
515- // be marked as unavailable even though it is perfectly fine.
516- if msg. contains ( "canceling statement" )
517- || msg. contains ( "terminating connection due to conflict with recovery" )
518- {
519- return ;
520- }
521-
522- self . counter . inc ( ) ;
523- if self . state_tracker . is_available ( ) {
524- error ! ( self . logger, "Postgres connection error" ; "error" => msg) ;
525- }
526- self . state_tracker . mark_unavailable ( ) ;
527- }
528- }
529-
530- #[ derive( Clone ) ]
531- struct EventHandler {
532- logger : Logger ,
533- count_gauge : Gauge ,
534- wait_gauge : Gauge ,
535- size_gauge : Gauge ,
536- wait_stats : PoolWaitStats ,
537- state_tracker : PoolStateTracker ,
538- }
539-
540- impl EventHandler {
541- fn new (
542- logger : Logger ,
543- registry : Arc < MetricsRegistry > ,
544- wait_stats : PoolWaitStats ,
545- const_labels : HashMap < String , String > ,
546- state_tracker : PoolStateTracker ,
547- ) -> Self {
548- let count_gauge = registry
549- . global_gauge (
550- "store_connection_checkout_count" ,
551- "The number of Postgres connections currently checked out" ,
552- const_labels. clone ( ) ,
553- )
554- . expect ( "failed to create `store_connection_checkout_count` counter" ) ;
555- let wait_gauge = registry
556- . global_gauge (
557- "store_connection_wait_time_ms" ,
558- "Average connection wait time" ,
559- const_labels. clone ( ) ,
560- )
561- . expect ( "failed to create `store_connection_wait_time_ms` counter" ) ;
562- let size_gauge = registry
563- . global_gauge (
564- "store_connection_pool_size_count" ,
565- "Overall size of the connection pool" ,
566- const_labels,
567- )
568- . expect ( "failed to create `store_connection_pool_size_count` counter" ) ;
569- EventHandler {
570- logger,
571- count_gauge,
572- wait_gauge,
573- wait_stats,
574- size_gauge,
575- state_tracker,
576- }
577- }
578-
579- fn add_conn_wait_time ( & self , duration : Duration ) {
580- self . wait_stats
581- . write ( )
582- . unwrap ( )
583- . add_and_register ( duration, & self . wait_gauge ) ;
584- }
585- }
586-
587- impl std:: fmt:: Debug for EventHandler {
588- fn fmt ( & self , _f : & mut fmt:: Formatter ) -> fmt:: Result {
589- fmt:: Result :: Ok ( ( ) )
590- }
591- }
592-
593- impl HandleEvent for EventHandler {
594- fn handle_acquire ( & self , _: e:: AcquireEvent ) {
595- self . size_gauge . inc ( ) ;
596- self . state_tracker . mark_available ( ) ;
597- }
598-
599- fn handle_release ( & self , _: e:: ReleaseEvent ) {
600- self . size_gauge . dec ( ) ;
601- }
602-
603- fn handle_checkout ( & self , event : e:: CheckoutEvent ) {
604- self . count_gauge . inc ( ) ;
605- self . add_conn_wait_time ( event. duration ( ) ) ;
606- self . state_tracker . mark_available ( ) ;
607- }
608-
609- fn handle_timeout ( & self , event : e:: TimeoutEvent ) {
610- if self . state_tracker . timeout_is_ignored ( ) {
611- return ;
612- }
613- self . add_conn_wait_time ( event. timeout ( ) ) ;
614- if self . state_tracker . is_available ( ) {
615- error ! ( self . logger, "Connection checkout timed out" ;
616- "wait_ms" => event. timeout( ) . as_millis( )
617- )
618- }
619- self . state_tracker . mark_unavailable ( ) ;
620- }
621-
622- fn handle_checkin ( & self , _: e:: CheckinEvent ) {
623- self . count_gauge . dec ( ) ;
624- }
625- }
626-
627424#[ derive( Clone ) ]
628425pub struct PoolInner {
629426 logger : Logger ,
@@ -662,7 +459,7 @@ impl PoolInner {
662459 fdw_pool_size : Option < u32 > ,
663460 logger : & Logger ,
664461 registry : Arc < MetricsRegistry > ,
665- state_tracker : PoolStateTracker ,
462+ state_tracker : StateTracker ,
666463 ) -> PoolInner {
667464 check_mirrored_tables ( ) ;
668465
0 commit comments