diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj index 37ee049e..8486eaaa 100644 --- a/src/main/clojure/clojure/core/async/flow/impl.clj +++ b/src/main/clojure/clojure/core/async/flow/impl.clj @@ -14,7 +14,7 @@ [clojure.core.async.impl.dispatch :as disp] [clojure.walk :as walk] [clojure.datafy :as datafy]) - (:import [java.util.concurrent Future Executors Executor TimeUnit] + (:import [java.util.concurrent Future Executor TimeUnit CompletableFuture] [java.util.concurrent.locks ReentrantLock])) (set! *warn-on-reflection* true) @@ -30,8 +30,10 @@ (fn [& args] (let [^Executor e (if (instance? Executor exec) exec - (disp/executor-for exec))] - (.execute e ^Runnable #(apply f args))))) + (disp/executor-for exec)) + fut (CompletableFuture.)] + (.execute e #(.complete fut (apply f args))) + fut))) (defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}] (let [{:keys [ins outs signal-select]} (spi/describe proc) diff --git a/src/test/clojure/clojure/core/async/flow_test.clj b/src/test/clojure/clojure/core/async/flow_test.clj new file mode 100644 index 00000000..d215258a --- /dev/null +++ b/src/test/clojure/clojure/core/async/flow_test.clj @@ -0,0 +1,24 @@ +;; Copyright (c) Rich Hickey and contributors. All rights reserved. +;; The use and distribution terms for this software are covered by the +;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) +;; which can be found in the file epl-v10.html at the root of this distribution. +;; By using this software in any fashion, you are agreeing to be bound by +;; the terms of this license. +;; You must not remove this notice, or any other, from this software. + +(ns clojure.core.async.flow-test + (:require [clojure.test :refer :all] + [clojure.core.async.flow :as flow])) + +(deftest test-futurize + (testing "" + (let [in-es? (atom false) + es (reify java.util.concurrent.Executor + (^void execute [_ ^Runnable f] + (reset! in-es? true) + (future-call f)))] + (is (= 16 @((flow/futurize #(* % %) {:exec :mixed}) 4))) + (is (= 16 @((flow/futurize #(* % %)) 4))) + (is (= 16 @((flow/futurize #(* % %) {:exec es}) 4))) + (is @in-es?)))) +