commit 0c7c7ce6d9a8ee9a401ee1fd2c7be1660db2f5c9 Author: niten Date: Fri Mar 17 22:09:25 2023 -0700 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b9f7c3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.DS_Store +.idea +*.log +tmp/ + +.cpcache/ +.nrepl-port +target/ +result diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..ce198e5 --- /dev/null +++ b/deps.edn @@ -0,0 +1,25 @@ +{ + :paths ["src"] + :deps { + org.clojure/clojure { :mvn/version "1.11.1" } + org.clojure/core.async { :mvn/version "1.6.673" } + org.clojure/data.json { :mvn/version "2.4.0" } + + org.eclipse.paho/org.eclipse.paho.client.mqttv3 { :mvn/version "1.2.5" } + } + :aliases { + :test { + :extra-paths ["test"] + :extra-deps { + io.github.cognitect-labs/test-runner + { + :git/url "https://github.com/cognitect-labs/test-runner.git" + :sha "dfb30dd6605cb6c0efc275e1df1736f6e90d4d73" + } + } + :main-opts ["-m" "cognitect.test-runner"] + :exec-fn cognitect.test-runner.api/test + } + :build { :default-ns build } + } + } diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj new file mode 100644 index 0000000..fcbd114 --- /dev/null +++ b/src/milquetoast/client.clj @@ -0,0 +1,112 @@ +(ns milquetoast.client + (:require [clojure.core.async :as async :refer [go go-loop !]] + [clojure.data.json :as json]) + (:import [org.eclipse.paho.client.mqttv3 MqttClient MqttConnectOptions MqttMessage IMqttMessageListener] + org.eclipse.paho.client.mqttv3.persist.MemoryPersistence)) + +(defn- create-mqtt-client! [broker-uri username password] + (let [client-id (MqttClient/generateClientId) + opts (doto (MqttConnectOptions.) + (.setCleanSession true) + (.setAutomaticReconnect true) + (.setPassword (char-array password)) + (.setUserName username))] + (doto (MqttClient. broker-uri client-id (MemoryPersistence.)) + (.connect opts)))) + +(defn- create-message + [msg {:keys [qos retain] + :or {qos 1 + retain false}}] + (doto (MqttMessage. (.getBytes msg)) + (.setQos qos) + (.setRetained retain))) + +(defn- parse-message [mqtt-msg] + { + :id (.getId mqtt-msg) + :qos (.getQos mqtt-msg) + :retained (.isRetained mqtt-msg) + :duplicate (.isDuplicate mqtt-msg) + :payload (.toString mqtt-msg) + }) + +(defprotocol IMilquetoastClient + (send-message! [_ topic msg opts]) + (add-channel! [_ chan]) + (stop! [_]) + (subscribe-topic! [_ topic opts])) + +(defrecord MilquetoastClient [client open-channels verbose] + IMilquetoastClient + (send-message! [_ topic msg opts] + (.publish client topic (create-message msg opts))) + (stop! [_] + (when verbose + (println + (str "stopping " (count @open-channels) " channels"))) + (doseq [chan @open-channels] + (go (>! chan :stop) + (async/close! chan))) + true) + (add-channel! [_ chan] + (swap! open-channels conj chan)) + (subscribe-topic! [self topic opts] + (let [{:keys [buffer-size]} opts + chan (async/chan buffer-size)] + (add-channel! self chan) + (.subscribe client topic + (proxy [IMqttMessageListener] [] + (messageArrived [topic mqtt-message] + (go (>! chan (assoc (parse-message mqtt-message) + :topic topic)))))) + chan))) + +(defn- parallelism [] + (-> (Runtime/getRuntime) + (.availableProcessors) + (+ 1))) + +(defn pipe [in xf] + (let [out (async/chan)] + (async/pipeline (parallelism) out xf in))) + +(defrecord MilquetoastJsonClient [client] + IMilquetoastClient + (send-message! [_ topic msg opts] + (.publish client topic (create-message (json/write-str msg) opts))) + (stop! [_] (stop! client)) + (add-channel! [_ chan] (add-channel! client chan)) + (subscribe-topic! [_ topic opts] + (pipe (subscribe-topic! client topic opts) + (fn [msg] (update msg :payload json/read-str))))) + +(defn send! [client topic msg & {:keys [qos retain] + :or {qos 1 retain false}}] + (send-message! client topic msg {:qos qos :retain retain})) + +(defn open-channel! + [client topic & {:keys [buffer-size qos retain] + :or {buffer-size 1 + qos 1 + retain false}}] + (let [chan (async/chan buffer-size)] + (add-channel! client chan) + (go-loop [msg (