Skip to content

Commit 03b97e0

Browse files
committed
added core.async.flow
1 parent e0bdece commit 03b97e0

File tree

4 files changed

+666
-0
lines changed

4 files changed

+666
-0
lines changed
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns ^{:author "Rich Hickey"}
10+
clojure.core.async.flow
11+
"A library for building concurrent, event driven data processing
12+
flows out of communication-free functions, while centralizing
13+
control, reporting, execution and error handling. Built on core.async.
14+
15+
The top-level construct is the flow, comprising:
16+
a set of processes (generally, threads) - concurrent activities
17+
a set of channels flowing data into and out of the processes
18+
a set of channels for centralized control, reporting, error-handling,
19+
and execution of the processes
20+
21+
A flow is constructed from flow definition data which defines a
22+
directed graph of processes and the connections between
23+
them. Processes describe their I/O requirements and the
24+
flow (library) itself creates channels and passes them to the
25+
processes that requested them. See 'create-flow' for the
26+
details. The flow definition provides a centralized place for policy
27+
decisions regarding configuration, threading, buffering etc.
28+
29+
It is expected that applications will rarely define processes but
30+
instead use the API functions here, 'process' and 'step-process',
31+
that implement the process protocol in terms of calls to ordinary
32+
functions that include no communication or core.async code. In this
33+
way the library helps you achieve a strict separation of your
34+
application logic from its execution, communication, lifecycle and
35+
monitoring.
36+
37+
Note that at several points the library calls upon the user to
38+
supply ids for processes, inputs, outputs etc. These should be
39+
keywords. When a namespaced keyword is required it is explicitly
40+
stated. This documentation refers to various keywords utilized by
41+
the library itself as ::flow/xyz, weere ::flow is an alias for
42+
clojure.core.async.flow
43+
44+
A process is represented in the flow definition by an implementation
45+
of spi/ProcLauncher that starts it. See the spi docs for
46+
details."
47+
48+
(:require
49+
[clojure.core.async.flow.impl :as impl]
50+
[clojure.core.async.flow.impl.graph :as g]))
51+
52+
(defn create-flow
53+
"Creates a flow from the supplied definition: a map containing the
54+
keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec
55+
56+
:procs - a map of pid->proc-def
57+
where proc-def is a map with keys :proc, :args, :chan-opts
58+
59+
:proc - a function that starts a process
60+
:args - a map of param->val which will be passed to the process ctor
61+
:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
62+
and xform have their meanings per core.async/chan
63+
the default is {:buf-or-n 10}
64+
65+
:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
66+
67+
Inputs and outputs support mutliple connections. When an output is
68+
connected multiple times every connection will get every message,
69+
as per a core.async/mult.
70+
71+
:mixed-exec/:io-exec/:compute-exec -> ExecutorService
72+
These can be used to specify the ExecutorService to use for the
73+
corresonding context, in lieu of the lib defaults
74+
75+
N.B. The flow is not started. See 'start'"
76+
[def] (impl/create-flow def))
77+
78+
(defn start
79+
"starts the entire flow from init values. The processes start paused.
80+
Call resume or resume-proc to start flow.
81+
returns {::flow/report-chan - a core.async chan for reading
82+
::flow/error-chan - a core.async chan for reading}"
83+
[g] (g/start g))
84+
85+
(defn stop
86+
"shuts down the flow, stopping all procsesses and closing the error
87+
and report channels. The flow can be started again"
88+
[g] (g/stop g))
89+
90+
(defn pause
91+
"pauses a running flow"
92+
[g] (g/pause g))
93+
94+
(defn resume
95+
"resumes a paused flow"
96+
[g] (g/resume g))
97+
98+
(defn ping
99+
"pings all processes, which will put their status and state on the
100+
report channel"
101+
[g] (g/ping g))
102+
103+
(defn pause-proc
104+
"pauses a process"
105+
[g pid] (g/pause-proc g pid))
106+
107+
(defn resume-proc
108+
"resumes a process"
109+
[g pid] (g/resume-proc g pid))
110+
111+
(defn ping-proc
112+
"pings the process, which will put its status and state on the report
113+
channel"
114+
[g pid] (g/ping-proc g pid))
115+
116+
(defn command-proc
117+
"synchronously sends a process-specific command with the given id and
118+
additional kvs to the process. The cmd-id must be ns-qualified with
119+
a ns you own."
120+
[g pid cmd-id more-kvs] (g/command-proc g pid cmd-id more-kvs))
121+
122+
(defn inject
123+
"synchronously puts the messages on the channel corresponding to the
124+
input or output of the process"
125+
[g [pid io-id :as coord] msgs] (g/inject g coord msgs))
126+
127+
(defn process
128+
"Given a map of functions (described below), returns a launcher that
129+
creates a process compliant with the process protocol (see the
130+
spi/ProcLauncher doc). The possible entries for process-impl-map
131+
are :describe, :init, :transition, :transform and :inject. This is
132+
the core facility for defining the logic for processes via ordinary
133+
functions.
134+
135+
:describe - required, () -> desc
136+
where desc is a map with keys :params :ins and :outs, each of which
137+
in turn is a map of keyword to doc string
138+
139+
:params describes the initial arguments to setup the state for the function.
140+
:ins enumerates the input[s], for which the flow will create channels
141+
:outs enumerates the output[s], for which the flow may create channels.
142+
143+
No key may be present in both :ins and :outs The ins/outs/params of f
144+
will be the ins/outs/params of the process. describe may be called
145+
by users to understand how to use the proc. It will also be called
146+
by the impl in order to discover what channels are needed.
147+
148+
:init - optional, (arg-map) -> initial-state
149+
150+
init will be called once by the process to establish any
151+
initial state. The arg-map will be a map of param->val, as supplied
152+
in the flow def. init must be provided if 'describe' returns :params.
153+
154+
:transition - optional, (state transition) -> state'
155+
156+
transition will be called when the process makes a state transition,
157+
transition being one of ::flow/resume, ::flow/pause or ::flow/stop
158+
159+
With this fn a process impl can track changes and coordinate
160+
resources, especially cleaning up any resources on :stop, since the
161+
process will no longer be used following that. See the SPI for
162+
details. state' will be the state supplied to subsequent calls.
163+
164+
Exactly one of either :transform or :inject are required.
165+
166+
:transform - (state in-name msg) -> [state' output]
167+
where output is a map of outid->[msgs*]
168+
169+
The transform fn will be called every time a message arrives at any
170+
of the inputs. Output can be sent to none, any or all of the :outs
171+
enumerated, and/or an input named by a [pid inid] tuple (e.g. for
172+
reply-to), and/or to the ::flow/report output. A step need not
173+
output at all (output or msgs can be empyt/nil), however an output _message_
174+
may never be nil (per core.async channels). state' will be the state
175+
supplied to subsequent calls.
176+
177+
:inject - (state) -> [state' output]
178+
where output is a map of outid->[msgs*], per :transform
179+
180+
The inject fn is used for sources - proc-impls that inject new data
181+
into the flow by doing I/O with something external to the flow and
182+
feeding that data to its outputs. A proc-impl specifying :inject may not
183+
specify any :ins in its descriptor, as none but the ::flow/control channel
184+
will be read. Instead, inject will be called every time through the
185+
process loop, and will presumably do blocking or paced I/O to get
186+
new data to return via its outputs. If it does blocking I/O it
187+
should do so with a timeout so it can regularly return to the
188+
process loop which can then look for control messages - it's fine
189+
for inject to return with no output. Do not spin poll in the inject
190+
fn.
191+
192+
proc accepts an option map with keys:
193+
:exec - one of :mixed, :io or :compute, default :mixed
194+
:compute-timeout-ms - if :exec is :compute, this timeout (default 5000 msec)
195+
will be used when getting the return from the future - see below
196+
197+
The :compute context is not allowed for proc impls that
198+
provide :inject (as I/O is presumed).
199+
200+
In the :exec context of :mixed or :io, this dictates the type of
201+
thread in which the process loop will run, _including its calls to
202+
transform/inject_.
203+
204+
When :io is specified transform/inject should not do extensive computation.
205+
206+
When :compute is specified (only allowed for :transform), each call
207+
to transform will be run in a separate thread. The process loop will
208+
run in an :io context (since it no longer directly calls transform,
209+
all it does is I/O) and it will submit transform to the :compute
210+
executor then await (blocking, for compute-timeout-ms) the
211+
completion of the future returned by the executor. If the future
212+
times out it will be reported on ::flow/error.
213+
214+
When :compute is specified transform must not block!"
215+
([process-impl-map] (process process-impl-map nil))
216+
([process-impl-map {:keys [exec timeout-ms]
217+
:or {exec :mixed, timeout-ms 5000} :as opts}]
218+
(impl/proc process-impl-map opts)))
219+
220+
(defn step-process
221+
"Given a (e.g. communication-free) step function f of three
222+
arities (described below), and the same opts as 'process', returns a
223+
launcher that creates a process compliant with the process
224+
protocol (see 'process').
225+
226+
The arities of f are:
227+
228+
()->desc
229+
a function matching the semantics of process' :describe
230+
231+
(arg-map)->initial-state
232+
a function matching the semantics of process' :init
233+
234+
(state in-name msg)->[state' output]
235+
a function matching the semantics of process' :transform"
236+
([f] (step-process f nil))
237+
([f opts]
238+
(process {:describe f, :init f, :transform f} opts)))
239+
240+
(defn futurize
241+
"Takes a fn f and returns a fn that takes the same arguments as f and
242+
immediately returns a future, having starting a thread of the
243+
indicated type, or via the supplied executor, that invokes f with
244+
those args and completes that future with its return.
245+
246+
futurize accepts kwarg options:
247+
:exec - one of :mixed, :io, :compute
248+
or a j.u.c.ExecutorService object,
249+
default :mixed"
250+
[f & {:keys [exec]
251+
:or {exec :mixed} :as opts}]
252+
(impl/futurize f opts))
253+
254+
(defn lift*->step
255+
"given a fn f taking one arg and returning a collection of non-nil
256+
values, create a 'step' fn as needed by step-process, with one input
257+
and one output (named :in and :out), and no state."
258+
[f]
259+
(fn
260+
([] {:params {}
261+
:ins {:in (str "the argument to " f)}
262+
:outs {:out (str "the return of " f)}})
263+
([_] nil)
264+
([_ _ msg] [nil {:out (f msg)}])))
265+
266+
(defn lift1->step
267+
"like lift*->step except taking a fn returning one value, which, when
268+
nil, will yield no output."
269+
[f]
270+
(lift*->step #(when-some [m (f %)] (vector m))))

0 commit comments

Comments
 (0)