Retry on failure

This commit is contained in:
niten 2023-04-25 15:25:02 -07:00
parent ae81b91f0c
commit 6c64dedc17

View File

@ -1,5 +1,5 @@
(ns milquetoast.client (ns milquetoast.client
(:require [clojure.core.async :as async :refer [go go-loop <! >! alts!!]] (:require [clojure.core.async :as async :refer [go go-loop <! >! alts!! <!! timeout]]
[clojure.data.json :as json]) [clojure.data.json :as json])
(:import [org.eclipse.paho.client.mqttv3 MqttClient MqttConnectOptions MqttMessage IMqttMessageListener] (:import [org.eclipse.paho.client.mqttv3 MqttClient MqttConnectOptions MqttMessage IMqttMessageListener]
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
@ -15,6 +15,27 @@
(doto (MqttClient. broker-uri client-id (MemoryPersistence.)) (doto (MqttClient. broker-uri client-id (MemoryPersistence.))
(.connect opts)))) (.connect opts))))
(defn- retry-attempt [verbose f reconnect]
(let [wrapped-attempt (fn []
(try [true (f)]
(catch RuntimeException e
(do (when verbose
(println (format "exception: %s"
(.toString e))))
[false e]))))
max-wait (* 5 60 1000)] ;; wait at most 5 minutes
(loop [[success? result] (wrapped-attempt)
wait-ms 1000]
(if success?
result
(do (when verbose
(println (format "attempt failed, attempting reconnect")))
(reconnect)
(when verbose
(println (format "sleeping %s ms" wait-ms)))
(<!! (timeout wait-ms))
(recur (wrapped-attempt) (min (* wait-ms 1.25) max-wait)))))))
(defn- create-message (defn- create-message
[msg {:keys [qos retain] [msg {:keys [qos retain]
:or {qos 1 :or {qos 1
@ -24,13 +45,11 @@
(.setRetained retain))) (.setRetained retain)))
(defn- parse-message [mqtt-msg] (defn- parse-message [mqtt-msg]
{ {:id (.getId mqtt-msg)
:id (.getId mqtt-msg)
:qos (.getQos mqtt-msg) :qos (.getQos mqtt-msg)
:retained (.isRetained mqtt-msg) :retained (.isRetained mqtt-msg)
:duplicate (.isDuplicate mqtt-msg) :duplicate (.isDuplicate mqtt-msg)
:payload (.toString mqtt-msg) :payload (.toString mqtt-msg)})
})
(defprotocol IMilquetoastClient (defprotocol IMilquetoastClient
(send-message! [_ topic msg opts]) (send-message! [_ topic msg opts])
@ -42,7 +61,9 @@
(defrecord MilquetoastClient [client open-channels verbose] (defrecord MilquetoastClient [client open-channels verbose]
IMilquetoastClient IMilquetoastClient
(send-message! [_ topic msg opts] (send-message! [_ topic msg opts]
(.publish client topic (create-message msg opts))) (retry-attempt verbose
#(.publish client topic (create-message msg opts))
#(.reconnect client)))
(stop! [_] (stop! [_]
(when verbose (when verbose
(println (println
@ -57,22 +78,26 @@
:or {buffer-size 1 qos 0}} opts :or {buffer-size 1 qos 0}} opts
chan (async/chan buffer-size)] chan (async/chan buffer-size)]
(add-channel! self chan) (add-channel! self chan)
(.subscribe client topic qos (retry-attempt verbose
(proxy [IMqttMessageListener] [] #(.subscribe client topic qos
(messageArrived [topic mqtt-message] (proxy [IMqttMessageListener] []
(go (>! chan (assoc (parse-message mqtt-message) (messageArrived [topic mqtt-message]
:topic topic)))))) (go (>! chan (assoc (parse-message mqtt-message)
:topic topic))))))
#(.reconnect client))
chan)) chan))
(get-topic! [_ topic opts] (get-topic! [_ topic opts]
(let [{:keys [qos timeout] :or {qos 0 timeout 5}} opts (let [{:keys [qos timeout] :or {qos 0 timeout 5}} opts
result-chan (async/chan)] result-chan (async/chan)]
(.subscribe client topic qos (retry-attempt verbose
(proxy [IMqttMessageListener] [] #(.subscribe client topic qos
(messageArrived [topic mqtt-message] (proxy [IMqttMessageListener] []
(go (>! result-chan (assoc (parse-message mqtt-message) (messageArrived [topic mqtt-message]
:topic topic)) (go (>! result-chan (assoc (parse-message mqtt-message)
(async/close! result-chan)) :topic topic))
(.unsubscribe client topic)))) (async/close! result-chan))
(.unsubscribe client topic))))
#(.reconnect client))
(first (alts!! [result-chan (first (alts!! [result-chan
(async/timeout (* timeout 1000))]))))) (async/timeout (* timeout 1000))])))))
@ -88,7 +113,8 @@
(defn- json-parse-message [msg] (defn- json-parse-message [msg]
(-> msg (-> msg
(update :payload #(json/read-str % :key-fn keyword)) (update :payload (fn [payload]
(json/read-str payload :key-fn keyword)))
(assoc :timestamp (Instant/now)))) (assoc :timestamp (Instant/now))))
(defrecord MilquetoastJsonClient [client] (defrecord MilquetoastJsonClient [client]
@ -125,14 +151,16 @@
(recur (<! chan)))) (recur (<! chan))))
chan)) chan))
(defn subscribe! [client topic & {:keys [buffer-size qos] (defn subscribe!
:or {buffer-size 1 [client topic & {:keys [buffer-size qos]
qos 1}}] :or {buffer-size 1
qos 1}}]
(subscribe-topic! client topic {:buffer-size buffer-size :qos qos})) (subscribe-topic! client topic {:buffer-size buffer-size :qos qos}))
(defn connect! [& {:keys [host port scheme username password verbose] (defn connect!
:or {verbose false [& {:keys [host port scheme username password verbose]
scheme :tcp}}] :or {verbose false
scheme :tcp}}]
(let [broker-uri (str (name scheme) "://" host ":" port)] (let [broker-uri (str (name scheme) "://" host ":" port)]
(MilquetoastClient. (create-mqtt-client! broker-uri username password) (MilquetoastClient. (create-mqtt-client! broker-uri username password)
(atom []) (atom [])