Back

broadcast (clj)

(source)

function

(broadcast & ports)
Returns a broadcasting write port which, when written to, writes the value to each of ports. Writes to the broadcasting port will park until the value is written to each of the ports used to create it. For this reason, it is strongly advised that each of the underlying ports support buffered writes.

Examples

clojure/core.async
(ns clojure.core.async.lab-test
  (:require
    [clojure.test :refer [deftest is]]
    [clojure.core.async.lab :refer [broadcast multiplex]]
    [clojure.core.async :as async]))

(deftest broadcast-test
  (is (apply = (let [broadcast-receivers (repeatedly 5 #(async/chan 1))
                     broadcaster (apply broadcast broadcast-receivers)
                     _ (async/>!! broadcaster :foo)
                     expected (repeat 5 :foo)
                     observed (doall (map async/<!! broadcast-receivers))]
                 [expected observed]))
      "Broadcasting to multiple channels returns a channel which will
      write to all the target channels.")
  (is (apply = (let [broadcast-receivers (repeatedly 5 async/chan)
                     broadcaster (apply broadcast broadcast-receivers)
                     read-channels (take 4 broadcast-receivers)
                     _ (future (async/>!! broadcaster :foo)
                               (async/>!! broadcaster :bar))
                     first-reads (doall (map async/<!! read-channels))
                     timeout-channel (async/timeout 500)
                     alt-read (async/alts!! (conj read-channels timeout-channel))
                     expected [(repeat 4 :foo) [nil timeout-channel]]
                     observed [first-reads alt-read]]
                 (async/<!! (last broadcast-receivers))
                 (doseq [channel broadcast-receivers]
                   (async/<!! channel))
                 [expected observed]))
      "Broadcasts block further writes if one of the channels cannot
      complete its write.")
  (is (apply = (let [broadcast-receivers (repeatedly 5 #(async/chan 100))
                     broadcaster (apply broadcast broadcast-receivers)
                     _ (future (dotimes [i 100]
                         (async/>!! broadcaster i)))
                     observed (for [_ (range 100)]
                                (async/<!! (first broadcast-receivers)))
                     expected (range 100)]
                 [expected observed])) "When all channels are sufficiently buffered, reads on one channel are not throttled by reads from other channels."))