Back

ReadPort (clj)

(source)

protocol

Examples

logseq/logseq
(ns frontend.pubsub
  "All mults and pubs are collected to this ns.
  vars with suffix '-mult' is a/Mult, use a/tap and a/untap on them. used by event subscribers
  vars with suffix '-pub' is a/Pub, use a/sub and a/unsub on them. used by event subscribers
  vars with suffix '-ch' is chan used by event publishers."
  {:clj-kondo/config {:linters {:unresolved-symbol {:level :off}}}}
  #?(:cljs (:require-macros [frontend.pubsub :refer [def-mult-or-pub chan-of]]))
  (:require [clojure.core.async :as a :refer [chan mult pub]]
            [clojure.core.async.impl.protocols :as ap]
            [malli.core :as m]
            [malli.dev.pretty :as mdp]
            [clojure.pprint :as pp]))

;;; helper macro
(defmacro chan-of [malli-schema malli-schema-validator & chan-args]
  `(let [ch# (chan ~@chan-args)]
     (reify
       ap/ReadPort
       (~'take! [~'_ fn1-handler#]
        (ap/take! ch# fn1-handler#))
       ap/WritePort
       (~'put! [~'_ val# fn1-handler#]
        (if (~malli-schema-validator val#)
          (ap/put! ch# val# fn1-handler#)
          (do (mdp/explain ~malli-schema val#)
              (throw (ex-info "validate chan value failed" {:val val#}))))))))
juxt/yada
(ns yada.resources.sse
  (:require
   [clojure.core.async :refer [chan mult tap]]
   [manifold.stream :refer [->source transform]]
   [yada.charset :as charset]
   [yada.resource :refer [resource ResourceCoercion]]
   clojure.core.async.impl.channels
   clojure.core.async.impl.protocols
   manifold.stream.async)
  (:import [clojure.core.async.impl.protocols ReadPort]))

(extend-protocol ResourceCoercion
  ReadPort
  (as-resource [ch]
    (let [mlt (mult ch)]
      (resource
       {:produces [{:media-type "text/event-stream"
                    :charset charset/platform-charsets}]
        :methods {:get {:response (fn [ctx]
                                    (let [ch (chan)]
                                      (tap mlt ch)
                                      (transform (map (partial format "data: %s\n\n")) (->source ch))))}}}))))
typedclojure/typedclojure
    End users should use typed.lib.clojure.core.async, which all types here
    are qualified under."}
  typed.ann.clojure.core.async
  (:require [typed.clojure :refer [ann ann-datatype defalias ann-protocol] :as t]
            [typed.lib.clojure.core.async :as-alias ta]
            [clojure.core.async.impl.protocols :as-alias impl])
  (:import (java.util.concurrent.locks Lock)))

(ann-protocol [[r :variance :covariant]]
              clojure.core.async.impl.protocols/ReadPort
              take! [(impl/ReadPort r) Lock 
                     -> (t/U nil (t/Deref (t/U nil r)))])

(ann-datatype [[w :variance :contravariant]
               [r :variance :covariant]]
              clojure.core.async.impl.channels.ManyToManyChannel 
              []
              :unchecked-ancestors [impl/Channel
                                    (impl/ReadPort r)
                                    (impl/WritePort w)])

(defalias 
  ^{:forms '[(Port2 t t)]}
  ta/Port2
  "A port that can write type w and read type r"
  (t/TFn [[w :variance :contravariant]
          [r :variance :covariant]]
         (t/I (impl/WritePort w)
              (impl/ReadPort r))))
lambda-toolshed/papillon
(ns lambda-toolshed.papillon.async
  #?(:cljs
     (:require [clojure.core.async :as core.async]
               [clojure.core.async.impl.protocols :refer [ReadPort]]
               [cljs.core.async.interop :as core.async.interop :refer [p->c]])))

#?(:cljs
   (do
     (extend-type js/Promise
       ReadPort
       (take! [this handler]
         (->
          this
          p->c
          (#(core.async/take 1 %))
          (clojure.core.async.impl.protocols/take! handler))))))