Skip to content

Commit 4f03e00

Browse files
committed
Merge branch 'master' into dev-io-thread
2 parents a6151f2 + cfb53ac commit 4f03e00

File tree

4 files changed

+126
-82
lines changed

4 files changed

+126
-82
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
name: Build Codox Docs
2+
3+
on:
4+
workflow_dispatch:
5+
6+
jobs:
7+
8+
build:
9+
runs-on: ubuntu-latest
10+
steps:
11+
12+
- name: Set up Java
13+
uses: actions/setup-java@v3
14+
with:
15+
java-version: 8
16+
distribution: 'temurin'
17+
18+
- name: Set up Clojure
19+
uses: DeLaGuardo/[email protected]
20+
with:
21+
cli: 'latest'
22+
23+
- name: Cache clojure dependencies
24+
uses: actions/cache@v3
25+
with:
26+
path: |
27+
~/.m2/repository
28+
~/.gitlibs
29+
key: cljdeps-${{ hashFiles('deps.edn') }}
30+
restore-keys: cljdeps-
31+
32+
- name: Clone the repo
33+
uses: actions/checkout@v4
34+
35+
- name: Install rlwrap
36+
run: sudo apt-get install -y rlwrap
37+
38+
- name: Execute doc build
39+
run: |
40+
clj -X:docs
41+
42+
- name: Commit and push
43+
run: |
44+
git config --global user.name clojure-build
45+
git config --global user.email "[email protected]"
46+
git add -u -v
47+
git commit -m "Action doc commit"
48+
git push origin master

docs/clojure.core.async.flow.html

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
channel returned from start</pre></div></div><div class="public anchor" id="var-process"><h3>process</h3><div class="usage"><code>(process process-impl-map)</code><code>(process process-impl-map {:keys [workload timeout-ms], :or {timeout-ms 5000}, :as opts})</code></div><div class="doc"><pre class="plaintext">Given a map of functions (described below), returns a launcher that
8282
creates a process compliant with the process protocol (see the
8383
spi/ProcLauncher doc). The possible entries for process-impl-map
84-
are :describe, :init, :transition, :transform and :introduce. This is
84+
are :describe, :init, :transition and :transform. This is
8585
the core facility for defining the logic for processes via ordinary
8686
functions.
8787

@@ -107,9 +107,24 @@
107107

108108
:init - optional, (arg-map) -&gt; initial-state
109109

110-
init will be called once by the process to establish any
111-
initial state. The arg-map will be a map of param-&gt;val, as supplied
112-
in the flow def. init must be provided if 'describe' returns :params.
110+
init will be called once by the process to establish any initial
111+
state. The arg-map will be a map of param-&gt;val, as supplied in the
112+
flow def. init must be provided if 'describe' returns :params.
113+
114+
Optionally, a returned init state may contain the
115+
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
116+
of cid -&gt; a core.async.channel. The cids must not conflict with the
117+
in/out ids. These channels will become part of the input/output set
118+
of the process, but are not otherwise visible/resolvable within the
119+
flow. Ports are a way to allow data to enter or exit the flow from
120+
outside of it. Use :transition to coordinate the lifecycle of these
121+
external channels.
122+
123+
Optionally, _any_ returned state, whether from :init, :transition
124+
or :transform, may contain the key ::flow/input-filter, a predicate
125+
of cid. Only inputs (including in-ports) satisfying the predicate
126+
will be part of the next channel read set. In the absence of this
127+
predicate all inputs are read.
113128

114129
:transition - optional, (state transition) -&gt; state'
115130

@@ -121,9 +136,7 @@
121136
process will no longer be used following that. See the SPI for
122137
details. state' will be the state supplied to subsequent calls.
123138

124-
Exactly one of either :transform or :introduce are required.
125-
126-
:transform - (state in-name msg) -&gt; [state' output]
139+
:transform - required, (state in-name msg) -&gt; [state' output]
127140
where output is a map of outid-&gt;[msgs*]
128141

129142
The transform fn will be called every time a message arrives at any
@@ -134,21 +147,6 @@
134147
may never be nil (per core.async channels). state' will be the state
135148
supplied to subsequent calls.
136149

137-
:introduce - (state) -&gt; [state' output]
138-
where output is a map of outid-&gt;[msgs*], per :transform
139-
140-
The introduce fn is used for sources - proc-impls that introduce new data
141-
into the flow by doing I/O with something external to the flow and
142-
feeding that data to its outputs. A proc-impl specifying :introduce may not
143-
specify any :ins in its descriptor, as none but the ::flow/control channel
144-
will be read. Instead, introduce will be called every time through the
145-
process loop, and will presumably do blocking or paced I/O to get
146-
new data to return via its outputs. If it does blocking I/O it
147-
should do so with a timeout so it can regularly return to the
148-
process loop which can then look for control messages - it's fine
149-
for introduce to return with no output. Do not spin poll in the introduce
150-
fn.
151-
152150
process accepts an option map with keys:
153151
:workload - one of :mixed, :io or :compute
154152
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
@@ -158,14 +156,11 @@
158156
any :workload returned by the :describe fn of the process. If neither
159157
are provded the default is :mixed.
160158

161-
The :compute workload is not allowed for proc impls that
162-
provide :introduce (as I/O is presumed).
163-
164159
In the :workload context of :mixed or :io, this dictates the type of
165160
thread in which the process loop will run, _including its calls to
166-
transform/introduce_.
161+
transform_.
167162

168-
When :io is specified transform/introduce should not do extensive computation.
163+
When :io is specified, transform should not do extensive computation.
169164

170165
When :compute is specified (only allowed for :transform), each call
171166
to transform will be run in a separate thread. The process loop will
@@ -182,7 +177,7 @@
182177

183178
:report-chan - a core.async chan for reading.'ping' reponses
184179
will show up here, as will any explicit ::flow/report outputs
185-
from :transform/:introduce
180+
from :transform
186181

187182
:error-chan - a core.async chan for reading. Any (and only)
188183
exceptions thrown anywhere on any thread inside a flow will appear

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
8787
:report-chan - a core.async chan for reading.'ping' reponses
8888
will show up here, as will any explicit ::flow/report outputs
89-
from :transform/:introduce
89+
from :transform
9090
9191
:error-chan - a core.async chan for reading. Any (and only)
9292
exceptions thrown anywhere on any thread inside a flow will appear
@@ -141,7 +141,7 @@
141141
"Given a map of functions (described below), returns a launcher that
142142
creates a process compliant with the process protocol (see the
143143
spi/ProcLauncher doc). The possible entries for process-impl-map
144-
are :describe, :init, :transition, :transform and :introduce. This is
144+
are :describe, :init, :transition and :transform. This is
145145
the core facility for defining the logic for processes via ordinary
146146
functions.
147147
@@ -167,9 +167,24 @@
167167
168168
:init - optional, (arg-map) -> initial-state
169169
170-
init will be called once by the process to establish any
171-
initial state. The arg-map will be a map of param->val, as supplied
172-
in the flow def. init must be provided if 'describe' returns :params.
170+
init will be called once by the process to establish any initial
171+
state. The arg-map will be a map of param->val, as supplied in the
172+
flow def. init must be provided if 'describe' returns :params.
173+
174+
Optionally, a returned init state may contain the
175+
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
176+
of cid -> a core.async.channel. The cids must not conflict with the
177+
in/out ids. These channels will become part of the input/output set
178+
of the process, but are not otherwise visible/resolvable within the
179+
flow. Ports are a way to allow data to enter or exit the flow from
180+
outside of it. Use :transition to coordinate the lifecycle of these
181+
external channels.
182+
183+
Optionally, _any_ returned state, whether from :init, :transition
184+
or :transform, may contain the key ::flow/input-filter, a predicate
185+
of cid. Only inputs (including in-ports) satisfying the predicate
186+
will be part of the next channel read set. In the absence of this
187+
predicate all inputs are read.
173188
174189
:transition - optional, (state transition) -> state'
175190
@@ -181,9 +196,7 @@
181196
process will no longer be used following that. See the SPI for
182197
details. state' will be the state supplied to subsequent calls.
183198
184-
Exactly one of either :transform or :introduce are required.
185-
186-
:transform - (state in-name msg) -> [state' output]
199+
:transform - required, (state in-name msg) -> [state' output]
187200
where output is a map of outid->[msgs*]
188201
189202
The transform fn will be called every time a message arrives at any
@@ -194,21 +207,6 @@
194207
may never be nil (per core.async channels). state' will be the state
195208
supplied to subsequent calls.
196209
197-
:introduce - (state) -> [state' output]
198-
where output is a map of outid->[msgs*], per :transform
199-
200-
The introduce fn is used for sources - proc-impls that introduce new data
201-
into the flow by doing I/O with something external to the flow and
202-
feeding that data to its outputs. A proc-impl specifying :introduce may not
203-
specify any :ins in its descriptor, as none but the ::flow/control channel
204-
will be read. Instead, introduce will be called every time through the
205-
process loop, and will presumably do blocking or paced I/O to get
206-
new data to return via its outputs. If it does blocking I/O it
207-
should do so with a timeout so it can regularly return to the
208-
process loop which can then look for control messages - it's fine
209-
for introduce to return with no output. Do not spin poll in the introduce
210-
fn.
211-
212210
process accepts an option map with keys:
213211
:workload - one of :mixed, :io or :compute
214212
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
@@ -218,14 +216,11 @@
218216
any :workload returned by the :describe fn of the process. If neither
219217
are provded the default is :mixed.
220218
221-
The :compute workload is not allowed for proc impls that
222-
provide :introduce (as I/O is presumed).
223-
224219
In the :workload context of :mixed or :io, this dictates the type of
225220
thread in which the process loop will run, _including its calls to
226-
transform/introduce_.
221+
transform_.
227222
228-
When :io is specified transform/introduce should not do extensive computation.
223+
When :io is specified, transform should not do extensive computation.
229224
230225
When :compute is specified (only allowed for :transform), each call
231226
to transform will be run in a separate thread. The process loop will
@@ -281,8 +276,7 @@
281276
and one output (named :in and :out), and no state."
282277
[f]
283278
(fn
284-
([] {:params {}
285-
:ins {:in (str "the argument to " f)}
279+
([] {:ins {:in (str "the argument to " f)}
286280
:outs {:out (str "the return of " f)}})
287281
([_] nil)
288282
([_ _ msg] [nil {:out (f msg)}])))

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@
187187
"when transition, returns maybe new state"
188188
[transition status nstatus state]
189189
(if (and transition (not= status nstatus))
190-
(transition state nstatus)
190+
(transition state (case nstatus
191+
:exit ::flow/stop
192+
:running ::flow/resume
193+
:paused ::flow/pause))
191194
state))
192195

193196
(defn send-outputs [status state outputs outs resolver control handle-command transition]
@@ -214,13 +217,11 @@
214217

215218
(defn proc
216219
"see lib ns for docs"
217-
[{:keys [describe init transition transform introduce] :as impl} {:keys [workload compute-timeout-ms]}]
218-
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
220+
[{:keys [describe init transition transform] :as impl} {:keys [workload compute-timeout-ms]}]
221+
(assert transform "must provide :transform")
219222
(let [{:keys [params ins] :as desc} (describe)
220223
workload (or workload (:workload desc) :mixed)]
221-
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
222224
(assert (or (not params) init) "must have :init if :params")
223-
(assert (not (and introduce (= workload :compute))) "can't specify :introduce and :compute")
224225
(reify
225226
clojure.core.protocols/Datafiable
226227
(datafy [_]
@@ -229,17 +230,20 @@
229230
spi/ProcLauncher
230231
(describe [_] desc)
231232
(start [_ {:keys [pid args ins outs resolver]}]
233+
(assert (or (not params) args) "must provide :args if :params")
232234
(let [comp? (= workload :compute)
233235
transform (cond-> transform (= workload :compute)
234236
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
235237
compute-timeout-ms TimeUnit/MILLISECONDS))
236238
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
239+
state (when init (init args))
240+
ins (into (or ins {}) (::flow/in-ports state))
241+
outs (into (or outs {}) (::flow/out-ports state))
237242
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
238243
control (::flow/control ins)
239-
;;TODO rotate/randomize after control per normal alts?
240-
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
244+
read-ins (dissoc ins ::flow/control)
241245
run
242-
#(loop [status :paused, state (when init (init args)), count 0]
246+
#(loop [status :paused, state state, count 0, read-ins read-ins]
243247
(let [pong (fn []
244248
(let [pins (dissoc ins ::flow/control)
245249
pouts (dissoc outs ::flow/error ::flow/report)]
@@ -249,40 +253,43 @@
249253
:ins (zipmap (keys pins) (map chan->data (vals pins)))
250254
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
251255
handle-command (partial handle-command pid pong)
252-
[nstatus nstate count]
256+
[nstatus nstate count read-ins]
253257
(try
254258
(if (= status :paused)
255259
(let [nstatus (handle-command status (async/<!! control))
256260
nstate (handle-transition transition status nstatus state)]
257-
[nstatus nstate count])
261+
[nstatus nstate count read-ins])
258262
;;:running
259-
(let [[msg c] (if transform
260-
(async/alts!! read-chans :priority true)
261-
;;introduce
262-
(when-let [msg (async/poll! control)]
263-
[msg control]))
263+
(let [;;TODO rotate/randomize after control per normal alts?
264+
read-chans (let [ipred (or (::flow/input-filter state) identity)]
265+
(reduce-kv (fn [ret cid chan]
266+
(if (ipred cid)
267+
(conj ret chan)
268+
ret))
269+
[control] read-ins))
270+
[msg c] (async/alts!! read-chans :priority true)
264271
cid (io-id c)]
265272
(if (= c control)
266273
(let [nstatus (handle-command status msg)
267274
nstate (handle-transition transition status nstatus state)]
268-
[nstatus nstate count])
275+
[nstatus nstate count read-ins])
269276
(try
270-
(let [[nstate outputs] (if transform
271-
(transform state cid msg)
272-
(introduce state))
277+
(let [[nstate outputs] (transform state cid msg)
273278
[nstatus nstate]
274279
(send-outputs status nstate outputs outs
275280
resolver control handle-command transition)]
276-
[nstatus nstate (inc count)])
281+
[nstatus nstate (inc count) (if (some? msg)
282+
read-ins
283+
(dissoc read-ins cid))])
277284
(catch Throwable ex
278285
(async/>!! (outs ::flow/error)
279286
#::flow{:pid pid, :status status, :state state,
280287
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
281-
[status state count])))))
288+
[status state count read-ins])))))
282289
(catch Throwable ex
283290
(async/>!! (outs ::flow/error)
284-
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
285-
[status state count]))]
291+
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
292+
[status state count read-ins]))]
286293
(when-not (= nstatus :exit) ;;fall out
287-
(recur nstatus nstate (long count)))))]
294+
(recur nstatus nstate (long count) read-ins))))]
288295
((futurize run {:exec exs})))))))

0 commit comments

Comments
 (0)