Back

multiplex (clj)

(source)

function

(multiplex & ports)
Returns a multiplexing read port which, when read from, produces a value from one of ports. If at read time only one port is available to be read from, the multiplexing port will return that value. If multiple ports are available to be read from, the multiplexing port will return one value from a port chosen non-deterministicly. If no port is available to be read from, parks execution until a value is available.

Examples

clojure/core.async
(ns clojure.core.async.lab-test
  (:require
    [clojure.test :refer [deftest is]]
    [clojure.core.async.lab :refer [broadcast multiplex]]
    [clojure.core.async :as async]))

(deftest multiplex-test
  (is (apply = (let [even-chan (async/chan)
                     odd-chan (async/chan)
                     muxer (multiplex even-chan odd-chan)
                     odds (filter odd? (range 10))
                     evens (filter even? (range 10))
                     odd-fn #(doseq [odd odds]
                               (async/>!! odd-chan odd))
                     _odd-pusher (doto (Thread. ^Runnable odd-fn) (.start))
                     even-fn #(doseq [even evens]
                                (async/>!! even-chan even))
                     _even-pusher (doto (Thread. ^Runnable even-fn) (.start))
                     expected (set (range 10))
                     observed (set (for [_ (range 10)] (async/<!! muxer)))]
                 [expected observed]))
      "Multiplexing multiple channels returns a channel which returns
      the values written to each.")
  (is (let [short-chan (async/chan)
            long-chan (async/chan)
            muxer (multiplex short-chan long-chan)
            long-fn #(do (dotimes [i 10000]
                           (async/>!! long-chan i))
                         (async/close! short-chan))
            _long-pusher (doto (Thread. ^Runnable long-fn) (.start))
            short-fn #(do (dotimes [i 10]
                            (async/>!! short-chan i))
                          (async/close! short-chan))
            _short-pusher (doto (Thread. ^Runnable short-fn) (.start))
            observed (for [_ (range 10010)] (async/<!! muxer))]
        (every? identity observed))
      "A closed channel will deliver nil, but the multiplexed channel
      will never deliver nil until all channels are closed.")
  (is (apply = (let [chans (take 5 (repeatedly #(async/chan)))
                     muxer (apply multiplex chans)]
                 (doseq [chan chans]
                   (async/close! chan))
                 [nil (async/<!! muxer)]))
      "When all of a multiplexer's channels are closed, it behaves
      like a closed channel on read."))