File tree Expand file tree Collapse file tree 1 file changed +9
-6
lines changed Expand file tree Collapse file tree 1 file changed +9
-6
lines changed Original file line number Diff line number Diff line change 1
- import { Notify } from "./notify.ts" ;
2
-
3
1
/**
4
2
* A queue implementation that allows for adding and removing elements, with optional waiting when
5
3
* popping elements from an empty queue.
@@ -18,7 +16,7 @@ import { Notify } from "./notify.ts";
18
16
* ```
19
17
*/
20
18
export class Queue < T extends NonNullable < unknown > | null > {
21
- #notify = new Notify ( ) ;
19
+ #resolves: ( ( ) => void ) [ ] = [ ] ;
22
20
#items: T [ ] = [ ] ;
23
21
24
22
/**
@@ -32,15 +30,15 @@ export class Queue<T extends NonNullable<unknown> | null> {
32
30
* Returns true if the queue is currently locked.
33
31
*/
34
32
get locked ( ) : boolean {
35
- return this . #notify . waiterCount > 0 ;
33
+ return this . #resolves . length > 0 ;
36
34
}
37
35
38
36
/**
39
37
* Adds an item to the end of the queue and notifies any waiting consumers.
40
38
*/
41
39
push ( value : T ) : void {
42
40
this . #items. push ( value ) ;
43
- this . #notify . notify ( ) ;
41
+ this . #resolves . shift ( ) ?. ( ) ;
44
42
}
45
43
46
44
/**
@@ -55,7 +53,12 @@ export class Queue<T extends NonNullable<unknown> | null> {
55
53
if ( value !== undefined ) {
56
54
return value ;
57
55
}
58
- await this . #notify. notified ( { signal } ) ;
56
+ const { promise, resolve, reject } = Promise . withResolvers < void > ( ) ;
57
+ signal ?. addEventListener ( "abort" , ( ) => reject ( signal . reason ) , {
58
+ once : true ,
59
+ } ) ;
60
+ this . #resolves. push ( resolve ) ;
61
+ await promise ;
59
62
}
60
63
}
61
64
}
You can’t perform that action at this time.
0 commit comments