@@ -1959,42 +1959,48 @@ private enum ThrottleWhileState<Value> {
1959
1959
}
1960
1960
}
1961
1961
1962
- private protocol SignalAggregateStrategy {
1962
+ private protocol SignalAggregateStrategy : class {
1963
1963
/// Update the latest value of the signal at `position` to be `value`.
1964
1964
///
1965
1965
/// - parameters:
1966
1966
/// - value: The latest value emitted by the signal at `position`.
1967
1967
/// - position: The position of the signal.
1968
- ///
1969
- /// - returns: `true` if the aggregating signal should terminate as a result of the
1970
- /// update. `false` otherwise.
1971
- mutating func update( _ value: Any , at position: Int ) -> Bool
1968
+ func update( _ value: Any , at position: Int )
1972
1969
1973
1970
/// Record the completion of the signal at `position`.
1974
1971
///
1975
1972
/// - parameters:
1976
1973
/// - position: The position of the signal.
1977
- ///
1978
- /// - returns: `true` if the aggregating signal should terminate as a result of the
1979
- /// completion. `false` otherwise.
1980
- mutating func complete ( at position : Int ) -> Bool
1974
+ func complete ( at position : Int )
1975
+
1976
+ init ( count : Int , action : @escaping ( AggregateStrategyEvent ) -> Void )
1977
+ }
1981
1978
1982
- init ( count: Int , action: @escaping ( ContiguousArray < Any > ) -> Void )
1979
+ private enum AggregateStrategyEvent {
1980
+ case value( ContiguousArray < Any > )
1981
+ case completed
1983
1982
}
1984
1983
1985
1984
extension Signal {
1986
- private struct CombineLatestStrategy : SignalAggregateStrategy {
1985
+ // Threading of `CombineLatestStrategy` and `ZipStrategy`.
1986
+ //
1987
+ // The threading models of these strategies mirror that of `Signal.Core` to allow
1988
+ // recursive termial event from the upstreams that is triggered by the combined
1989
+ // values.
1990
+ //
1991
+ // The strategies do not unique the delivery of `completed`, since `Signal` already
1992
+ // guarantees that no event would ever be delivered after a terminal event.
1993
+
1994
+ private final class CombineLatestStrategy : SignalAggregateStrategy {
1987
1995
private enum Placeholder {
1988
1996
case none
1989
1997
}
1990
1998
1991
- private var values : ContiguousArray < Any >
1992
- private var completionCount : Int
1993
- private let action : ( ContiguousArray < Any > ) -> Void
1999
+ var values : ContiguousArray < Any >
1994
2000
1995
2001
private var _haveAllSentInitial : Bool
1996
2002
private var haveAllSentInitial : Bool {
1997
- mutating get {
2003
+ get {
1998
2004
if _haveAllSentInitial {
1999
2005
return true
2000
2006
}
@@ -2004,47 +2010,74 @@ extension Signal {
2004
2010
}
2005
2011
}
2006
2012
2007
- mutating func update( _ value: Any , at position: Int ) -> Bool {
2013
+ private let count : Int
2014
+ private let lock : Lock
2015
+
2016
+ private let completion : Atomic < Int >
2017
+ private let action : ( AggregateStrategyEvent ) -> Void
2018
+
2019
+ func update( _ value: Any , at position: Int ) {
2020
+ lock. lock ( )
2008
2021
values [ position] = value
2009
2022
2010
2023
if haveAllSentInitial {
2011
- action ( values)
2024
+ action ( . value ( values) )
2012
2025
}
2013
2026
2014
- return false
2027
+ lock. unlock ( )
2028
+
2029
+ if completion. value == self . count, lock. try ( ) {
2030
+ action ( . completed)
2031
+ lock. unlock ( )
2032
+ }
2015
2033
}
2016
2034
2017
- mutating func complete( at position: Int ) -> Bool {
2018
- completionCount += 1
2019
- return completionCount == values. count
2035
+ func complete( at position: Int ) {
2036
+ let count : Int = completion. modify { count in
2037
+ count += 1
2038
+ return count
2039
+ }
2040
+
2041
+ if count == self . count, lock. try ( ) {
2042
+ action ( . completed)
2043
+ lock. unlock ( )
2044
+ }
2020
2045
}
2021
2046
2022
- init ( count: Int , action: @escaping ( ContiguousArray < Any > ) -> Void ) {
2023
- values = ContiguousArray ( repeating: Placeholder . none, count: count)
2024
- completionCount = 0
2025
- _haveAllSentInitial = false
2047
+ init ( count: Int , action: @escaping ( AggregateStrategyEvent ) -> Void ) {
2048
+ self . count = count
2049
+ self . lock = Lock . make ( )
2050
+ self . values = ContiguousArray ( repeating: Placeholder . none, count: count)
2051
+ self . _haveAllSentInitial = false
2052
+ self . completion = Atomic ( 0 )
2026
2053
self . action = action
2027
2054
}
2028
2055
}
2029
2056
2030
- private struct ZipStrategy : SignalAggregateStrategy {
2057
+ private final class ZipStrategy : SignalAggregateStrategy {
2058
+ private let stateLock : Lock
2059
+ private let sendLock : Lock
2060
+
2031
2061
private var values : ContiguousArray < [ Any ] >
2062
+ private var canEmit : Bool {
2063
+ return values. reduce ( true ) { $0 && !$1. isEmpty }
2064
+ }
2065
+
2066
+ private var hasConcurrentlyCompleted : Bool
2032
2067
private var isCompleted : ContiguousArray < Bool >
2033
- private let action : ( ContiguousArray < Any > ) -> Void
2034
2068
2035
2069
private var hasCompletedAndEmptiedSignal : Bool {
2036
2070
return Swift . zip ( values, isCompleted) . contains ( where: { $0. 0 . isEmpty && $0. 1 } )
2037
2071
}
2038
2072
2039
- private var canEmit : Bool {
2040
- return values. reduce ( true ) { $0 && !$1. isEmpty }
2041
- }
2042
-
2043
2073
private var areAllCompleted : Bool {
2044
2074
return isCompleted. reduce ( true ) { $0 && $1 }
2045
2075
}
2046
2076
2047
- mutating func update( _ value: Any , at position: Int ) -> Bool {
2077
+ private let action : ( AggregateStrategyEvent ) -> Void
2078
+
2079
+ func update( _ value: Any , at position: Int ) {
2080
+ stateLock. lock ( )
2048
2081
values [ position] . append ( value)
2049
2082
2050
2083
if canEmit {
@@ -2055,33 +2088,61 @@ extension Signal {
2055
2088
buffer. append ( values [ index] . removeFirst ( ) )
2056
2089
}
2057
2090
2058
- action ( buffer)
2091
+ let shouldComplete = areAllCompleted || hasCompletedAndEmptiedSignal
2092
+ sendLock. lock ( )
2093
+ stateLock. unlock ( )
2094
+
2095
+ action ( . value( buffer) )
2059
2096
2060
- if hasCompletedAndEmptiedSignal {
2061
- return true
2097
+ if shouldComplete {
2098
+ action ( . completed)
2099
+ }
2100
+
2101
+ sendLock. unlock ( )
2102
+
2103
+ stateLock. lock ( )
2104
+
2105
+ if hasConcurrentlyCompleted {
2106
+ sendLock. lock ( )
2107
+ action ( . completed)
2108
+ sendLock. unlock ( )
2062
2109
}
2063
2110
}
2064
2111
2065
- return false
2112
+ stateLock . unlock ( )
2066
2113
}
2067
2114
2068
- mutating func complete( at position: Int ) -> Bool {
2115
+ func complete( at position: Int ) {
2116
+ stateLock. lock ( )
2069
2117
isCompleted [ position] = true
2070
2118
2071
- // `zip` completes when all signals has completed, or any of the signals
2072
- // has completed without any buffered value.
2073
- return hasCompletedAndEmptiedSignal || areAllCompleted
2119
+ if hasConcurrentlyCompleted || areAllCompleted || hasCompletedAndEmptiedSignal {
2120
+ if sendLock. try ( ) {
2121
+ stateLock. unlock ( )
2122
+
2123
+ action ( . completed)
2124
+ sendLock. unlock ( )
2125
+ return
2126
+ }
2127
+
2128
+ hasConcurrentlyCompleted = true
2129
+ }
2130
+
2131
+ stateLock. unlock ( )
2074
2132
}
2075
2133
2076
- init ( count: Int , action: @escaping ( ContiguousArray < Any > ) -> Void ) {
2077
- values = ContiguousArray ( repeating: [ ] , count: count)
2078
- isCompleted = ContiguousArray ( repeating: false , count: count)
2134
+ init ( count: Int , action: @escaping ( AggregateStrategyEvent ) -> Void ) {
2135
+ self . values = ContiguousArray ( repeating: [ ] , count: count)
2136
+ self . hasConcurrentlyCompleted = false
2137
+ self . isCompleted = ContiguousArray ( repeating: false , count: count)
2079
2138
self . action = action
2139
+ self . sendLock = Lock . make ( )
2140
+ self . stateLock = Lock . make ( )
2080
2141
}
2081
2142
}
2082
2143
2083
2144
private final class AggregateBuilder < Strategy: SignalAggregateStrategy > {
2084
- fileprivate var startHandlers : [ ( _ index: Int , _ strategy: Atomic < Strategy > , _ action: @escaping ( Signal < Never , Error > . Event ) -> Void ) -> Disposable ? ]
2145
+ fileprivate var startHandlers : [ ( _ index: Int , _ strategy: Strategy , _ action: @escaping ( Signal < Never , Error > . Event ) -> Void ) -> Disposable ? ]
2085
2146
2086
2147
init ( ) {
2087
2148
self . startHandlers = [ ]
@@ -2093,22 +2154,10 @@ extension Signal {
2093
2154
return signal. observe { event in
2094
2155
switch event {
2095
2156
case let . value( value) :
2096
- let shouldComplete = strategy. modify {
2097
- return $0. update ( value, at: index)
2098
- }
2099
-
2100
- if shouldComplete {
2101
- action ( . completed)
2102
- }
2157
+ strategy. update ( value, at: index)
2103
2158
2104
2159
case . completed:
2105
- let shouldComplete = strategy. modify {
2106
- return $0. complete ( at: index)
2107
- }
2108
-
2109
- if shouldComplete {
2110
- action ( . completed)
2111
- }
2160
+ strategy. complete ( at: index)
2112
2161
2113
2162
case . interrupted:
2114
2163
action ( . interrupted)
@@ -2126,17 +2175,20 @@ extension Signal {
2126
2175
private convenience init < Strategy: SignalAggregateStrategy > ( _ builder: AggregateBuilder < Strategy > , _ transform: @escaping ( ContiguousArray < Any > ) -> Value ) {
2127
2176
self . init { observer in
2128
2177
let disposables = CompositeDisposable ( )
2129
- let strategy = Atomic ( Strategy ( count: builder. startHandlers. count) { observer. send ( value: transform ( $0) ) } )
2178
+ let strategy = Strategy ( count: builder. startHandlers. count) { event in
2179
+ switch event {
2180
+ case let . value( value) :
2181
+ observer. send ( value: transform ( value) )
2182
+ case . completed:
2183
+ observer. sendCompleted ( )
2184
+ }
2185
+ }
2130
2186
2131
2187
for (index, action) in builder. startHandlers. enumerated ( ) where !disposables. isDisposed {
2132
2188
disposables += action ( index, strategy) { observer. action ( $0. map { _ in fatalError ( ) } ) }
2133
2189
}
2134
2190
2135
- return AnyDisposable {
2136
- strategy. modify { _ in
2137
- disposables. dispose ( )
2138
- }
2139
- }
2191
+ return disposables
2140
2192
}
2141
2193
}
2142
2194
0 commit comments