225 lines
9.5 KiB
Clojure
225 lines
9.5 KiB
Clojure
(ns pricebot.core
|
|
(:require [clojure.core.async :as async :refer [go go-loop >! >!! <! <!! chan timeout alt!]]
|
|
[bebot.client :as bebot-client]
|
|
[bebot.api.channel :as bebot]
|
|
[bebot.logger :as bebot-log]
|
|
[coinbase-pro.client :as coinbase-pro]
|
|
[exchange.client :as client]
|
|
[fudo-clojure.common :refer [*-> date<]]
|
|
[fudo-clojure.result :as result :refer [map-success send-success dispatch-result success? unwrap]]
|
|
[fudo-clojure.logging :as log]
|
|
[clojure.string :as str]
|
|
[clojure.math.numeric-tower :refer [floor]]
|
|
[clojure.tools.cli :refer [parse-opts]]
|
|
[camel-snake-kebab.core :refer [->SCREAMING_SNAKE_CASE]])
|
|
(:import java.math.RoundingMode
|
|
java.time.format.DateTimeFormatter
|
|
java.time.Duration
|
|
java.time.Instant
|
|
java.time.ZoneId)
|
|
(:gen-class))
|
|
|
|
(defprotocol IPriceBotProcess
|
|
(stop! [self])
|
|
(start! [self]))
|
|
|
|
(defprotocol IPriceChannel
|
|
(get-price-channel [self]))
|
|
|
|
(defrecord PriceMeasurement [instant price])
|
|
|
|
(defn- make-price-channel [hostname currency &
|
|
{:keys [delay buffer-size]
|
|
:or {delay 60
|
|
buffer-size 10}}]
|
|
(let [stop-chan (chan)
|
|
price-chan (chan 10)
|
|
client (coinbase-pro/connect :hostname hostname)
|
|
get-price #(client/get-market-price! client currency)]
|
|
(reify
|
|
IPriceBotProcess
|
|
(stop! [self]
|
|
(go (>! stop-chan true)
|
|
(>! price-chan :stopped))
|
|
self)
|
|
(start! [self]
|
|
(go-loop [result (get-price)]
|
|
(>! price-chan (map-success result (partial ->PriceMeasurement (Instant/now))))
|
|
(alt! stop-chan nil
|
|
(timeout (* delay 1000)) (recur (get-price))))
|
|
self)
|
|
|
|
IPriceChannel
|
|
(get-price-channel [_] price-chan))))
|
|
|
|
;; Bots can have a list of price checkers that take current price & price
|
|
;; history, and potentially return a message to send to the channel
|
|
|
|
(defn- lpthru [label o]
|
|
(println (str "****** " label))
|
|
(clojure.pprint/pprint o)
|
|
(println "******")
|
|
o)
|
|
|
|
(let [formatter (.withZone DateTimeFormatter/ISO_LOCAL_TIME (ZoneId/systemDefault))]
|
|
(defn- format-instant [inst]
|
|
(.format formatter inst)))
|
|
|
|
(defn- check-threshold [threshold report! & {:keys [mute-duration]
|
|
:or {mute-duration (Duration/ofHours 1)}}]
|
|
(let [prev (atom nil)
|
|
threshold-dec (bigdec threshold)
|
|
chop-threshold (fn [n] (int (floor (.divide n threshold-dec RoundingMode/DOWN))))
|
|
mute-until (atom (Instant/now))
|
|
unmuted? (fn [time] (.after @mute-until time))]
|
|
(fn [curr]
|
|
(when @prev
|
|
(when (unmuted? (:instant curr))
|
|
(when (not (= (chop-threshold (:price @prev))
|
|
(chop-threshold (:price curr))))
|
|
(report! (str "price crossed a " threshold " threshold "
|
|
"between " (format-instant (:instant @prev)) " and " (format-instant (:instant curr)) ", "
|
|
"from " (:price @prev) " to " (:price curr)))
|
|
(report! (str "muting until " (.plus (:instant curr) mute-duration)))
|
|
(swap! mute-until (fn [_] (.plus (:instant curr) mute-duration))))))
|
|
(swap! prev (fn [_] curr)))))
|
|
|
|
(defn- minimize-time-gap [prices threshold target]
|
|
(let [smaller-duration? (fn [a b] (> 0 (.compareTo a b)))
|
|
get-gap (fn [m] (Duration/between target (:instant m)))]
|
|
(when-let [initial-measure (first prices)]
|
|
(loop [curr initial-measure
|
|
curr-gap (get-gap initial-measure)
|
|
remaining (rest prices)]
|
|
(if (empty? remaining)
|
|
curr
|
|
(if (smaller-duration? curr-gap (get-gap (first remaining)))
|
|
(recur (first remaining) (get-gap (first remaining)) (rest remaining))
|
|
curr))))))
|
|
|
|
(defn- check-movement [percentage duration threshold report! debug!]
|
|
(let [prices (atom (list))
|
|
within-duration-of (fn [curr m]
|
|
(date< (-> (:instant curr) (.minus duration) (.minus threshold))
|
|
(:instant m)))
|
|
append-to-prices (fn [prices curr] (cons curr (filter (partial within-duration-of curr) prices)))
|
|
close-enough? (fn [target t] (date< (.minus target threshold) t (.plus target threshold)))]
|
|
(fn [curr]
|
|
(let [target-time (.minus (:instant curr) duration)
|
|
prev (minimize-time-gap @prices threshold target-time)]
|
|
(if prev
|
|
(let [diff-pct (.divide (- (:price curr) (:price prev)) (:price curr) RoundingMode/HALF_EVEN)]
|
|
(if (close-enough? target-time (:instant prev))
|
|
(when (> (Math/abs diff-pct) percentage)
|
|
(report! (str "price has changed by " (* 100 diff-pct) "% "
|
|
"within " duration ": "
|
|
(:price prev) " to " (:price curr))))
|
|
(debug! (str "unable to find a measure within " threshold " of " target-time)))))
|
|
(swap! prices append-to-prices curr)))))
|
|
|
|
(defn- create-checks [logger]
|
|
(let [debug! (partial log/debug! logger)]
|
|
[(check-threshold 100 (partial log/info! logger))
|
|
(check-threshold 1000 (partial log/alert! logger))
|
|
(check-movement 0.01 (Duration/ofHours 3) (Duration/ofMinutes 15) (partial log/info! logger) debug!)
|
|
(check-movement 0.03 (Duration/ofHours 6) (Duration/ofMinutes 30) (partial log/notify! logger) debug!)
|
|
(check-movement 0.05 (Duration/ofHours 6) (Duration/ofMinutes 30) (partial log/alert! logger) debug!)]))
|
|
|
|
(defprotocol IPriceBot
|
|
(add-check! [self check])
|
|
(remove-check! [self id]))
|
|
|
|
(defn- reify-bot
|
|
([hostname currency] (reify-bot hostname currency []))
|
|
([hostname currency initial-checks]
|
|
(let [price-chan (atom nil)
|
|
checks (atom (zipmap (repeatedly #(java.util.UUID/randomUUID)) initial-checks))]
|
|
(reify
|
|
IPriceBotProcess
|
|
(start! [_]
|
|
(swap! price-chan (fn [_] (make-price-channel hostname currency)))
|
|
(let [chan (get-price-channel @price-chan)]
|
|
(go-loop [measure (<! chan)]
|
|
(when (not (= measure :stopped))
|
|
(doseq [[id check] @checks]
|
|
(send-success measure check))
|
|
(recur (<! chan)))))
|
|
(start! @price-chan)
|
|
:started)
|
|
(stop! [_]
|
|
(swap! price-chan (fn [chan] (when chan (stop! chan))))
|
|
:stopped)
|
|
|
|
IPriceBot
|
|
(add-check! [_ f]
|
|
(swap! checks assoc (java.util.UUID/randomUUID) f))
|
|
(remove-check! [_ id]
|
|
(swap! checks dissoc id))))))
|
|
|
|
(defn- make-bot [& {:keys [hostname currency logger]}]
|
|
(let [checks (create-checks logger)]
|
|
(reify-bot hostname hostname currency checks)))
|
|
|
|
(def cli-opts
|
|
[["-x" "--exchange-host HOST" "Hostname of the Coinbase Pro target host."]
|
|
["-b" "--bebot-url URL" "URL of the target Mattermost server."]
|
|
["-a" "--bebot-auth-token-file FILE" "Path to file containing the Bebot authentication token."]
|
|
["-C" "--bebot-channel-id ID" "ID of channel in which to send price notifications."]
|
|
["-c" "--currency CURRENCY" "Cryptocurrency to watch for price fluctuations."]
|
|
["-u" "--notify-user USER" "User to notify of significant price fluctuations."]
|
|
["-h" "--help"]])
|
|
|
|
(defn get-key [opts k]
|
|
(if-let [opt (get opts k)]
|
|
[k opt]
|
|
[k (System/getenv (format "PRICEBOT_%s"
|
|
(-> k name ->SCREAMING_SNAKE_CASE)))]))
|
|
|
|
(defn exit! [status msg]
|
|
(println msg)
|
|
(System/exit status))
|
|
|
|
(defn pprint-to-string [o]
|
|
(with-out-str (clojure.pprint/pprint o)))
|
|
|
|
(defn get-args [keys args]
|
|
(let [{:keys [options arguments errors summary]} (parse-opts args cli-opts)
|
|
resolved (into {} (map (partial get-key options) keys))
|
|
missing (filter (fn [k] (not (get resolved k))) keys)]
|
|
(cond (:help options) (exit! 0 summary)
|
|
errors (exit! 1 (str/join \newline errors))
|
|
(not (empty? missing)) (exit! 2 (str "missing arguments: " (str/join " " (map name missing))
|
|
\newline
|
|
"available: " (pprint-to-string resolved)))
|
|
:else resolved)))
|
|
|
|
(defn -main [& args]
|
|
(let [required-keys [:exchange-host
|
|
:currency
|
|
:bebot-url
|
|
:bebot-auth-token-file
|
|
:bebot-channel-id
|
|
:notify-user]
|
|
{:keys [exchange-host
|
|
currency
|
|
bebot-url
|
|
bebot-auth-token-file
|
|
bebot-channel-id
|
|
notify-user]} (get-args required-keys args)]
|
|
(let [logger (bebot-log/make-logger bebot-url
|
|
(-> bebot-auth-token-file slurp str/trim-newline)
|
|
bebot-channel-id
|
|
notify-user)
|
|
checks (create-checks logger)
|
|
bot (reify-bot exchange-host (-> currency str/lower-case keyword))
|
|
shutdown (chan)]
|
|
(.addShutdownHook (Runtime/getRuntime)
|
|
(Thread. (fn []
|
|
(println "Stopping pricebot...")
|
|
(stop! bot)
|
|
(>!! shutdown true))))
|
|
(log/notify! logger (str "Hi! I'm going to start watching the price of " (name currency) " for you."))
|
|
(start! bot)
|
|
(<!! shutdown)
|
|
(System/exit 0))))
|