Skip to content

Commit 5b086bc

Browse files
committed
added channel status to ping replies
1 parent 03b97e0 commit 5b086bc

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
[clojure.core.async.flow.impl :as impl]
5050
[clojure.core.async.flow.impl.graph :as g]))
5151

52+
(set! *warn-on-reflection* true)
53+
5254
(defn create-flow
5355
"Creates a flow from the supplied definition: a map containing the
5456
keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@
2121
(defonce io-exec clojure.lang.Agent/soloExecutor)
2222
(defonce compute-exec clojure.lang.Agent/pooledExecutor)
2323

24+
(defn chan->data
25+
[^clojure.core.async.impl.channels.ManyToManyChannel c]
26+
(let [b (.buf c)]
27+
{:buffer-type (if b
28+
(-> b class .getSimpleName symbol)
29+
:none)
30+
:buffer-count (count b)
31+
:put-count (count (.puts c))
32+
:take-count (count (.takes c))
33+
:closed? (clojure.core.async.impl.protocols/closed? c)}))
34+
2435
(defn futurize ^Future [f {:keys [exec]}]
2536
(fn [& args]
2637
(^[Callable] ExecutorService/.submit
@@ -231,9 +242,14 @@
231242
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
232243
run
233244
#(loop [status :paused, state (when init (init args)), count 0]
234-
(let [pong (fn [] (async/>!! (outs ::flow/report)
235-
#::flow{:report :ping, :pid pid, :status status
236-
:state state, :count count}))
245+
(let [pong (fn []
246+
(let [pins (dissoc ins ::flow/control)
247+
pouts (dissoc outs ::flow/error ::flow/report)]
248+
(async/>!! (outs ::flow/report)
249+
#::flow{:report :ping, :pid pid, :status status
250+
:state state, :count count
251+
:ins (zipmap (keys pins) (map chan->data (vals pins)))
252+
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
237253
handle-command (partial handle-command pid pong)
238254
[nstatus nstate count]
239255
(try
@@ -262,7 +278,7 @@
262278
(catch Throwable ex
263279
(async/>!! (outs ::flow/error)
264280
#::flow{:pid pid, :status status, :state state,
265-
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
281+
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
266282
[status state count])))))
267283
(catch Throwable ex
268284
(async/>!! (outs ::flow/error)

0 commit comments

Comments
 (0)