From 6c64dedc17f506f3190207bbe08eb1eb1edc7e01 Mon Sep 17 00:00:00 2001 From: niten Date: Tue, 25 Apr 2023 15:25:02 -0700 Subject: [PATCH] Retry on failure --- src/milquetoast/client.clj | 78 ++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj index 68e030c..c380416 100644 --- a/src/milquetoast/client.clj +++ b/src/milquetoast/client.clj @@ -1,5 +1,5 @@ (ns milquetoast.client - (:require [clojure.core.async :as async :refer [go go-loop ! alts!!]] + (:require [clojure.core.async :as async :refer [go go-loop ! alts!! ! chan (assoc (parse-message mqtt-message) - :topic topic)))))) + (retry-attempt verbose + #(.subscribe client topic qos + (proxy [IMqttMessageListener] [] + (messageArrived [topic mqtt-message] + (go (>! chan (assoc (parse-message mqtt-message) + :topic topic)))))) + #(.reconnect client)) chan)) (get-topic! [_ topic opts] (let [{:keys [qos timeout] :or {qos 0 timeout 5}} opts result-chan (async/chan)] - (.subscribe client topic qos - (proxy [IMqttMessageListener] [] - (messageArrived [topic mqtt-message] - (go (>! result-chan (assoc (parse-message mqtt-message) - :topic topic)) - (async/close! result-chan)) - (.unsubscribe client topic)))) + (retry-attempt verbose + #(.subscribe client topic qos + (proxy [IMqttMessageListener] [] + (messageArrived [topic mqtt-message] + (go (>! result-chan (assoc (parse-message mqtt-message) + :topic topic)) + (async/close! result-chan)) + (.unsubscribe client topic)))) + #(.reconnect client)) (first (alts!! [result-chan (async/timeout (* timeout 1000))]))))) @@ -88,7 +113,8 @@ (defn- json-parse-message [msg] (-> msg - (update :payload #(json/read-str % :key-fn keyword)) + (update :payload (fn [payload] + (json/read-str payload :key-fn keyword))) (assoc :timestamp (Instant/now)))) (defrecord MilquetoastJsonClient [client] @@ -125,14 +151,16 @@ (recur (