Back
<! (clj)
(source)function
(<! port)
takes a val from port. Must be called inside a (go ...) block. Will
return nil if closed. Will park if nothing is available.
Examples
core.async
;; The clojure.core.async namespace contains the public API.
(require '[clojure.core.async :as async :refer :all])
;; In ordinary threads, we use `>!!` (blocking put) and `<!!`
;; (blocking take) to communicate via channels.
(let [c (chan 10)]
(>!! c "hello")
(assert (= "hello" (<!! c)))
(close! c))
(let [c (chan)]
(thread (>!! c "hello"))
(assert (= "hello" (<!! c)))
(close! c))
;; The `go` macro asynchronously executes its body in a special pool
;; of threads. Channel operations that would block will pause
;; execution instead, blocking no threads. This mechanism encapsulates
;; the inversion of control that is external in event/callback
;; systems. Inside `go` blocks, we use `>!` (put) and `<!` (take).
;; Here we convert our prior channel example to use go blocks:
(let [c (chan)]
(go (>! c "hello"))
(assert (= "hello" (<!! (go (<! c)))))
(close! c))
(let [t (timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println "Waited" (- (System/currentTimeMillis) begin)))
core.async
(require '[clojure.core.async :as async :refer [<! >! <!! >!! timeout chan alt! alts!! go]])
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(future (while true
(let [x (<!! in)
outs (map #(vector % x) cs)]
(alts!! outs))))
cs))
(let [cout (chan)
cin (fan-in (fan-out cout (repeatedly 3 chan)))]
(dotimes [n 10]
(>!! cout n)
(prn (<!! cin))))
core.async
(require '[clojure.core.async :as async :refer [<!! >!! timeout chan alt!!]])
(defn fake-search [kind]
(fn [c query]
(future
(<!! (timeout (rand-int 100)))
(>!! c [kind query]))))
(defn google [query]
(let [c (chan)
t (timeout 80)]
(future (>!! c (<!! (fastest query web1 web2))))
(future (>!! c (<!! (fastest query image1 image2))))
(future (>!! c (<!! (fastest query video1 video2))))
(loop [i 0 ret []]
(if (= i 3)
ret
(recur (inc i) (conj ret (alt!! [c t] ([v] v))))))))
core.async
(require '[clojure.core.async :as async :refer [<! >! timeout chan alt! alts! go]])
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(go (while true
(let [x (<! in)
outs (map #(vector % x) cs)]
(alts! outs))))
cs))
(let [cout (chan)
cin (fan-in (fan-out cout (repeatedly 3 chan)))]
(go (dotimes [n 10]
(>! cout n)
(prn (<! cin))))
nil)
core.async
(require '[clojure.core.async :as async :refer [<! >! <!! timeout chan alt! go]])
(defn fake-search [kind]
(fn [c query]
(go
(<! (timeout (rand-int 100)))
(>! c [kind query]))))
(defn google [query]
(let [c (chan)
t (timeout 80)]
(go (>! c (<! (fastest query web1 web2))))
(go (>! c (<! (fastest query image1 image2))))
(go (>! c (<! (fastest query video1 video2))))
(go (loop [i 0 ret []]
(if (= i 3)
ret
(recur (inc i) (conj ret (alt! [c t] ([v] v)))))))))
(<!! (google "clojure"))
clojure/core.async
;; The clojure.core.async namespace contains the public API.
(require '[clojure.core.async :as async :refer :all])
;; In ordinary threads, we use `>!!` (blocking put) and `<!!`
;; (blocking take) to communicate via channels.
(let [c (chan 10)]
(>!! c "hello")
(assert (= "hello" (<!! c)))
(close! c))
(let [c (chan)]
(thread (>!! c "hello"))
(assert (= "hello" (<!! c)))
(close! c))
;; The `go` macro asynchronously executes its body in a special pool
;; of threads. Channel operations that would block will pause
;; execution instead, blocking no threads. This mechanism encapsulates
;; the inversion of control that is external in event/callback
;; systems. Inside `go` blocks, we use `>!` (put) and `<!` (take).
;; Here we convert our prior channel example to use go blocks:
(let [c (chan)]
(go (>! c "hello"))
(assert (= "hello" (<!! (go (<! c)))))
(close! c))
(let [t (timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println "Waited" (- (System/currentTimeMillis) begin)))
cognitect-labs/aws-api
(ns s3-examples
(:require [clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.spec.gen.alpha :as gen]
[clojure.java.io :as io]
[clojure.repl :as repl]
[cognitect.aws.client.api :as aws]))
(a/<!! c)
;; supply your own channel
(let [ch (a/chan)]
(aws/invoke-async s3 {:op :ListBuckets
:ch ch})
(a/<!! ch))
epiccastle/spire
(ns spire.output.events
(:require [spire.output.core :as output]
[puget.printer :as puget]
[clojure.core.async :refer [<!! put! go chan thread]]))
(defmethod output/print-thread :events [_]
(thread
(loop []
(puget/cprint (<!! prn-chan))
(recur))))
unclebob/more-speech
(ns more-speech.websocket-relay-spec
(:require [speclj.core :refer :all]
[clojure.core.async :as async]
[more-speech
[relay :as relay]
[websocket-relay :as ws-relay]]))
(it "can send and receive"
(pending "be nice to relay.damus.io")
(let [chan (async/chan)
recv-f (fn [relay msg] (async/>!! chan [relay msg]))
relay (ws-relay/make "wss://relay.damus.io" recv-f)
relay-open (relay/open relay)
_ (relay/send relay-open ["test"])
f-reply (future (async/<!! chan))
[relay-r reply] (deref f-reply 1000 :timeout)
_ (relay/close relay-open)]
(should= relay relay-r )
(should= ["NOTICE" "could not parse command"] reply)))
)
unclebob/more-speech
(ns more-speech.nostr.zaps-spec
(:require
[clj-http.client :as client]
[clojure.core.async :as async]
[clojure.data.json :as json]
[clojure.string :as string]
[more-speech.bech32 :as bech32]
[more-speech.config :as config]
[more-speech.db.gateway :as gateway]
[more-speech.mem :refer :all]
[more-speech.nostr.elliptic-signature :as es]
[more-speech.nostr.event-composers :as composers]
[more-speech.nostr.util :as util]
[more-speech.nostr.zaps :as zaps]
[more-speech.relay :as relay]
[more-speech.spec-util :refer :all]
[more-speech.util.fortune-messages :as fortune]
[more-speech.websocket-relay :as ws-relay]
[speclj.core :refer :all])
(:import (ecdhJava SECP256K1)))
(context "wallet-connect"
(it "executes wallet-connect payment"
(let [event-index (atom -1)
events [{:content "pay_invoice"} {:kind 23195
:tags [[:p "e6c47ae6962c5ea1559f48b437c193a1bcb1d72d08d75d743ba3cbfb8e7afbeb"]]
:content "{\"result_type\": \"pay_invoice\"}"}]]
(set-mem [:keys :wallet-connect] "nostrwalletconnect://beef?relay=wc-relay-url&secret=dead")
(with-redefs [ws-relay/make (stub :relay-make {:return "some-relay"})
relay/open (stub :relay-open {:return "open-relay"})
relay/send (stub :relay-send)
relay/close (stub :relay-close)
zaps/decrypt-content (stub :calc-key {:invoke (fn [_ _ c] c)})
zaps/get-wc-request-event (stub :request-event {:return "request-event"})
async/<!! (stub :read-chan {:invoke (fn [_x] (get events (swap! event-index inc)))})]
(zaps/zap-by-wallet-connect "event-to-zap")
(should-have-invoked :relay-make {:with ["wc-relay-url" :*]})
(should-have-invoked :relay-open {:with ["some-relay"]})
(should-have-invoked :relay-send {:with ["open-relay" ["REQ" "ms-info" {"kinds" [13194], "authors" ["beef"]}]]})
(should-have-invoked :request-event {:with ["event-to-zap" :*]})
(should-have-invoked :relay-send {:with ["open-relay" "request-event"]})
(should-have-invoked :relay-close {:with ["open-relay"]}))))