Skip to content

Commit 0529d79

Browse files
committed
first cut at datafy for flow
1 parent a3220fa commit 0529d79

File tree

1 file changed

+102
-77
lines changed
  • src/main/clojure/clojure/core/async/flow

1 file changed

+102
-77
lines changed

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 102 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
(:require [clojure.core.async :as async]
1111
[clojure.core.async.flow :as-alias flow]
1212
[clojure.core.async.flow.spi :as spi]
13-
[clojure.core.async.flow.impl.graph :as graph])
13+
[clojure.core.async.flow.impl.graph :as graph]
14+
[clojure.walk :as walk]
15+
[clojure.datafy :as datafy])
1416
(:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
1517
[java.util.concurrent.locks ReentrantLock]))
1618

@@ -21,16 +23,34 @@
2123
(defonce io-exec clojure.lang.Agent/soloExecutor)
2224
(defonce compute-exec clojure.lang.Agent/pooledExecutor)
2325

26+
(defn oid [x]
27+
(symbol (str (-> x class .getSimpleName) "@" (-> x System/identityHashCode Integer/toHexString))))
28+
2429
(defn chan->data
2530
[^clojure.core.async.impl.channels.ManyToManyChannel c]
26-
(let [b (.buf c)]
27-
{:buffer-type (if b
28-
(-> b class .getSimpleName symbol)
29-
:none)
30-
:buffer-count (count b)
31-
:put-count (count (.puts c))
32-
:take-count (count (.takes c))
33-
:closed? (clojure.core.async.impl.protocols/closed? c)}))
31+
(let [b (.buf c)]
32+
{:buffer (if (some? b) (oid b) :none)
33+
:buffer-count (count b)
34+
:put-count (count (.puts c))
35+
:take-count (count (.takes c))
36+
:closed? (clojure.core.async.impl.protocols/closed? c)}))
37+
38+
(defn exec->data [exec]
39+
(let [ess (as-> (str exec) ^String es
40+
(.substring es (inc (.lastIndexOf es "[")) (.lastIndexOf es "]"))
41+
(.split es ","))]
42+
(merge {:id (oid exec)
43+
:status (first ess)} ;;TODO less fragile
44+
(zipmap [:pool-size :active-threads :queued-tasks :completed-tasks]
45+
(map #(-> ^String % (.substring (inc (.lastIndexOf ^String % " "))) Long.) (rest ess))))))
46+
47+
(defn datafy [x]
48+
(condp instance? x
49+
clojure.lang.Fn (-> x str symbol)
50+
ExecutorService (exec->data x)
51+
clojure.lang.Var (symbol x)
52+
clojure.core.async.impl.channels.ManyToManyChannel (chan->data x)
53+
(datafy/datafy x)))
3454

3555
(defn futurize ^Future [f {:keys [exec]}]
3656
(fn [& args]
@@ -92,6 +112,11 @@
92112
ret)))]
93113
(if (= to ::flow/all) ret (-> ret vals first))))]
94114
(reify
115+
clojure.core.protocols/Datafiable
116+
(datafy [_]
117+
(walk/postwalk datafy {:procs procs, :conns conns, :execs execs
118+
:chans (select-keys @chans [:ins :outs :error :report])}))
119+
95120
clojure.core.async.flow.impl.graph.Graph
96121
(start [_]
97122
(.lock lock)
@@ -237,73 +262,73 @@
237262
workload (or workload (:workload desc) :mixed)]
238263
(assert (or (not params) init) "must have :init if :params")
239264
(reify
240-
clojure.core.protocols/Datafiable
241-
(datafy [_]
242-
(let [{:keys [params ins outs]} desc]
243-
{:impl impl :params (-> params keys vec) :ins (-> ins keys vec) :outs (-> outs keys vec)}))
244-
spi/ProcLauncher
245-
(describe [_] desc)
246-
(start [_ {:keys [pid args ins outs resolver]}]
247-
(assert (or (not params) args) "must provide :args if :params")
248-
(let [comp? (= workload :compute)
249-
transform (cond-> transform (= workload :compute)
250-
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
251-
compute-timeout-ms TimeUnit/MILLISECONDS))
252-
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
253-
state (when init (init args))
254-
ins (into (or ins {}) (::flow/in-ports state))
255-
outs (into (or outs {}) (::flow/out-ports state))
256-
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
257-
control (::flow/control ins)
258-
read-ins (dissoc ins ::flow/control)
259-
run
260-
#(loop [status :paused, state state, count 0, read-ins read-ins]
261-
(let [pong (fn [c]
262-
(let [pins (dissoc ins ::flow/control)
263-
pouts (dissoc outs ::flow/error ::flow/report)]
264-
(async/>!! c ;;(outs ::flow/report)
265-
#::flow{:report :ping, :pid pid, :status status
266-
:state state, :count count
267-
:ins (zipmap (keys pins) (map chan->data (vals pins)))
268-
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
269-
handle-command (partial handle-command pid pong)
270-
[nstatus nstate count read-ins]
271-
(try
272-
(if (= status :paused)
273-
(let [nstatus (handle-command status (async/<!! control))
265+
clojure.core.protocols/Datafiable
266+
(datafy [_]
267+
(let [{:keys [params ins outs]} desc]
268+
(walk/postwalk datafy {:impl impl :params (-> params keys vec)
269+
:ins (-> ins keys vec) :outs (-> outs keys vec)})))
270+
spi/ProcLauncher
271+
(describe [_] desc)
272+
(start [_ {:keys [pid args ins outs resolver]}]
273+
(assert (or (not params) args) "must provide :args if :params")
274+
(let [comp? (= workload :compute)
275+
transform (cond-> transform (= workload :compute)
276+
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
277+
compute-timeout-ms TimeUnit/MILLISECONDS))
278+
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
279+
state (when init (init args))
280+
ins (into (or ins {}) (::flow/in-ports state))
281+
outs (into (or outs {}) (::flow/out-ports state))
282+
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
283+
control (::flow/control ins)
284+
read-ins (dissoc ins ::flow/control)
285+
run
286+
#(loop [status :paused, state state, count 0, read-ins read-ins]
287+
(let [pong (fn [c]
288+
(let [pins (dissoc ins ::flow/control)
289+
pouts (dissoc outs ::flow/error ::flow/report)]
290+
(async/>!! c (walk/postwalk datafy
291+
#::flow{:pid pid, :status status
292+
:state state, :count count
293+
:ins pins :outs pouts}))))
294+
handle-command (partial handle-command pid pong)
295+
[nstatus nstate count read-ins]
296+
(try
297+
(if (= status :paused)
298+
(let [nstatus (handle-command status (async/<!! control))
299+
nstate (handle-transition transition status nstatus state)]
300+
[nstatus nstate count read-ins])
301+
;;:running
302+
(let [ ;;TODO rotate/randomize after control per normal alts?
303+
read-chans (let [ipred (or (::flow/input-filter state) identity)]
304+
(reduce-kv (fn [ret cid chan]
305+
(if (ipred cid)
306+
(conj ret chan)
307+
ret))
308+
[control] read-ins))
309+
[msg c] (async/alts!! read-chans :priority true)
310+
cid (io-id c)]
311+
(if (= c control)
312+
(let [nstatus (handle-command status msg)
274313
nstate (handle-transition transition status nstatus state)]
275314
[nstatus nstate count read-ins])
276-
;;:running
277-
(let [;;TODO rotate/randomize after control per normal alts?
278-
read-chans (let [ipred (or (::flow/input-filter state) identity)]
279-
(reduce-kv (fn [ret cid chan]
280-
(if (ipred cid)
281-
(conj ret chan)
282-
ret))
283-
[control] read-ins))
284-
[msg c] (async/alts!! read-chans :priority true)
285-
cid (io-id c)]
286-
(if (= c control)
287-
(let [nstatus (handle-command status msg)
288-
nstate (handle-transition transition status nstatus state)]
289-
[nstatus nstate count read-ins])
290-
(try
291-
(let [[nstate outputs] (transform state cid msg)
292-
[nstatus nstate]
293-
(send-outputs status nstate outputs outs
294-
resolver control handle-command transition)]
295-
[nstatus nstate (inc count) (if (some? msg)
296-
read-ins
297-
(dissoc read-ins cid))])
298-
(catch Throwable ex
299-
(async/>!! (outs ::flow/error)
300-
#::flow{:pid pid, :status status, :state state,
301-
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
302-
[status state count read-ins])))))
303-
(catch Throwable ex
304-
(async/>!! (outs ::flow/error)
305-
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
306-
[status state count read-ins]))]
307-
(when-not (= nstatus :exit) ;;fall out
308-
(recur nstatus nstate (long count) read-ins))))]
309-
((futurize run {:exec exs})))))))
315+
(try
316+
(let [[nstate outputs] (transform state cid msg)
317+
[nstatus nstate]
318+
(send-outputs status nstate outputs outs
319+
resolver control handle-command transition)]
320+
[nstatus nstate (inc count) (if (some? msg)
321+
read-ins
322+
(dissoc read-ins cid))])
323+
(catch Throwable ex
324+
(async/>!! (outs ::flow/error)
325+
#::flow{:pid pid, :status status, :state state,
326+
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
327+
[status state count read-ins])))))
328+
(catch Throwable ex
329+
(async/>!! (outs ::flow/error)
330+
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
331+
[status state count read-ins]))]
332+
(when-not (= nstatus :exit) ;;fall out
333+
(recur nstatus nstate (long count) read-ins))))]
334+
((futurize run {:exec exs})))))))

0 commit comments

Comments
 (0)