Back
multiplex (clj)
(source)function
(multiplex & ports)
Returns a multiplexing read port which, when read from, produces a
value from one of ports.
If at read time only one port is available to be read from, the
multiplexing port will return that value. If multiple ports are
available to be read from, the multiplexing port will return one
value from a port chosen non-deterministicly. If no port is
available to be read from, parks execution until a value is
available.
Examples
clojure/core.async
(ns clojure.core.async.lab-test
(:require
[clojure.test :refer [deftest is]]
[clojure.core.async.lab :refer [broadcast multiplex]]
[clojure.core.async :as async]))
(deftest multiplex-test
(is (apply = (let [even-chan (async/chan)
odd-chan (async/chan)
muxer (multiplex even-chan odd-chan)
odds (filter odd? (range 10))
evens (filter even? (range 10))
odd-fn #(doseq [odd odds]
(async/>!! odd-chan odd))
_odd-pusher (doto (Thread. ^Runnable odd-fn) (.start))
even-fn #(doseq [even evens]
(async/>!! even-chan even))
_even-pusher (doto (Thread. ^Runnable even-fn) (.start))
expected (set (range 10))
observed (set (for [_ (range 10)] (async/<!! muxer)))]
[expected observed]))
"Multiplexing multiple channels returns a channel which returns
the values written to each.")
(is (let [short-chan (async/chan)
long-chan (async/chan)
muxer (multiplex short-chan long-chan)
long-fn #(do (dotimes [i 10000]
(async/>!! long-chan i))
(async/close! short-chan))
_long-pusher (doto (Thread. ^Runnable long-fn) (.start))
short-fn #(do (dotimes [i 10]
(async/>!! short-chan i))
(async/close! short-chan))
_short-pusher (doto (Thread. ^Runnable short-fn) (.start))
observed (for [_ (range 10010)] (async/<!! muxer))]
(every? identity observed))
"A closed channel will deliver nil, but the multiplexed channel
will never deliver nil until all channels are closed.")
(is (apply = (let [chans (take 5 (repeatedly #(async/chan)))
muxer (apply multiplex chans)]
(doseq [chan chans]
(async/close! chan))
[nil (async/<!! muxer)]))
"When all of a multiplexer's channels are closed, it behaves
like a closed channel on read."))