Skip to content

Commit 103393b

Browse files
committed
:exec option -> :workload, proc-fn :describe can return :workflow, process opt overridesflow definition -> config, no use of Clojure 1.12 features so 1.11 compatible
1 parent 17b0538 commit 103393b

File tree

3 files changed

+128
-116
lines changed

3 files changed

+128
-116
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
(ns ^{:author "Rich Hickey"}
1010
clojure.core.async.flow
11-
"A library for building concurrent, event driven data processing
11+
"
12+
Note - Alpha, work-in-progress, names and other details are in flux
13+
14+
A library for building concurrent, event driven data processing
1215
flows out of communication-free functions, while centralizing
1316
control, reporting, execution and error handling. Built on core.async.
1417
@@ -18,21 +21,21 @@
1821
a set of channels for centralized control, reporting, error-handling,
1922
and execution of the processes
2023
21-
A flow is constructed from flow definition data which defines a
24+
A flow is constructed from flow configuration data which defines a
2225
directed graph of processes and the connections between
2326
them. Processes describe their I/O requirements and the
2427
flow (library) itself creates channels and passes them to the
2528
processes that requested them. See 'create-flow' for the
26-
details. The flow definition provides a centralized place for policy
27-
decisions regarding configuration, threading, buffering etc.
29+
details. The flow configuration provides a centralized place for
30+
policy decisions regarding process settings, threading, buffering etc.
2831
29-
It is expected that applications will rarely define processes but
30-
instead use the API functions here, 'process' and 'step-process',
31-
that implement the process protocol in terms of calls to ordinary
32-
functions that include no communication or core.async code. In this
33-
way the library helps you achieve a strict separation of your
34-
application logic from its execution, communication, lifecycle and
35-
monitoring.
32+
It is expected that applications will rarely define instances of the
33+
process protocol but instead use the API functions here, 'process'
34+
and 'step-process', that implement the process protocol in terms of
35+
calls to ordinary functions that might include no communication or
36+
core.async code. In this way the library helps you achieve a strict
37+
separation of your application logic from its execution,
38+
communication, lifecycle, error handling and monitoring.
3639
3740
Note that at several points the library calls upon the user to
3841
supply ids for processes, inputs, outputs etc. These should be
@@ -52,7 +55,7 @@
5255
(set! *warn-on-reflection* true)
5356

5457
(defn create-flow
55-
"Creates a flow from the supplied definition: a map containing the
58+
"Creates a flow from the supplied configuration: a map containing the
5659
keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec
5760
5861
:procs - a map of pid->proc-def
@@ -66,26 +69,26 @@
6669
6770
:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
6871
69-
Inputs and outputs support mutliple connections. When an output is
72+
Inputs and outputs support multiple connections. When an output is
7073
connected multiple times every connection will get every message,
7174
as per a core.async/mult.
7275
7376
:mixed-exec/:io-exec/:compute-exec -> ExecutorService
7477
These can be used to specify the ExecutorService to use for the
75-
corresonding context, in lieu of the lib defaults
78+
corresonding workflow, in lieu of the lib defaults.
7679
7780
N.B. The flow is not started. See 'start'"
78-
[def] (impl/create-flow def))
81+
[config] (impl/create-flow config))
7982

8083
(defn start
8184
"starts the entire flow from init values. The processes start paused.
8285
Call 'resume' or 'resume-proc' to start flow. returns a map with keys:
8386
84-
::flow/report-chan - a core.async chan for reading.'ping' reponses
87+
:report-chan - a core.async chan for reading.'ping' reponses
8588
will show up here, as will any explicit ::flow/report outputs
8689
from :transform/:introduce
8790
88-
::flow/error-chan - a core.async chan for reading. Any (and only)
91+
:error-chan - a core.async chan for reading. Any (and only)
8992
exceptions thrown anywhere on any thread inside a flow will appear
9093
in maps sent here. There will at least be a ::flow/ex entry with the
9194
exception, and may be additional keys for pid, state, status etc
@@ -144,16 +147,23 @@
144147
145148
:describe - required, () -> desc
146149
where desc is a map with keys :params :ins and :outs, each of which
147-
in turn is a map of keyword to doc string
150+
in turn is a map of keyword to doc string, and :workload with
151+
possible values of :mixed :io :compute. All entries in the describe
152+
return map are optional.
148153
149154
:params describes the initial arguments to setup the state for the function.
150155
:ins enumerates the input[s], for which the flow will create channels
151156
:outs enumerates the output[s], for which the flow may create channels.
157+
:workload - describes the nature of the workload, one of :mixed :io or :compute
158+
an :io workload should not do extended computation
159+
a :compute workload should never block
152160
153-
No key may be present in both :ins and :outs The ins/outs/params of f
154-
will be the ins/outs/params of the process. describe may be called
155-
by users to understand how to use the proc. It will also be called
156-
by the impl in order to discover what channels are needed.
161+
No key may be present in both :ins and :outs, allowing for a uniform
162+
channel coordinate system of [:process-id :channel-id]. The
163+
ins/outs/params returned will be the ins/outs/params of the
164+
process. describe may be called by users to understand how to use
165+
the proc. It will also be called by the impl in order to discover
166+
what channels are needed.
157167
158168
:init - optional, (arg-map) -> initial-state
159169
@@ -199,15 +209,19 @@
199209
for introduce to return with no output. Do not spin poll in the introduce
200210
fn.
201211
202-
proc accepts an option map with keys:
203-
:exec - one of :mixed, :io or :compute, default :mixed
204-
:compute-timeout-ms - if :exec is :compute, this timeout (default 5000 msec)
212+
process accepts an option map with keys:
213+
:workflow - one of :mixed, :io or :compute
214+
:compute-timeout-ms - if :workflow is :compute, this timeout (default 5000 msec)
205215
will be used when getting the return from the future - see below
206216
207-
The :compute context is not allowed for proc impls that
217+
A :workflow supplied as an option to process will override
218+
any :workflow returned by the :describe fn of the process. If neither
219+
are provded the default is :mixed.
220+
221+
The :compute workload is not allowed for proc impls that
208222
provide :introduce (as I/O is presumed).
209223
210-
In the :exec context of :mixed or :io, this dictates the type of
224+
In the :workflow context of :mixed or :io, this dictates the type of
211225
thread in which the process loop will run, _including its calls to
212226
transform/introduce_.
213227
@@ -223,8 +237,8 @@
223237
224238
When :compute is specified transform must not block!"
225239
([process-impl-map] (process process-impl-map nil))
226-
([process-impl-map {:keys [exec timeout-ms]
227-
:or {exec :mixed, timeout-ms 5000} :as opts}]
240+
([process-impl-map {:keys [workflow timeout-ms]
241+
:or {timeout-ms 5000} :as opts}]
228242
(impl/proc process-impl-map opts)))
229243

230244
(defn step-process
@@ -248,13 +262,13 @@
248262
(process {:describe f, :init f, :transform f} opts)))
249263

250264
(defn futurize
251-
"Takes a fn f and returns a fn that takes the same arguments as f and
252-
immediately returns a future, having starting a thread of the
253-
indicated type, or via the supplied executor, that invokes f with
254-
those args and completes that future with its return.
265+
"Takes a fn f and returns a fn that takes the same arguments as f
266+
and immediately returns a future, having started a thread for the
267+
indicated workload, or via the supplied executor, that invokes f
268+
with those args and completes that future with its return.
255269
256270
futurize accepts kwarg options:
257-
:exec - one of :mixed, :io, :compute
271+
:exec - one of the workloads :mixed, :io, :compute
258272
or a j.u.c.ExecutorService object,
259273
default :mixed"
260274
[f & {:keys [exec]

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 79 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@
3434

3535
(defn futurize ^Future [f {:keys [exec]}]
3636
(fn [& args]
37-
(^[Callable] ExecutorService/.submit
38-
(case exec
39-
:compute compute-exec
40-
:io io-exec
41-
:mixed mixed-exec
42-
exec)
43-
#(apply f args))))
37+
(let [^ExecutorService e (case exec
38+
:compute compute-exec
39+
:io io-exec
40+
:mixed mixed-exec
41+
exec)]
42+
(.submit e ^Callable #(apply f args)))))
4443

4544
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
4645
(let [{:keys [ins outs]} (spi/describe proc)
@@ -57,8 +56,7 @@
5756
(defn create-flow
5857
"see lib ns for docs"
5958
[{:keys [procs conns mixed-exec io-exec compute-exec]
60-
:or {mixed-exec mixed-exec, io-exec io-exec, compute-exec compute-exec}
61-
:as desc}]
59+
:or {mixed-exec mixed-exec, io-exec io-exec, compute-exec compute-exec}}]
6260
(let [lock (ReentrantLock.)
6361
chans (atom nil)
6462
execs {:mixed mixed-exec :io io-exec :compute compute-exec}
@@ -216,75 +214,75 @@
216214

217215
(defn proc
218216
"see lib ns for docs"
219-
[{:keys [describe init transition transform introduce] :as impl} {:keys [exec compute-timeout-ms]}]
220-
;;validate the preconditions
221-
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
222-
(assert (not (and introduce (= exec :compute))) "can't specify :introduce and :compute")
223-
(reify
224-
clojure.core.protocols/Datafiable
225-
(datafy [_]
226-
(let [{:keys [params ins outs]} (describe)]
227-
{:impl impl :params (-> params keys vec) :ins (-> ins keys vec) :outs (-> outs keys vec)}))
228-
spi/ProcLauncher
229-
(describe [_]
230-
(let [{:keys [params ins] :as desc} (describe)]
231-
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
232-
(assert (or (not params) init) "must have :init if :params")
233-
desc))
234-
(start [_ {:keys [pid args ins outs resolver]}]
235-
(let [comp? (= exec :compute)
236-
transform (cond-> transform (= exec :compute)
237-
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
238-
compute-timeout-ms TimeUnit/MILLISECONDS))
239-
exs (spi/get-exec resolver (if (= exec :mixed) :mixed :io))
240-
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
241-
control (::flow/control ins)
242-
;;TODO rotate/randomize after control per normal alts?
243-
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
244-
run
245-
#(loop [status :paused, state (when init (init args)), count 0]
246-
(let [pong (fn []
247-
(let [pins (dissoc ins ::flow/control)
248-
pouts (dissoc outs ::flow/error ::flow/report)]
249-
(async/>!! (outs ::flow/report)
250-
#::flow{:report :ping, :pid pid, :status status
251-
:state state, :count count
252-
:ins (zipmap (keys pins) (map chan->data (vals pins)))
253-
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
254-
handle-command (partial handle-command pid pong)
255-
[nstatus nstate count]
256-
(try
257-
(if (= status :paused)
258-
(let [nstatus (handle-command status (async/<!! control))
259-
nstate (handle-transition transition status nstatus state)]
260-
[nstatus nstate count])
261-
;;:running
262-
(let [[msg c] (if transform
263-
(async/alts!! read-chans :priority true)
264-
;;introduce
265-
(when-let [msg (async/poll! control)]
266-
[msg control]))
267-
cid (io-id c)]
268-
(if (= c control)
269-
(let [nstatus (handle-command status msg)
270-
nstate (handle-transition transition status nstatus state)]
271-
[nstatus nstate count])
272-
(try
273-
(let [[nstate outputs] (if transform
274-
(transform state cid msg)
275-
(introduce state))
276-
[nstatus nstate]
277-
(send-outputs status nstate outputs outs resolver control handle-command transition)]
278-
[nstatus nstate (inc count)])
279-
(catch Throwable ex
280-
(async/>!! (outs ::flow/error)
281-
#::flow{:pid pid, :status status, :state state,
282-
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
283-
[status state count])))))
284-
(catch Throwable ex
285-
(async/>!! (outs ::flow/error)
286-
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
287-
[status state count]))]
288-
(when-not (= nstatus :exit) ;;fall out
289-
(recur nstatus nstate (long count)))))]
290-
((futurize run {:exec exs}))))))
217+
[{:keys [describe init transition transform introduce] :as impl} {:keys [workflow compute-timeout-ms]}]
218+
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
219+
(let [{:keys [params ins] :as desc} (describe)
220+
workflow (or workflow (:workflow desc) :mixed)]
221+
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
222+
(assert (or (not params) init) "must have :init if :params")
223+
(assert (not (and introduce (= workflow :compute))) "can't specify :introduce and :compute")
224+
(reify
225+
clojure.core.protocols/Datafiable
226+
(datafy [_]
227+
(let [{:keys [params ins outs]} desc]
228+
{:impl impl :params (-> params keys vec) :ins (-> ins keys vec) :outs (-> outs keys vec)}))
229+
spi/ProcLauncher
230+
(describe [_] desc)
231+
(start [_ {:keys [pid args ins outs resolver]}]
232+
(let [comp? (= workflow :compute)
233+
transform (cond-> transform (= workflow :compute)
234+
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
235+
compute-timeout-ms TimeUnit/MILLISECONDS))
236+
exs (spi/get-exec resolver (if (= workflow :mixed) :mixed :io))
237+
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
238+
control (::flow/control ins)
239+
;;TODO rotate/randomize after control per normal alts?
240+
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
241+
run
242+
#(loop [status :paused, state (when init (init args)), count 0]
243+
(let [pong (fn []
244+
(let [pins (dissoc ins ::flow/control)
245+
pouts (dissoc outs ::flow/error ::flow/report)]
246+
(async/>!! (outs ::flow/report)
247+
#::flow{:report :ping, :pid pid, :status status
248+
:state state, :count count
249+
:ins (zipmap (keys pins) (map chan->data (vals pins)))
250+
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
251+
handle-command (partial handle-command pid pong)
252+
[nstatus nstate count]
253+
(try
254+
(if (= status :paused)
255+
(let [nstatus (handle-command status (async/<!! control))
256+
nstate (handle-transition transition status nstatus state)]
257+
[nstatus nstate count])
258+
;;:running
259+
(let [[msg c] (if transform
260+
(async/alts!! read-chans :priority true)
261+
;;introduce
262+
(when-let [msg (async/poll! control)]
263+
[msg control]))
264+
cid (io-id c)]
265+
(if (= c control)
266+
(let [nstatus (handle-command status msg)
267+
nstate (handle-transition transition status nstatus state)]
268+
[nstatus nstate count])
269+
(try
270+
(let [[nstate outputs] (if transform
271+
(transform state cid msg)
272+
(introduce state))
273+
[nstatus nstate]
274+
(send-outputs status nstate outputs outs
275+
resolver control handle-command transition)]
276+
[nstatus nstate (inc count)])
277+
(catch Throwable ex
278+
(async/>!! (outs ::flow/error)
279+
#::flow{:pid pid, :status status, :state state,
280+
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
281+
[status state count])))))
282+
(catch Throwable ex
283+
(async/>!! (outs ::flow/error)
284+
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
285+
[status state count]))]
286+
(when-not (= nstatus :exit) ;;fall out
287+
(recur nstatus nstate (long count)))))]
288+
((futurize run {:exec exs})))))))

src/main/clojure/clojure/core/async/flow/spi.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
(ns clojure.core.async.flow.spi)
1010

1111
(defprotocol ProcLauncher
12-
"Note - definine a ProcLauncher is an advanced feature and should not
12+
"Note - defining a ProcLauncher is an advanced feature and should not
1313
be needed for ordinary use of the library. This protocol is for
1414
creating new types of Processes that are not possible to create
1515
with ::flow/process.

0 commit comments

Comments
 (0)