diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj index 0dc3261..e87e9f4 100644 --- a/src/milquetoast/client.clj +++ b/src/milquetoast/client.clj @@ -1,8 +1,9 @@ (ns milquetoast.client - (:require [clojure.core.async :as async :refer [go go-loop !]] + (:require [clojure.core.async :as async :refer [go go-loop ! alts!!]] [clojure.data.json :as json]) (:import [org.eclipse.paho.client.mqttv3 MqttClient MqttConnectOptions MqttMessage IMqttMessageListener] - org.eclipse.paho.client.mqttv3.persist.MemoryPersistence)) + org.eclipse.paho.client.mqttv3.persist.MemoryPersistence + java.time.Instant)) (defn- create-mqtt-client! [broker-uri username password] (let [client-id (MqttClient/generateClientId) @@ -35,7 +36,8 @@ (send-message! [_ topic msg opts]) (add-channel! [_ chan]) (stop! [_]) - (subscribe-topic! [_ topic opts])) + (subscribe-topic! [_ topic opts]) + (get-topic! [_ topic opts])) (defrecord MilquetoastClient [client open-channels verbose] IMilquetoastClient @@ -51,15 +53,28 @@ (add-channel! [_ chan] (swap! open-channels conj chan)) (subscribe-topic! [self topic opts] - (let [{:keys [buffer-size]} opts + (let [{:keys [buffer-size qos] + :or {buffer-size 1 qos 0}} opts chan (async/chan buffer-size)] (add-channel! self chan) - (.subscribe client topic + (.subscribe client topic qos (proxy [IMqttMessageListener] [] (messageArrived [topic mqtt-message] (go (>! chan (assoc (parse-message mqtt-message) :topic topic)))))) - chan))) + chan)) + (get-topic! [_ topic opts] + (let [{:keys [qos timeout] :or {qos 0 timeout 5}} opts + result-chan (async/chan)] + (.subscribe client topic qos + (proxy [IMqttMessageListener] [] + (messageArrived [topic mqtt-message] + (go (>! result-chan (assoc (parse-message mqtt-message) + :topic topic)) + (async/close! result-chan)) + (.unsubscribe client topic)))) + (first (alts!! [result-chan + (async/timeout (* timeout 1000))]))))) (defn- parallelism [] (-> (Runtime/getRuntime) @@ -71,6 +86,11 @@ (async/pipeline (parallelism) out xf in) out)) +(defn- json-parse-message [msg] + (-> msg + (update :payload #(json/read-str % :key-fn keyword)) + (assoc :timestamp (Instant/now)))) + (defrecord MilquetoastJsonClient [client] IMilquetoastClient (send-message! [_ topic msg opts] @@ -79,7 +99,11 @@ (add-channel! [_ chan] (add-channel! client chan)) (subscribe-topic! [_ topic opts] (pipe (subscribe-topic! client topic opts) - (map (fn [msg] (update msg :payload json/read-str)))))) + (map json-parse-message))) + (get-topic! [_ topic opts] + (if-let [msg (get-topic! client topic opts)] + (json-parse-message msg) + nil))) (defn send! [client topic msg & {:keys [qos retain] :or {qos 1 retain false}}] @@ -98,9 +122,10 @@ (recur (