(ns pricebot.core (:require [clojure.core.async :as async :refer [go go-loop >! >!! date<]] [fudo-clojure.result :as result :refer [map-success send-success dispatch-result success? unwrap]] [fudo-clojure.logging :as log] [clojure.string :as str] [clojure.math.numeric-tower :refer [floor]]) (:import java.math.RoundingMode java.time.format.DateTimeFormatter java.time.Duration java.time.Instant java.time.ZoneId) (:gen-class)) (defprotocol IPriceBotProcess (stop! [self]) (start! [self])) (defprotocol IPriceChannel (get-price-channel [self])) (defrecord PriceMeasurement [instant price]) (defn- make-price-channel [hostname currency & {:keys [delay buffer-size] :or {delay 60 buffer-size 10}}] (let [stop-chan (chan) price-chan (chan 10) client (coinbase-pro/connect :hostname hostname) get-price #(client/get-market-price! client currency)] (reify IPriceBotProcess (stop! [self] (go (>! stop-chan true) (>! price-chan :stopped)) self) (start! [self] (go-loop [result (get-price)] (>! price-chan (map-success result (partial ->PriceMeasurement (Instant/now)))) (alt! stop-chan nil (timeout (* delay 1000)) (recur (get-price)))) self) IPriceChannel (get-price-channel [_] price-chan)))) ;; Bots can have a list of price checkers that take current price & price ;; history, and potentially return a message to send to the channel (defn- lpthru [label o] (println (str "****** " label)) (clojure.pprint/pprint o) (println "******") o) (let [formatter (.withZone DateTimeFormatter/ISO_LOCAL_TIME (ZoneId/systemDefault))] (defn- format-instant [inst] (.format formatter inst))) (defn- check-threshold [threshold report! & {:keys [mute-duration] :or {mute-duration (Duration/ofHours 1)}}] (let [prev (atom nil) threshold-dec (bigdec threshold) chop-threshold (fn [n] (int (floor (.divide n threshold-dec RoundingMode/DOWN)))) mute-until (atom (Instant/now)) unmuted? (fn [time] (.after @mute-until time))] (fn [curr] (when @prev (when (unmuted? (:instant curr)) (when (not (= (chop-threshold (:price @prev)) (chop-threshold (:price curr)))) (report! (str "price crossed a " threshold " threshold " "between " (format-instant (:instant @prev)) " and " (format-instant (:instant curr)) ", " "from " (:price @prev) " to " (:price curr))) (report! (str "muting until " (.plus (:instant curr) mute-duration))) (swap! mute-until (fn [_] (.plus (:instant curr) mute-duration)))))) (swap! prev (fn [_] curr))))) (defn- minimize-time-gap [prices threshold target] (let [smaller-duration? (fn [a b] (> 0 (.compareTo a b))) get-gap (fn [m] (Duration/between target (:instant m)))] (when-let [initial-measure (first prices)] (loop [curr initial-measure curr-gap (get-gap initial-measure) remaining (rest prices)] (if (empty? remaining) curr (if (smaller-duration? curr-gap (get-gap (first remaining))) (recur (first remaining) (get-gap (first remaining)) (rest remaining)) curr)))))) (defn- check-movement [percentage duration threshold report! debug!] (let [prices (atom (list)) within-duration-of (fn [curr m] (date< (-> (:instant curr) (.minus duration) (.minus threshold)) (:instant m))) append-to-prices (fn [prices curr] (cons curr (filter (partial within-duration-of curr) prices))) close-enough? (fn [target t] (date< (.minus target threshold) t (.plus target threshold)))] (fn [curr] (let [target-time (.minus (:instant curr) duration) prev (minimize-time-gap @prices threshold target-time)] (if prev (let [diff-pct (.divide (- (:price curr) (:price prev)) (:price curr) RoundingMode/HALF_EVEN)] (if (close-enough? target-time (:instant prev)) (when (> (Math/abs diff-pct) percentage) (report! (str "price has changed by " (* 100 diff-pct) "% " "within " duration ": " (:price prev) " to " (:price curr)))) (debug! (str "unable to find a measure within " threshold " of " target-time))))) (swap! prices append-to-prices curr))))) (defn- create-checks [logger] (let [debug! (partial log/debug! logger)] [(check-threshold 100 (partial log/info! logger)) (check-threshold 1000 (partial log/alert! logger)) (check-movement 0.01 (Duration/ofHours 3) (Duration/ofMinutes 15) (partial log/info! logger) debug!) (check-movement 0.03 (Duration/ofHours 6) (Duration/ofMinutes 30) (partial log/notify! logger) debug!) (check-movement 0.05 (Duration/ofHours 6) (Duration/ofMinutes 30) (partial log/alert! logger) debug!)])) (defprotocol IPriceBot (add-check! [self check]) (remove-check! [self id])) (defn- reify-bot ([hostname currency] (reify-bot hostname currency [])) ([hostname currency initial-checks] (let [price-chan (atom nil) checks (atom (zipmap (repeatedly #(java.util.UUID/randomUUID)) initial-checks))] (reify IPriceBotProcess (start! [_] (swap! price-chan (fn [_] (make-price-channel hostname currency))) (let [chan (get-price-channel @price-chan)] (go-loop [measure ( (getenv-or-fail "PRICEBOT_BEBOT_AUTH_TOKEN_FILE") (slurp)) bebot-channel-id (getenv-or-fail "PRICEBOT_BEBOT_CHANNEL_ID") target-currency (-> (getenv-or-fail "PRICEBOT_TARGET_CURRENCY") (str/lower-case) (keyword)) notify-user (getenv-or-fail "PRICEBOT_NOTIFY_USER")] (let [logger (bebot-log/make-logger bebot-host bebot-auth-token bebot-channel-id notify-user) checks (create-checks logger) bot (reify-bot exchange-hostname target-currency) shutdown (chan)] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (println "Stopping pricebot...") (stop! bot) (>!! shutdown true)))) (start! bot) (