From 8fbab20836b403088cfb06e99505b8f8ed3e77d7 Mon Sep 17 00:00:00 2001 From: Peter Selby Date: Tue, 29 Jun 2021 23:35:55 -0700 Subject: [PATCH] Initial checkin...importing from coinbase, coinbase-pro, bittrex --- src/worther/deps.edn | 6 ++ src/worther/injest/bittrex.clj | 111 ++++++++++++++++++++ src/worther/injest/coinbase.clj | 145 ++++++++++++++++++++++++++ src/worther/injest/coinbase_pro.clj | 143 ++++++++++++++++++++++++++ src/worther/injest/common.clj | 154 ++++++++++++++++++++++++++++ src/worther/injest/deps.edn | 5 + 6 files changed, 564 insertions(+) create mode 100644 src/worther/deps.edn create mode 100644 src/worther/injest/bittrex.clj create mode 100644 src/worther/injest/coinbase.clj create mode 100644 src/worther/injest/coinbase_pro.clj create mode 100644 src/worther/injest/common.clj create mode 100644 src/worther/injest/deps.edn diff --git a/src/worther/deps.edn b/src/worther/deps.edn new file mode 100644 index 0000000..ffe769c --- /dev/null +++ b/src/worther/deps.edn @@ -0,0 +1,6 @@ +{:deps + {juxt/crux-core {:mvn/version "21.05-1.17.0-beta"} + org.clojure/clojure {:mvn/version "1.10.3"} + org.clojure/core.match {:mvn/version "1.0.0"} + org.clojure/spec.alpha {:mvn/version "0.2.194"} + org.clojure/tools.deps.alpha {:mvn/version "0.11.926"}}} diff --git a/src/worther/injest/bittrex.clj b/src/worther/injest/bittrex.clj new file mode 100644 index 0000000..493a875 --- /dev/null +++ b/src/worther/injest/bittrex.clj @@ -0,0 +1,111 @@ +(ns worther.injest.bittrex + (:require [worther.injest.common :as common] + [clojure.core.match :refer [match]] + [clojure.spec.alpha :as s] + [clojure.string :as str])) + +(defn split-buy [txn] + [(-> txn + (assoc ::common/txn-type ::common/buy + ::common/quantity-transacted (::quantity txn) + ::common/currency (::market-currency txn))) + (-> txn + (assoc ::common/txn-type ::common/sell + ::common/quantity-transacted (- (* (::price-per txn) + (::quantity txn))) + ::common/currency (::base-currency txn)))]) + +(defn split-sell [txn] + [(-> txn + (assoc ::common/txn-type ::common/sell + ::common/quantity-transacted (- (::quantity txn)) + ::common/currency (::market-currency txn))) + (-> txn + (assoc ::common/txn-type ::common/buy + ::common/quantity-transacted (* (::price-per txn) + (::quantity txn)) + ::common/currency (::base-currency txn)))]) + +(defmulti type-specific-translation ::txn-type) + +(defmethod type-specific-translation :market-sell [txn] + (split-sell txn)) + +(defmethod type-specific-translation :limit-sell [txn] + (split-sell txn)) + +(defmethod type-specific-translation :limit-buy [txn] + (split-buy txn)) + +(defmethod type-specific-translation :ceiling-market-buy [txn] + (split-buy txn)) + +(defmethod type-specific-translation :fee [txn] + [(assoc txn ::common/txn-type ::common/fee)]) + +(defmethod type-specific-translation nil [txn] + (throw (RuntimeException. (str "::txn-type missing from transaction: " txn)))) + +(defn to-uuid [str] (java.util.UUID/fromString str)) + +(defn split-fee [txn] + [txn + { + ::txn-type :fee + ::common/timestamp (::common/timestamp txn) + ::common/account (::common/account txn) + ::common/usd-value :deferred + ::common/quantity-transacted (::quantity txn) + ::common/currency (::base-currency txn) + }]) + +(defn split-currencies [txn] + (let [currencies (map common/to-keyword (str/split (:exchange txn) #"-"))] + [(-> txn + (assoc ::base-currency (first currencies)) + (assoc ::market-currency (second currencies)))])) +(s/fdef split-market + :args (s/cat :txn (s/keys :req #{:exchange})) + :ret (s/keys :req #{::base-currency ::market-currency})) + +(def translate-bittrex-txn + (common/flatcomp + split-fee + split-currencies + (common/alter-field :uuid to-uuid) + (common/rename-field :timestamp + ::common/timestamp + (common/date-parser "M/d/yyyy HH:mm:ss aa")) + (common/rename-field :ordertype + ::txn-type + common/to-keyword) + (common/rename-field :quantity + ::quantity + bigdec) + (common/rename-field :quantityremaining + ::balance + bigdec) + (common/rename-field :commission + ::fee + bigdec) + (common/rename-field :price + ::total + bigdec) + (common/rename-field :priceperunit + ::price-per + bigdec) + (common/rename-field :closed + ::closed + (common/date-parser "M/d/yyyy HH:mm:ss aa")) + (common/add-field ::common/account + (fn [_] :bittrex)))) + +(defn load-csv [filename] + (let [txn-translator (common/flatcomp + (common/add-txid ::common/txid) + type-specific-translation + translate-bittrex-txn)] + (->> (common/load-csv filename) + (common/pivot-csv) + (common/flatmap txn-translator)))) + diff --git a/src/worther/injest/coinbase.clj b/src/worther/injest/coinbase.clj new file mode 100644 index 0000000..87d58fc --- /dev/null +++ b/src/worther/injest/coinbase.clj @@ -0,0 +1,145 @@ +(ns worther.injest.coinbase + (:require [worther.injest.common :as common] + [clojure.spec.alpha :as s])) + +(defn print-through [o] + (clojure.pprint/pprint o) + flush + o) + +(defn print-entry [o] + (clojure.pprint/pprint o) + [o]) + +(def txn-type-map + { + :fee ::common/fee + :buy ::common/buy + :sell ::common/sell + :send ::common/send + :receive ::common/receive + :paid-for-an-order ::common/sell + }) + +(s/def ::txn-type (set (keys txn-type-map))) + +(defn update-txn-type [txn] + (assoc txn ::common/txn-type + (get txn-type-map (::txn-type txn)))) + +(defmulti type-specific-translation ::txn-type) + +(defn split-fee [txn] + (if (and (::usd-fees txn) + (> (::usd-fees txn) 0)) + [(update-txn-type txn) + { + ::txn-type :fee + ::common/quantity-transacted (::usd-fees txn) + ::common/currency :usd + ::common/account :coinbase + ::common/timestamp (::common/timestamp txn) + ::common/notes (str "Fee for: " (::common/notes txn)) + ::common/usd-value (::usd-fees txn) + }] + [(update-txn-type txn)])) + +(defmethod type-specific-translation :buy [txn] + [(update-txn-type txn) + { + ::common/txn-type ::common/deposit + ::common/quantity-transacted (::common/usd-value txn) + ::common/currency :usd + ::common/account :coinbase + ::common/timestamp (::common/timestamp txn) + ::common/notes (str "Deposit for: " (::common/notes txn)) + ::common/usd-value (::common/usd-value txn) + }]) +(defmethod type-specific-translation :sell [txn] + [(update (update-txn-type txn) ::common/quantity-transacted -) + { + ::common/txn-type ::common/withdrawal + ::common/quantity-transacted (::common/usd-value txn) + ::common/currency :usd + ::common/account :coinbase + ::common/timestamp (::common/timestamp txn) + ::common/notes (str "Withdrawal for: " (::common/notes txn)) + ::common/usd-value (::common/usd-value txn) + }]) + +(defmethod type-specific-translation :paid-for-an-order [txn] + (let [usd-value (* (::common/usd-current-price txn) + (::common/quantity-transacted txn))] + [(-> (update-txn-type txn) + (assoc ::common/usd-value (- usd-value))) + { + ::common/txn-type ::common/send + ::common/quantity-transacted usd-value + ::common/currency :usd + ::common/account :coinbase + ::common/timestamp (::common/timestamp txn) + ::common/notes (str "Send to merchant for: " (::common/notes txn)) + ::common/usd-value usd-value + ::common/recipient :merchant + }])) + +(defmethod type-specific-translation :send [txn] + (let [usd-value (* (::common/usd-current-price txn) + (::common/quantity-transacted txn))] + [(-> (update-txn-type txn) + (assoc ::common/recipient :unknown) + (assoc ::common/usd-value usd-value) + (update ::common/quantity-transacted -))])) + +(defmethod type-specific-translation :receive [txn] + (let [usd-value (* (::common/usd-current-price txn) + (::common/quantity-transacted txn))] + [(-> (update-txn-type txn) + (assoc ::common/usd-value usd-value) + (assoc ::common/sender :unknown))])) + +(defmethod type-specific-translation :fee [txn] + [(update-txn-type txn)]) + +(defmethod type-specific-translation nil [txn] + (throw (RuntimeException. (str "Missing txn-type: " txn)))) + +(def translate-coinbase-txns + (common/flatcomp + (common/add-field ::common/account + (fn [_] :coinbase)) + (common/rename-field :notes + ::common/notes) + (common/rename-field :asset + ::common/currency + common/to-keyword) + (common/rename-field :usd-fees + ::usd-fees + (common/zero-or bigdec)) + (common/rename-field :usd-subtotal + ::common/usd-value + (common/null-or bigdec)) + (common/rename-field :timestamp + ::common/timestamp + (common/date-parser "yyyy-MM-dd'T'HH:mm:ss'Z'")) + (common/rename-field :transaction-type + ::txn-type + common/to-keyword) + (common/rename-field :usd-spot-price-at-transaction + ::common/usd-current-price + (common/null-or bigdec)) + (common/rename-field :quantity-transacted + ::common/quantity-transacted + bigdec))) + +(defn load-csv [filename] + (let [header-line? (fn [cells] (= (first cells) "Timestamp")) + txn-translator (common/flatcomp + (common/add-txid ::common/txid) + type-specific-translation + split-fee + translate-coinbase-txns)] + (->> (common/load-csv filename) + (drop-while (comp not header-line?)) + (common/pivot-csv) + (common/flatmap txn-translator)))) diff --git a/src/worther/injest/coinbase_pro.clj b/src/worther/injest/coinbase_pro.clj new file mode 100644 index 0000000..5aaf4ff --- /dev/null +++ b/src/worther/injest/coinbase_pro.clj @@ -0,0 +1,143 @@ +(ns worther.injest.coinbase-pro + (:require [worther.injest.common :as common] + [clojure.core.match :refer [match]] + [clojure.spec.alpha :as s])) + +;; Coinbase Pro transactions take one of the following forms: +;; - A transfer, i.e. deposit or withdrawal +;; - A trade, with 2 to 3 parts: +;; - A negative :match (or presumably limit) +;; - A positive :match +;; - A fee +;; All three may be in USD or a cryptocurrency. +;; :transfer-id and :order-id are mutually exclusive +;; :transfer-id and :trade-id are mutually exclusive +;; :trade-id and :order-id will either both be null or both be set +;; :order-id is the user-submitted request to sell +;; :trade-id represents one trade which is part of an order +;; There may be more than one :trade-id per :order-id, but there will only ever be one :order-id per :trade-id + +(defmacro *-> [& fns] + (let [o (gensym)] + `(fn [~o] (-> ~o ~@fns)))) + +(defmacro and-> [f0 f1] + (let [o (gensym)] + `(fn [~o] (and (~f0 ~o) (~f1 ~o))))) + +(defn find-first [pred lst] + (first (filter pred lst))) + +(defn parse-int [i] (Integer/parseInt i)) + +(defn parse-uuid [str] (java.util.UUID/fromString str)) + +(defn print-through [o] (clojure.pprint/pprint o) o) + +(defn coinbase-pro-txn->common-txn [txn] + (get { + :withdrawal ::common/withdrawal + :deposit ::common/deposit + } + (::txn-type txn))) + +(defn merge-match-order [spend receive common] + (match (map ::common/currency [spend receive]) + [:usd _] [(merge common + { + ::common/txn-type ::common/buy + ::common/usd-value (::common/quantity-transacted spend) + ::common/quantity-transacted (::common/quantity-transacted receive) + ::common/currency (::common/currency receive) + })] + [_ :usd] [(merge common + { + ::common/txn-type ::common/sell + ::common/usd-value (::common/quantity-transacted receive) + ::common/quantity-transacted (::common/quantity-transacted spend) + ::common/currency (::common/currency spend) + })] + :else [(merge common + { + ::common/txn-type ::common/sell + ::common/usd-value :deferred + ::common/quantity-transacted (::common/quantity-transacted spend) + ::common/currency (::common/currency spend) + }) + (merge common + { + ::common/txn-type ::common/buy + ::common/usd-value :deferred + ::common/quantity-transacted (::common/quantity-transacted receive) + ::common/currency (::common/currency receive) + })])) + +(def process-transfer + (common/flatcomp + (common/add-field ::common/txn-type coinbase-pro-txn->common-txn))) + +(defn process-match-order [[trade-id elems]] + (let [txn0 (first elems) + fee? (*-> ::txn-type (= :fee)) + spend? (and-> (*-> ::common/quantity-transacted neg?) (*-> ::txn-type (= :match))) + receive? (and-> (comp not neg? ::common/quantity-transacted) (*-> ::txn-type (= :match))) + common-fields (select-keys txn0 + [::order-id + ::trade-id + ::common/account + ::common/timestamp + ::transfer-id + ::balance + ::portfolio])] + (concat (merge-match-order (find-first spend? elems) + (find-first receive? elems) + common-fields) + (map (fn [txn] + (-> (select-keys txn [::common/quantity-transacted ::common/currency]) + (merge common-fields) + (assoc ::common/txn-type ::common/fee))) + (filter fee? elems))))) + +(defn group-transactions [txns] + (let [trades (filter (comp not nil? ::trade-id) txns) + transfers (filter (comp not nil? ::transfer-id) txns)] + (concat (common/flatmap process-transfer transfers) + (apply concat (map process-match-order (group-by ::trade-id trades)))))) + +(def preclean-txns + (common/flatcomp + (common/rename-field :amount + ::common/quantity-transacted + bigdec) + (common/rename-field :amount/balance-unit + ::common/currency + common/to-keyword) + (common/rename-field :time + ::common/timestamp + (common/date-parser "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) + (common/rename-field :trade-id + ::trade-id + (common/null-or parse-int)) + (common/rename-field :transfer-id + ::transfer-id + (common/null-or parse-uuid)) + (common/rename-field :order-id + ::order-id + (common/null-or parse-uuid)) + (common/rename-field :type + ::txn-type + common/to-keyword) + (common/rename-field :portfolio + ::portfolio + common/to-keyword) + (common/rename-field :balance + ::balance + bigdec))) + +(defn load-csv [filename] + (->> (common/load-csv filename) + (common/pivot-csv) + (map #(assoc % ::common/account :coinbase-pro)) + (common/flatmap preclean-txns) + (group-transactions) + (common/flatmap (common/add-txid ::common/txid)))) diff --git a/src/worther/injest/common.clj b/src/worther/injest/common.clj new file mode 100644 index 0000000..dd28318 --- /dev/null +++ b/src/worther/injest/common.clj @@ -0,0 +1,154 @@ +(ns worther.injest.common + (:require [clojure.data.csv :as csv] + [clojure.java.io :as io] + [clojure.string :as str] + [clojure.spec.alpha :as s] + [valuehash.api :as hash])) + +(s/def ::header keyword?) + +(defn headify [str] + (-> str + (str/replace #" \([^)]+\)$" "") + (str/replace #" +" "-") + (str/lower-case) + (keyword))) +(s/fdef headify + :args (s/cat :str string?) + :ret ::header) + +(defn to-keyword [str] + (-> str + (str/replace #" +" "-") + (str/replace #"_" "-") + (str/lower-case) + (keyword))) + +(defn make-entry [headers fields] + (into {} (map vector headers fields))) +(s/fdef make-entry + :args (s/cat :headers (s/coll-of ::header) + :fields (s/coll-of string?))) + +(defn date-parser [date-format] + (let [date-format (java.text.SimpleDateFormat. date-format)] + (fn [date-str] (.parse date-format date-str)))) + +(defn pivot-csv [csv-contents] + (let [headers (map headify (first csv-contents))] + (for [row (rest csv-contents)] + (make-entry headers row)))) + +(defn load-csv [filename] + (csv/read-csv (io/reader filename))) + +(defn alter-field [field f] + (fn [entry] + (if (get entry field) + [(update entry field (fn [value] (f value)))] + (throw (RuntimeException. (str "Field not found: " field)))))) + +(defn add-field [field f] + (fn [entry] + [(assoc entry field (f entry))])) + +(defn generate-id [entry] + ;; Use a vector because I'm not sure the map will be ordered + (let [make-entry-vector (juxt ::timestamp + ::quantity-transacted + ::txn-type + ::account + ::currency)] + (-> entry + (make-entry-vector) + (hash/sha-256-str)))) + +(defn add-txid [field] + (add-field field generate-id)) + +(defn rename-field + ([field new-field] (rename-field field new-field identity)) + ([field new-field f] (fn [entry] + (when (not (contains? entry field)) + (throw (RuntimeException. (str "Field not found: " field)))) + [(assoc (dissoc entry field) + new-field + (f (get entry field)))]))) + +(defn dump-fields [] + (fn [record] + (clojure.pprint/pprint record) + [record])) + +;; (-> (-> a [b]) [a] [b]) +(defn flatmap + ([f coll] (apply concat (map f coll))) + ([f] (fn [coll] (flatmap f coll)))) + +;; (-> *(-> a [b]) (-> a [b])) +(defn flatcomp [& fns] + (apply comp (concat (map #(flatmap %) (butlast fns)) [(last fns)]))) + +(defn null-or [f] + (fn [str] (if (or (nil? str) (empty? str)) nil (f str)))) + +(defn zero-or [f] + (fn [str] (if (or (nil? str) (empty? str)) (bigdec 0) (f str)))) + +(s/def ::txn-type #{::buy ::sell ::send ::receive ::deposit ::withdrawal ::fee ::income}) + +(defn valid-timestamp? [ts] (instance? java.util.Date ts)) + +(s/def ::timestamp valid-timestamp?) + +(defn bigdec? [i] (instance? java.math.BigDecimal i)) + +(s/def ::quantity-transacted bigdec?) + +(s/def ::usd-value (s/or :deferred #(= % :deferred) + :number bigdec?)) + +(s/def ::currency keyword?) + +(s/def ::account keyword?) + +(s/def ::sender keyword?) + +(s/def ::recipient keyword?) + +(defmulti txn ::txn-type) +(s/def ::txn (s/multi-spec txn ::txn-type)) + +(let [txn-common-req [::txid + ::quantity-transacted + ::currency + ::txn-type + ::account + ::timestamp]] + (defmethod txn ::buy [_] + (s/keys :req (concat txn-common-req [::usd-value]))) + (defmethod txn ::sell [_] + (s/keys :req (concat txn-common-req [::usd-value]))) + (defmethod txn ::send [_] + (s/keys :req (concat txn-common-req [::recipient]))) + (defmethod txn ::receive [_] + (s/keys :req (concat txn-common-req [::sender]))) + (defmethod txn ::fee [_] + (s/keys :req (concat txn-common-req))) + (defmethod txn ::income [_] + (s/keys :req (concat txn-common-req [::src]))) + (defmethod txn ::deposit [_] + (s/keys :req (concat txn-common-req [::src]))) + (defmethod txn ::withdrawal [_] + (s/keys :req (concat txn-common-req [::dest])))) + +(defn map-select [m] + (fn [om] + (every? (fn [key] (= (get om key) + (get m key))) + (keys m)))) + +(defn matching-files [rx dir] + (filter (fn [file] + (re-matches rx file)) + (map str (file-seq (clojure.java.io/file dir))))) diff --git a/src/worther/injest/deps.edn b/src/worther/injest/deps.edn new file mode 100644 index 0000000..8c1a243 --- /dev/null +++ b/src/worther/injest/deps.edn @@ -0,0 +1,5 @@ +{:deps + {org.clojure/data.csv {:mvn/version "1.0.0"} + org.clojure/core.match {:mvn/version "1.0.0"} + arachne-framework/valuehash {:git/url "https://git.fudo.org/fudo-public/valuehash.git" + :sha "9d2dbafdb5db886a57f44c5b7fe32c824713e6c7"}}}