From 16dd4b144079a3afca9b1b85e3348c59e5db5665 Mon Sep 17 00:00:00 2001 From: niten Date: Fri, 17 Mar 2023 22:52:16 -0700 Subject: [PATCH] Close channels properly. --- src/milquetoast/client.clj | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/milquetoast/client.clj b/src/milquetoast/client.clj index fcbd114..f785de1 100644 --- a/src/milquetoast/client.clj +++ b/src/milquetoast/client.clj @@ -46,8 +46,7 @@ (println (str "stopping " (count @open-channels) " channels"))) (doseq [chan @open-channels] - (go (>! chan :stop) - (async/close! chan))) + (async/close! chan)) true) (add-channel! [_ chan] (swap! open-channels conj chan)) @@ -69,7 +68,8 @@ (defn pipe [in xf] (let [out (async/chan)] - (async/pipeline (parallelism) out xf in))) + (async/pipeline (parallelism) out xf in) + out)) (defrecord MilquetoastJsonClient [client] IMilquetoastClient @@ -79,7 +79,7 @@ (add-channel! [_ chan] (add-channel! client chan)) (subscribe-topic! [_ topic opts] (pipe (subscribe-topic! client topic opts) - (fn [msg] (update msg :payload json/read-str))))) + (map (fn [msg] (update msg :payload json/read-str)))))) (defn send! [client topic msg & {:keys [qos retain] :or {qos 1 retain false}}] @@ -93,7 +93,7 @@ (let [chan (async/chan buffer-size)] (add-channel! client chan) (go-loop [msg (