Skip to content

Commit 9e76160

Browse files
committed
support input read subsets with ::flow/input-filter pred in state
1 parent 5afc8f8 commit 9e76160

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,18 @@
174174
Optionally, a returned init state may contain the
175175
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
176176
of cid -> a core.async.channel. The cids must not conflict with the
177-
in/out ids. These channels will become part of the read/write set of
178-
the process, but are not otherwise visible/resolvable within the
179-
flow. Ports are a way to have data enter or exit the flow from
180-
outside. Use :transition to coordinate the lifecycle of these
177+
in/out ids. These channels will become part of the input/output set
178+
of the process, but are not otherwise visible/resolvable within the
179+
flow. Ports are a way to allow data to enter or exit the flow from
180+
outside of it. Use :transition to coordinate the lifecycle of these
181181
external channels.
182182
183+
Optionally, _any_ returned state, whether from :init, :transition
184+
or :transform, may contain the key ::flow/input-filter, a predicate
185+
of cid. Only inputs (including in-ports) satisfying the predicate
186+
will be part of the next channel read set. In the absence of this
187+
predicate all inputs are read.
188+
183189
:transition - optional, (state transition) -> state'
184190
185191
transition will be called when the process makes a state transition,

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,9 @@
241241
outs (into (or outs {}) (::flow/out-ports state))
242242
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
243243
control (::flow/control ins)
244-
;;TODO rotate/randomize after control per normal alts?
245-
read-chans (vec (-> ins (dissoc ::flow/control) vals))
244+
read-ins (dissoc ins ::flow/control)
246245
run
247-
#(loop [status :paused, state state, count 0, read-chans read-chans]
246+
#(loop [status :paused, state state, count 0, read-ins read-ins]
248247
(let [pong (fn []
249248
(let [pins (dissoc ins ::flow/control)
250249
pouts (dissoc outs ::flow/error ::flow/report)]
@@ -254,36 +253,43 @@
254253
:ins (zipmap (keys pins) (map chan->data (vals pins)))
255254
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
256255
handle-command (partial handle-command pid pong)
257-
[nstatus nstate count read-chans]
256+
[nstatus nstate count read-ins]
258257
(try
259258
(if (= status :paused)
260259
(let [nstatus (handle-command status (async/<!! control))
261260
nstate (handle-transition transition status nstatus state)]
262-
[nstatus nstate count read-chans])
261+
[nstatus nstate count read-ins])
263262
;;:running
264-
(let [[msg c] (async/alts!! (into [control] read-chans) :priority true)
263+
(let [;;TODO rotate/randomize after control per normal alts?
264+
read-chans (let [ipred (or (::flow/input-filter state) identity)]
265+
(reduce-kv (fn [ret cid chan]
266+
(if (ipred cid)
267+
(conj ret chan)
268+
ret))
269+
[control] read-ins))
270+
[msg c] (async/alts!! read-chans :priority true)
265271
cid (io-id c)]
266272
(if (= c control)
267273
(let [nstatus (handle-command status msg)
268274
nstate (handle-transition transition status nstatus state)]
269-
[nstatus nstate count read-chans])
275+
[nstatus nstate count read-ins])
270276
(try
271277
(let [[nstate outputs] (transform state cid msg)
272278
[nstatus nstate]
273279
(send-outputs status nstate outputs outs
274280
resolver control handle-command transition)]
275281
[nstatus nstate (inc count) (if (some? msg)
276-
read-chans
277-
(vec (remove #{c} read-chans)))])
282+
read-ins
283+
(dissoc read-ins cid))])
278284
(catch Throwable ex
279285
(async/>!! (outs ::flow/error)
280286
#::flow{:pid pid, :status status, :state state,
281287
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
282-
[status state count read-chans])))))
288+
[status state count read-ins])))))
283289
(catch Throwable ex
284290
(async/>!! (outs ::flow/error)
285291
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
286-
[status state count read-chans]))]
292+
[status state count read-ins]))]
287293
(when-not (= nstatus :exit) ;;fall out
288-
(recur nstatus nstate (long count) read-chans))))]
294+
(recur nstatus nstate (long count) read-ins))))]
289295
((futurize run {:exec exs})))))))

0 commit comments

Comments
 (0)