@@ -485,13 +485,11 @@ def send_batch(
485485 )
486486 return self ._send_batch_sync (queue_name , encode_list_to_psql (messages ), delay )
487487
488- def _read_sync (
489- self , queue_name : str , vt : Optional [int ] = None
490- ) -> Optional [Message ]:
488+ def _read_sync (self , queue_name : str , vt : int ) -> Optional [Message ]:
491489 with self .session_maker () as session :
492490 row = session .execute (
493491 text ("select * from pgmq.read(:queue_name,:vt,1);" ),
494- {"queue_name" : queue_name , "vt" : vt or self . vt },
492+ {"queue_name" : queue_name , "vt" : vt },
495493 ).fetchone ()
496494 session .commit ()
497495 if row is None :
@@ -500,14 +498,12 @@ def _read_sync(
500498 msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
501499 )
502500
503- async def _read_async (
504- self , queue_name : str , vt : Optional [int ] = None
505- ) -> Optional [Message ]:
501+ async def _read_async (self , queue_name : str , vt : int ) -> Optional [Message ]:
506502 async with self .session_maker () as session :
507503 row = (
508504 await session .execute (
509505 text ("select * from pgmq.read(:queue_name,:vt,1);" ),
510- {"queue_name" : queue_name , "vt" : vt or self . vt },
506+ {"queue_name" : queue_name , "vt" : vt },
511507 )
512508 ).fetchone ()
513509 await session .commit ()
@@ -584,15 +580,17 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:
584580 def _read_batch_sync (
585581 self ,
586582 queue_name : str ,
583+ vt : int ,
587584 batch_size : int = 1 ,
588- vt : Optional [int ] = None ,
589585 ) -> Optional [List [Message ]]:
586+ if vt is None :
587+ vt = self .vt
590588 with self .session_maker () as session :
591589 rows = session .execute (
592590 text ("select * from pgmq.read(:queue_name,:vt,:batch_size);" ),
593591 {
594592 "queue_name" : queue_name ,
595- "vt" : vt or self . vt ,
593+ "vt" : vt ,
596594 "batch_size" : batch_size ,
597595 },
598596 ).fetchall ()
@@ -613,16 +611,16 @@ def _read_batch_sync(
613611 async def _read_batch_async (
614612 self ,
615613 queue_name : str ,
614+ vt : int ,
616615 batch_size : int = 1 ,
617- vt : Optional [int ] = None ,
618616 ) -> Optional [List [Message ]]:
619617 async with self .session_maker () as session :
620618 rows = (
621619 await session .execute (
622620 text ("select * from pgmq.read(:queue_name,:vt,:batch_size);" ),
623621 {
624622 "queue_name" : queue_name ,
625- "vt" : vt or self . vt ,
623+ "vt" : vt ,
626624 "batch_size" : batch_size ,
627625 },
628626 )
@@ -663,6 +661,8 @@ def read_batch(
663661 msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10)
664662
665663 """
664+ if vt is None :
665+ vt = self .vt
666666 if self .is_async :
667667 return self .loop .run_until_complete (
668668 self ._read_batch_async (queue_name , batch_size , vt )
@@ -672,7 +672,7 @@ def read_batch(
672672 def _read_with_poll_sync (
673673 self ,
674674 queue_name : str ,
675- vt : Optional [ int ] = None ,
675+ vt : int ,
676676 qty : int = 1 ,
677677 max_poll_seconds : int = 5 ,
678678 poll_interval_ms : int = 100 ,
@@ -685,7 +685,7 @@ def _read_with_poll_sync(
685685 ),
686686 {
687687 "queue_name" : queue_name ,
688- "vt" : vt or self . vt ,
688+ "vt" : vt ,
689689 "qty" : qty ,
690690 "max_poll_seconds" : max_poll_seconds ,
691691 "poll_interval_ms" : poll_interval_ms ,
@@ -708,7 +708,7 @@ def _read_with_poll_sync(
708708 async def _read_with_poll_async (
709709 self ,
710710 queue_name : str ,
711- vt : Optional [ int ] = None ,
711+ vt : int ,
712712 qty : int = 1 ,
713713 max_poll_seconds : int = 5 ,
714714 poll_interval_ms : int = 100 ,
@@ -722,7 +722,7 @@ async def _read_with_poll_async(
722722 ),
723723 {
724724 "queue_name" : queue_name ,
725- "vt" : vt or self . vt ,
725+ "vt" : vt ,
726726 "qty" : qty ,
727727 "max_poll_seconds" : max_poll_seconds ,
728728 "poll_interval_ms" : poll_interval_ms ,
@@ -799,6 +799,8 @@ def read_with_poll(
799799 assert len(msgs) == 3 # will read at most 3 messages (qty=3)
800800
801801 """
802+ if vt is None :
803+ vt = self .vt
802804
803805 if self .is_async :
804806 return self .loop .run_until_complete (
0 commit comments