@@ -485,13 +485,11 @@ def send_batch(
485
485
)
486
486
return self ._send_batch_sync (queue_name , encode_list_to_psql (messages ), delay )
487
487
488
- def _read_sync (
489
- self , queue_name : str , vt : Optional [int ] = None
490
- ) -> Optional [Message ]:
488
+ def _read_sync (self , queue_name : str , vt : int ) -> Optional [Message ]:
491
489
with self .session_maker () as session :
492
490
row = session .execute (
493
491
text ("select * from pgmq.read(:queue_name,:vt,1);" ),
494
- {"queue_name" : queue_name , "vt" : vt or self . vt },
492
+ {"queue_name" : queue_name , "vt" : vt },
495
493
).fetchone ()
496
494
session .commit ()
497
495
if row is None :
@@ -500,14 +498,12 @@ def _read_sync(
500
498
msg_id = row [0 ], read_ct = row [1 ], enqueued_at = row [2 ], vt = row [3 ], message = row [4 ]
501
499
)
502
500
503
- async def _read_async (
504
- self , queue_name : str , vt : Optional [int ] = None
505
- ) -> Optional [Message ]:
501
+ async def _read_async (self , queue_name : str , vt : int ) -> Optional [Message ]:
506
502
async with self .session_maker () as session :
507
503
row = (
508
504
await session .execute (
509
505
text ("select * from pgmq.read(:queue_name,:vt,1);" ),
510
- {"queue_name" : queue_name , "vt" : vt or self . vt },
506
+ {"queue_name" : queue_name , "vt" : vt },
511
507
)
512
508
).fetchone ()
513
509
await session .commit ()
@@ -584,15 +580,17 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:
584
580
def _read_batch_sync (
585
581
self ,
586
582
queue_name : str ,
583
+ vt : int ,
587
584
batch_size : int = 1 ,
588
- vt : Optional [int ] = None ,
589
585
) -> Optional [List [Message ]]:
586
+ if vt is None :
587
+ vt = self .vt
590
588
with self .session_maker () as session :
591
589
rows = session .execute (
592
590
text ("select * from pgmq.read(:queue_name,:vt,:batch_size);" ),
593
591
{
594
592
"queue_name" : queue_name ,
595
- "vt" : vt or self . vt ,
593
+ "vt" : vt ,
596
594
"batch_size" : batch_size ,
597
595
},
598
596
).fetchall ()
@@ -613,16 +611,16 @@ def _read_batch_sync(
613
611
async def _read_batch_async (
614
612
self ,
615
613
queue_name : str ,
614
+ vt : int ,
616
615
batch_size : int = 1 ,
617
- vt : Optional [int ] = None ,
618
616
) -> Optional [List [Message ]]:
619
617
async with self .session_maker () as session :
620
618
rows = (
621
619
await session .execute (
622
620
text ("select * from pgmq.read(:queue_name,:vt,:batch_size);" ),
623
621
{
624
622
"queue_name" : queue_name ,
625
- "vt" : vt or self . vt ,
623
+ "vt" : vt ,
626
624
"batch_size" : batch_size ,
627
625
},
628
626
)
@@ -663,6 +661,8 @@ def read_batch(
663
661
msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10)
664
662
665
663
"""
664
+ if vt is None :
665
+ vt = self .vt
666
666
if self .is_async :
667
667
return self .loop .run_until_complete (
668
668
self ._read_batch_async (queue_name , batch_size , vt )
@@ -672,7 +672,7 @@ def read_batch(
672
672
def _read_with_poll_sync (
673
673
self ,
674
674
queue_name : str ,
675
- vt : Optional [ int ] = None ,
675
+ vt : int ,
676
676
qty : int = 1 ,
677
677
max_poll_seconds : int = 5 ,
678
678
poll_interval_ms : int = 100 ,
@@ -685,7 +685,7 @@ def _read_with_poll_sync(
685
685
),
686
686
{
687
687
"queue_name" : queue_name ,
688
- "vt" : vt or self . vt ,
688
+ "vt" : vt ,
689
689
"qty" : qty ,
690
690
"max_poll_seconds" : max_poll_seconds ,
691
691
"poll_interval_ms" : poll_interval_ms ,
@@ -708,7 +708,7 @@ def _read_with_poll_sync(
708
708
async def _read_with_poll_async (
709
709
self ,
710
710
queue_name : str ,
711
- vt : Optional [ int ] = None ,
711
+ vt : int ,
712
712
qty : int = 1 ,
713
713
max_poll_seconds : int = 5 ,
714
714
poll_interval_ms : int = 100 ,
@@ -722,7 +722,7 @@ async def _read_with_poll_async(
722
722
),
723
723
{
724
724
"queue_name" : queue_name ,
725
- "vt" : vt or self . vt ,
725
+ "vt" : vt ,
726
726
"qty" : qty ,
727
727
"max_poll_seconds" : max_poll_seconds ,
728
728
"poll_interval_ms" : poll_interval_ms ,
@@ -799,6 +799,8 @@ def read_with_poll(
799
799
assert len(msgs) == 3 # will read at most 3 messages (qty=3)
800
800
801
801
"""
802
+ if vt is None :
803
+ vt = self .vt
802
804
803
805
if self .is_async :
804
806
return self .loop .run_until_complete (
0 commit comments