Skip to content

Commit 148ebe2

Browse files
committed
ASYNC-252 Split core.async go runtime from ioc-macros namespace
1 parent 4e0733d commit 148ebe2

File tree

3 files changed

+50
-126
lines changed

3 files changed

+50
-126
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ to catch and handle."
2626
[clojure.core.async.impl.buffers :as buffers]
2727
[clojure.core.async.impl.timers :as timers]
2828
[clojure.core.async.impl.dispatch :as dispatch]
29-
[clojure.core.async.impl.ioc-macros :as ioc]
29+
[clojure.core.async.impl.ioc-macros :as ioc-macros] ;; only for go analyzer
30+
[clojure.core.async.impl.runtime :as ioc]
3031
[clojure.core.async.impl.mutex :as mutex]
3132
[clojure.core.async.impl.concurrent :as conc]
3233
)
@@ -417,7 +418,7 @@ to catch and handle."
417418

418419
(defn ioc-alts! [state cont-block ports & {:as opts}]
419420
(ioc/aset-all! state ioc/STATE-IDX cont-block)
420-
(when-let [cb (clojure.core.async/do-alts
421+
(when-let [cb (do-alts
421422
(fn [val]
422423
(ioc/aset-all! state ioc/VALUE-IDX val)
423424
(ioc/run-state-machine-wrapped state))
@@ -456,18 +457,7 @@ to catch and handle."
456457
Returns a channel which will receive the result of the body when
457458
completed"
458459
[& body]
459-
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
460-
`(let [c# (chan 1)
461-
captured-bindings# (Var/getThreadBindingFrame)]
462-
(dispatch/run
463-
(^:once fn* []
464-
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
465-
f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators)
466-
state# (-> (f#)
467-
(ioc/aset-all! ioc/USER-START-IDX c#
468-
ioc/BINDINGS-IDX captured-bindings#))]
469-
(ioc/run-state-machine-wrapped state#))))
470-
c#)))
460+
(#'clojure.core.async.impl.ioc-macros/go-impl &env body))
471461

472462
(defonce ^:private ^Executor thread-macro-executor
473463
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))

src/main/clojure/clojure/core/async/impl/ioc_macros.clj

Lines changed: 43 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,16 @@
2020
[clojure.tools.analyzer.passes.jvm.warn-on-reflection :refer [warn-on-reflection]]
2121
[clojure.tools.analyzer.jvm :as an-jvm]
2222
[clojure.core.async.impl.protocols :as impl]
23+
[clojure.core.async.impl.dispatch :as dispatch]
24+
[clojure.core.async.impl.runtime :as rt]
2325
[clojure.set :as set])
24-
(:import [java.util.concurrent.locks Lock]
25-
[java.util.concurrent.atomic AtomicReferenceArray]))
26+
(:import [java.util.concurrent.atomic AtomicReferenceArray]
27+
[clojure.lang Var]))
2628

2729
(defn debug [x]
2830
(pprint x)
2931
x)
3032

31-
(def ^{:const true :tag 'long} FN-IDX 0)
32-
(def ^{:const true :tag 'long} STATE-IDX 1)
33-
(def ^{:const true :tag 'long} VALUE-IDX 2)
34-
(def ^{:const true :tag 'long} BINDINGS-IDX 3)
35-
(def ^{:const true :tag 'long} EXCEPTION-FRAMES 4)
36-
(def ^{:const true :tag 'long} USER-START-IDX 5)
37-
38-
(defn aset-object [^AtomicReferenceArray arr ^long idx o]
39-
(.set arr idx o))
40-
41-
(defn aget-object [^AtomicReferenceArray arr ^long idx]
42-
(.get arr idx))
43-
44-
(defmacro aset-all!
45-
[arr & more]
46-
(assert (even? (count more)) "Must give an even number of args to aset-all!")
47-
(let [bindings (partition 2 more)
48-
arr-sym (gensym "statearr-")]
49-
`(let [~arr-sym ~arr]
50-
~@(map
51-
(fn [[idx val]]
52-
`(aset-object ~arr-sym ~idx ~val))
53-
bindings)
54-
~arr-sym)))
55-
5633
;; State monad stuff, used only in SSA construction
5734

5835
(defmacro gen-plan
@@ -217,7 +194,7 @@
217194
IEmittableInstruction
218195
(emit-instruction [this state-sym]
219196
(if (= value ::value)
220-
`[~(:id this) (aget-object ~state-sym ~VALUE-IDX)]
197+
`[~(:id this) (rt/aget-object ~state-sym ~rt/VALUE-IDX)]
221198
`[~(:id this) ~value])))
222199

223200
(defrecord RawCode [ast locals]
@@ -317,10 +294,10 @@
317294
(terminate-block [_this state-sym _]
318295
`(do (case ~val-id
319296
~@(concat (mapcat (fn [test blk]
320-
`[~test (aset-all! ~state-sym ~STATE-IDX ~blk)])
297+
`[~test (rt/aset-all! ~state-sym ~rt/STATE-IDX ~blk)])
321298
test-vals jmp-blocks)
322299
(when default-block
323-
`[(do (aset-all! ~state-sym ~STATE-IDX ~default-block)
300+
`[(do (rt/aset-all! ~state-sym ~rt/STATE-IDX ~default-block)
324301
:recur)])))
325302
:recur)))
326303

@@ -351,7 +328,7 @@
351328
(block-references [_this] [block])
352329
ITerminator
353330
(terminate-block [_this state-sym _]
354-
`(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ~block)
331+
`(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ~block)
355332
:recur)))
356333

357334
(defrecord Return [value]
@@ -364,7 +341,7 @@
364341
(terminate-block [this state-sym custom-terminators]
365342
(if-let [f (get custom-terminators (terminator-code this))]
366343
`(~f ~state-sym ~value)
367-
`(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ::finished)
344+
`(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ::finished)
368345
nil))))
369346

370347
(defrecord CondBr [test then-block else-block]
@@ -375,8 +352,8 @@
375352
ITerminator
376353
(terminate-block [_this state-sym _]
377354
`(do (if ~test
378-
(aset-all! ~state-sym ~STATE-IDX ~then-block)
379-
(aset-all! ~state-sym ~STATE-IDX ~else-block))
355+
(rt/aset-all! ~state-sym ~rt/STATE-IDX ~then-block)
356+
(rt/aset-all! ~state-sym ~rt/STATE-IDX ~else-block))
380357
:recur)))
381358

382359
(defrecord PushTry [catch-block]
@@ -386,7 +363,7 @@
386363
(block-references [_this] [catch-block])
387364
IEmittableInstruction
388365
(emit-instruction [_this state-sym]
389-
`[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (cons ~catch-block (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))
366+
`[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (cons ~catch-block (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))]))
390367

391368
(defrecord PopTry []
392369
IInstruction
@@ -395,7 +372,7 @@
395372
(block-references [_this] [])
396373
IEmittableInstruction
397374
(emit-instruction [_this state-sym]
398-
`[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (rest (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))
375+
`[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (rest (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))]))
399376

400377
(defrecord CatchHandler [catches]
401378
IInstruction
@@ -405,10 +382,10 @@
405382
ITerminator
406383
(terminate-block [_this state-sym _]
407384
(let [ex (gensym 'ex)]
408-
`(let [~ex (aget-object ~state-sym ~VALUE-IDX)]
385+
`(let [~ex (rt/aget-object ~state-sym ~rt/VALUE-IDX)]
409386
(cond
410387
~@(for [[handler-idx type] catches
411-
i [`(instance? ~type ~ex) `(aset-all! ~state-sym ~STATE-IDX ~handler-idx)]]
388+
i [`(instance? ~type ~ex) `(rt/aset-all! ~state-sym ~rt/STATE-IDX ~handler-idx)]]
412389
i)
413390
:else (throw ~ex))
414391
:recur))))
@@ -888,7 +865,7 @@
888865
(if (empty? args)
889866
[]
890867
(mapcat (fn [sym]
891-
`[~sym (aget-object ~state-sym ~(id-for-inst local-map sym))])
868+
`[~sym (rt/aget-object ~state-sym ~(id-for-inst local-map sym))])
892869
args))))
893870

894871
(defn- build-block-body [state-sym blk]
@@ -905,27 +882,27 @@
905882
blk)
906883
results (interleave (map (partial id-for-inst local-map) results) results)]
907884
(if-not (empty? results)
908-
[state-sym `(aset-all! ~state-sym ~@results)]
885+
[state-sym `(rt/aset-all! ~state-sym ~@results)]
909886
[])))
910887

911888
(defn- emit-state-machine [machine num-user-params custom-terminators]
912889
(let [index (index-state-machine machine)
913890
state-sym (with-meta (gensym "state_")
914891
{:tag 'objects})
915-
local-start-idx (+ num-user-params USER-START-IDX)
892+
local-start-idx (+ num-user-params rt/USER-START-IDX)
916893
state-arr-size (+ local-start-idx (count-persistent-values index))
917894
local-map (atom {::next-idx local-start-idx})
918895
block-catches (:block-catches machine)]
919896
`(fn state-machine#
920-
([] (aset-all! (AtomicReferenceArray. ~state-arr-size)
921-
~FN-IDX state-machine#
922-
~STATE-IDX ~(:start-block machine)))
897+
([] (rt/aset-all! (AtomicReferenceArray. ~state-arr-size)
898+
~rt/FN-IDX state-machine#
899+
~rt/STATE-IDX ~(:start-block machine)))
923900
([~state-sym]
924901
(let [old-frame# (clojure.lang.Var/getThreadBindingFrame)
925902
ret-value# (try
926-
(clojure.lang.Var/resetThreadBindingFrame (aget-object ~state-sym ~BINDINGS-IDX))
903+
(clojure.lang.Var/resetThreadBindingFrame (rt/aget-object ~state-sym ~rt/BINDINGS-IDX))
927904
(loop []
928-
(let [result# (case (int (aget-object ~state-sym ~STATE-IDX))
905+
(let [result# (case (int (rt/aget-object ~state-sym ~rt/STATE-IDX))
929906
~@(mapcat
930907
(fn [[id blk]]
931908
[id `(let [~@(concat (build-block-preamble local-map index state-sym blk)
@@ -937,77 +914,18 @@
937914
(recur)
938915
result#)))
939916
(catch Throwable ex#
940-
(aset-all! ~state-sym ~VALUE-IDX ex#)
941-
(if (seq (aget-object ~state-sym ~EXCEPTION-FRAMES))
942-
(aset-all! ~state-sym ~STATE-IDX (first (aget-object ~state-sym ~EXCEPTION-FRAMES)))
917+
(rt/aset-all! ~state-sym ~rt/VALUE-IDX ex#)
918+
(if (seq (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES))
919+
(rt/aset-all! ~state-sym ~rt/STATE-IDX (first (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))
943920
(throw ex#))
944921
:recur)
945922
(finally
946-
(aset-object ~state-sym ~BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame))
923+
(rt/aset-object ~state-sym ~rt/BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame))
947924
(clojure.lang.Var/resetThreadBindingFrame old-frame#)))]
948925
(if (identical? ret-value# :recur)
949926
(recur ~state-sym)
950927
ret-value#))))))
951928

952-
(defn finished?
953-
"Returns true if the machine is in a finished state"
954-
[state-array]
955-
(identical? (aget-object state-array STATE-IDX) ::finished))
956-
957-
(defn- fn-handler
958-
[f]
959-
(reify
960-
Lock
961-
(lock [_])
962-
(unlock [_])
963-
964-
impl/Handler
965-
(active? [_] true)
966-
(blockable? [_] true)
967-
(lock-id [_] 0)
968-
(commit [_] f)))
969-
970-
971-
(defn run-state-machine [state]
972-
((aget-object state FN-IDX) state))
973-
974-
(defn run-state-machine-wrapped [state]
975-
(try
976-
(run-state-machine state)
977-
(catch Throwable ex
978-
(impl/close! (aget-object state USER-START-IDX))
979-
(throw ex))))
980-
981-
(defn take! [state blk c]
982-
(if-let [cb (impl/take! c (fn-handler
983-
(fn [x]
984-
(aset-all! state VALUE-IDX x STATE-IDX blk)
985-
(run-state-machine-wrapped state))))]
986-
(do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
987-
:recur)
988-
nil))
989-
990-
(defn put! [state blk c val]
991-
(if-let [cb (impl/put! c val (fn-handler (fn [ret-val]
992-
(aset-all! state VALUE-IDX ret-val STATE-IDX blk)
993-
(run-state-machine-wrapped state))))]
994-
(do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
995-
:recur)
996-
nil))
997-
998-
(defn return-chan [state value]
999-
(let [c (aget-object state USER-START-IDX)]
1000-
(when-not (nil? value)
1001-
(impl/put! c value (fn-handler (fn [_] nil))))
1002-
(impl/close! c)
1003-
c))
1004-
1005-
(def async-custom-terminators
1006-
{'clojure.core.async/<! `take!
1007-
'clojure.core.async/>! `put!
1008-
'clojure.core.async/alts! 'clojure.core.async/ioc-alts!
1009-
:Return `return-chan})
1010-
1011929
(defn mark-transitions
1012930
{:pass-info {:walk :post :depends #{} :after an-jvm/default-passes}}
1013931
[{:keys [op fn] :as ast}]
@@ -1110,3 +1028,19 @@
11101028
(parse-to-state-machine user-transitions)
11111029
second
11121030
(emit-state-machine num-user-params user-transitions))))
1031+
1032+
(defn go-impl
1033+
[env body]
1034+
(let [crossing-env (zipmap (keys env) (repeatedly gensym))]
1035+
`(let [c# (clojure.core.async/chan 1)
1036+
captured-bindings# (Var/getThreadBindingFrame)]
1037+
(dispatch/run
1038+
(^:once fn* []
1039+
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
1040+
f# ~(state-machine
1041+
`(do ~@body) 1 [crossing-env env] rt/async-custom-terminators)
1042+
state# (-> (f#)
1043+
(rt/aset-all! rt/USER-START-IDX c#
1044+
rt/BINDINGS-IDX captured-bindings#))]
1045+
(rt/run-state-machine-wrapped state#))))
1046+
c#)))

src/test/clojure/clojure/core/async/ioc_macros_test.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(ns clojure.core.async.ioc-macros-test
22
(:refer-clojure :exclude [map into reduce transduce merge take partition
33
partition-by])
4-
(:require [clojure.core.async.impl.ioc-macros :as ioc]
4+
(:require [clojure.core.async.impl.runtime :as ioc]
55
[clojure.core.async :refer :all :as async]
66
[clojure.set :as set]
77
[clojure.test :refer :all])
@@ -24,7 +24,7 @@
2424
crossing-env (zipmap (keys &env) (repeatedly gensym))]
2525
`(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame)
2626
~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~l)]) crossing-env)
27-
state# (~(ioc/state-machine `(do ~@body) 0 [crossing-env &env] terminators))]
27+
state# (~(clojure.core.async.impl.ioc-macros/state-machine `(do ~@body) 0 [crossing-env &env] terminators))]
2828
(ioc/aset-all! state# ~ioc/BINDINGS-IDX captured-bindings#)
2929
(ioc/run-state-machine state#)
3030
(ioc/aget-object state# ioc/VALUE-IDX))))
@@ -553,7 +553,7 @@
553553
crossing-env (zipmap (keys &env) (repeatedly gensym))]
554554
`(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame)
555555
~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
556-
state# (~(ioc/state-machine
556+
state# (~(clojure.core.async.impl.ioc-macros/state-machine
557557
`(do ~@body)
558558
0
559559
[crossing-env &env]

0 commit comments

Comments
 (0)