diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index f09649d..63da1f8 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -17,11 +17,11 @@ go block threads - use Thread.setDefaultUncaughtExceptionHandler() to catch and handle. Use the Java system property `clojure.core.async.executor-factory` -to specify a function that will provide ExecutorServices for +to specify a function that will provide an Executor instance for application-wide use by core.async in lieu of its defaults. The property value should name a fully qualified var. The function will be passed a keyword indicating the context of use of the -executor, and should return either an ExecutorService, or nil to +executor, and should return either an Executor, or nil to use the default. Results per keyword will be cached and used for the remainder of the application. Possible context arguments are: @@ -36,38 +36,11 @@ flow/process :core-async-dispatch - used for completion fn handling (e.g. in put! and take!, as well as go block IOC thunk processing) throughout -core.async. If not supplied the ExecutorService for :io will be +core.async. If not supplied, the Executor for :io will be used instead. The set of contexts may grow in the future so the function should -return nil for unexpected contexts. - -Use the Java system property `clojure.core.async.vthreads` to control -how core.async uses JDK 21+ virtual threads. The property can be one of -the following values: - -unset - core.async will opportunistically use vthreads when available -(≥ Java 21) and will otherwise use the old IOC impl. io-thread and :io -thread pool will run on platform threads if vthreads are not available. -If AOT compiling, go blocks will always use IOC so that the resulting -bytecode works on all JVMs (so no change in compiled output) - -\"target\" - means that you are targeting virtual threads. At runtime -from source, go blocks will throw if vthreads are not available. -If AOT compiling, go blocks are always compiled as normal Clojure -code to be run on vthreads and will throw at runtime if vthreads are -not available (Java <21) - -\"avoid\" - means that vthreads will not be used by core.async - you can -use this to minimize impacts if you are not yet ready to utilize vthreads -in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread -and the :io thread pool use platform threads - -Note: existing IOC compiled go blocks from older core.async versions continue -to work (we retain and load the IOC state machine runtime - this does not -require the analyzer), and you can interact with the same channels from both -IOC and vthread code. -" +return nil for unexpected contexts." (:refer-clojure :exclude [reduce transduce into merge map take partition partition-by bounded-count]) (:require [clojure.core.async.impl.protocols :as impl] @@ -516,22 +489,6 @@ IOC and vthread code. (let [ret (impl/take! port (fn-handler nop false))] (when ret @ret))) -(defn- go* [body env] - (cond (and (not dispatch/virtual-threads-available?) - dispatch/target-vthreads? - (not clojure.core/*compile-files*)) - (dispatch/report-vthreads-not-available-error!) - - (or dispatch/target-vthreads? - (and dispatch/unset-vthreads? - dispatch/virtual-threads-available? - (not clojure.core/*compile-files*))) - `(do (dispatch/ensure-runtime-vthreads!) - (thread-call (^:once fn* [] ~@body) :io)) - - :else - ((requiring-resolve 'clojure.core.async.impl.go/go-impl) env body))) - (defmacro go "Asynchronously executes the body, returning immediately to the calling thread. Additionally, any visible calls to ! and alt!/alts! @@ -548,7 +505,7 @@ IOC and vthread code. Returns a channel which will receive the result of the body when completed" [& body] - (go* body &env)) + ((requiring-resolve 'clojure.core.async.impl.go/go-impl) &env body)) (defonce ^:private thread-macro-executor nil) diff --git a/src/main/clojure/clojure/core/async/flow.clj b/src/main/clojure/clojure/core/async/flow.clj index f591f15..d82c2c9 100644 --- a/src/main/clojure/clojure/core/async/flow.clj +++ b/src/main/clojure/clojure/core/async/flow.clj @@ -98,8 +98,8 @@ async/sliding-buffer of size 100, thus signals not handled in a timely manner will be dropped in favor of later arriving signals. - :mixed-exec/:io-exec/:compute-exec -> ExecutorService - These can be used to specify the ExecutorService to use for the + :mixed-exec/:io-exec/:compute-exec -> Executor + These can be used to specify the Executor to use for the corresonding workload, in lieu of the lib defaults. N.B. The flow is not started. See 'start'" @@ -334,7 +334,7 @@ futurize accepts kwarg options: :exec - one of the workloads :mixed, :io, :compute - or a j.u.c.ExecutorService object, + or a j.u.c.Executor object, default :mixed" [f & {:keys [exec] :or {exec :mixed} :as opts}] diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj index 92898dd..37ee049 100644 --- a/src/main/clojure/clojure/core/async/flow/impl.clj +++ b/src/main/clojure/clojure/core/async/flow/impl.clj @@ -14,7 +14,7 @@ [clojure.core.async.impl.dispatch :as disp] [clojure.walk :as walk] [clojure.datafy :as datafy]) - (:import [java.util.concurrent Future Executors ExecutorService TimeUnit] + (:import [java.util.concurrent Future Executors Executor TimeUnit] [java.util.concurrent.locks ReentrantLock])) (set! *warn-on-reflection* true) @@ -22,16 +22,16 @@ (defn datafy [x] (condp instance? x clojure.lang.Fn (-> x str symbol) - ExecutorService (str x) + Executor (str x) clojure.lang.Var (symbol x) (datafy/datafy x))) (defn futurize [f {:keys [exec]}] (fn [& args] - (let [^ExecutorService e (if (instance? ExecutorService exec) - exec - (disp/executor-for exec))] - (.submit e ^Callable #(apply f args))))) + (let [^Executor e (if (instance? Executor exec) + exec + (disp/executor-for exec))] + (.execute e ^Runnable #(apply f args))))) (defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}] (let [{:keys [ins outs signal-select]} (spi/describe proc) @@ -51,8 +51,8 @@ (let [lock (ReentrantLock.) chans (atom nil) execs {:mixed mixed-exec :io io-exec :compute compute-exec} - _ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs)) - "mixed-exe, io-exec and compute-exec must be ExecutorServices") + _ (assert (every? #(or (nil? %) (instance? Executor %)) (vals execs)) + "mixed-exe, io-exec and compute-exec must be Executors") pdescs (reduce-kv prep-proc {} procs) allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs)))) inopts (allopts :ins) diff --git a/src/main/clojure/clojure/core/async/flow/spi.clj b/src/main/clojure/clojure/core/async/flow/spi.clj index e86a66a..b7635a3 100644 --- a/src/main/clojure/clojure/core/async/flow/spi.clj +++ b/src/main/clojure/clojure/core/async/flow/spi.clj @@ -82,7 +82,7 @@ N.B. outputs may be nil if not connected :resolver - an impl of spi/Resolver, which can be used to find channels given their logical [pid cid] coordinates, as well as to - obtain ExecutorServices corresponding to the + obtain Executors corresponding to the logical :mixed/:io/:compute contexts")) (defprotocol Resolver @@ -91,5 +91,5 @@ write to or nil (in which case the output should be dropped, e.g. nothing is connected).") (get-exec [_ context] - "returns the ExecutorService for the given context, one + "returns the Executor for the given context, one of :mixed, :io, :compute")) diff --git a/src/main/clojure/clojure/core/async/impl/dispatch.clj b/src/main/clojure/clojure/core/async/impl/dispatch.clj index 2117de5..6e64f30 100644 --- a/src/main/clojure/clojure/core/async/impl/dispatch.clj +++ b/src/main/clojure/clojure/core/async/impl/dispatch.clj @@ -8,7 +8,7 @@ (ns ^{:skip-wiki true} clojure.core.async.impl.dispatch - (:import [java.util.concurrent Executors ExecutorService ThreadFactory])) + (:import [java.util.concurrent Executors Executor ThreadFactory])) (set! *warn-on-reflection* true) @@ -84,21 +84,6 @@ (catch ClassNotFoundException _ false))) -(defn- vthreads-directive - "Retrieves the value of the sysprop clojure.core.async.vthreads." - [] - (System/getProperty "clojure.core.async.vthreads")) - -(def target-vthreads? - (= (vthreads-directive) "target")) - -(def unset-vthreads? - (nil? (vthreads-directive))) - -(def vthreads-available-and-allowed? - (and virtual-threads-available? - (not= (vthreads-directive) "avoid"))) - (def ^:private virtual-thread? (if virtual-threads-available? (eval `(fn [^Thread t#] (~'.isVirtual t#))) @@ -108,20 +93,13 @@ (and virtual-threads-available? (virtual-thread? (Thread/currentThread)))) -(defn report-vthreads-not-available-error! [] - (throw (ex-info "Code compiled to target virtual threads, but is running without vthread support." - {:runtime-jvm-version (System/getProperty "java.version") - :vthreads-directive (vthreads-directive)}))) - -(defn ensure-runtime-vthreads! [] - (when (not vthreads-available-and-allowed?) - (report-vthreads-not-available-error!))) - (defn- make-io-executor [] - (if vthreads-available-and-allowed? - (-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0)) - (.invoke nil (make-array Class 0))) + (if virtual-threads-available? + (let [svt (.getDeclaredMethod Thread "startVirtualThread" (into-array Class [Runnable]))] + (reify Executor + (execute [_ r] + (.invoke svt nil (object-array [r]))))) (make-ctp-named :io))) (defn ^:private create-default-executor @@ -132,23 +110,23 @@ :mixed (make-ctp-named :mixed))) (def executor-for - "Given a workload tag, returns an ExecutorService instance and memoizes the result. By + "Given a workload tag, returns an Executor instance and memoizes the result. By default, core.async will defer to a user factory (if provided via sys prop) or construct - a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When + a specialized Executor instance for each tag :io, :compute, and :mixed. When given the tag :core-async-dispatch it will default to the executor service for :io." (memoize - (fn ^ExecutorService [workload] + (fn ^Executor [workload] (let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")] (requiring-resolve (symbol esf))) sp-exec (and sysprop-factory (sysprop-factory workload))] (or sp-exec (if (= workload :core-async-dispatch) - (executor-for :io) + (executor-for :mixed) (create-default-executor workload))))))) (defn exec [^Runnable r workload] - (let [^ExecutorService e (executor-for workload)] + (let [^Executor e (executor-for workload)] (.execute e r))) (defn run