diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj index 858a2f7..3e46683 100644 --- a/src/milquetoast/client.clj +++ b/src/milquetoast/client.clj @@ -58,7 +58,8 @@ (add-channel! [_ chan]) (stop! [_]) (subscribe-topic! [_ topic opts]) - (get-topic! [_ topic opts])) + (get-topic! [_ topic opts]) + (get-topic-raw! [_ topic opts])) (defrecord MilquetoastClient [client open-channels verbose] IMilquetoastClient @@ -101,7 +102,8 @@ (.unsubscribe client topic)))) #(.reconnect client)) (first (alts!! [result-chan - (async/timeout (* timeout 1000))]))))) + (async/timeout (* timeout 1000))])))) + (get-topic-raw! [c topic opts] (get-topic! c topic opts))) (defn- parallelism [] (-> (Runtime/getRuntime) @@ -131,6 +133,10 @@ (get-topic! [_ topic opts] (if-let [msg (get-topic! client topic opts)] (json-parse-message msg) + nil)) + (get-topic-raw! [_ topic opts] + (if-let [msg (get-topic! client topic opts)] + msg nil))) (defn send! [client topic msg & {:keys [qos retain] @@ -140,6 +146,9 @@ (defn get! [client topic & options] (get-topic! client topic options)) +(defn get-raw! [client topic & options] + (get-topic-raw! client topic options)) + (defn open-channel! [client topic & {:keys [buffer-size qos retain] :or {buffer-size 1