Skip to content

Commit d5eba35

Browse files
committed
removed :introduce, use ports instead for sources/sinks, added ::flow/[in-out-]ports support for external I/O, fix transitions
1 parent cfb9573 commit d5eba35

File tree

2 files changed

+31
-44
lines changed

2 files changed

+31
-44
lines changed

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
8787
:report-chan - a core.async chan for reading.'ping' reponses
8888
will show up here, as will any explicit ::flow/report outputs
89-
from :transform/:introduce
89+
from :transform
9090
9191
:error-chan - a core.async chan for reading. Any (and only)
9292
exceptions thrown anywhere on any thread inside a flow will appear
@@ -141,7 +141,7 @@
141141
"Given a map of functions (described below), returns a launcher that
142142
creates a process compliant with the process protocol (see the
143143
spi/ProcLauncher doc). The possible entries for process-impl-map
144-
are :describe, :init, :transition, :transform and :introduce. This is
144+
are :describe, :init, :transition and :transform. This is
145145
the core facility for defining the logic for processes via ordinary
146146
functions.
147147
@@ -167,9 +167,18 @@
167167
168168
:init - optional, (arg-map) -> initial-state
169169
170-
init will be called once by the process to establish any
171-
initial state. The arg-map will be a map of param->val, as supplied
172-
in the flow def. init must be provided if 'describe' returns :params.
170+
init will be called once by the process to establish any initial
171+
state. The arg-map will be a map of param->val, as supplied in the
172+
flow def. init must be provided if 'describe' returns :params.
173+
174+
Optionally, a returned init state may contain the
175+
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
176+
of cid -> a core.async.channel. The cids must not conflict with the
177+
in/out ids. These channels will become part of the read/write set of
178+
the process, but are not otherwise visible/resolvable within the
179+
flow. Ports are a way to have data enter or exit the flow from
180+
outside. Use :transition to coordinate the lifecycle of these
181+
external channels.
173182
174183
:transition - optional, (state transition) -> state'
175184
@@ -181,9 +190,7 @@
181190
process will no longer be used following that. See the SPI for
182191
details. state' will be the state supplied to subsequent calls.
183192
184-
Exactly one of either :transform or :introduce are required.
185-
186-
:transform - (state in-name msg) -> [state' output]
193+
:transform - required, (state in-name msg) -> [state' output]
187194
where output is a map of outid->[msgs*]
188195
189196
The transform fn will be called every time a message arrives at any
@@ -194,21 +201,6 @@
194201
may never be nil (per core.async channels). state' will be the state
195202
supplied to subsequent calls.
196203
197-
:introduce - (state) -> [state' output]
198-
where output is a map of outid->[msgs*], per :transform
199-
200-
The introduce fn is used for sources - proc-impls that introduce new data
201-
into the flow by doing I/O with something external to the flow and
202-
feeding that data to its outputs. A proc-impl specifying :introduce may not
203-
specify any :ins in its descriptor, as none but the ::flow/control channel
204-
will be read. Instead, introduce will be called every time through the
205-
process loop, and will presumably do blocking or paced I/O to get
206-
new data to return via its outputs. If it does blocking I/O it
207-
should do so with a timeout so it can regularly return to the
208-
process loop which can then look for control messages - it's fine
209-
for introduce to return with no output. Do not spin poll in the introduce
210-
fn.
211-
212204
process accepts an option map with keys:
213205
:workload - one of :mixed, :io or :compute
214206
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
@@ -218,14 +210,11 @@
218210
any :workload returned by the :describe fn of the process. If neither
219211
are provded the default is :mixed.
220212
221-
The :compute workload is not allowed for proc impls that
222-
provide :introduce (as I/O is presumed).
223-
224213
In the :workload context of :mixed or :io, this dictates the type of
225214
thread in which the process loop will run, _including its calls to
226-
transform/introduce_.
215+
transform_.
227216
228-
When :io is specified transform/introduce should not do extensive computation.
217+
When :io is specified, transform should not do extensive computation.
229218
230219
When :compute is specified (only allowed for :transform), each call
231220
to transform will be run in a separate thread. The process loop will
@@ -281,8 +270,7 @@
281270
and one output (named :in and :out), and no state."
282271
[f]
283272
(fn
284-
([] {:params {}
285-
:ins {:in (str "the argument to " f)}
273+
([] {:ins {:in (str "the argument to " f)}
286274
:outs {:out (str "the return of " f)}})
287275
([_] nil)
288276
([_ _ msg] [nil {:out (f msg)}])))

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@
187187
"when transition, returns maybe new state"
188188
[transition status nstatus state]
189189
(if (and transition (not= status nstatus))
190-
(transition state nstatus)
190+
(transition state (case nstatus
191+
:exit ::flow/stop
192+
:running ::flow/resume
193+
:paused ::flow/pause))
191194
state))
192195

193196
(defn send-outputs [status state outputs outs resolver control handle-command transition]
@@ -214,13 +217,11 @@
214217

215218
(defn proc
216219
"see lib ns for docs"
217-
[{:keys [describe init transition transform introduce] :as impl} {:keys [workload compute-timeout-ms]}]
218-
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
220+
[{:keys [describe init transition transform] :as impl} {:keys [workload compute-timeout-ms]}]
221+
(assert transform "must provide :transform")
219222
(let [{:keys [params ins] :as desc} (describe)
220223
workload (or workload (:workload desc) :mixed)]
221-
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
222224
(assert (or (not params) init) "must have :init if :params")
223-
(assert (not (and introduce (= workload :compute))) "can't specify :introduce and :compute")
224225
(reify
225226
clojure.core.protocols/Datafiable
226227
(datafy [_]
@@ -229,17 +230,21 @@
229230
spi/ProcLauncher
230231
(describe [_] desc)
231232
(start [_ {:keys [pid args ins outs resolver]}]
233+
(assert (or (not params) args) "must provide :args if :params")
232234
(let [comp? (= workload :compute)
233235
transform (cond-> transform (= workload :compute)
234236
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
235237
compute-timeout-ms TimeUnit/MILLISECONDS))
236238
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
239+
state (when init (init args))
240+
ins (into (or ins {}) (::flow/in-ports state))
241+
outs (into (or outs {}) (::flow/out-ports state))
237242
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
238243
control (::flow/control ins)
239244
;;TODO rotate/randomize after control per normal alts?
240245
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
241246
run
242-
#(loop [status :paused, state (when init (init args)), count 0]
247+
#(loop [status :paused, state state, count 0]
243248
(let [pong (fn []
244249
(let [pins (dissoc ins ::flow/control)
245250
pouts (dissoc outs ::flow/error ::flow/report)]
@@ -256,20 +261,14 @@
256261
nstate (handle-transition transition status nstatus state)]
257262
[nstatus nstate count])
258263
;;:running
259-
(let [[msg c] (if transform
260-
(async/alts!! read-chans :priority true)
261-
;;introduce
262-
(when-let [msg (async/poll! control)]
263-
[msg control]))
264+
(let [[msg c] (async/alts!! read-chans :priority true)
264265
cid (io-id c)]
265266
(if (= c control)
266267
(let [nstatus (handle-command status msg)
267268
nstate (handle-transition transition status nstatus state)]
268269
[nstatus nstate count])
269270
(try
270-
(let [[nstate outputs] (if transform
271-
(transform state cid msg)
272-
(introduce state))
271+
(let [[nstate outputs] (transform state cid msg)
273272
[nstatus nstate]
274273
(send-outputs status nstate outputs outs
275274
resolver control handle-command transition)]

0 commit comments

Comments
 (0)