commit 984feb0878846753179a4408abdf0683087db50c Author: niten Date: Thu Mar 7 20:22:00 2024 -0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c416e6b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.cpcache/ +.nrepl-port diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..2071228 --- /dev/null +++ b/deps.edn @@ -0,0 +1,25 @@ +{ + :paths ["src"] + :deps { + org.clojure/clojure { :mvn/version "1.11.1" } + org.clojure/tools.cli { :mvn/version "1.0.214" } + org.clj-commons/digest { :mvn/version "1.4.100" } + clj-time/clj-time { :mvn/version "0.15.2" } + metosin/malli { :mvn/version "0.11.0" } + org.fudo/mebot { + ;; :git/url "https://fudo.dev/public/mebot.git" + ;; :git/sha "67fad7a8875c7fc04dce20cb3cfb5c98ae64454d" + :local/root "/net/projects/niten/matrix-client" + } + org.fudo/milquetoast { + ;; :git/url "https://git.fudo.org/fudo-public/milquetoast.git" + ;; :git/sha "2c5b228f426adc2832d8c295b27970c0948523d5" + :local/root "/net/projects/niten/milquetoast" + } + org.apache.tika/tika-core { :mvn/version "2.9.1" } + camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} + org.clojure/data.json { :mvn/version "2.5.0" } + net.coobird/thumbnailator { :mvn/version "0.4.20" } + slingshot/slingshot { :mvn/version "0.12.2" } + } + } diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..a1634d0 --- /dev/null +++ b/flake.nix @@ -0,0 +1,49 @@ +{ + description = "Mabel Home Bot"; + + inputs = { + nixpkgs.url = "nixpkgs/nixos-23.11"; + utils.url = "github:numtide/flake-utils"; + helpers = { + url = "git+https://git.fudo.org/fudo-public/nix-helpers.git"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + mebot = { + url = "git+https://git.fudo.org/fudo-public/mebot.git"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + }; + + outputs = { self, nixpkgs, utils, helpers, mebot, ... }: + utils.lib.eachDefaultSystem (system: + let + inherit (helpers.packages."${system}") mkClojureBin; + pkgs = nixpkgs.legacyPackages."${system}"; + cljLibs = { "org.fudo/mebot" = "${mebot.packages."${system}".mebot}"; }; + in { + packages = rec { + default = mabel; + mabel = mkClojureBin { + name = "org.fudo/mabel"; + primaryNamespaces = "mebot.cli"; + src = ./.; + inherit cljLibs; + }; + }; + + devShells = rec { + default = updateDeps; + updateDeps = pkgs.mkShell { + buildInputs = with helpers.packages."${system}"; [ updateCljDeps ]; + }; + mabel = pkgs.mkShell { + buildInputs = with self.packages."${system}"; [ mabel ]; + }; + }; + }) // { + nixosModules = rec { + default = mabel; + mabel = import ./module.nix self.packages; + }; + }; +} diff --git a/module.nix b/module.nix new file mode 100644 index 0000000..047e2f5 --- /dev/null +++ b/module.nix @@ -0,0 +1,94 @@ +packages: + +{ config, lib, pkgs, ... }: + +with lib; +let + mabel = packages."${pkgs.system}".mabel; + cfg = config.services.mabel; + +in { + options.services.mabel = with types; { + enable = mkEnableOption "Enable Mabel home monitor server."; + + verbose = mkEnableOption "Generate verbose logs & output."; + + mqtt = { + host = mkOption { + type = str; + description = "Hostname of the MQTT server."; + }; + + port = mkOption { + type = port; + description = "Port on which the MQTT server is listening."; + default = 1883; + }; + + username = mkOption { + type = str; + description = "User as which to connect to the MQTT server."; + }; + + password-file = mkOption { + type = str; + description = + "File (on the local host) containing the password for the MQTT server."; + }; + }; + + matrix = { + domain = mkOption { + type = str; + description = "Domain name of the MQTT server."; + }; + + username = mkOption { + type = str; + description = "User as which to connect to the MQTT server."; + }; + + password-file = mkOption { + type = str; + description = + "File (on the local host) containing the password for the MQTT server."; + }; + + channel-alias = mkOption { + type = str; + description = "Matrix channel to which Mabel should post updates."; + }; + }; + }; + + config = mkIf cfg.enable { + systemd.services.mabel-server = { + path = [ mabel ]; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + serviceConfig = { + DynamicUser = true; + Restart = "on-failure"; + RestartSec = "120s"; + LoadCredential = [ + "mqtt.passwd:${cfg.mqtt.password-file}" + "matrix.passwd:${cfg.matrix.password-file}" + ]; + ExecStart = pkgs.writeShellScript "mabel-server.sh" + (concatStringsSep " " ([ + "mabel" + "--mqtt-host=${cfg.mqtt.host}" + "--mqtt-port=${cfg.mqtt.post}" + "--mqtt-user=${cfg.mqtt.username}" + "--mqtt-password-file=${cfg.mqtt.password-file}" + "--matrix-domain=${cfg.matrix.domain}" + "--matrix-user=${cfg.matrix.username}" + "--matrix-password-file=${cfg.matrix.password-file}" + "--matrix-room=${cfg.matrix.channel-alias}" + ])); + unitConfig.ConditionPathExists = + [ cfg.mqtt.password-file cfg.matrix.password-file ]; + }; + }; + }; +} diff --git a/src/mabel/cli.clj b/src/mabel/cli.clj new file mode 100644 index 0000000..68288b0 --- /dev/null +++ b/src/mabel/cli.clj @@ -0,0 +1,98 @@ +(ns mabel.cli + (:require [clojure.tools.cli :as cli] + [mebot.client :as mebot] + [clojure.string :as str] + [clojure.core.async :refer [>!! > (concat errors + ["usage: snooper-client [opts]" + "" + "Options:" + summary]) + (str/join \newline)))) + +(defn- parse-opts [args required cli-opts] + (let [{:keys [options] :as result} (cli/parse-opts args cli-opts) + missing (set/difference required (-> options (keys) (set))) + missing-errors (map #(format "missing required parameter: %s" (name %)) + missing)] + (update result :errors concat missing-errors))) + +(defn -main [& args] + (let [required-args #{:mqtt-host + :mqtt-port + :mqtt-user + :mqtt-password-file + :matrix-domain + :matrix-user + :matrix-password-file + :matrix-room} + {:keys [options _ errors summary]} (parse-opts args required-args cli-opts)] + (when (:help options) (msg-quit :message (usage summary))) + (when (seq errors) (msg-quit :status 1 :message (usage summary errors))) + (let [{:keys [mqtt-host + mqtt-port + mqtt-user + mqtt-password-file + matrix-domain + matrix-user + matrix-password-file + matrix-room]} options + shutdown-chan (chan) + milquetoast-quit (chan) + mabel-quit (chan) + milquetoast-client (mqtt/connect-json! :username mqtt-user + :password (-> mqtt-password-file + (slurp) + (str/trim)) + :host mqtt-host + :port mqtt-port) + jwt (mebot/get-jwt-token! :domain matrix-domain + :username matrix-user + :password (-> matrix-password-file + (slurp) + (str/trim))) + mebot (mebot/request-access! (mebot/make-client! matrix-domain) jwt) + room (try+ + (mebot/join-public-room! mebot :alias matrix-room) + (catch [:type :mebot/forbidden] {:keys [room-alias]} + (msg-quit :status 2 :message (str "access to " + room-alias + " forbidden")))) + evt-chan (mabel/frigate-listen! milquetoast-client milquetoast-quit)] + (mabel/notify! room evt-chan mabel-quit) + (.addShutdownHook (Runtime/getRuntime) + (Thread. (fn [] (>!! shutdown-chan true)))) + (!! milquetoast-quit true) + (println "stopping mabel client...") + (>!! mabel-quit true)) + (msg-quit :message "stopping mabel server"))) diff --git a/src/mabel/core.clj b/src/mabel/core.clj new file mode 100644 index 0000000..0f79e5e --- /dev/null +++ b/src/mabel/core.clj @@ -0,0 +1,204 @@ +(ns mabel.core + (:require [mebot.client :as mebot] + [milquetoast.client :as mqtt] + [clojure.java.io :as io] + [clojure.core.async :refer [go-loop ! chan pipeline alt!]] + [clj-time.core :as t] + [clj-commons.digest :as digest] + [clojure.string :as str] + [mabel.core :as mabel] + [slingshot.slingshot :refer [throw+ try+]] + [clojure.pprint :refer [pprint]]) + (:import org.apache.tika.Tika + java.util.UUID)) + +(defn- pthru [o] (clojure.pprint/pprint o) (flush) o) + +(defn create! [{:keys [domain username password]}] + (let [client (mebot/make-client! domain) + jwt-token (mebot/get-jwt-token! domain username password)] + {:client (mebot/request-access! client jwt-token)})) + +(defn- detect-mime-type [is] + (let [tika (Tika.)] + (.detect tika is))) + +(defn- parallelism [] + (-> (Runtime/getRuntime) + (.availableProcessors) + (+ 1))) + +(defn pipe [in xf] + (let [out (chan)] + (pipeline (parallelism) out xf in) + out)) + +(defn frigate-listen! [milquetoast-client quit-chan] + (let [person? (fn [evt] (or (= "person" (-> evt :payload :before :label)) + (= "person" (-> evt :payload :after :label)))) + evt-chan (pipe (mqtt/subscribe! milquetoast-client "frigate/events" + :buffer-size 5) + (filter person?)) + detect-chan (chan 5)] + (go-loop [evt (alt! evt-chan ([e] {:type :event :content e}) + quit-chan ([_] {:type :quit}))] + (if (= (:type evt) :quit) + (println "Detection loop quitting...") + (let [{{{{:keys [label camera] :as evt} :before} :payload} :content} evt] + (>! detect-chan + (assoc evt :snapshot + (:payload-bytes + (mqtt/get-raw! milquetoast-client + (str "frigate/" camera "/" label "/snapshot"))))) + (recur (alt! evt-chan ([e] {:type :event :content e}) + quit-chan ([_] {:type :quit})))))) + detect-chan)) + +(defn- current-timestamp [] + (quot (System/currentTimeMillis) 1000)) + +(defmulti handle-update! (fn [update & _] (:type update))) + +(defn- snapshot-cache [size] + {:snapshots [] :size size}) + +(defn- has-snapshot? [{:keys [snapshots]} snapshot] + (some #{(digest/sha-256 snapshot)} snapshots)) + +(defn- add-snapshot [{:keys [size snapshots] :as cache} snapshot] + (assoc cache :snapshots (take size (conj snapshots (digest/sha-256 snapshot))))) + +(defn- silence-map [pause-time] + {:pause-time pause-time + :cameras {} + :all (t/now)}) + +(defn- add-silence + ([sm camera] (add-silence sm camera (t/plus (t/now) (t/seconds (:pause-time sm))))) + ([sm camera time] (assoc-in sm [:cameras camera] time))) + +(defn- silence-all [sm time] + (assoc sm :all time)) + +(defn- silenced? [sm camera] + (if (t/before? (t/now) (:all sm)) + true + (if-let [silence-end (get-in sm [:cameras camera])] + (t/before? (t/now) silence-end) + false))) + +(defmethod handle-update! :detection + ;; TODO: Per-cam silences? + [{{:keys [label camera snapshot] :as update} :content} mebot-room context] + (println (str "KEYS: " (str/join "," (keys update)))) + (when (not (silenced? (:silence-map @context) camera)) + (when (not (has-snapshot? (:recents @context) snapshot)) + (mebot/room-message! mebot-room (str "There's a " label " at the " camera)) + (let [id (UUID/randomUUID)] + (mebot/room-image! mebot-room snapshot (str id ".jpg"))) + (-> context + (swap! update :silence-map add-silence camera) + (swap! update :recents add-snapshot snapshot))))) + +(defmethod handle-update! :quit + [_ mebot-room _] + (mebot/room-message! mebot-room (str "Quit requested!"))) + +(defmacro ->* [& fns] + (let [v (gensym)] + `(fn [~v] (-> ~v ~@fns)))) + +(defn- remove-silence-from-context [reply! context cameras] + (doseq [camera cameras] + (if (= camera "all") + (do (reply! "Okay, unsilencing all!") + (swap! context update :silence-map (->* (assoc :all (t/now)) + (assoc :cameras {})))) + (if (get-in @context [:silence-map :camera camera]) + (do (reply! (str "Okay, unsilencing " camera)) + (swap! context assoc-in :silence-map [:cameras camera] (t/now))) + (reply! (str "Camera " camera " not found")))))) + +(defn- parse-time-element [el] + (if-some [[_ time duration] (re-matches #"^([0-9]+)([a-z]+)" el)] + (merge (parse-time-element time) (parse-time-element duration)) + (cond (re-matches #"^[0-9]+$" el) {:count (Integer/parseInt el)} + (contains? #{"s" "secs" "second" "seconds"} el) {:duration :second} + (contains? #{"m" "mins" "minute" "minutes"} el) {:duration :minute} + (contains? #{"h" "hrs" "hour" "hours"} el) {:duration :hour} + (contains? #{"d" "day" "days"} el) {:duration :day} + :else (throw (throw+ + {:type ::bad-time + :message (str "bad time element: " el)}))))) + +(defmulti translate-time :duration) + +(defmethod translate-time :second [{:keys [count]}] + (t/seconds count)) + +(defmethod translate-time :minute [{:keys [count]}] + (t/minutes count)) + +(defmethod translate-time :hour [{:keys [count]}] + (t/hours count)) + +(defmethod translate-time :day [{:keys [count]}] + (t/days count)) + +(defmethod translate-time :default [& _] + (t/seconds 0)) + +(defn- parse-time [time-els] + (translate-time (reduce merge (map parse-time-element time-els)))) + +(defn- add-silence-to-context [reply! context msg] + (let [[camera & time-els] msg + time (t/plus (t/now) + (parse-time time-els))] + (if (= camera "all") + (do (reply! "Okay, silencing all!") + (swap! context assoc-in [:silence-map :all] time)) + (if (get-in @context [:silence-map :camera camera]) + (do (reply! (str "Okay, silencing " camera)) + (swap! context assoc-in [:silence-map :cameras camera] time)) + (reply! (str "Camera " camera " not found")))))) + +(defmethod handle-update! :message + [{{:keys [sender] {:keys [body]} :content} :content} room context] + (let [msg (-> body + (str/replace #"^[^:]+: " "") + (str/trim) + (str/lower-case) + (str/split #" ")) + reply! (fn [msg] (mebot/room-message! room (str sender ": " msg)))] + (cond (contains? #{"unmute" + "unsilence"} + (first msg)) (remove-silence-from-context reply! context (rest msg)) + (contains? #{"silence" + "mute" + "quiet"} + (first msg)) (add-silence-to-context reply! context (rest msg)) + :else (println (str sender " sez: " body))))) + +(defmethod handle-update! :default [update _ _] + (println (str "Unexpected update type for update: " update))) + +(defn notify! [mebot-room detect-chan quit-chan & + {:keys [cache-size default-pause] + :or {cache-size 10 + default-pause 15}}] + (let [context (atom {:silence-map (silence-map default-pause) + :recents (snapshot-cache cache-size)}) + mentions (mebot/room-self-mention-channel! mebot-room)] + (go-loop [update (alt! detect-chan ([d] {:type :detection :content d}) + mentions ([m] {:type :message :content m}) + quit-chan ([_] {:type :quit}))] + (when (not (= (:type update) :quit)) + (try+ + (handle-update! update mebot-room context) + (catch Exception e + (mebot/room-message! mebot-room "Encountered error") + (pprint e))) + (recur (alt! detect-chan ([d] {:type :detection :content d}) + mentions ([m] {:type :message :content m}) + quit-chan ([_] {:type :quit})))))))