@@ -75,7 +75,7 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err);
75
75
static void on_process_queries (void * arg );
76
76
static void poll_database_connection (h2o_socket_t * sock , const char * err );
77
77
static void prepare_statements (db_conn_t * conn );
78
- static void process_queries (db_conn_t * conn , bool removed );
78
+ static void process_queries (db_conn_pool_t * pool );
79
79
static void remove_connection (db_conn_t * conn );
80
80
static void start_database_connect (db_conn_pool_t * pool , db_conn_t * conn );
81
81
@@ -237,7 +237,10 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
237
237
h2o_timer_unlink (& conn -> timer );
238
238
h2o_socket_read_stop (conn -> sock );
239
239
h2o_socket_read_start (conn -> sock , on_database_read_ready );
240
- process_queries (conn , true);
240
+ * conn -> pool -> conn .tail = & conn -> l ;
241
+ conn -> pool -> conn .tail = & conn -> l .next ;
242
+ conn -> l .next = NULL ;
243
+ process_queries (conn -> pool );
241
244
return ;
242
245
default :
243
246
LIBRARY_ERROR ("PQresultStatus" , PQresultErrorMessage (result ));
@@ -370,7 +373,13 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
370
373
for (PGnotify * notify = PQnotifies (conn -> conn ); notify ; notify = PQnotifies (conn -> conn ))
371
374
PQfreemem (notify );
372
375
373
- process_queries (conn , removed );
376
+ if (removed && conn -> query_num ) {
377
+ * conn -> pool -> conn .tail = & conn -> l ;
378
+ conn -> pool -> conn .tail = & conn -> l .next ;
379
+ conn -> l .next = NULL ;
380
+ }
381
+
382
+ process_queries (conn -> pool );
374
383
}
375
384
376
385
static void on_database_timeout (h2o_timer_t * timer )
@@ -405,20 +414,83 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
405
414
406
415
static void on_process_queries (void * arg )
407
416
{
417
+ list_t * iter = NULL ;
408
418
db_conn_pool_t * const pool = arg ;
419
+ size_t query_num = 0 ;
409
420
410
- while (pool -> queries .head && pool -> conn ) {
411
- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn );
421
+ while (pool -> queries .head && pool -> conn .head ) {
422
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn .head );
423
+ db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER (db_query_param_t ,
424
+ l ,
425
+ conn -> pool -> queries .head );
412
426
413
- pool -> conn = conn -> l .next ;
414
427
assert (conn -> query_num );
415
- process_queries (conn , true);
428
+ assert (pool -> query_num < pool -> config -> max_query_num );
429
+ pool -> conn .head = conn -> l .next ;
430
+ pool -> queries .head = param -> l .next ;
431
+
432
+ if (!pool -> conn .head ) {
433
+ assert (pool -> conn .tail == & conn -> l .next );
434
+ pool -> conn .tail = & pool -> conn .head ;
435
+ }
436
+
437
+ if (++ pool -> query_num == pool -> config -> max_query_num ) {
438
+ assert (!pool -> queries .head );
439
+ assert (pool -> queries .tail == & param -> l .next );
440
+ pool -> queries .tail = & pool -> queries .head ;
441
+ }
442
+
443
+ if (do_execute_query (conn , param )) {
444
+ param -> on_error (param , DB_ERROR );
445
+ on_database_error (conn , DB_ERROR );
446
+ }
447
+ else {
448
+ query_num ++ ;
449
+
450
+ if (conn -> query_num ) {
451
+ * pool -> conn .tail = & conn -> l ;
452
+ pool -> conn .tail = & conn -> l .next ;
453
+ conn -> l .next = NULL ;
454
+ }
455
+ else {
456
+ conn -> l .next = iter ;
457
+ iter = & conn -> l ;
458
+ }
459
+ }
416
460
}
417
461
418
- if (pool -> queries .head && pool -> conn_num )
419
- start_database_connect (pool , NULL );
462
+ if (iter )
463
+ do {
464
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , iter );
465
+
466
+ iter = conn -> l .next ;
467
+
468
+ if (flush_connection (on_database_write_ready , conn ))
469
+ on_database_error (conn , DB_ERROR );
470
+ } while (iter );
420
471
472
+ pool -> conn .tail = & pool -> conn .head ;
421
473
pool -> process_queries = false;
474
+ query_num += pool -> config -> max_query_num - pool -> query_num ;
475
+
476
+ for (iter = pool -> conn .head ; iter ;) {
477
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , iter );
478
+
479
+ iter = conn -> l .next ;
480
+
481
+ if (flush_connection (on_database_write_ready , conn )) {
482
+ * pool -> conn .tail = iter ;
483
+ on_database_error (conn , DB_ERROR );
484
+ }
485
+ else
486
+ pool -> conn .tail = & conn -> l .next ;
487
+ }
488
+
489
+ const size_t conn_num = pool -> config -> max_db_conn_num - pool -> conn_num ;
490
+
491
+ if (query_num > conn_num )
492
+ for (query_num -= conn_num ; pool -> conn_num && query_num ; query_num -- )
493
+ start_database_connect (pool , NULL );
422
494
}
423
495
424
496
static void poll_database_connection (h2o_socket_t * sock , const char * err )
@@ -536,54 +608,44 @@ static void prepare_statements(db_conn_t *conn)
536
608
}
537
609
else {
538
610
h2o_socket_read_start (conn -> sock , on_database_read_ready );
539
- process_queries (conn , true);
611
+ * conn -> pool -> conn .tail = & conn -> l ;
612
+ conn -> pool -> conn .tail = & conn -> l .next ;
613
+ conn -> l .next = NULL ;
614
+ process_queries (conn -> pool );
540
615
}
541
616
}
542
617
543
- static void process_queries (db_conn_t * conn , bool removed )
618
+ static void process_queries (db_conn_pool_t * pool )
544
619
{
545
- const bool flush = conn -> query_num && conn -> pool -> queries .head ;
546
-
547
- while (conn -> query_num && conn -> pool -> queries .head ) {
548
- db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER (db_query_param_t ,
549
- l ,
550
- conn -> pool -> queries .head );
551
-
552
- if (++ conn -> pool -> query_num == conn -> pool -> config -> max_query_num ) {
553
- assert (conn -> pool -> queries .tail == & param -> l .next );
554
- conn -> pool -> queries .tail = & conn -> pool -> queries .head ;
555
- }
556
-
557
- conn -> pool -> queries .head = param -> l .next ;
558
-
559
- if (do_execute_query (conn , param )) {
560
- param -> on_error (param , DB_ERROR );
561
- on_database_error (conn , DB_ERROR );
562
- return ;
563
- }
564
- }
565
-
566
- if (flush && flush_connection (on_database_write_ready , conn ))
567
- on_database_error (conn , DB_ERROR );
568
- else if (conn -> query_num && removed ) {
569
- conn -> l .next = conn -> pool -> conn ;
570
- conn -> pool -> conn = & conn -> l ;
620
+ if (!pool -> process_queries && pool -> queries .head ) {
621
+ task_message_t * const msg = h2o_mem_alloc (sizeof (* msg ));
622
+
623
+ assert (pool -> query_num < pool -> config -> max_query_num );
624
+ memset (msg , 0 , sizeof (* msg ));
625
+ msg -> arg = pool ;
626
+ msg -> super .type = TASK ;
627
+ msg -> task = on_process_queries ;
628
+ pool -> process_queries = true;
629
+ send_local_message (& msg -> super , pool -> local_messages );
571
630
}
572
- else if (!conn -> query_num && !removed )
573
- // This call should not be problematic, assuming a relatively low number of connections.
574
- remove_connection (conn );
575
631
}
576
632
577
633
static void remove_connection (db_conn_t * conn )
578
634
{
579
- list_t * iter = conn -> pool -> conn ;
580
- list_t * * prev = & conn -> pool -> conn ;
635
+ list_t * iter = conn -> pool -> conn . head ;
636
+ list_t * * prev = & conn -> pool -> conn . head ;
581
637
582
638
for (; iter && iter != & conn -> l ; iter = iter -> next )
583
639
prev = & iter -> next ;
584
640
585
- if (iter )
641
+ if (iter ) {
586
642
* prev = iter -> next ;
643
+
644
+ if (!conn -> pool -> conn .head ) {
645
+ assert (conn -> pool -> conn .tail == & iter -> next );
646
+ conn -> pool -> conn .tail = & conn -> pool -> conn .head ;
647
+ }
648
+ }
587
649
}
588
650
589
651
static void start_database_connect (db_conn_pool_t * pool , db_conn_t * conn )
@@ -661,37 +723,15 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
661
723
int ret = 1 ;
662
724
663
725
if (pool -> query_num ) {
664
- if (pool -> conn ) {
665
- // Delay sending the database queries to the server, so that if there is a rapid
666
- // succession of calls to this function, all resultant queries would be inserted
667
- // into a command pipeline with a smaller number of system calls.
668
- if (!pool -> process_queries ) {
669
- task_message_t * const msg = h2o_mem_alloc (sizeof (* msg ));
670
-
671
- memset (msg , 0 , sizeof (* msg ));
672
- msg -> arg = pool ;
673
- msg -> super .type = TASK ;
674
- msg -> task = on_process_queries ;
675
- send_local_message (& msg -> super , pool -> local_messages );
676
- pool -> process_queries = true;
677
- }
678
-
679
- ret = 0 ;
680
- }
681
- else {
682
- if (pool -> conn_num )
683
- start_database_connect (pool , NULL );
684
-
685
- if (pool -> conn_num < pool -> config -> max_db_conn_num && pool -> query_num )
686
- ret = 0 ;
687
- }
688
-
689
- if (!ret ) {
690
- param -> l .next = NULL ;
691
- * pool -> queries .tail = & param -> l ;
692
- pool -> queries .tail = & param -> l .next ;
693
- pool -> query_num -- ;
694
- }
726
+ // Delay sending the database queries to the server, so that if there is a rapid
727
+ // succession of calls to this function, all resultant queries would be inserted
728
+ // into a command pipeline with a smaller number of system calls.
729
+ param -> l .next = NULL ;
730
+ * pool -> queries .tail = & param -> l ;
731
+ pool -> queries .tail = & param -> l .next ;
732
+ pool -> query_num -- ;
733
+ process_queries (pool );
734
+ ret = 0 ;
695
735
}
696
736
697
737
return ret ;
@@ -704,9 +744,9 @@ void free_database_connection_pool(db_conn_pool_t *pool)
704
744
705
745
size_t num = 0 ;
706
746
707
- if (pool -> conn )
747
+ if (pool -> conn . head )
708
748
do {
709
- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn );
749
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn . head );
710
750
711
751
assert (!conn -> queries .head );
712
752
assert (conn -> query_num == pool -> config -> max_pipeline_query_num );
@@ -715,10 +755,10 @@ void free_database_connection_pool(db_conn_pool_t *pool)
715
755
h2o_socket_read_stop (conn -> sock );
716
756
h2o_socket_close (conn -> sock );
717
757
PQfinish (conn -> conn );
718
- pool -> conn = pool -> conn -> next ;
719
- free (conn );
758
+ pool -> conn .head = conn -> l .next ;
720
759
num ++ ;
721
- } while (pool -> conn );
760
+ free (conn );
761
+ } while (pool -> conn .head );
722
762
723
763
assert (num + pool -> conn_num == pool -> config -> max_db_conn_num );
724
764
}
@@ -732,6 +772,7 @@ void initialize_database_connection_pool(const char *conninfo,
732
772
{
733
773
memset (pool , 0 , sizeof (* pool ));
734
774
pool -> config = config ;
775
+ pool -> conn .tail = & pool -> conn .head ;
735
776
pool -> conninfo = conninfo ? conninfo : "" ;
736
777
pool -> local_messages = local_messages ;
737
778
pool -> loop = loop ;
0 commit comments