Back

>!! (clj)

(source)

(>!! port val)
puts a val into port. nil values are not allowed. Will block if no buffer space is available. Returns true unless port is already closed. Not intended for use in direct or transitive calls from (go ...) blocks. Use the clojure.core.async.go-checking flag to detect invalid use (see namespace docs).

Examples

core.async
;; The clojure.core.async namespace contains the public API.
(require '[clojure.core.async :as async :refer :all])

;; In ordinary threads, we use `>!!` (blocking put) and `<!!`
;; (blocking take) to communicate via channels.

(let [c (chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (close! c))

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

(let [c1 (chan)
      c2 (chan)]
  (thread (while true
            (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))
core.async
(require '[clojure.core.async :as async :refer [<! >! <!! >!! timeout chan alt! alts!! go]])

(defn fan-in [ins]
  (let [c (chan)]
   (future (while true
             (let [[x] (alts!! ins)]
               (>!! c x))))
   c))

(let [cout (chan)
      cin (fan-in (fan-out cout (repeatedly 3 chan)))]
  (dotimes [n 10]
    (>!! cout n)
    (prn (<!! cin))))
core.async
(require '[clojure.core.async :as async :refer [<!! >!! timeout chan alt!!]])

(defn fake-search [kind]
  (fn [c query]
    (future
     (<!! (timeout (rand-int 100)))
     (>!! c [kind query]))))

(defn google [query]
  (let [c (chan)
        t (timeout 80)]
    (future (>!! c (<!! (fastest query web1 web2))))
    (future (>!! c (<!! (fastest query image1 image2))))
    (future (>!! c (<!! (fastest query video1 video2))))
    (loop [i 0 ret []]
      (if (= i 3)
        ret
        (recur (inc i) (conj ret (alt!! [c t] ([v] v))))))))
clojure/core.async
;; The clojure.core.async namespace contains the public API.
(require '[clojure.core.async :as async :refer :all])

;; In ordinary threads, we use `>!!` (blocking put) and `<!!`
;; (blocking take) to communicate via channels.

(let [c (chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (close! c))

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

(let [c1 (chan)
      c2 (chan)]
  (thread (while true
            (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))
clj-commons/manifold
(ns manifold.stream.async
  {:no-doc true}
  (:require
    [manifold.deferred :as d]
    [clojure.core.async :as a]
    [manifold.stream
     [graph :as g]
     [core :as s]]
    [manifold
     [executor :as executor]
     [utils :as utils]])
  (:import
    [java.util.concurrent.atomic
     AtomicReference]))

        blocking?
        (try
          (a/>!! ch x)
          true)
unclebob/more-speech
(ns more-speech.websocket-relay-spec
  (:require [speclj.core :refer :all]
            [clojure.core.async :as async]
            [more-speech
             [relay :as relay]
             [websocket-relay :as ws-relay]]))

  (it "can send and receive"
    (pending "be nice to relay.damus.io")
    (let [chan (async/chan)
          recv-f (fn [relay msg] (async/>!! chan [relay msg]))
          relay (ws-relay/make "wss://relay.damus.io" recv-f)
          relay-open (relay/open relay)
          _ (relay/send relay-open ["test"])
          f-reply (future (async/<!! chan))
          [relay-r reply] (deref f-reply 1000 :timeout)
          _ (relay/close relay-open)]
      (should= relay relay-r )
      (should= ["NOTICE" "could not parse command"] reply)))
  )
juxt/jig
(ns jig.mqtt
  (:require
   jig
   [clojurewerkz.machine-head.client :as mh]
   [clojure.core.async :refer (chan >!! close!)]
   [clojure.tools.logging :refer :all])
  (:import (jig Lifecycle)))

(deftype MqttSubscriber [config]
  Lifecycle
  (init [_ system]
    (let [ch (chan (or (:channel-size config) 100))]
      (assoc-in system [:jig/channels (:channel config)] ch)))
  (start [_ system]
    (let [ch (get-in system [:jig/channels (:channel config)])]
      (infof "MQTT, client is %s, topics are %s" (::machine-head-client system) (:topics config))
      (mh/subscribe
       (::machine-head-client system)
       (:topics config)
       (fn [topic meta payload]
         (infof "Received message on topic %s: %s" topic (String. payload))
         (>!! ch {:topic topic :meta meta :payload payload}))))
    system)
  (stop [_ system]
    (let [client (::machine-head-client system)]
      (mh/unsubscribe client (:topics config)))
    (close! (get-in system [:jig/channels (:channel config)]))
    (update-in system [:jig/channels] dissoc (:channel config))))
duanebester/clunk
(require '[com.clunk.core :as clunk]
         '[clojure.core.async :as async]
         '[clojure.core.match :as m])

(async/>!! (:out client) (clunk/get-startup username database))
;; (async/>!! (:out client) (clunk/get-query "select 1 as x, 2 as y;"))
(async/>!! (:out client) (clunk/get-query "select * from country limit 10;"))