Skip to content

Commit 26b3c4d

Browse files
committed
Merge branch 'master' into dev-vthreads-exec
2 parents 76d2253 + 0a49c7a commit 26b3c4d

21 files changed

+919
-29
lines changed

README.md

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
A Clojure library providing facilities for async programming and communication.
44

5+
## Documentation
6+
7+
* [Rationale](https://clojure.github.io/core.async/rationale.html)
8+
* [API docs](https://clojure.github.io/core.async/)
9+
* [Code walkthrough](https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj)
10+
11+
## Presentations
12+
13+
* [Rich Hickey on core.async](https://www.youtube.com/watch?v=yJxFPoxqzWE)
14+
* [Tim Baldridge on core.async](https://www.youtube.com/watch?v=enwIIGzhahw) from Clojure/conj 2013 ([code](https://github.com/halgari/clojure-conj-2013-core.async-examples)).
15+
* Tim Baldridge on go macro internals - [part 1](https://www.youtube.com/watch?v=R3PZMIwXN_g) [part 2](https://www.youtube.com/watch?v=SI7qtuuahhU)
516

617
## Releases and Dependency Information
718

@@ -33,18 +44,6 @@ Latest release: 1.8.741
3344
</dependency>
3445
```
3546

36-
## Documentation
37-
38-
* [Rationale](https://clojure.org/news/2013/06/28/clojure-clore-async-channels)
39-
* [API docs](https://clojure.github.io/core.async/)
40-
* [Code walkthrough](https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj)
41-
42-
## Presentations
43-
44-
* [Rich Hickey on core.async](https://www.youtube.com/watch?v=yJxFPoxqzWE)
45-
* [Tim Baldridge on core.async](https://www.youtube.com/watch?v=enwIIGzhahw) from Clojure/conj 2013 ([code](https://github.com/halgari/clojure-conj-2013-core.async-examples)).
46-
* Tim Baldridge on go macro internals - [part 1](https://www.youtube.com/watch?v=R3PZMIwXN_g) [part 2](https://www.youtube.com/watch?v=SI7qtuuahhU)
47-
4847
## Contributing
4948

5049
[Contributing to Clojure projects](https://clojure.org/community/contributing) requires a signed Contributor Agreement. Pull requests and GitHub issues are not accepted; please use the [core.async JIRA project](https://clojure.atlassian.net/browse/ASYNC) to report problems or enhancements.
@@ -65,6 +64,9 @@ Copyright © Rich Hickey and contributors
6564

6665
## Changelog
6766

67+
* Release 1.9.808-alpha1 on 2025-04-28
68+
* First alpha release of core.async.flow - all APIs subject to change
69+
* Added datafy support for channels and buffers
6870
* Release 1.8.741 on 2025-04-07
6971
* [ASYNC-234](https://clojure.atlassian.net/browse/ASYNC-234) (CLJ) Inactive alt handlers hold strong references to (potentially large) caller state after the alt completes
7072
* Release 1.8.735 on 2025.04.02

VERSION_TEMPLATE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.8.GENERATED_VERSION
1+
1.9.GENERATED_VERSION-alpha2

deps.edn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
:exec-fn codox.main/generate-docs
3030
:exec-args {:source-paths ["src/main/clojure"]
3131
:namespaces [clojure.core.async clojure.core.async.flow clojure.core.async.flow.spi]
32-
:doc-files ["doc/flow.md"]
32+
:doc-files ["doc/rationale.md" "doc/reference.md" "doc/walkthrough.md" "doc/flow.md" "doc/flow-guide.md"]
3333
:output-path "docs"
3434
:html {:namespace-list :flat}}}
3535

doc/flow-guide.md

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Flow Guide
2+
3+
## Getting started
4+
5+
The [flow](https://clojure.github.io/core.async/flow.html) library enables a strict separation application logic from the deployment concerns of topology, execution, communication, lifecycle, monitoring and error handling.
6+
7+
## Step fns and process launchers
8+
9+
You provide logic to flow in the form of _step-fns_, which are wrapped into running processes, executing in a loop. Flow manages the life cycle of the process and handles incoming and outgoing messages by putting or taking them on channels. Step-fns do not access channels directly or hold state, making them easy to test in isolation and reuse.
10+
11+
Step functions have four arities:
12+
13+
<a href="https://github.com/clojure/core.async/blob/master/doc/img/step-fn-arities.png?raw=true"><img src="https://github.com/clojure/core.async/blob/master/doc/img/step-fn-arities.png?raw=true" alt="step-fn arities" width="700"/></a>
14+
15+
### describe: `(step-fn) -> descriptor`
16+
17+
The describe arity must return a static description of the step-fn's :params, :ins, and :outs. Each of these is a map of name (a keyword) to docstring.
18+
19+
For example, the describe arity might return this description for a simple step-fn:
20+
21+
```clojure
22+
{:params {:size "Max size"} ;; step-fn params
23+
:ins {:in "Input channel"} ;; input channels
24+
:outs {:out "Output channel"}} ;; output channels
25+
```
26+
27+
The names used for input and output channels should be distinct (no overlap).
28+
29+
### init: `(step-fn arg-map) -> init-state`
30+
31+
The init arity is called once by the process to takes a set of args from the flow def (corresponding to the params returned from the describe arity) and returns the init state of the process.
32+
33+
### transition: `(step-fn state transition) -> state'`
34+
35+
The transition arity is called any time the flow or process undergoes a lifecycle transition (::flow/start, ::flow/stop, ::flow/pause, ::flow/resume). The description arity takes the current state and returns an updated state to be used for subsequent calls.
36+
37+
The step-fn should use the transition arity to coordinate the creation, pausing, and shutdown of external resources in a process.
38+
39+
### transform: `(step-fn state input msg) -> [state' {out-id [msgs]}]`
40+
41+
The transform arity is called in a loop by the process for every message received on an input channel and returns a new state and a map of output cids to messages to return. The process will take care of sending these messages to the output channels. Output can be sent to none, any or all of the :outsenumerated, and/or an input named by a [pid inid] tuple (e.g. for reply-to), and/or to the ::flow/report output. A step need not output at all (output or msgs can be empyt/nil), however an output _message_ may never be nil (per core.async channels).
42+
43+
The step-fn may throw excepitons from any arity and they will be handled by flow. Exceptions thrown from the transition or transform arities, the exception will be logged on the flow's :error-chan.
44+
45+
### Process state
46+
47+
The process state is a map. It can contain any keys needed by the step-fn transition and transform arities. In addition, there are some flow-specific keys, described here.
48+
49+
`::flow/pid` is added to the state by the process based on the name supplied in the flow def.
50+
51+
`::flow/in-ports` and `::flow/out-ports` are maps of cid to external channel, optionally returned in the initial state from the init arity. The in-ports and out-ports are used to connect source and sink processes to external channels. These channels must be provided by the step-fn and returned in the init arity map, either by creating the channel or using a channel passed in via the flow def init args for the process. The flow does not manage the lifecycle of these channels.
52+
53+
`::flow/input-filter`, a predicate of cid, can be returned in the state from any arity to indicate a filter on the process input channel read set. For example, a step-fn that is waiting for a response from multiple inputs might remove the channels that have already responded from the read-set until responses have been received from all.
54+
55+
### step-fn helpers
56+
57+
Some additional helpers exist to create step-fns from other forms:
58+
59+
* `lift*->step` - given a fn f taking one arg and returning a collection of non-nil values, creates a step-fn as needed by a process launcher, with one input and one output (named :in and :out), and no state
60+
* `lift1->step` - like `lift*->step` but for functions that return a single value (when `nil`, yield no output)
61+
* `map->step` - given a map with keys `:describe`, `:init`, `:transition`, `:transform` corresponding to the arities above, create a step-fn.
62+
63+
### Creating a process launcher
64+
65+
Process launchers can be created using the [process](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-process) function, which takes a step-fn, and an option map with keys:
66+
67+
* `::workload` - one of `:mixed`, `:io` or `:compute`
68+
* `:compute-timeout-ms` - if :workload is :compute, this timeout (default 5000 msec) will be used when getting the return from the future - see below
69+
70+
A :workload supplied as an option to `process` will override any :workload returned by the :describe fn of the process launcher. If neither are provded the default is :mixed.
71+
72+
In the :workload context of :mixed or :io, this dictates the type of thread in which the process loop will run, _including its calls to transform_.
73+
74+
When :io is specified, transform should not do extensive computation.
75+
76+
When :compute is specified, each call to transform will be run in a separate thread. The process loop will run in an :io context (since it no longer directly calls transform, all it does is I/O) and it will submit transform to the :compute executor then await (blocking, for compute-timeout-ms) the completion of the future returned by the executor. If the future times out it will be reported on ::flow/error.
77+
78+
When :compute is specified transform must not block!
79+
80+
Note that process launchers are defined by the [ProcLauncher](https://clojure.github.io/core.async/clojure.core.async.flow.spi.html#var-ProcLauncher) protocol. While you will typically use `process` to create a process launcher, advanced uses may also implement the protocol directly.
81+
82+
### Reloading
83+
84+
Because the step-fn is called in a loop, it is a good practice to define the step-fn in a var and use the var (`#'the-fn`) instead of the function value itself (`the-fn`). This practice supports interactive development by allowing the var to be rebound from the repl while the flow is running.
85+
86+
## Flow def
87+
88+
The step-fns are how you supply code for each process in the flow. The other thing you must supply is the flow configuration that ties together the proc launchers and the connections between them.
89+
90+
This flow definition is supplied to the [create-flow](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-create-flow) function and consists of a map with `:procs`, `:conns`, and optionally some workflow executors.
91+
92+
The `:procs` is a map of pid -> proc-def. The proc-def is a map with `:proc` (the process launcher), the `:args` (passed to the init arity of the step-fn), and the `:chan-opts` which can be used to specify channel properties.
93+
94+
The `:conns` is a collection of `[[from-pid outid] [to-pid inid]]` tuples. Inputs and outputs support multiple connections. When an output is connected multiple times, every connection will get every message, per `core.async/mult`.
95+
96+
An example flow definition might look like this for a flow with two procs where the in-chan and out-chan are being passed through the source and sink args:
97+
98+
```clojure
99+
{:procs {:source-proc {:proc (process #'source-fn)
100+
:args {:source-chan in-chan}}
101+
:sink-proc {:proc (process #'sink-fn)
102+
:args {:sink-chan out-chan}}}
103+
:conns [ [[:source-proc :out] [:sink-proc :in]] ]}
104+
````
105+
106+
The flow is created by passing the flow definition to [create-flow](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-create-flow).
107+
108+
The returned flow object can be passed to the lifecycle methods (see next). In addition the flow can be used with [datafy](https://clojure.github.io/clojure/clojure.datafy-api.html#clojure.datafy/datafy) to get a datafied description of the flow. This is a static view - see `ping` described later for a dynamic view.
109+
110+
## Flow lifecycle
111+
112+
When a flow is created, it starts in the resumed state. The following flow functions can be used to change the flow lifecycle state:
113+
114+
* [start](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-start) - Starts all procs in the flow, return a map of with `:report-chan` and `:error-chan`
115+
* [stop](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-stop) - Stops all procs in the flow
116+
* [pause](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-pause) - Pauses all procs in the flow
117+
* [resume](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-resume) - Resumes all procs in the flow
118+
* [pause-proc](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-pause-proc) - Pauses a single proc
119+
* [resume-proc](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-resume-proc) - Resumes a single proc
120+
121+
You can also use these functions to ping the running processes and return their current state and status:
122+
123+
* [ping](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-ping) - Pings all procs and returns a map of their status
124+
* [ping-proc](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-ping-proc) - Pings a single proce by pid and returns a map of status
125+
126+
This function can be used to inject a message to an arbitrary `[pid cid]` channel:
127+
128+
* [inject](https://clojure.github.io/core.async/clojure.core.async.flow.html#var-inject) - Inject messages to any coord in the flow
129+
130+
The map returned from `start` has the flow's report and error channels. Procs can output messages to the `:report-chan` for unified logging across the flow. Exceptions thrown by a step-fn or procs in the flow are all logged to the `:error-chan`.
131+
132+
## Flow monitor
133+
134+
See [core.async.flow-monitor](https://github.com/clojure/core.async.flow-monitor/) for how to use the flow-monitor tool.

doc/flow.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
# core.async.flow
1+
# Flow
2+
23
## Rationale
34

4-
The [rationale](https://clojure.org/news/2013/06/28/clojure-clore-async-channels) for **core.async** says "There comes a time in all good programs when components or subsystems must stop communicating directly with one another." And core.async provides fundamental tools (channels) for doing that.
5+
The [rationale](https://clojure.github.io/core.async/rationale.html) for **core.async** says "There comes a time in all good programs when components or subsystems must stop communicating directly with one another." And core.async provides fundamental tools (channels) for doing that.
56

67
But using core.async well, e.g. keeping your I/O out of your computational logic, requires discipline and architectural savvy, and to do so consistently throughout an application or organization, conventions. Given channels, many architectural decisions remain regarding thread execution, backpressure, error handling etc. And often the topology of your network of communicating processes *emerges* out of the flow of control of your program as various pieces of code create threads and wire channels together, interleaved with computation, making it difficult to see the topology or administer it in one place.
78

@@ -11,12 +12,14 @@ The fundamental objective of __core.async.flow__ is to enable a strict separatio
1112

1213
__core.async.flow__ provides *concrete* implementations of two more abstractions - the '__process__' - a thread of activity, and the '__flow__' - a directed graph of processes communicating via channels. A single data structure describes your flow topology, and has all of the settings for threads, channels etc. A process accepts data from and provides data to channels. The process implementation in c.a.flow handles all channel I/O, thread lifecycle and coordination with the flow graph.
1314

14-
All you need to do in you application is:
15+
All you need to do in your application is:
1516

16-
1. Define ordinary, often pure, data->data functions that the processes will run in their inner loop to do the *computational* part of processing messages. These functions do not handle channels or threads or lifecycle, and do not even know they are running in a flow. They can be tested in isolation, and hot-reloaded. If they encounter a problem they can, and should, just throw an exception. The process will take care of it from there.
17+
1. Define ordinary, often pure, data->data functions that the processes will run in their inner loop to do the *computational* part of processing messages (aka 'step' functions). These functions do not handle channels or threads or lifecycle, and do not even know they are running in a flow. They can be tested in isolation, and hot-reloaded. If they encounter a problem they can, and should, just throw an exception. The process will take care of it from there.
1718

1819
2. Define a flow by creating a data structure that enumerates the processes and the connections between their inputs and outputs, as well as various configuration settings for both.
1920

21+
<a href="https://github.com/clojure/core.async/blob/master/doc/img/flow-concerns.png?raw=true"><img src="https://github.com/clojure/core.async/blob/master/doc/img/flow-concerns.png?raw=true" alt="core.async.flow concerns" width="700"/></a>
22+
2023
With these application inputs, c.a.flow does the rest. It inquires of the processes what channels they require, creates those channels, then instantiates the processes making all of the channel connections between them. The processes in turn start threads (in fully user-configurable thread pools), await inputs, monitor the admin control channel, and when inputs arrive make data->data calls to your application logic, taking the return from that and sending it to the designated output channels. The processes follow a protocol used by the flow to do lifecycle management and error handling.
2124

2225
Once you've created a flow, the API provides functions to start/stop(shutdown) the flow, and to pause/resume both the flow and individual processes, to ping processes to get their state and that of their connected channels, to inject data into any point in the graph etc. The flow provides channels containing the ordinary monitoring/reporting stream and, separately, the error stream.

doc/img/flow-concerns.png

212 KB
Loading

doc/img/step-fn-arities.png

82 KB
Loading

doc/intro.md

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)