Back
future-cancel (clj)
(source)function
(future-cancel f)
Cancels the future, if possible.
Examples
typedclojure/typedclojure
(ns ^:no-doc typed.ann.clojure
"Type annotations for the base Clojure distribution."
#?(:cljs (:require-macros [typed.ann-macros.clojure :as macros]))
(:require [clojure.core :as cc]
[typed.clojure :as t]
#?(:clj [typed.ann-macros.clojure :as macros])
#?(:clj typed.ann.clojure.jvm) ;; jvm annotations
#?(:clj clojure.core.typed))
#?(:clj
(:import (clojure.lang PersistentHashSet PersistentList
APersistentMap #_IPersistentCollection
#_ITransientSet
IRef)
(java.util Comparator Collection))))
#?@(:cljs [] :default [
cc/future-cancelled? [java.util.concurrent.Future :-> t/Bool]
cc/future-cancel [java.util.concurrent.Future :-> t/Any]
cc/future? (t/Pred java.util.concurrent.Future)
cc/future-done? [java.util.concurrent.Future :-> t/Bool]
])
ReactiveX/RxClojure
(ns rx.lang.clojure.future-test
(:require [rx.lang.clojure.core :as rx]
[rx.lang.clojure.blocking :as b]
[rx.lang.clojure.future :as f])
(:require [clojure.test :refer [deftest testing is]]))
(deftest test-future-cancel
(let [exited? (atom nil)
o (f/future* future-call
(fn [] (Thread/sleep 1000)
(reset! exited? true)
"WAT"))
result (->> o
(rx/take 0)
(b/into []))]
(Thread/sleep 2000)
(is (= [nil []]
[@exited? result]))))
bilus/pipes
(ns pipes.core-test
(:require
[pipes.core :refer :all]
[pipes.job :refer :all]
[pipes.test-helpers :refer :all]
[clojure.core.async :refer [alts!! timeout chan >!! go-loop go >! <! close!]]
[clojure.test :refer [are is deftest with-test run-tests testing]]
[clojure.java.io :as io]
[pipes.exceptions :as ex])
(:import
[java.lang RuntimeException]
[java.io ByteArrayInputStream
ByteArrayOutputStream FileNotFoundException]))
(deftest non-blocking-pipeline-test
(testing "success path"
(testing "pipes input to output"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)]
(is (= (.toString out) "foo bar"))))
(testing "invokes success callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel))]
(is (matching? [:success :finally] (take-all! result-ch)))))
(testing "gives each job a chance to finish"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(Thread/sleep 100)
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)]
(is (= (.toString out) "foo bar")))))
(testing "deref"
(testing "waits for all to complete"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(Thread/sleep 50)
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)
result (.toString out)]
(is (= result "foo bar")))))
(testing "cancel"
(testing "cancels each job"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out)]
(pipe)
(is (matching? [:job1 :job2] (take-all! cancellations)))))
(testing "invokes cancel callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(future-cancel slow-future)
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel))]
(pipe)
(ex/ignore-any-exception
@pipe)
(is (matching? [:cancel :finally] (take-all! result-ch))))))
(testing "exceptions"
(testing "invokes error callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
failing-job (fn [in out]
(job
(throw (RuntimeException. "An error"))))
pipe (->pipe-> in
[identity-job
failing-job]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel)
:error (fn [_] (>!! result-ch :error)))]
(ex/ignore-any-exception
@pipe)
(is (matching? [:error :finally] (take-all! result-ch)))))
(testing "invokes error callback with first exception thrown"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
failing-job (fn [throw-after msg]
(fn [in out]
(job
(Thread/sleep throw-after)
(throw (RuntimeException. msg)))))
pipe (->pipe-> in
[(failing-job 1000 "Error from first job")
(failing-job 1 "Error from second job")]
out
:error (fn [e] (>!! result-ch (.getMessage e))))]
(try @pipe (catch RuntimeException _))
(is (matching? ["Error from second job"] (take-all! result-ch)))))
(testing "throws first exception on deref"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
failing-job (fn [throw-after msg]
(fn [in out]
(job
(Thread/sleep throw-after)
(throw (RuntimeException. msg)))))
pipe (->pipe-> in
[(failing-job 1000 "Error from first job")
(failing-job 1 "Error from second job")]
out)]
(is (thrown-with-msg? RuntimeException #"Error from second job" @pipe))))
(testing "cancels blocked jobs"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 1000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
failing-job (fn [in out]
(job
(throw (RuntimeException. "An error"))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)
failing-job]
out)] ; Make test complete faster.
(try @pipe (catch RuntimeException _))
(is (matching? [:job1 :job2] (take-all! cancellations)))))
(testing "no output stream"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
aux-out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
saving-job (fn [in _out]
(job
(io/copy in aux-out)))
_pipe @(->pipe in
[identity-job
saving-job])]
(is (= (.toString aux-out) "foo bar"))))
(testing "no input stream"
(let [out (ByteArrayOutputStream.)
producer-job (fn [_ out]
(job
(.write out (.getBytes "foo bar"))))
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(pipe-> [producer-job
identity-job]
out)]
(is (= (.toString out) "foo bar"))))
(testing "support input file streams but no string paths"
(with-open [in (io/input-stream "README.md")
out (ByteArrayOutputStream.)]
(let [identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job]
out)]
(is (= (.toString out) (slurp "README.md"))))))
(testing "support output file streams but no string paths"
(try
(with-open [in (io/input-stream "README.md")
out (io/output-stream "README.md~")]
(let [identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job]
out)]))
(is (= (slurp "README.md") (slurp "README.md~")))
(finally
(.delete (io/file "README.md~")))))
(testing "realized?"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out)]
(is (not (realized? pipe)))
(pipe)
@pipe
(is (realized? pipe))))))