Close channels properly.
This commit is contained in:
parent
0c7c7ce6d9
commit
16dd4b1440
|
@ -46,8 +46,7 @@
|
||||||
(println
|
(println
|
||||||
(str "stopping " (count @open-channels) " channels")))
|
(str "stopping " (count @open-channels) " channels")))
|
||||||
(doseq [chan @open-channels]
|
(doseq [chan @open-channels]
|
||||||
(go (>! chan :stop)
|
(async/close! chan))
|
||||||
(async/close! chan)))
|
|
||||||
true)
|
true)
|
||||||
(add-channel! [_ chan]
|
(add-channel! [_ chan]
|
||||||
(swap! open-channels conj chan))
|
(swap! open-channels conj chan))
|
||||||
|
@ -69,7 +68,8 @@
|
||||||
|
|
||||||
(defn pipe [in xf]
|
(defn pipe [in xf]
|
||||||
(let [out (async/chan)]
|
(let [out (async/chan)]
|
||||||
(async/pipeline (parallelism) out xf in)))
|
(async/pipeline (parallelism) out xf in)
|
||||||
|
out))
|
||||||
|
|
||||||
(defrecord MilquetoastJsonClient [client]
|
(defrecord MilquetoastJsonClient [client]
|
||||||
IMilquetoastClient
|
IMilquetoastClient
|
||||||
|
@ -79,7 +79,7 @@
|
||||||
(add-channel! [_ chan] (add-channel! client chan))
|
(add-channel! [_ chan] (add-channel! client chan))
|
||||||
(subscribe-topic! [_ topic opts]
|
(subscribe-topic! [_ topic opts]
|
||||||
(pipe (subscribe-topic! client topic opts)
|
(pipe (subscribe-topic! client topic opts)
|
||||||
(fn [msg] (update msg :payload json/read-str)))))
|
(map (fn [msg] (update msg :payload json/read-str))))))
|
||||||
|
|
||||||
(defn send! [client topic msg & {:keys [qos retain]
|
(defn send! [client topic msg & {:keys [qos retain]
|
||||||
:or {qos 1 retain false}}]
|
:or {qos 1 retain false}}]
|
||||||
|
@ -93,7 +93,7 @@
|
||||||
(let [chan (async/chan buffer-size)]
|
(let [chan (async/chan buffer-size)]
|
||||||
(add-channel! client chan)
|
(add-channel! client chan)
|
||||||
(go-loop [msg (<! chan)]
|
(go-loop [msg (<! chan)]
|
||||||
(when (not= :stop msg)
|
(when msg
|
||||||
(send-message! client topic msg {:qos qos :retain retain})
|
(send-message! client topic msg {:qos qos :retain retain})
|
||||||
(recur (<! chan))))
|
(recur (<! chan))))
|
||||||
chan))
|
chan))
|
||||||
|
|
Loading…
Reference in New Issue