Back
ReadPort (clj)
(source)protocol
Examples
logseq/logseq
(ns frontend.pubsub
"All mults and pubs are collected to this ns.
vars with suffix '-mult' is a/Mult, use a/tap and a/untap on them. used by event subscribers
vars with suffix '-pub' is a/Pub, use a/sub and a/unsub on them. used by event subscribers
vars with suffix '-ch' is chan used by event publishers."
{:clj-kondo/config {:linters {:unresolved-symbol {:level :off}}}}
#?(:cljs (:require-macros [frontend.pubsub :refer [def-mult-or-pub chan-of]]))
(:require [clojure.core.async :as a :refer [chan mult pub]]
[clojure.core.async.impl.protocols :as ap]
[malli.core :as m]
[malli.dev.pretty :as mdp]
[clojure.pprint :as pp]))
;;; helper macro
(defmacro chan-of [malli-schema malli-schema-validator & chan-args]
`(let [ch# (chan ~@chan-args)]
(reify
ap/ReadPort
(~'take! [~'_ fn1-handler#]
(ap/take! ch# fn1-handler#))
ap/WritePort
(~'put! [~'_ val# fn1-handler#]
(if (~malli-schema-validator val#)
(ap/put! ch# val# fn1-handler#)
(do (mdp/explain ~malli-schema val#)
(throw (ex-info "validate chan value failed" {:val val#}))))))))
juxt/yada
(ns yada.resources.sse
(:require
[clojure.core.async :refer [chan mult tap]]
[manifold.stream :refer [->source transform]]
[yada.charset :as charset]
[yada.resource :refer [resource ResourceCoercion]]
clojure.core.async.impl.channels
clojure.core.async.impl.protocols
manifold.stream.async)
(:import [clojure.core.async.impl.protocols ReadPort]))
(extend-protocol ResourceCoercion
ReadPort
(as-resource [ch]
(let [mlt (mult ch)]
(resource
{:produces [{:media-type "text/event-stream"
:charset charset/platform-charsets}]
:methods {:get {:response (fn [ctx]
(let [ch (chan)]
(tap mlt ch)
(transform (map (partial format "data: %s\n\n")) (->source ch))))}}}))))
typedclojure/typedclojure
End users should use typed.lib.clojure.core.async, which all types here
are qualified under."}
typed.ann.clojure.core.async
(:require [typed.clojure :refer [ann ann-datatype defalias ann-protocol] :as t]
[typed.lib.clojure.core.async :as-alias ta]
[clojure.core.async.impl.protocols :as-alias impl])
(:import (java.util.concurrent.locks Lock)))
(ann-protocol [[r :variance :covariant]]
clojure.core.async.impl.protocols/ReadPort
take! [(impl/ReadPort r) Lock
-> (t/U nil (t/Deref (t/U nil r)))])
(ann-datatype [[w :variance :contravariant]
[r :variance :covariant]]
clojure.core.async.impl.channels.ManyToManyChannel
[]
:unchecked-ancestors [impl/Channel
(impl/ReadPort r)
(impl/WritePort w)])
(defalias
^{:forms '[(Port2 t t)]}
ta/Port2
"A port that can write type w and read type r"
(t/TFn [[w :variance :contravariant]
[r :variance :covariant]]
(t/I (impl/WritePort w)
(impl/ReadPort r))))
lambda-toolshed/papillon
(ns lambda-toolshed.papillon.async
#?(:cljs
(:require [clojure.core.async :as core.async]
[clojure.core.async.impl.protocols :refer [ReadPort]]
[cljs.core.async.interop :as core.async.interop :refer [p->c]])))
#?(:cljs
(do
(extend-type js/Promise
ReadPort
(take! [this handler]
(->
this
p->c
(#(core.async/take 1 %))
(clojure.core.async.impl.protocols/take! handler))))))