Back
realized? (clj)
(source)function
(realized? x)
Returns true if a value has been produced for a promise, delay, future or lazy sequence.
Examples
clj-commons/manifold
(ns manifold.stream.async
{:no-doc true}
(:require
[manifold.deferred :as d]
[clojure.core.async :as a]
[manifold.stream
[graph :as g]
[core :as s]]
[manifold
[executor :as executor]
[utils :as utils]])
(:import
[java.util.concurrent.atomic
AtomicReference]))
(let [d (d/deferred)
d' (.getAndSet last-take d)
f (fn [_]
(a/take! ch
(fn [msg]
(d/success! d
(if (nil? msg)
(do
(.markDrained this)
default-val)
msg)))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
d)))
;; if I don't take this out of the goroutine, core.async OOMs on compilation
mark-drained #(.markDrained this)
f (fn [_]
(a/go
(let [result (a/alt!
ch ([x] (if (nil? x)
(do
(mark-drained)
default-val)
x))
(a/timeout timeout) timeout-val
:priority true)]
(d/success! d result))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
(if blocking?
@d
d))))
:else
(let [d (d/deferred)
d' (.getAndSet last-put d)
f (fn [_]
(a/go
(d/success! d
(boolean
(a/>! ch x)))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
d))))
(let [d (d/deferred)
d' (.getAndSet last-put d)
f (fn [_]
(a/go
(let [result (a/alt!
[[ch x]] true
(a/timeout timeout) timeout-val
:priority true)]
(d/success! d result))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
(if blocking?
@d
d))))))
typedclojure/typedclojure
(ns ^:no-doc typed.ann.clojure
"Type annotations for the base Clojure distribution."
#?(:cljs (:require-macros [typed.ann-macros.clojure :as macros]))
(:require [clojure.core :as cc]
[typed.clojure :as t]
#?(:clj [typed.ann-macros.clojure :as macros])
#?(:clj typed.ann.clojure.jvm) ;; jvm annotations
#?(:clj clojure.core.typed))
#?(:clj
(:import (clojure.lang PersistentHashSet PersistentList
APersistentMap #_IPersistentCollection
#_ITransientSet
IRef)
(java.util Comparator Collection))))
cc/realized? [#?(:clj clojure.lang.IPending
:cljs cljs.core/IPending) :-> t/Bool]
bilus/pipes
(ns pipes.job-test
(use clojure.test
pipes.job
pipes.test-helpers)
(:require [clojure.core.async :refer [chan >!!]]
[pipes.exceptions :as ex]))
(deftest compose-jobs-test
(testing "fix nested composed jobs hanging on exception"
(let [j (compose-jobs
[(compose-jobs
[(job (throw (RuntimeException. "THIS IS A TEST")))])])]
(Thread/sleep 100)
(is (realized? j))))
(testing "wait for more than one job"
(let [j (compose-jobs
[(compose-jobs
[(job) (job)])])]
(Thread/sleep 100)
(is (realized? j))))
(testing "invokes cancel callback and finally callback"
(let [result-ch (chan 1024)
pipe (compose-jobs
[(job (Thread/sleep 10000))]
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel))]
(pipe)
(ex/ignore-any-exception
@pipe)
(is (matching? [:cancel :finally] (take-all! result-ch))))))
bilus/pipes
(ns pipes.core-test
(:require
[pipes.core :refer :all]
[pipes.job :refer :all]
[pipes.test-helpers :refer :all]
[clojure.core.async :refer [alts!! timeout chan >!! go-loop go >! <! close!]]
[clojure.test :refer [are is deftest with-test run-tests testing]]
[clojure.java.io :as io]
[pipes.exceptions :as ex])
(:import
[java.lang RuntimeException]
[java.io ByteArrayInputStream
ByteArrayOutputStream FileNotFoundException]))
(deftest non-blocking-pipeline-test
(testing "success path"
(testing "pipes input to output"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)]
(is (= (.toString out) "foo bar"))))
(testing "invokes success callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel))]
(is (matching? [:success :finally] (take-all! result-ch)))))
(testing "gives each job a chance to finish"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(Thread/sleep 100)
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)]
(is (= (.toString out) "foo bar")))))
(testing "deref"
(testing "waits for all to complete"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(Thread/sleep 50)
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job
identity-job]
out)
result (.toString out)]
(is (= result "foo bar")))))
(testing "cancel"
(testing "cancels each job"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out)]
(pipe)
(is (matching? [:job1 :job2] (take-all! cancellations)))))
(testing "invokes cancel callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(future-cancel slow-future)
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel))]
(pipe)
(ex/ignore-any-exception
@pipe)
(is (matching? [:cancel :finally] (take-all! result-ch))))))
(testing "exceptions"
(testing "invokes error callback and finally callback"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
failing-job (fn [in out]
(job
(throw (RuntimeException. "An error"))))
pipe (->pipe-> in
[identity-job
failing-job]
out
:success #(>!! result-ch :success)
:finally #(>!! result-ch :finally)
:cancel #(>!! result-ch :cancel)
:error (fn [_] (>!! result-ch :error)))]
(ex/ignore-any-exception
@pipe)
(is (matching? [:error :finally] (take-all! result-ch)))))
(testing "invokes error callback with first exception thrown"
(let [result-ch (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
failing-job (fn [throw-after msg]
(fn [in out]
(job
(Thread/sleep throw-after)
(throw (RuntimeException. msg)))))
pipe (->pipe-> in
[(failing-job 1000 "Error from first job")
(failing-job 1 "Error from second job")]
out
:error (fn [e] (>!! result-ch (.getMessage e))))]
(try @pipe (catch RuntimeException _))
(is (matching? ["Error from second job"] (take-all! result-ch)))))
(testing "throws first exception on deref"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
failing-job (fn [throw-after msg]
(fn [in out]
(job
(Thread/sleep throw-after)
(throw (RuntimeException. msg)))))
pipe (->pipe-> in
[(failing-job 1000 "Error from first job")
(failing-job 1 "Error from second job")]
out)]
(is (thrown-with-msg? RuntimeException #"Error from second job" @pipe))))
(testing "cancels blocked jobs"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 1000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
failing-job (fn [in out]
(job
(throw (RuntimeException. "An error"))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)
failing-job]
out)] ; Make test complete faster.
(try @pipe (catch RuntimeException _))
(is (matching? [:job1 :job2] (take-all! cancellations)))))
(testing "no output stream"
(let [in (ByteArrayInputStream. (.getBytes "foo bar"))
aux-out (ByteArrayOutputStream.)
identity-job (fn [in out]
(job
(io/copy in out)))
saving-job (fn [in _out]
(job
(io/copy in aux-out)))
_pipe @(->pipe in
[identity-job
saving-job])]
(is (= (.toString aux-out) "foo bar"))))
(testing "no input stream"
(let [out (ByteArrayOutputStream.)
producer-job (fn [_ out]
(job
(.write out (.getBytes "foo bar"))))
identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(pipe-> [producer-job
identity-job]
out)]
(is (= (.toString out) "foo bar"))))
(testing "support input file streams but no string paths"
(with-open [in (io/input-stream "README.md")
out (ByteArrayOutputStream.)]
(let [identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job]
out)]
(is (= (.toString out) (slurp "README.md"))))))
(testing "support output file streams but no string paths"
(try
(with-open [in (io/input-stream "README.md")
out (io/output-stream "README.md~")]
(let [identity-job (fn [in out]
(job
(io/copy in out)))
_pipe @(->pipe-> in
[identity-job]
out)]))
(is (= (slurp "README.md") (slurp "README.md~")))
(finally
(.delete (io/file "README.md~")))))
(testing "realized?"
(let [cancellations (chan 1024)
in (ByteArrayInputStream. (.getBytes "foo bar"))
out (ByteArrayOutputStream.)
slow-job (fn [job-id]
(fn [_in _out]
(let [slow-future (future (Thread/sleep 10000))]
(job-ctl
{:invoke-fn #(do (>!! cancellations job-id)
(future-cancel slow-future))
:is-realized-fn #(realized? slow-future)
:deref-fn #(deref slow-future)}))))
pipe (->pipe-> in
[(slow-job :job1)
(slow-job :job2)]
out)]
(is (not (realized? pipe)))
(pipe)
@pipe
(is (realized? pipe))))))
finnishtransportagency/harja
(ns ^:integraatio harja.palvelin.integraatiot.tloik.ilmoitukset-test
(:require [clojure.test :refer [deftest is use-fixtures]]
[clojure.data.zip.xml :as z]
[harja.testi :refer :all]
[harja.integraatio :as integraatio]
[com.stuartsierra.component :as component]
[cheshire.core :as cheshire]
[harja.palvelin.integraatiot.jms :as jms]
[harja.palvelin.komponentit.itmf :as itmf]
[harja.tyokalut.xml :as xml]
[harja.domain.tieliikenneilmoitukset :as ti]
[harja.palvelin.integraatiot.tloik.tyokalut :refer :all]
[harja.palvelin.integraatiot.api.ilmoitukset :as api-ilmoitukset]
[harja.palvelin.integraatiot.api.tyokalut :as api-tyokalut]
[harja.palvelin.integraatiot.labyrintti.sms :as labyrintti]
[harja.palvelin.integraatiot.jms.tyokalut :as jms-tk]
[harja.palvelin.integraatiot.vayla-rest.sahkoposti :as sahkoposti-api]
[harja.palvelin.integraatiot.tloik.aineistot.toimenpidepyynnot :as aineisto-toimenpidepyynnot]
[clj-time
[coerce :as tc]
[format :as df]]
[clojure.core.async :as async]
[clj-time.core :as t])
(:import (org.postgis PGgeometry)
(java.util UUID)))
(odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
(odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)
(odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
(odota-ehdon-tayttymista #(= 1 (count @kuittausviestit-tloikkiin)) "Kuittaus on vastaanotettu." kuittaus-timeout)
(odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
(odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)
(odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
(odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)
;; Ilmoitushausta tehdään future, jotta HTTP long poll on jo käynnissä, kun uusi ilmoitus vastaanotetaan
(with-redefs [harja.kyselyt.yhteyshenkilot/hae-urakan-tamanhetkiset-paivystajat
(fn [db urakka-id] (list {:id 1
:etunimi "Pekka"
:sukunimi "Päivystäjä"
;; Testi olettaa, että labyrinttiä ei ole mockattu eikä käynnistetty, joten puhelinnumerot on jätetty tyhjäksi
:matkapuhelin nil
:tyopuhelin nil
:sahkoposti "email.email@example.com"
:alku (t/now)
:loppu (t/now)
:vastuuhenkilo true
:varahenkilo true}))]
(let [urakka-id (hae-oulun-maanteiden-hoitourakan-2019-2024-id)
ilmoitushaku (future (api-tyokalut/get-kutsu ["/api/urakat/" urakka-id "/ilmoitukset?odotaUusia=true"]
kayttaja portti))
ilmoitus-id (rand-int 99999999)
ilmoitus-data {:viesti-id (str (UUID/randomUUID))
:ilmoitus-id ilmoitus-id
:ilmoittaja-etunimi "Anonyymi"
:ilmoittaja-sukunimi "kontakti"
:ilmoittaja-email "anonyymi.kontakti@example.com"
:ilmoittaja-tyyppi "urakoitsija" ;; Tämän toimivuus testataan tässä
:sijainti-xml aineisto-toimenpidepyynnot/sijainti-oulun-alueella}]
(async/<!! (async/timeout timeout))
(jms/laheta (:itmf jarjestelma) +tloik-ilmoitusviestijono+ (aineisto-toimenpidepyynnot/toimenpidepyynto-ilmoittaja-sanoma ilmoitus-data))
(odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
(odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)