Back
send (clj)
(source)function
(send a f & args)
Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread from a thread pool, the state of the agent
will be set to the value of:
(apply action-fn state-of-agent args)
Examples
typedclojure/typedclojure
(ns ^:no-doc typed.ann.clojure
"Type annotations for the base Clojure distribution."
#?(:cljs (:require-macros [typed.ann-macros.clojure :as macros]))
(:require [clojure.core :as cc]
[typed.clojure :as t]
#?(:clj [typed.ann-macros.clojure :as macros])
#?(:clj typed.ann.clojure.jvm) ;; jvm annotations
#?(:clj clojure.core.typed))
#?(:clj
(:import (clojure.lang PersistentHashSet PersistentList
APersistentMap #_IPersistentCollection
#_ITransientSet
IRef)
(java.util Comparator Collection))))
#?@(:cljs [] :default [
cc/get-thread-bindings [:-> (t/Map t/AnyVar t/Any)]
cc/bound-fn*
(t/All [r b :..]
[[b :.. b :-> r] :-> [b :.. b :-> r]])
cc/find-var
[t/Sym :-> (t/Nilable t/AnyVar)]
cc/agent
(t/All [x] [x & :optional {:validator (t/Nilable [x :-> t/Any]) :meta t/Any
:error-handler (t/Nilable [(t/Agent x) Throwable :-> t/Any])
:error-mode (t/U ':continue ':fail)}
:-> (t/Agent x)])
cc/set-agent-send-executor! [java.util.concurrent.ExecutorService :-> t/Any]
cc/set-agent-send-off-executor! [java.util.concurrent.ExecutorService :-> t/Any]
cc/send-via (t/All [x b :..] [(t/Agent x) [x b :.. b :-> x] b :.. b :-> (t/Agent x)])
cc/send (t/All [x b :..] [(t/Agent x) [x b :.. b :-> x] b :.. b :-> (t/Agent x)])
cc/send-off (t/All [x b :..] [(t/Agent x) [x b :.. b :-> x] b :.. b :-> (t/Agent x)])
cc/await [t/AnyAgent :* :-> nil]
cc/await-for [t/AnyInteger t/AnyAgent :* :-> t/Bool]
cc/await1 (t/All [[a :< t/AnyAgent]] [a :-> (t/Nilable a)])
cc/release-pending-sends [:-> t/AnyInteger]
])
unclebob/more-speech
(ns more-speech.nostr.zaps-spec
(:require
[clj-http.client :as client]
[clojure.core.async :as async]
[clojure.data.json :as json]
[clojure.string :as string]
[more-speech.bech32 :as bech32]
[more-speech.config :as config]
[more-speech.db.gateway :as gateway]
[more-speech.mem :refer :all]
[more-speech.nostr.elliptic-signature :as es]
[more-speech.nostr.event-composers :as composers]
[more-speech.nostr.util :as util]
[more-speech.nostr.zaps :as zaps]
[more-speech.relay :as relay]
[more-speech.spec-util :refer :all]
[more-speech.util.fortune-messages :as fortune]
[more-speech.websocket-relay :as ws-relay]
[speclj.core :refer :all])
(:import (ecdhJava SECP256K1)))
(context "auto-thanks"
(it "sends thanks for a zap when auto-thanks is :on"
(with-redefs [composers/compose-and-send-text-event (stub :send)
config/auto-thanks :on
config/auto-thanks-fortune :off]
(let [zapper-id (rand-int 1000000)]
(gateway/add-profile @db zapper-id {:name "zapper"})
(zaps/auto-thanks zapper-id)
(should-have-invoked :send {:with [nil "Auto Thanks" "@zapper Thank you!\n"]}))))
(it "dms thanks for a zap when auto-thanks is :dm"
(with-redefs [composers/compose-and-send-text-event (stub :send)
config/auto-thanks :dm
config/auto-thanks-fortune :off]
(let [zapper-id (rand-int 1000000)]
(gateway/add-profile @db zapper-id {:name "zapper"})
(zaps/auto-thanks zapper-id)
(should-have-invoked :send {:with [nil "Auto Thanks" "D @zapper Thank you!\n"]}))))
(it "sends thanks for a zap with a fortune"
(with-redefs [composers/compose-and-send-text-event (stub :send)
fortune/get-message (stub :get-message {:return "hi"})
config/auto-thanks :on
config/auto-thanks-fortune :normal]
(let [zapper-id (rand-int 1000000)]
(gateway/add-profile @db zapper-id {:name "zapper"})
(zaps/auto-thanks zapper-id)
(should-have-invoked :send {:with [nil "Auto Thanks" "@zapper Thank you!\nhi"]}))))
)
(context "wallet-connect"
(it "executes wallet-connect payment"
(let [event-index (atom -1)
events [{:content "pay_invoice"} {:kind 23195
:tags [[:p "e6c47ae6962c5ea1559f48b437c193a1bcb1d72d08d75d743ba3cbfb8e7afbeb"]]
:content "{\"result_type\": \"pay_invoice\"}"}]]
(set-mem [:keys :wallet-connect] "nostrwalletconnect://beef?relay=wc-relay-url&secret=dead")
(with-redefs [ws-relay/make (stub :relay-make {:return "some-relay"})
relay/open (stub :relay-open {:return "open-relay"})
relay/send (stub :relay-send)
relay/close (stub :relay-close)
zaps/decrypt-content (stub :calc-key {:invoke (fn [_ _ c] c)})
zaps/get-wc-request-event (stub :request-event {:return "request-event"})
async/<!! (stub :read-chan {:invoke (fn [_x] (get events (swap! event-index inc)))})]
(zaps/zap-by-wallet-connect "event-to-zap")
(should-have-invoked :relay-make {:with ["wc-relay-url" :*]})
(should-have-invoked :relay-open {:with ["some-relay"]})
(should-have-invoked :relay-send {:with ["open-relay" ["REQ" "ms-info" {"kinds" [13194], "authors" ["beef"]}]]})
(should-have-invoked :request-event {:with ["event-to-zap" :*]})
(should-have-invoked :relay-send {:with ["open-relay" "request-event"]})
(should-have-invoked :relay-close {:with ["open-relay"]}))))
(it "composes a wc request event"
(with-redefs [config/proof-of-work-default 0]
(let [sender-private-key-bytes (util/make-private-key)
sender-private-key (util/bytes->num sender-private-key-bytes)
sender-private-key-hex (util/hexify sender-private-key)
sender-public-key-bytes (es/get-pub-key sender-private-key-bytes)
sender-public-key (util/bytes->num sender-public-key-bytes)
recipient-private-key-bytes (util/make-private-key)
recipient-private-key (util/bytes->num recipient-private-key-bytes)
recipient-public-key-bytes (es/get-pub-key recipient-private-key-bytes)
recipient-public-key (util/bytes->num recipient-public-key-bytes)
recipient-public-key-hex (util/hexify recipient-public-key)
inbound-shared-secret (SECP256K1/calculateKeyAgreement
recipient-private-key
sender-public-key)
request "request"
_ (set-mem :pubkey 1)
[_ event] (zaps/compose-wc-request-event recipient-public-key-hex sender-private-key-hex request)]
(should= 23194 (:kind event))
(should= [[:p recipient-public-key-hex]] (filter #(= :p (first %)) (:tags event)))
(should= request (SECP256K1/decrypt inbound-shared-secret (:content event)))))))
)
lynaghk/zmq-async
(ns com.keminglabs.zmq-async.t-core
(:require [com.keminglabs.zmq-async.core :refer :all]
[clojure.core.async :refer [go close! >!! <!! chan timeout alts!!]]
[midje.sweet :refer :all])
(:import org.zeromq.ZMQ))
(doto (.createSocket (context :zcontext) ZMQ/PUSH)
(.connect "inproc://A")
(.send "A message"))
(try
?form
(finally
(send! zcontrol "shutdown")
(.join zmq-thread 100)
(assert (not (.isAlive zmq-thread)))
;;passes along received messages
(let [test-msg "hihi"]
(.send sock test-msg)
;;including multipart messages
(let [test-msg ["yo" "what's" "up?"]]
(.send sock "yo" ZMQ/SNDMORE)
(.send sock "what's" ZMQ/SNDMORE)
(.send sock "up?")
;;sends messages when asked to
(let [test-msg "heyo"]
(command-zmq-thread! zcontrol queue
[test-id test-msg])
(Thread/sleep 50)
(.recvStr sock ZMQ/NOBLOCK) => test-msg))))))
(.send (doto (.createSocket (context :zcontext) ZMQ/PAIR)
(.connect addr))
test-msg)
(String. (<!! out)) => test-msg))
(.send (doto (.createSocket (context :zcontext) ZMQ/PAIR)
(.connect addr))
test-msg)
(String. (<!! out)) => test-msg))
clojure-lsp/lsp4clj
(ns lsp4clj.io-server-test
(:require
[clojure.core.async :as async]
[clojure.test :refer [deftest is]]
[lsp4clj.io-chan :as io-chan]
[lsp4clj.io-server :as io-server]
[lsp4clj.lsp.requests :as lsp.requests]
[lsp4clj.server :as server]
[lsp4clj.test-helper :as h])
(:import
[java.io PipedInputStream PipedOutputStream]
[java.net InetAddress ServerSocket Socket]))
(deftest should-communicate-over-io-streams
(let [client-input-stream (PipedInputStream.)
client-output-stream (PipedOutputStream.)
server-input-stream (PipedInputStream. client-output-stream)
server-output-stream (PipedOutputStream. client-input-stream)
client-input-ch (io-chan/input-stream->input-chan client-input-stream)
client-output-ch (io-chan/output-stream->output-chan client-output-stream)
server (io-server/server {:in server-input-stream :out server-output-stream})
join (server/start server nil)]
;; client initiates request
(async/put! client-output-ch (lsp.requests/request 1 "foo" {}))
;; server responds
(is (= {:jsonrpc "2.0",
:id 1,
:error {:code -32601,
:message "Method not found",
:data {:method "foo"}}}
(h/assert-take client-input-ch)))
;; server initiates notification
(server/send-notification server "bar" {:key "value"})
;; client receives
(is (= {:jsonrpc "2.0",
:method "bar",
:params {:key "value"}}
(h/assert-take client-input-ch)))
(server/shutdown server)
(is (= :done @join))))
(deftest should-communicate-through-socket
(let [addr (InetAddress/getLoopbackAddress)
;; ephermeral port, translated to real port via .getLocalPort
port 0]
(with-open [socket-server (ServerSocket. port 0 addr)
socket-for-server (Socket. addr (.getLocalPort socket-server))
socket-for-client (.accept socket-server)]
(let [client-input-ch (io-chan/input-stream->input-chan socket-for-client)
client-output-ch (io-chan/output-stream->output-chan socket-for-client)
server (io-server/server {:in socket-for-server
:out socket-for-server})
join (server/start server nil)]
(async/put! client-output-ch (lsp.requests/request 1 "foo" {}))
(is (= {:jsonrpc "2.0",
:id 1,
:error {:code -32601,
:message "Method not found",
:data {:method "foo"}}}
(h/assert-take client-input-ch)))
(server/send-notification server "bar" {:key "value"})
(is (= {:jsonrpc "2.0",
:method "bar",
:params {:key "value"}}
(h/assert-take client-input-ch)))
(server/shutdown server)
(is (= :done @join))))))
PacktPublishing/Clojure-Programming-Cookbook
(ns chapter04.kafka
(:require
[clj-kafka.producer :as p]
[clj-kafka.core :as core]
[clj-kafka.consumer.zk :as zk]
[clj-kafka.new.producer :as new]
[clj-kafka.consumer.simple :as simple]
[clj-kafka.offset :as offset]
[clj-kafka.admin :as admin]
[clojure.core.async :as async :refer :all]
)
(:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
(kafka.producer KeyedMessage ProducerConfig)
(kafka.javaapi.producer Producer)
(java.util Properties)
(java.util.concurrent Executors))
)
;;=> nil
(p/send-message (p/producer producer-config)
(p/message "test" (.getBytes "Hi hello !")))
;;=> nil
(p/send-message (p/producer producer-config)
(p/message "test" (.getBytes "Hi hello !")))
;;=> nil