Public Vars

Back

realized? (clj)

(source)

function

(realized? x)
Returns true if a value has been produced for a promise, delay, future or lazy sequence.

Examples

clj-commons/manifold
(ns manifold.stream.async
  {:no-doc true}
  (:require
    [manifold.deferred :as d]
    [clojure.core.async :as a]
    [manifold.stream
     [graph :as g]
     [core :as s]]
    [manifold
     [executor :as executor]
     [utils :as utils]])
  (:import
    [java.util.concurrent.atomic
     AtomicReference]))

      (let [d  (d/deferred)
            d' (.getAndSet last-take d)
            f  (fn [_]
                 (a/take! ch
                          (fn [msg]
                            (d/success! d
                                        (if (nil? msg)
                                          (do
                                            (.markDrained this)
                                            default-val)
                                          msg)))))]
        (if (d/realized? d')
          (f nil)
          (d/on-realized d' f f))
        d)))

          ;; if I don't take this out of the goroutine, core.async OOMs on compilation
          mark-drained #(.markDrained this)
          f            (fn [_]
                         (a/go
                           (let [result (a/alt!
                                          ch ([x] (if (nil? x)
                                                    (do
                                                      (mark-drained)
                                                      default-val)
                                                    x))
                                          (a/timeout timeout) timeout-val
                                          :priority true)]
                             (d/success! d result))))]
      (if (d/realized? d')
        (f nil)
        (d/on-realized d' f f))
      (if blocking?
        @d
        d))))

        :else
        (let [d  (d/deferred)
              d' (.getAndSet last-put d)
              f  (fn [_]
                   (a/go
                     (d/success! d
                                 (boolean
                                   (a/>! ch x)))))]
          (if (d/realized? d')
            (f nil)
            (d/on-realized d' f f))
          d))))

        (let [d  (d/deferred)
              d' (.getAndSet last-put d)
              f  (fn [_]
                   (a/go
                     (let [result (a/alt!
                                    [[ch x]] true
                                    (a/timeout timeout) timeout-val
                                    :priority true)]
                       (d/success! d result))))]
          (if (d/realized? d')
            (f nil)
            (d/on-realized d' f f))
          (if blocking?
            @d
            d))))))
typedclojure/typedclojure
(ns ^:no-doc typed.ann.clojure
  "Type annotations for the base Clojure distribution."
  #?(:cljs (:require-macros [typed.ann-macros.clojure :as macros]))
  (:require [clojure.core :as cc]
            [typed.clojure :as t]
            #?(:clj [typed.ann-macros.clojure :as macros])
            #?(:clj typed.ann.clojure.jvm) ;; jvm annotations
            #?(:clj clojure.core.typed))
  #?(:clj
     (:import (clojure.lang PersistentHashSet PersistentList
                            APersistentMap #_IPersistentCollection
                            #_ITransientSet
                            IRef)
              (java.util Comparator Collection))))

cc/realized? [#?(:clj clojure.lang.IPending
                 :cljs cljs.core/IPending) :-> t/Bool]
bilus/pipes
(ns pipes.job-test
  (use clojure.test
       pipes.job
       pipes.test-helpers)
  (:require [clojure.core.async :refer [chan >!!]]
            [pipes.exceptions :as ex]))

(deftest compose-jobs-test
  (testing "fix nested composed jobs hanging on exception"
    (let [j (compose-jobs
              [(compose-jobs
                 [(job (throw (RuntimeException. "THIS IS A TEST")))])])]
      (Thread/sleep 100)
      (is (realized? j))))

  (testing "wait for more than one job"
    (let [j (compose-jobs
              [(compose-jobs
                 [(job) (job)])])]
      (Thread/sleep 100)
      (is (realized? j))))
  (testing "invokes cancel callback and finally callback"
    (let [result-ch (chan 1024)
          pipe (compose-jobs
                 [(job (Thread/sleep 10000))]
                         :success #(>!! result-ch :success)
                         :finally #(>!! result-ch :finally)
                         :cancel #(>!! result-ch :cancel))]
      (pipe)
      (ex/ignore-any-exception
        @pipe)
      (is (matching? [:cancel :finally] (take-all! result-ch))))))
bilus/pipes
(ns pipes.core-test
  (:require
    [pipes.core :refer :all]
    [pipes.job :refer :all]
    [pipes.test-helpers :refer :all]
    [clojure.core.async :refer [alts!! timeout chan >!! go-loop go >! <! close!]]
    [clojure.test :refer [are is deftest with-test run-tests testing]]
    [clojure.java.io :as io]
    [pipes.exceptions :as ex])
  (:import
    [java.lang RuntimeException]
    [java.io ByteArrayInputStream
             ByteArrayOutputStream FileNotFoundException]))

(deftest non-blocking-pipeline-test
  (testing "success path"
    (testing "pipes input to output"
      (let [in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (io/copy in out)))
            _pipe @(->pipe-> in
                             [identity-job
                              identity-job]
                             out)]
        (is (= (.toString out) "foo bar"))))
    (testing "invokes success callback and finally callback"
      (let [result-ch (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (io/copy in out)))
            _pipe @(->pipe-> in
                             [identity-job
                              identity-job]
                             out
                             :success #(>!! result-ch :success)
                             :finally #(>!! result-ch :finally)
                             :cancel #(>!! result-ch :cancel))]
        (is (matching? [:success :finally] (take-all! result-ch)))))
    (testing "gives each job a chance to finish"
      (let [in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (Thread/sleep 100)
                             (io/copy in out)))
            _pipe @(->pipe-> in
                             [identity-job
                              identity-job]
                             out)]
        (is (= (.toString out) "foo bar")))))
  (testing "deref"
    (testing "waits for all to complete"
      (let [in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (Thread/sleep 50)
                             (io/copy in out)))
            _pipe @(->pipe-> in
                             [identity-job
                              identity-job]
                             out)
            result (.toString out)]
        (is (= result "foo bar")))))
  (testing "cancel"
    (testing "cancels each job"
      (let [cancellations (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            slow-job (fn [job-id]
                       (fn [_in _out]
                         (let [slow-future (future (Thread/sleep 10000))]
                           (job-ctl
                             {:invoke-fn      #(do (>!! cancellations job-id)
                                                   (future-cancel slow-future))
                              :is-realized-fn #(realized? slow-future)
                              :deref-fn       #(deref slow-future)}))))
            pipe (->pipe-> in
                           [(slow-job :job1)
                            (slow-job :job2)]
                           out)]
        (pipe)
        (is (matching? [:job1 :job2] (take-all! cancellations)))))
    (testing "invokes cancel callback and finally callback"
      (let [result-ch (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            slow-job (fn [job-id]
                       (fn [_in _out]
                         (let [slow-future (future (Thread/sleep 10000))]
                           (job-ctl
                             {:invoke-fn      #(future-cancel slow-future)
                              :is-realized-fn #(realized? slow-future)
                              :deref-fn       #(deref slow-future)}))))
            pipe (->pipe-> in
                           [(slow-job :job1)
                            (slow-job :job2)]
                           out
                           :success #(>!! result-ch :success)
                           :finally #(>!! result-ch :finally)
                           :cancel #(>!! result-ch :cancel))]
        (pipe)
        (ex/ignore-any-exception
          @pipe)
        (is (matching? [:cancel :finally] (take-all! result-ch))))))
  (testing "exceptions"
    (testing "invokes error callback and finally callback"
      (let [result-ch (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (io/copy in out)))
            failing-job (fn [in out]
                          (job
                            (throw (RuntimeException. "An error"))))
            pipe (->pipe-> in
                           [identity-job
                            failing-job]
                           out
                           :success #(>!! result-ch :success)
                           :finally #(>!! result-ch :finally)
                           :cancel #(>!! result-ch :cancel)
                           :error (fn [_] (>!! result-ch :error)))]
        (ex/ignore-any-exception
          @pipe)
        (is (matching? [:error :finally] (take-all! result-ch)))))
    (testing "invokes error callback with first exception thrown"
      (let [result-ch (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            failing-job (fn [throw-after msg]
                          (fn [in out]
                            (job
                              (Thread/sleep throw-after)
                              (throw (RuntimeException. msg)))))
            pipe (->pipe-> in
                           [(failing-job 1000 "Error from first job")
                            (failing-job 1 "Error from second job")]
                           out
                           :error (fn [e] (>!! result-ch (.getMessage e))))]
        (try @pipe (catch RuntimeException _))
        (is (matching? ["Error from second job"] (take-all! result-ch)))))
    (testing "throws first exception on deref"
      (let [in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            failing-job (fn [throw-after msg]
                          (fn [in out]
                            (job
                              (Thread/sleep throw-after)
                              (throw (RuntimeException. msg)))))
            pipe (->pipe-> in
                           [(failing-job 1000 "Error from first job")
                            (failing-job 1 "Error from second job")]
                           out)]
        (is (thrown-with-msg? RuntimeException #"Error from second job" @pipe))))
    (testing "cancels blocked jobs"
      (let [cancellations (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            slow-job (fn [job-id]
                       (fn [_in _out]
                         (let [slow-future (future (Thread/sleep 1000))]
                           (job-ctl
                             {:invoke-fn      #(do (>!! cancellations job-id)
                                                   (future-cancel slow-future))
                              :is-realized-fn #(realized? slow-future)
                              :deref-fn       #(deref slow-future)}))))
            failing-job (fn [in out]
                          (job
                            (throw (RuntimeException. "An error"))))
            pipe (->pipe-> in
                           [(slow-job :job1)
                            (slow-job :job2)
                            failing-job]
                           out)]               ; Make test complete faster.
        (try @pipe (catch RuntimeException _))
        (is (matching? [:job1 :job2] (take-all! cancellations)))))
    (testing "no output stream"
      (let [in (ByteArrayInputStream. (.getBytes "foo bar"))
            aux-out (ByteArrayOutputStream.)
            identity-job (fn [in out]
                           (job
                             (io/copy in out)))
            saving-job (fn [in _out]
                         (job
                           (io/copy in aux-out)))
            _pipe @(->pipe in
                           [identity-job
                            saving-job])]
        (is (= (.toString aux-out) "foo bar"))))
    (testing "no input stream"
      (let [out (ByteArrayOutputStream.)
            producer-job (fn [_ out]
                           (job
                             (.write out (.getBytes "foo bar"))))
            identity-job (fn [in out]
                           (job
                             (io/copy in out)))
            _pipe @(pipe-> [producer-job
                            identity-job]
                           out)]
        (is (= (.toString out) "foo bar"))))
    (testing "support input file streams but no string paths"
      (with-open [in (io/input-stream "README.md")
                  out (ByteArrayOutputStream.)]
        (let [identity-job (fn [in out]
                             (job
                               (io/copy in out)))
              _pipe @(->pipe-> in
                               [identity-job]
                               out)]
          (is (= (.toString out) (slurp "README.md"))))))
    (testing "support output file streams but no string paths"
      (try
        (with-open [in (io/input-stream "README.md")
                    out (io/output-stream "README.md~")]
          (let [identity-job (fn [in out]
                               (job
                                 (io/copy in out)))
                _pipe @(->pipe-> in
                                 [identity-job]
                                 out)]))
        (is (= (slurp "README.md") (slurp "README.md~")))
        (finally
          (.delete (io/file "README.md~")))))
    (testing "realized?"
      (let [cancellations (chan 1024)
            in (ByteArrayInputStream. (.getBytes "foo bar"))
            out (ByteArrayOutputStream.)
            slow-job (fn [job-id]
                       (fn [_in _out]
                         (let [slow-future (future (Thread/sleep 10000))]
                           (job-ctl
                             {:invoke-fn      #(do (>!! cancellations job-id)
                                                   (future-cancel slow-future))
                              :is-realized-fn #(realized? slow-future)
                              :deref-fn       #(deref slow-future)}))))
            pipe (->pipe-> in
                           [(slow-job :job1)
                            (slow-job :job2)]
                           out)]
        (is (not (realized? pipe)))
        (pipe)
        @pipe
        (is (realized? pipe))))))
finnishtransportagency/harja
(ns ^:integraatio harja.palvelin.integraatiot.tloik.ilmoitukset-test
  (:require [clojure.test :refer [deftest is use-fixtures]]
            [clojure.data.zip.xml :as z]
            [harja.testi :refer :all]
            [harja.integraatio :as integraatio]
            [com.stuartsierra.component :as component]
            [cheshire.core :as cheshire]
            [harja.palvelin.integraatiot.jms :as jms]
            [harja.palvelin.komponentit.itmf :as itmf]
            [harja.tyokalut.xml :as xml]
            [harja.domain.tieliikenneilmoitukset :as ti]
            [harja.palvelin.integraatiot.tloik.tyokalut :refer :all]
            [harja.palvelin.integraatiot.api.ilmoitukset :as api-ilmoitukset]
            [harja.palvelin.integraatiot.api.tyokalut :as api-tyokalut]
            [harja.palvelin.integraatiot.labyrintti.sms :as labyrintti]
            [harja.palvelin.integraatiot.jms.tyokalut :as jms-tk]
            [harja.palvelin.integraatiot.vayla-rest.sahkoposti :as sahkoposti-api]
            [harja.palvelin.integraatiot.tloik.aineistot.toimenpidepyynnot :as aineisto-toimenpidepyynnot]
            [clj-time
             [coerce :as tc]
             [format :as df]]
            [clojure.core.async :as async]
            [clj-time.core :as t])
  (:import (org.postgis PGgeometry)
           (java.util UUID)))

      (odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
      (odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)

       (odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
       (odota-ehdon-tayttymista #(= 1 (count @kuittausviestit-tloikkiin)) "Kuittaus on vastaanotettu." kuittaus-timeout)

        (odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
        (odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)

        (odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
        (odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)

    ;; Ilmoitushausta tehdään future, jotta HTTP long poll on jo käynnissä, kun uusi ilmoitus vastaanotetaan
    (with-redefs [harja.kyselyt.yhteyshenkilot/hae-urakan-tamanhetkiset-paivystajat
                  (fn [db urakka-id] (list {:id 1
                                            :etunimi "Pekka"
                                            :sukunimi "Päivystäjä"
                                            ;; Testi olettaa, että labyrinttiä ei ole mockattu eikä käynnistetty, joten puhelinnumerot on jätetty tyhjäksi
                                            :matkapuhelin nil
                                            :tyopuhelin nil
                                            :sahkoposti "email.email@example.com"
                                            :alku (t/now)
                                            :loppu (t/now)
                                            :vastuuhenkilo true
                                            :varahenkilo true}))]
      (let [urakka-id (hae-oulun-maanteiden-hoitourakan-2019-2024-id)
            ilmoitushaku (future (api-tyokalut/get-kutsu ["/api/urakat/" urakka-id "/ilmoitukset?odotaUusia=true"]
                                   kayttaja portti))
            ilmoitus-id (rand-int 99999999)
            ilmoitus-data {:viesti-id (str (UUID/randomUUID))
                           :ilmoitus-id ilmoitus-id
                           :ilmoittaja-etunimi "Anonyymi"
                           :ilmoittaja-sukunimi "kontakti"
                           :ilmoittaja-email "anonyymi.kontakti@example.com"
                           :ilmoittaja-tyyppi "urakoitsija" ;; Tämän toimivuus testataan tässä
                           :sijainti-xml aineisto-toimenpidepyynnot/sijainti-oulun-alueella}]
        (async/<!! (async/timeout timeout))
        (jms/laheta (:itmf jarjestelma) +tloik-ilmoitusviestijono+ (aineisto-toimenpidepyynnot/toimenpidepyynto-ilmoittaja-sanoma ilmoitus-data))
        (odota-ehdon-tayttymista #(realized? ilmoitushaku) "Saatiin vastaus ilmoitushakuun." kuittaus-timeout)
        (odota-ehdon-tayttymista #(= 1 (count @viestit)) "Kuittaus on vastaanotettu." kuittaus-timeout)