Skip to content

Commit 2029bad

Browse files
committed
Merge branch 'master' into dev-io-thread
2 parents d06f2c5 + 56ad8dd commit 2029bad

File tree

3 files changed

+31
-17
lines changed

3 files changed

+31
-17
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@
109109
[g] (g/resume g))
110110

111111
(defn ping
112-
"pings all processes, which will put their status and state on the
113-
report channel returned from start"
114-
[g] (g/ping g))
112+
"pings all processes, returning a map of pid -> proc status and
113+
state, for those procs that reply within timeout-ms (default 1000)"
114+
[g & {:keys [timeout-ms] :or {timeout-ms 1000}}]
115+
(g/ping g timeout-ms))
115116

116117
(defn pause-proc
117118
"pauses a process"
@@ -122,9 +123,9 @@
122123
[g pid] (g/resume-proc g pid))
123124

124125
(defn ping-proc
125-
"pings the process, which will put its status and state on the report
126-
channel returned from start"
127-
[g pid] (g/ping-proc g pid))
126+
"like ping, but just pings the specified process"
127+
[g pid & {:keys [timeout-ms] :or {timeout-ms 1000}}]
128+
(g/ping-proc g pid timeout-ms))
128129

129130
(defn command-proc
130131
"synchronously sends a process-specific command with the given id and

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,22 @@
7575
(throw (ex-info "invalid connection" {:conn conn}))))
7676
{} conns)
7777
running-chans #(or (deref chans) (throw (Exception. "flow not running")))
78-
send-command (fn [command to]
79-
(let [{:keys [control]} (running-chans)]
80-
(async/>!! control #::flow{:command command :to to})))]
78+
send-command (fn sc
79+
([cmap]
80+
(let [{:keys [control]} (running-chans)]
81+
(async/>!! control cmap)))
82+
([command to] (sc #::flow{:command command :to to})))
83+
handle-ping (fn [to timeout-ms]
84+
(let [reply-chan (async/chan (count procs))
85+
ret-chan (async/take (if (= to ::flow/all) (count procs) 1) reply-chan)
86+
timeout (async/timeout timeout-ms)
87+
_ (send-command #::flow{:command ::flow/ping, :to to, :reply-chan reply-chan})
88+
ret (loop [ret nil]
89+
(let [[{::flow/keys [pid] :as m} c] (async/alts!! [ret-chan timeout])]
90+
(if (some? m)
91+
(recur (assoc ret pid m))
92+
ret)))]
93+
(if (= to ::flow/all) ret (-> ret vals first))))]
8194
(reify
8295
clojure.core.async.flow.impl.graph.Graph
8396
(start [_]
@@ -156,11 +169,11 @@
156169
(finally (.unlock lock))))
157170
(pause [_] (send-command ::flow/pause ::flow/all))
158171
(resume [_] (send-command ::flow/resume ::flow/all))
159-
(ping [_] (send-command ::flow/ping ::flow/all))
172+
(ping [_ timeout-ms] (handle-ping ::flow/all timeout-ms))
160173

161174
(pause-proc [_ pid] (send-command ::flow/pause pid))
162175
(resume-proc [_ pid] (send-command ::flow/resume pid))
163-
(ping-proc [_ pid] (send-command ::flow/ping pid))
176+
(ping-proc [_ pid timeout-ms] (handle-ping pid timeout-ms))
164177
(command-proc [_ pid command kvs]
165178
(assert (and (namespace command) (not= (namespace ::flow/command) (namespace command)))
166179
"extension commands must be in your own namespace")
@@ -177,10 +190,10 @@
177190
(defn handle-command
178191
[pid pong status cmd]
179192
(let [transition #::flow{:stop :exit, :resume :running, :pause :paused}
180-
{::flow/keys [to command]} cmd]
193+
{::flow/keys [to command reply-chan]} cmd]
181194
(if (#{::flow/all pid} to)
182195
(do
183-
(when (= command ::flow/ping) (pong))
196+
(when (= command ::flow/ping) (pong reply-chan))
184197
(or (transition command) status))
185198
status)))
186199

@@ -245,10 +258,10 @@
245258
read-ins (dissoc ins ::flow/control)
246259
run
247260
#(loop [status :paused, state state, count 0, read-ins read-ins]
248-
(let [pong (fn []
261+
(let [pong (fn [c]
249262
(let [pins (dissoc ins ::flow/control)
250263
pouts (dissoc outs ::flow/error ::flow/report)]
251-
(async/>!! (outs ::flow/report)
264+
(async/>!! c ;;(outs ::flow/report)
252265
#::flow{:report :ping, :pid pid, :status status
253266
:state state, :count count
254267
:ins (zipmap (keys pins) (map chan->data (vals pins)))

src/main/clojure/clojure/core/async/flow/impl/graph.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
(stop [g] "shuts down the graph, stopping all procs, can be started again")
1717
(pause [g] "pauses a running graph")
1818
(resume [g] "resumes a paused graph")
19-
(ping [g] "pings all processes, which will put their status and state on the report channel")
19+
(ping [g timeout-ms] "pings all processes, which will put their status and state on the report channel")
2020

2121
(pause-proc [g pid] "pauses a process")
2222
(resume-proc [g pid] "resumes a process")
23-
(ping-proc [g pid] "pings the process, which will put its status and state on the report channel")
23+
(ping-proc [g pid timeout-ms] "pings the process, which will put its status and state on the report channel")
2424
(command-proc [g pid cmd-id more-kvs] "synchronously sends a process-specific command with the given id
2525
and additional kvs to the process")
2626

0 commit comments

Comments
 (0)