9
9
AsyncGenerator ,
10
10
AsyncIterator ,
11
11
Awaitable ,
12
+ Collection ,
12
13
NamedTuple ,
13
14
Sequence ,
14
15
Union ,
@@ -328,7 +329,7 @@ def __ne__(self, other: object) -> bool:
328
329
class InitialResult (NamedTuple ):
329
330
"""The state of the initial result"""
330
331
331
- children : set [IncrementalDataRecord ]
332
+ children : dict [IncrementalDataRecord , None ]
332
333
is_completed : bool
333
334
334
335
@@ -356,16 +357,19 @@ class IncrementalPublisher:
356
357
357
358
Each Incremental Data record also contains similar metadata, i.e. these records also
358
359
contain similar ``is_completed`` and ``children`` properties.
360
+
361
+ Note: Instead of sets we use dicts (with values set to None) which preserve order
362
+ and thereby achieve more deterministic results.
359
363
"""
360
364
361
365
_initial_result : InitialResult
362
- _released : set [IncrementalDataRecord ]
363
- _pending : set [IncrementalDataRecord ]
366
+ _released : dict [IncrementalDataRecord , None ]
367
+ _pending : dict [IncrementalDataRecord , None ]
364
368
365
369
def __init__ (self ) -> None :
366
- self ._initial_result = InitialResult (set () , False )
367
- self ._released = set ()
368
- self ._pending = set ()
370
+ self ._initial_result = InitialResult ({} , False )
371
+ self ._released = {}
372
+ self ._pending = {}
369
373
self ._resolve = Event ()
370
374
self ._tasks : set [Awaitable ] = set ()
371
375
@@ -382,13 +386,10 @@ async def subscribe(
382
386
while not is_done :
383
387
released = self ._released
384
388
for item in released :
385
- pending .discard (item )
386
- self ._released = (
387
- set ()
388
- ) # TODO, solve differently? use clear() and local variable?
389
+ del pending [item ]
390
+ self ._released = {}
389
391
390
392
result = self ._get_incremental_result (released )
391
- released .clear ()
392
393
393
394
if not self .has_next ():
394
395
is_done = True
@@ -420,7 +421,8 @@ def prepare_new_deferred_fragment_record(
420
421
) -> DeferredFragmentRecord :
421
422
deferred_fragment_record = DeferredFragmentRecord (label , path , parent_context )
422
423
423
- (parent_context or self ._initial_result ).children .add (deferred_fragment_record )
424
+ context = parent_context or self ._initial_result
425
+ context .children [deferred_fragment_record ] = None
424
426
return deferred_fragment_record
425
427
426
428
def prepare_new_stream_items_record (
@@ -434,7 +436,8 @@ def prepare_new_stream_items_record(
434
436
label , path , parent_context , async_iterator
435
437
)
436
438
437
- (parent_context or self ._initial_result ).children .add (stream_items_record )
439
+ context = parent_context or self ._initial_result
440
+ context .children [stream_items_record ] = None
438
441
return stream_items_record
439
442
440
443
def complete_deferred_fragment_record (
@@ -484,7 +487,7 @@ def filter(
484
487
485
488
self ._delete (child )
486
489
parent = child .parent_context or self ._initial_result
487
- parent .children . discard ( child )
490
+ del parent .children [ child ]
488
491
489
492
if isinstance (child , StreamItemsRecord ):
490
493
async_iterator = child .async_iterator
@@ -504,26 +507,26 @@ def _reset(self) -> None:
504
507
self ._resolve .clear ()
505
508
506
509
def _introduce (self , item : IncrementalDataRecord ) -> None :
507
- self ._pending . add ( item )
510
+ self ._pending [ item ] = None
508
511
509
512
def _release (self , item : IncrementalDataRecord ) -> None :
510
513
if item in self ._pending :
511
- self ._pending . remove ( item )
512
- self ._released . add ( item )
514
+ del self ._pending [ item ]
515
+ self ._released [ item ] = None
513
516
self ._trigger ()
514
517
515
518
def _push (self , item : IncrementalDataRecord ) -> None :
516
- self ._released . add ( item )
517
- self ._pending . add ( item )
519
+ self ._released [ item ] = None
520
+ self ._pending [ item ] = None
518
521
self ._trigger ()
519
522
520
523
def _delete (self , item : IncrementalDataRecord ) -> None :
521
- self ._released . discard ( item )
522
- self ._pending . discard ( item )
524
+ del self ._released [ item ]
525
+ del self ._pending [ item ]
523
526
self ._trigger ()
524
527
525
528
def _get_incremental_result (
526
- self , completed_records : set [IncrementalDataRecord ]
529
+ self , completed_records : Collection [IncrementalDataRecord ]
527
530
) -> SubsequentIncrementalExecutionResult | None :
528
531
incremental_results : list [IncrementalResult ] = []
529
532
encountered_completed_async_iterator = False
@@ -574,13 +577,13 @@ def _publish(self, incremental_data_record: IncrementalDataRecord) -> None:
574
577
575
578
def _get_descendants (
576
579
self ,
577
- children : set [IncrementalDataRecord ],
578
- descendants : set [IncrementalDataRecord ] | None = None ,
579
- ) -> set [IncrementalDataRecord ]:
580
+ children : dict [IncrementalDataRecord , None ],
581
+ descendants : dict [IncrementalDataRecord , None ] | None = None ,
582
+ ) -> dict [IncrementalDataRecord , None ]:
580
583
if descendants is None :
581
- descendants = set ()
584
+ descendants = {}
582
585
for child in children :
583
- descendants . add ( child )
586
+ descendants [ child ] = None
584
587
self ._get_descendants (child .children , descendants )
585
588
return descendants
586
589
@@ -604,7 +607,7 @@ class DeferredFragmentRecord:
604
607
path : list [str | int ]
605
608
data : dict [str , Any ] | None
606
609
parent_context : IncrementalDataRecord | None
607
- children : set [IncrementalDataRecord ]
610
+ children : dict [IncrementalDataRecord , None ]
608
611
is_completed : bool
609
612
610
613
def __init__ (
@@ -617,7 +620,7 @@ def __init__(
617
620
self .path = path .as_list () if path else []
618
621
self .parent_context = parent_context
619
622
self .errors = []
620
- self .children = set ()
623
+ self .children = {}
621
624
self .is_completed = False
622
625
self .data = None
623
626
@@ -641,7 +644,7 @@ class StreamItemsRecord:
641
644
path : list [str | int ]
642
645
items : list [str ] | None
643
646
parent_context : IncrementalDataRecord | None
644
- children : set [IncrementalDataRecord ]
647
+ children : dict [IncrementalDataRecord , None ]
645
648
async_iterator : AsyncIterator [Any ] | None
646
649
is_completed_async_iterator : bool
647
650
is_completed : bool
@@ -658,7 +661,7 @@ def __init__(
658
661
self .parent_context = parent_context
659
662
self .async_iterator = async_iterator
660
663
self .errors = []
661
- self .children = set ()
664
+ self .children = {}
662
665
self .is_completed = False
663
666
self .items = None
664
667
0 commit comments