@@ -289,6 +289,7 @@ def __init__(self, node: NodeImpl, name: str, evictions: int, now: float) -> Non
289289 self .publish_futures : dict [int , PublishTracker ] = {}
290290 self .request_futures : dict [int , ResponseStreamImpl ] = {} # tag -> ResponseStreamImpl
291291 self .gossip_task : asyncio .Task [None ] | None = None
292+ self .gossip_task_is_periodic = False
292293 self .gossip_counter = 0
293294
294295 # -- Topic ABC --
@@ -313,10 +314,10 @@ def lage(self, now: float) -> int:
313314
314315 def merge_lage (self , now : float , remote_lage : int ) -> None :
315316 """Shift ts_origin backward if the remote claims an older origin."""
317+ clamped = max (- 1 , min (35 , remote_lage )) # LAGE_MIN=-1, LAGE_MAX=35
316318 my_lage = self .lage (now )
317- if remote_lage > my_lage :
318- # Shift origin backward to match the remote's claim.
319- self .ts_origin = now - (1 << remote_lage )
319+ if clamped > my_lage :
320+ self .ts_origin = now - (1 << clamped )
320321
321322 def animate (self , ts : float ) -> None :
322323 self .ts_animated = ts
@@ -516,7 +517,7 @@ def topic_allocate(self, topic: TopicImpl, new_evictions: int, now: float) -> No
516517 t .release_transport_handles ()
517518 t .evictions = ev
518519 t .ensure_listener ()
519- self .schedule_gossip_urgent (t , now )
520+ self .schedule_gossip_urgent (t )
520521 continue
521522
522523 modulus = self .transport .subject_id_modulus
@@ -532,7 +533,7 @@ def topic_allocate(self, topic: TopicImpl, new_evictions: int, now: float) -> No
532533 t .evictions = ev
533534 self .topics_by_subject_id [new_sid ] = t
534535 t .ensure_listener ()
535- self .schedule_gossip_urgent (t , now )
536+ self .schedule_gossip_urgent (t )
536537 elif left_wins (t .lage (now ), t .hash , collider .lage (now ), collider .hash ):
537538 # Our topic wins: take the slot, evict the collider.
538539 t .release_transport_handles ()
@@ -544,15 +545,16 @@ def topic_allocate(self, topic: TopicImpl, new_evictions: int, now: float) -> No
544545 del self .topics_by_subject_id [new_sid ]
545546 self .topics_by_subject_id [new_sid ] = t
546547 t .ensure_listener ()
547- self .schedule_gossip_urgent (t , now )
548+ self .schedule_gossip_urgent (t )
548549 # Schedule collider for reallocation.
549550 collider .release_transport_handles ()
550551 work .append ((collider , collider .evictions + 1 ))
551552 else :
552553 # Our topic loses: increment evictions and retry.
553554 work .append ((t , ev + 1 ))
554555
555- def couple_topic_root (self , topic : TopicImpl , root : SubscriberRoot ) -> None :
556+ @staticmethod
557+ def couple_topic_root (topic : TopicImpl , root : SubscriberRoot ) -> None :
556558 """Create a coupling between a topic and a subscriber root if not already coupled."""
557559 for c in topic .couplings :
558560 if c .root is root :
@@ -587,25 +589,29 @@ def schedule_gossip(self, topic: TopicImpl) -> None:
587589 """Start periodic gossip for an explicit topic."""
588590 if topic .gossip_task is not None :
589591 return # already scheduled
592+ topic .gossip_task_is_periodic = True
590593 topic .gossip_task = self .loop .create_task (self .gossip_loop (topic ))
591594
592- def schedule_gossip_urgent (self , topic : TopicImpl , now : float ) -> None :
595+ def schedule_gossip_urgent (self , topic : TopicImpl ) -> None :
593596 """Cancel current gossip and reschedule with short delay."""
594597 if topic .gossip_task is not None :
595598 topic .gossip_task .cancel ()
599+ topic .gossip_task_is_periodic = False
596600 topic .gossip_task = self .loop .create_task (self .gossip_urgent (topic ))
597601
598602 async def gossip_urgent (self , topic : TopicImpl ) -> None :
599603 try :
600604 await asyncio .sleep (random .random () * GOSSIP_URGENT_DELAY_MAX )
601605 await self .send_gossip (topic , broadcast = True )
602606 # Resume periodic gossip.
607+ topic .gossip_task_is_periodic = True
603608 topic .gossip_task = self .loop .create_task (self .gossip_loop (topic ))
604609 except asyncio .CancelledError :
605610 pass
606611
607612 async def gossip_loop (self , topic : TopicImpl ) -> None :
608613 try :
614+ topic .gossip_task_is_periodic = True
609615 while not self ._closed and not topic .is_implicit :
610616 dither = GOSSIP_PERIOD / GOSSIP_PERIOD_DITHER_RATIO
611617 delay = GOSSIP_PERIOD + random .uniform (- dither , dither )
@@ -696,15 +702,15 @@ def dispatch_arrival(self, arrival: TransportArrival, *, subject_id: int | None,
696702 payload = msg [HEADER_SIZE :]
697703
698704 if isinstance (hdr , (MsgBeHeader , MsgRelHeader )):
699- self .on_msg (arrival , hdr , payload , unicast )
705+ self .on_msg (arrival , hdr , payload )
700706 elif isinstance (hdr , (MsgAckHeader , MsgNackHeader )):
701707 self .on_msg_ack (arrival , hdr )
702708 elif isinstance (hdr , (RspBeHeader , RspRelHeader )):
703709 self .on_rsp (arrival , hdr , payload )
704710 elif isinstance (hdr , (RspAckHeader , RspNackHeader )):
705711 self .on_rsp_ack (arrival , hdr )
706712 elif isinstance (hdr , GossipHeader ):
707- self .on_gossip (arrival , hdr , payload , unicast , subject_id )
713+ self .on_gossip (hdr , payload , unicast , subject_id )
708714 elif isinstance (hdr , ScoutHeader ):
709715 self .on_scout (arrival , hdr , payload )
710716
@@ -713,7 +719,6 @@ def on_msg(
713719 arrival : TransportArrival ,
714720 hdr : MsgBeHeader | MsgRelHeader ,
715721 payload : bytes ,
716- unicast : bool ,
717722 ) -> None :
718723 topic = self .topics_by_hash .get (hdr .topic_hash )
719724 if topic is None :
@@ -754,8 +759,8 @@ def on_msg(
754759 )
755760 self .deliver_to_subscribers (topic , arrival , breadcrumb , payload , hdr .tag )
756761
762+ @staticmethod
757763 def deliver_to_subscribers (
758- self ,
759764 topic : TopicImpl ,
760765 arrival : TransportArrival ,
761766 breadcrumb : Breadcrumb ,
@@ -873,7 +878,6 @@ async def do_send() -> None:
873878
874879 def on_gossip (
875880 self ,
876- arrival : TransportArrival ,
877881 hdr : GossipHeader ,
878882 payload : bytes ,
879883 unicast : bool ,
@@ -912,19 +916,26 @@ def on_gossip_known(
912916 win = my_lage > lage or (my_lage == lage and topic .evictions > evictions )
913917 topic .merge_lage (now , lage )
914918 if win :
915- self .schedule_gossip_urgent (topic , now )
919+ self .schedule_gossip_urgent (topic )
916920 else :
917921 self .topic_allocate (topic , evictions , now )
918922 if topic .evictions == evictions :
919923 self .schedule_gossip (topic )
920924 else :
921925 topic .merge_lage (now , lage )
922- # Suppress gossip if we can't contribute newer states.
926+ # Suppress gossip if we can't contribute newer states to the consensus process.
927+ # Only suppress if the current gossip task is periodic (not urgent) or the scope is broadcast.
928+ # Unicast gossips are seen by a small subset of nodes so they do not suppress.
923929 is_broadcast = subject_id == self .broadcast_subject_id
924930 is_sharded = not unicast and not is_broadcast
925- suppress = (is_broadcast or is_sharded ) and (topic .lage (now ) == lage )
931+ suppress = (
932+ (is_broadcast or is_sharded )
933+ and (topic .lage (now ) == lage )
934+ and (topic .gossip_task_is_periodic or is_broadcast )
935+ )
926936 if suppress and topic .gossip_task is not None :
927937 topic .gossip_task .cancel ()
938+ topic .gossip_task_is_periodic = True
928939 topic .gossip_task = self .loop .create_task (self .gossip_loop (topic ))
929940 topic .ensure_listener ()
930941
@@ -936,18 +947,23 @@ def on_gossip_unknown(self, topic_hash: int, evictions: int, lage: int, now: flo
936947 return
937948 win = left_wins (mine .lage (now ), mine .hash , lage , topic_hash )
938949 if win :
939- self .schedule_gossip_urgent (mine , now )
950+ self .schedule_gossip_urgent (mine )
940951 else :
941952 self .topic_allocate (mine , mine .evictions + 1 , now )
942953
943954 def topic_subscribe_if_matching (self , name : str , topic_hash : int , evictions : int , lage : int ) -> TopicImpl | None :
944955 """Create an implicit topic if any pattern subscriber matches the name."""
956+ # Validate that the hash matches the name to prevent corrupt gossip from creating inconsistencies.
957+ if rapidhash (name ) != topic_hash :
958+ _logger .debug ("Gossip hash mismatch for '%s': got %016x, expected %016x" , name , topic_hash , rapidhash (name ))
959+ return None
945960 for pattern , root in self .sub_roots_pattern .items ():
946961 subs = match_pattern (pattern , name )
947962 if subs is not None :
948963 now = time .monotonic ()
964+ clamped_lage = max (0 , min (35 , lage ))
949965 topic = TopicImpl (self , name , evictions , now )
950- topic .ts_origin = now - max (1 , (1 << max ( 0 , lage ) ))
966+ topic .ts_origin = now - max (1 , (1 << clamped_lage ))
951967 self .topics_by_name [name ] = topic
952968 self .topics_by_hash [topic_hash ] = topic
953969 self .topic_allocate (topic , evictions , now )
0 commit comments