diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj index e476edd..9905f2e 100644 --- a/src/milquetoast/client.clj +++ b/src/milquetoast/client.clj @@ -7,7 +7,9 @@ org.eclipse.paho.client.mqttv3.persist.MemoryPersistence java.time.Instant)) -(defn- create-mqtt-client! [& {:keys [broker-uri username password]}] +(defn- create-mqtt-client! + "Creates and connects an MQTT client with the provided broker URI, username, and password." + [& {:keys [broker-uri username password]}] (let [client-id (MqttClient/generateClientId) opts (doto (MqttConnectOptions.) (.setCleanSession true) @@ -19,7 +21,9 @@ (doto (MqttClient. broker-uri client-id (MemoryPersistence.)) (.connect opts)))) -(defn- retry-attempt [verbose f reconnect] +(defn- retry-attempt + "Attempts to execute function `f`. If `f` throws a RuntimeException, logs the exception and attempts to reconnect before retrying `f`." + [verbose f reconnect] (let [wrapped-attempt (fn [] (try [true (f)] (catch RuntimeException e @@ -40,6 +44,7 @@ (recur (wrapped-attempt) (min (* wait-ms 1.25) max-wait))))))) (defn- create-message + "Creates an MQTT message with the provided message string and options." [msg {:keys [qos retain] :or {qos 1 retain false}}] @@ -47,7 +52,9 @@ (.setQos qos) (.setRetained retain))) -(defn- parse-message [mqtt-msg] +(defn- parse-message + "Parses an MQTT message into a map with keys :id, :qos, :retained, :duplicate, :payload, and :payload-bytes." + [mqtt-msg] {:id (.getId mqtt-msg) :qos (.getQos mqtt-msg) :retained (.isRetained mqtt-msg) @@ -56,6 +63,7 @@ :payload-bytes (.getPayload mqtt-msg)}) (defprotocol IMilquetoastClient + "Protocol defining the core operations of a Milquetoast MQTT client." (send-message! [_ topic msg opts]) (add-channel! [_ chan]) (stop! [_]) @@ -63,7 +71,9 @@ (get-topic! [_ topic opts]) (get-topic-raw! [_ topic opts])) -(defrecord MilquetoastClient [client open-channels verbose] +(defrecord MilquetoastClient + "Record implementing the IMilquetoastClient protocol for a basic MQTT client." + [client open-channels verbose] IMilquetoastClient (send-message! [_ topic msg opts] (retry-attempt verbose @@ -123,7 +133,9 @@ (json/read-str payload :key-fn keyword))) (assoc :timestamp (Instant/now)))) -(defrecord MilquetoastJsonClient [client] +(defrecord MilquetoastJsonClient + "Record implementing the IMilquetoastClient protocol for an MQTT client that sends and receives JSON messages." + [client] IMilquetoastClient (send-message! [_ topic msg opts] (send-message! client topic (json/write-str msg) opts)) @@ -141,17 +153,24 @@ msg nil))) -(defn send! [client topic msg & {:keys [qos retain] +(defn send! + "Sends a message to a topic on the provided client with the specified QoS and retain options." + [client topic msg & {:keys [qos retain] :or {qos 1 retain false}}] (send-message! client topic msg {:qos qos :retain retain})) -(defn get! [client topic & options] +(defn get! + "Gets a message from a topic on the provided client with the specified options." + [client topic & options] (get-topic! client topic options)) -(defn get-raw! [client topic & options] +(defn get-raw! + "Gets a raw message from a topic on the provided client with the specified options." + [client topic & options] (get-topic-raw! client topic options)) (defn open-channel! + "Opens a channel for sending messages to a topic on the provided client with the specified buffer size, QoS, and retain options." [client topic & {:keys [buffer-size qos retain] :or {buffer-size 1 qos 1 @@ -165,12 +184,14 @@ chan)) (defn subscribe! + "Subscribes to a topic on the provided client with the specified buffer size and QoS options." [client topic & {:keys [buffer-size qos] :or {buffer-size 1 qos 1}}] (subscribe-topic! client topic {:buffer-size buffer-size :qos qos})) (defn connect! + "Connects to an MQTT broker at the provided host and port with the specified scheme and verbosity options." [& {:keys [host port scheme verbose] :or {verbose false scheme :tcp} @@ -180,5 +201,7 @@ (atom []) verbose))) -(defn connect-json! [& args] +(defn connect-json! + "Connects to an MQTT broker with the provided arguments and configures the client to send and receive JSON messages." + [& args] (MilquetoastJsonClient. (apply connect! args)))