Skip to content

Commit a583e84

Browse files
committed
No longer exposing new arity in thread-call
1 parent 326dbb5 commit a583e84

File tree

3 files changed

+38
-32
lines changed

3 files changed

+38
-32
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ to catch and handle."
3030
clojure.core.async.impl.go ;; TODO: make conditional
3131
[clojure.core.async.impl.mutex :as mutex]
3232
[clojure.core.async.impl.concurrent :as conc]
33-
)
33+
[clojure.core.async.impl.exec.threadpool :as threadp])
3434
(:import [java.util.concurrent.atomic AtomicLong]
3535
[java.util.concurrent.locks Lock]
3636
[java.util.concurrent Executors Executor ThreadLocalRandom ExecutorService]
@@ -462,45 +462,41 @@ to catch and handle."
462462
[& body]
463463
(#'clojure.core.async.impl.go/go-impl &env body))
464464

465-
(defonce ^ExecutorService mixed-executor
466-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))
467-
468-
(defonce ^ExecutorService io-executor
469-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))
470-
471-
(defonce ^ExecutorService compute-executor
472-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))
465+
(defn- best-fit-thread-call
466+
[f exec]
467+
(let [c (chan 1)
468+
^ExecutorService e (case exec
469+
:compute threadp/compute-executor
470+
:io threadp/io-executor
471+
threadp/mixed-executor)]
472+
(let [binds (Var/getThreadBindingFrame)]
473+
(.execute e
474+
(fn []
475+
(Var/resetThreadBindingFrame binds)
476+
(try
477+
(let [ret (f)]
478+
(when-not (nil? ret)
479+
(>!! c ret)))
480+
(finally
481+
(close! c))))))
482+
c))
473483

474484
(defn thread-call
475485
"Executes f in another thread, returning immediately to the calling
476486
thread. Returns a channel which will receive the result of calling
477-
f when completed, then close."
478-
([f] (thread-call f :mixed))
479-
([f exec]
480-
(let [c (chan 1)
481-
^ExecutorService e (case exec
482-
:compute compute-executor
483-
:io io-executor
484-
mixed-executor)]
485-
(let [binds (Var/getThreadBindingFrame)]
486-
(.execute e
487-
(fn []
488-
(Var/resetThreadBindingFrame binds)
489-
(try
490-
(let [ret (f)]
491-
(when-not (nil? ret)
492-
(>!! c ret)))
493-
(finally
494-
(close! c))))))
495-
c)))
487+
f when completed, then close. exec is a keyword that describes the
488+
nature of f's workload, one of :mixed (default) :io or :compute
489+
whereby core.async may be able to choose a best fit thread type."
490+
[f]
491+
(best-fit-thread-call f :mixed))
496492

497493
(defmacro io-thread
498494
"Executes the body in a thread intended for blocking I/O workloads,
499495
returning immediately to the calling thread. The body must not do
500496
extended computation (if so, use 'thread' instead). Returns a channel
501497
which will receive the result of the body when completed, then close."
502498
[& body]
503-
`(thread-call (^:once fn* [] ~@body) :io))
499+
`(#'best-fit-thread-call (^:once fn* [] ~@body) :io))
504500

505501
(defmacro thread
506502
"Executes the body in another thread, returning immediately to the

src/main/clojure/clojure/core/async/impl/concurrent.clj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
;; You must not remove this notice, or any other, from this software.
88

99
(ns ^{:skip-wiki true}
10-
clojure.core.async.impl.concurrent
11-
(:import [java.util.concurrent ThreadFactory]))
10+
clojure.core.async.impl.concurrent
11+
(:import [java.util.concurrent ThreadFactory Executors ExecutorService]
12+
[clojure.lang Var]))
1213

1314
(set! *warn-on-reflection* true)
1415

src/main/clojure/clojure/core/async/impl/exec/threadpool.clj

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
(ns clojure.core.async.impl.exec.threadpool
1010
(:require [clojure.core.async.impl.protocols :as impl]
1111
[clojure.core.async.impl.concurrent :as conc])
12-
(:import [java.util.concurrent Executors]))
12+
(:import [java.util.concurrent Executors ExecutorService]))
1313

1414
(set! *warn-on-reflection* true)
1515

@@ -30,3 +30,12 @@
3030
(reify impl/Executor
3131
(impl/exec [_ r]
3232
(.execute executor-svc ^Runnable r))))))
33+
34+
(defonce ^ExecutorService mixed-executor
35+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))
36+
37+
(defonce ^ExecutorService io-executor
38+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))
39+
40+
(defonce ^ExecutorService compute-executor
41+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))

0 commit comments

Comments
 (0)