with-open (clj)
(with-open bindings & body)
bindings => [name init ...]
Evaluates body in a try expression with names bound to the values
of the inits, and a finally clause that calls (.close name) on each
name in reverse order.
(ns metabase.async.util-test
[clojure.core.async :as a]
[clojure.test :refer :all]
[metabase.async.util :as async.u]
[metabase.test.util.async :as tu.async]))
(deftest promise-pipe-test
(testing "make sure `single-value-pipe` pipes a value from in-chan to out-chan"
(tu.async/with-open-channels [in-chan (a/promise-chan)
out-chan (a/promise-chan)]
(async.u/promise-pipe in-chan out-chan)
(a/>!! in-chan ::value)
(is (= ::value
(first (a/alts!! [out-chan (a/timeout 1000)]))))))
(testing "`promise-pipe` should close input-chan if output-chan is closed"
(tu.async/with-open-channels [in-chan (a/promise-chan)
out-chan (a/promise-chan)]
(async.u/promise-pipe in-chan out-chan)
(a/close! out-chan)
(is (= nil
(tu.async/wait-for-result in-chan 100)))))
(testing "`promise-pipe` should close output-chan if input-chan is closed"
(tu.async/with-open-channels [in-chan (a/promise-chan)
out-chan (a/promise-chan)]
(async.u/promise-pipe in-chan out-chan)
(a/close! in-chan)
(is (= nil
(tu.async/wait-for-result out-chan 100)))))
(testing "if you are a knucklehead and write directly to out-chan it should close `in-chan`"
(tu.async/with-open-channels [in-chan (a/promise-chan)
out-chan (a/promise-chan)]
(async.u/promise-pipe in-chan out-chan)
(a/>!! out-chan "Oops")
(let [timeout-chan (a/timeout 1000)
[val port] (a/alts!! [in-chan timeout-chan])]
(is (= nil
(is (= :in-chan
(condp = port
in-chan :in-chan
out-chan :out-chan
timeout-chan :timeout-chan
(testing "can we combine multiple single value pipes?"
(tu.async/with-open-channels [in-chan (a/promise-chan)
out-chan-1 (a/promise-chan)
out-chan-2 (a/promise-chan)]
(async.u/promise-pipe in-chan out-chan-1)
(async.u/promise-pipe out-chan-1 out-chan-2)
(a/>!! in-chan ::value)
(is (= ::value
(first (a/alts!! [out-chan-2 (a/timeout 1000)])))))))
(deftest cancelable-thread-test
(testing "Make sure `cancelable-thread` can actually run a function correctly"
(tu.async/with-open-channels [result-chan (async.u/cancelable-thread
(Thread/sleep 10)
(is (= ::success
(first (a/alts!! [result-chan (a/timeout 500)]))))))
(testing (str "when you close the result channel of `cancelable-thread`, it should cancel the future that's running "
"it. This will produce an InterruptedException")
(tu.async/with-open-channels [started-chan (a/chan 1)
finished-chan (a/chan 1)]
(let [result-chan (async.u/cancelable-thread
(a/>!! started-chan ::started)
(Thread/sleep 5000)
(a/>!! finished-chan ::finished)
(catch Throwable e
(a/>!! finished-chan e))))]
;; wait for `f` to actually start running before we kill it. Otherwise it may not get started at all
(a/alts!! [started-chan (a/timeout 1000)])
(a/close! result-chan))
(is (instance?
(first (a/alts!! [finished-chan (a/timeout 1000)])))))))
(testing "We should be able to combine the `promise-pipe` and `cancelable-thread` and get results"
(letfn [(f []
(Thread/sleep 10)
(tu.async/with-open-channels [result-chan (a/promise-chan)]
(async.u/promise-pipe (async.u/cancelable-thread-call f) result-chan)
(is (= ::success
(first (a/alts!! [result-chan (a/timeout 500)]))))))))
(ns jackdaw.serdes.avro.integration-test
(:require [clj-uuid :as uuid]
[clojure.core.cache :as cache]
[ :as io]
[clojure.test :refer [deftest is testing]]
[jackdaw.client :as jc]
[jackdaw.client.log :as jcl]
[ :as jd]
[jackdaw.serdes.avro :as avro]
[jackdaw.serdes.avro.schema-registry :as reg]
[jackdaw.test.fixtures :as fix])
(:import [org.apache.kafka.common.serialization Serde Serdes]))
(doseq [[t r] topic+record]
(with-open [p (jc/producer +local-kafka+ t)]
@(jc/send! p (jd/->ProducerRecord t 0 (:a r) r))))
(with-open [c (-> (jc/subscribed-consumer +local-kafka+ [test-topic-v1])
(doseq [[[_ r] {r' :value}]
(map vector
(doall (jcl/log-until-inactivity c 1000)))]
(is (= r r') "Record didn't round trip!")))
(:require [clojure.test :refer :all]
[clojure.core.reducers :as r]
[parkour (conf :as conf) (fs :as fs) (wrapper :as w)
(mapreduce :as mr)]
[ (dsink :as dsink) (cascading :as casc)]
[parkour.test-helpers :as th]))
(deftest test-roundtrip
(let [records [["foo" 9] ["bar" 8] ["baz" 7] ["quux" 6]],
p (fs/path "tmp/casc")]
(fs/path-delete p)
(with-open [out (->> p (casc/dsink) dsink/sink-for)]
(->> records (mr/sink-as :vals) (mr/sink out)))
(is (= records (->> p casc/dseq (r/map second) (into []))))
(is (= records (->> (mr/sink-as :vals records)
(dsink/with-dseq (casc/dsink))
(r/map second)
(into []))))))
(ns com.keminglabs.zmq-async.t-core
(:require [com.keminglabs.zmq-async.core :refer :all]
[clojure.core.async :refer [go close! >!! <!! chan timeout alts!!]]
[midje.sweet :refer :all])
(:import org.zeromq.ZMQ))
(let [context (create-context)]
(fact "Poller selects correct socket"
(with-open [sock-A (doto (.createSocket (context :zcontext) ZMQ/PULL)
(.bind "inproc://A"))
sock-B (doto (.createSocket (context :zcontext) ZMQ/PULL)
(.bind "inproc://B"))]
(with-open [sock (.createSocket (context :zcontext) ZMQ/PAIR)]
(.connect sock test-addr)
(ns repl.reactions
(:require [clojurians-log.application :as app]
[ :as io]
[clojurians-log.slack-api :as slack]
[clojurians-log.db.queries :as q]
[clojurians-log.db.import :as import]
[ :as data]
[clj-slack.emoji :as slack-emoji]
[ :as io]
[clojurians-log.datomic :as d]
[ :as edn]
[clojure.string :as str]
[clojure.core.async :as async :refer [>!! <! >! go-loop go <!!]]
[ :as json])
(:use [clojurians-log.repl]))
;; add some emojis
(d/transact (conn) [{:emoji/shortcode "+1" :emoji/url "url1"}])
(d/transact (conn) [{:emoji/shortcode "joy" :emoji/url "url1"}])
(d/q '[:find (pull ?e [*]) :where [?e :emoji/shortcode]] (db))
(d/q '[:find (pull ?e [*]) :where [?e :emoji/shortcode "sheepy"]] (db))
;; add default emojis
(def default-emojis
(with-open [r (io/reader (io/resource "emojis.json"))]
(let [emoji-list (-> (json/read r :key-fn keyword) :emojis)]
(map #(hash-map :emoji/shortcode (:name %)) emoji-list))))
(def emlist (with-open [r (io/reader (io/resource "emojis.json"))]
