Initial commit

This commit is contained in:
niten 2024-03-07 20:22:00 -08:00
commit 984feb0878
6 changed files with 472 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.cpcache/
.nrepl-port

25
deps.edn Normal file
View File

@ -0,0 +1,25 @@
{
:paths ["src"]
:deps {
org.clojure/clojure { :mvn/version "1.11.1" }
org.clojure/tools.cli { :mvn/version "1.0.214" }
org.clj-commons/digest { :mvn/version "1.4.100" }
clj-time/clj-time { :mvn/version "0.15.2" }
metosin/malli { :mvn/version "0.11.0" }
org.fudo/mebot {
;; :git/url "https://fudo.dev/public/mebot.git"
;; :git/sha "67fad7a8875c7fc04dce20cb3cfb5c98ae64454d"
:local/root "/net/projects/niten/matrix-client"
}
org.fudo/milquetoast {
;; :git/url "https://git.fudo.org/fudo-public/milquetoast.git"
;; :git/sha "2c5b228f426adc2832d8c295b27970c0948523d5"
:local/root "/net/projects/niten/milquetoast"
}
org.apache.tika/tika-core { :mvn/version "2.9.1" }
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
org.clojure/data.json { :mvn/version "2.5.0" }
net.coobird/thumbnailator { :mvn/version "0.4.20" }
slingshot/slingshot { :mvn/version "0.12.2" }
}
}

49
flake.nix Normal file
View File

@ -0,0 +1,49 @@
{
description = "Mabel Home Bot";
inputs = {
nixpkgs.url = "nixpkgs/nixos-23.11";
utils.url = "github:numtide/flake-utils";
helpers = {
url = "git+https://git.fudo.org/fudo-public/nix-helpers.git";
inputs.nixpkgs.follows = "nixpkgs";
};
mebot = {
url = "git+https://git.fudo.org/fudo-public/mebot.git";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, utils, helpers, mebot, ... }:
utils.lib.eachDefaultSystem (system:
let
inherit (helpers.packages."${system}") mkClojureBin;
pkgs = nixpkgs.legacyPackages."${system}";
cljLibs = { "org.fudo/mebot" = "${mebot.packages."${system}".mebot}"; };
in {
packages = rec {
default = mabel;
mabel = mkClojureBin {
name = "org.fudo/mabel";
primaryNamespaces = "mebot.cli";
src = ./.;
inherit cljLibs;
};
};
devShells = rec {
default = updateDeps;
updateDeps = pkgs.mkShell {
buildInputs = with helpers.packages."${system}"; [ updateCljDeps ];
};
mabel = pkgs.mkShell {
buildInputs = with self.packages."${system}"; [ mabel ];
};
};
}) // {
nixosModules = rec {
default = mabel;
mabel = import ./module.nix self.packages;
};
};
}

94
module.nix Normal file
View File

@ -0,0 +1,94 @@
packages:
{ config, lib, pkgs, ... }:
with lib;
let
mabel = packages."${pkgs.system}".mabel;
cfg = config.services.mabel;
in {
options.services.mabel = with types; {
enable = mkEnableOption "Enable Mabel home monitor server.";
verbose = mkEnableOption "Generate verbose logs & output.";
mqtt = {
host = mkOption {
type = str;
description = "Hostname of the MQTT server.";
};
port = mkOption {
type = port;
description = "Port on which the MQTT server is listening.";
default = 1883;
};
username = mkOption {
type = str;
description = "User as which to connect to the MQTT server.";
};
password-file = mkOption {
type = str;
description =
"File (on the local host) containing the password for the MQTT server.";
};
};
matrix = {
domain = mkOption {
type = str;
description = "Domain name of the MQTT server.";
};
username = mkOption {
type = str;
description = "User as which to connect to the MQTT server.";
};
password-file = mkOption {
type = str;
description =
"File (on the local host) containing the password for the MQTT server.";
};
channel-alias = mkOption {
type = str;
description = "Matrix channel to which Mabel should post updates.";
};
};
};
config = mkIf cfg.enable {
systemd.services.mabel-server = {
path = [ mabel ];
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
serviceConfig = {
DynamicUser = true;
Restart = "on-failure";
RestartSec = "120s";
LoadCredential = [
"mqtt.passwd:${cfg.mqtt.password-file}"
"matrix.passwd:${cfg.matrix.password-file}"
];
ExecStart = pkgs.writeShellScript "mabel-server.sh"
(concatStringsSep " " ([
"mabel"
"--mqtt-host=${cfg.mqtt.host}"
"--mqtt-port=${cfg.mqtt.post}"
"--mqtt-user=${cfg.mqtt.username}"
"--mqtt-password-file=${cfg.mqtt.password-file}"
"--matrix-domain=${cfg.matrix.domain}"
"--matrix-user=${cfg.matrix.username}"
"--matrix-password-file=${cfg.matrix.password-file}"
"--matrix-room=${cfg.matrix.channel-alias}"
]));
unitConfig.ConditionPathExists =
[ cfg.mqtt.password-file cfg.matrix.password-file ];
};
};
};
}

98
src/mabel/cli.clj Normal file
View File

@ -0,0 +1,98 @@
(ns mabel.cli
(:require [clojure.tools.cli :as cli]
[mebot.client :as mebot]
[clojure.string :as str]
[clojure.core.async :refer [>!! <!! chan]]
[clojure.set :as set]
[mabel.core :as mabel]
[milquetoast.client :as mqtt]
[slingshot.slingshot :refer [try+]]
[mabel.core :as mabel]))
(def cli-opts
[["-v" "--verbose" "Provide verbose output."]
["-h" "--help" "Print this message."]
[nil "--mqtt-host HOSTNAME" "Hostname of MQTT server on which to listen for events."]
[nil "--mqtt-port PORT" "Port on which to connect to the incoming MQTT server."
:parse-fn #(Integer/parseInt %)]
[nil "--mqtt-user USER" "User as which to connect to MQTT server."]
[nil "--mqtt-password-file PASSWD_FILE" "File containing password for MQTT user."]
[nil "--matrix-domain DOMAIN" "Domain of Matrix server."]
[nil "--matrix-user USER" "User as which to connect to Matrix server."]
[nil "--matrix-password-file PASSWD_FILE" "File containing Matrix user password."]
[nil "--matrix-room ROOM" "Room in which to report events."]])
(defn- msg-quit [{:keys [status message]
:or {status 0}}]
(println message)
(System/exit status))
(defn- usage
([summary] (usage summary []))
([summary errors] (->> (concat errors
["usage: snooper-client [opts]"
""
"Options:"
summary])
(str/join \newline))))
(defn- parse-opts [args required cli-opts]
(let [{:keys [options] :as result} (cli/parse-opts args cli-opts)
missing (set/difference required (-> options (keys) (set)))
missing-errors (map #(format "missing required parameter: %s" (name %))
missing)]
(update result :errors concat missing-errors)))
(defn -main [& args]
(let [required-args #{:mqtt-host
:mqtt-port
:mqtt-user
:mqtt-password-file
:matrix-domain
:matrix-user
:matrix-password-file
:matrix-room}
{:keys [options _ errors summary]} (parse-opts args required-args cli-opts)]
(when (:help options) (msg-quit :message (usage summary)))
(when (seq errors) (msg-quit :status 1 :message (usage summary errors)))
(let [{:keys [mqtt-host
mqtt-port
mqtt-user
mqtt-password-file
matrix-domain
matrix-user
matrix-password-file
matrix-room]} options
shutdown-chan (chan)
milquetoast-quit (chan)
mabel-quit (chan)
milquetoast-client (mqtt/connect-json! :username mqtt-user
:password (-> mqtt-password-file
(slurp)
(str/trim))
:host mqtt-host
:port mqtt-port)
jwt (mebot/get-jwt-token! :domain matrix-domain
:username matrix-user
:password (-> matrix-password-file
(slurp)
(str/trim)))
mebot (mebot/request-access! (mebot/make-client! matrix-domain) jwt)
room (try+
(mebot/join-public-room! mebot :alias matrix-room)
(catch [:type :mebot/forbidden] {:keys [room-alias]}
(msg-quit :status 2 :message (str "access to "
room-alias
" forbidden"))))
evt-chan (mabel/frigate-listen! milquetoast-client milquetoast-quit)]
(mabel/notify! room evt-chan mabel-quit)
(.addShutdownHook (Runtime/getRuntime)
(Thread. (fn [] (>!! shutdown-chan true))))
(<!! shutdown-chan)
(println "stopping milquetoast client...")
(>!! milquetoast-quit true)
(println "stopping mabel client...")
(>!! mabel-quit true))
(msg-quit :message "stopping mabel server")))

204
src/mabel/core.clj Normal file
View File

@ -0,0 +1,204 @@
(ns mabel.core
(:require [mebot.client :as mebot]
[milquetoast.client :as mqtt]
[clojure.java.io :as io]
[clojure.core.async :refer [go-loop <! >! chan pipeline alt!]]
[clj-time.core :as t]
[clj-commons.digest :as digest]
[clojure.string :as str]
[mabel.core :as mabel]
[slingshot.slingshot :refer [throw+ try+]]
[clojure.pprint :refer [pprint]])
(:import org.apache.tika.Tika
java.util.UUID))
(defn- pthru [o] (clojure.pprint/pprint o) (flush) o)
(defn create! [{:keys [domain username password]}]
(let [client (mebot/make-client! domain)
jwt-token (mebot/get-jwt-token! domain username password)]
{:client (mebot/request-access! client jwt-token)}))
(defn- detect-mime-type [is]
(let [tika (Tika.)]
(.detect tika is)))
(defn- parallelism []
(-> (Runtime/getRuntime)
(.availableProcessors)
(+ 1)))
(defn pipe [in xf]
(let [out (chan)]
(pipeline (parallelism) out xf in)
out))
(defn frigate-listen! [milquetoast-client quit-chan]
(let [person? (fn [evt] (or (= "person" (-> evt :payload :before :label))
(= "person" (-> evt :payload :after :label))))
evt-chan (pipe (mqtt/subscribe! milquetoast-client "frigate/events"
:buffer-size 5)
(filter person?))
detect-chan (chan 5)]
(go-loop [evt (alt! evt-chan ([e] {:type :event :content e})
quit-chan ([_] {:type :quit}))]
(if (= (:type evt) :quit)
(println "Detection loop quitting...")
(let [{{{{:keys [label camera] :as evt} :before} :payload} :content} evt]
(>! detect-chan
(assoc evt :snapshot
(:payload-bytes
(mqtt/get-raw! milquetoast-client
(str "frigate/" camera "/" label "/snapshot")))))
(recur (alt! evt-chan ([e] {:type :event :content e})
quit-chan ([_] {:type :quit}))))))
detect-chan))
(defn- current-timestamp []
(quot (System/currentTimeMillis) 1000))
(defmulti handle-update! (fn [update & _] (:type update)))
(defn- snapshot-cache [size]
{:snapshots [] :size size})
(defn- has-snapshot? [{:keys [snapshots]} snapshot]
(some #{(digest/sha-256 snapshot)} snapshots))
(defn- add-snapshot [{:keys [size snapshots] :as cache} snapshot]
(assoc cache :snapshots (take size (conj snapshots (digest/sha-256 snapshot)))))
(defn- silence-map [pause-time]
{:pause-time pause-time
:cameras {}
:all (t/now)})
(defn- add-silence
([sm camera] (add-silence sm camera (t/plus (t/now) (t/seconds (:pause-time sm)))))
([sm camera time] (assoc-in sm [:cameras camera] time)))
(defn- silence-all [sm time]
(assoc sm :all time))
(defn- silenced? [sm camera]
(if (t/before? (t/now) (:all sm))
true
(if-let [silence-end (get-in sm [:cameras camera])]
(t/before? (t/now) silence-end)
false)))
(defmethod handle-update! :detection
;; TODO: Per-cam silences?
[{{:keys [label camera snapshot] :as update} :content} mebot-room context]
(println (str "KEYS: " (str/join "," (keys update))))
(when (not (silenced? (:silence-map @context) camera))
(when (not (has-snapshot? (:recents @context) snapshot))
(mebot/room-message! mebot-room (str "There's a " label " at the " camera))
(let [id (UUID/randomUUID)]
(mebot/room-image! mebot-room snapshot (str id ".jpg")))
(-> context
(swap! update :silence-map add-silence camera)
(swap! update :recents add-snapshot snapshot)))))
(defmethod handle-update! :quit
[_ mebot-room _]
(mebot/room-message! mebot-room (str "Quit requested!")))
(defmacro ->* [& fns]
(let [v (gensym)]
`(fn [~v] (-> ~v ~@fns))))
(defn- remove-silence-from-context [reply! context cameras]
(doseq [camera cameras]
(if (= camera "all")
(do (reply! "Okay, unsilencing all!")
(swap! context update :silence-map (->* (assoc :all (t/now))
(assoc :cameras {}))))
(if (get-in @context [:silence-map :camera camera])
(do (reply! (str "Okay, unsilencing " camera))
(swap! context assoc-in :silence-map [:cameras camera] (t/now)))
(reply! (str "Camera " camera " not found"))))))
(defn- parse-time-element [el]
(if-some [[_ time duration] (re-matches #"^([0-9]+)([a-z]+)" el)]
(merge (parse-time-element time) (parse-time-element duration))
(cond (re-matches #"^[0-9]+$" el) {:count (Integer/parseInt el)}
(contains? #{"s" "secs" "second" "seconds"} el) {:duration :second}
(contains? #{"m" "mins" "minute" "minutes"} el) {:duration :minute}
(contains? #{"h" "hrs" "hour" "hours"} el) {:duration :hour}
(contains? #{"d" "day" "days"} el) {:duration :day}
:else (throw (throw+
{:type ::bad-time
:message (str "bad time element: " el)})))))
(defmulti translate-time :duration)
(defmethod translate-time :second [{:keys [count]}]
(t/seconds count))
(defmethod translate-time :minute [{:keys [count]}]
(t/minutes count))
(defmethod translate-time :hour [{:keys [count]}]
(t/hours count))
(defmethod translate-time :day [{:keys [count]}]
(t/days count))
(defmethod translate-time :default [& _]
(t/seconds 0))
(defn- parse-time [time-els]
(translate-time (reduce merge (map parse-time-element time-els))))
(defn- add-silence-to-context [reply! context msg]
(let [[camera & time-els] msg
time (t/plus (t/now)
(parse-time time-els))]
(if (= camera "all")
(do (reply! "Okay, silencing all!")
(swap! context assoc-in [:silence-map :all] time))
(if (get-in @context [:silence-map :camera camera])
(do (reply! (str "Okay, silencing " camera))
(swap! context assoc-in [:silence-map :cameras camera] time))
(reply! (str "Camera " camera " not found"))))))
(defmethod handle-update! :message
[{{:keys [sender] {:keys [body]} :content} :content} room context]
(let [msg (-> body
(str/replace #"^[^:]+: " "")
(str/trim)
(str/lower-case)
(str/split #" "))
reply! (fn [msg] (mebot/room-message! room (str sender ": " msg)))]
(cond (contains? #{"unmute"
"unsilence"}
(first msg)) (remove-silence-from-context reply! context (rest msg))
(contains? #{"silence"
"mute"
"quiet"}
(first msg)) (add-silence-to-context reply! context (rest msg))
:else (println (str sender " sez: " body)))))
(defmethod handle-update! :default [update _ _]
(println (str "Unexpected update type for update: " update)))
(defn notify! [mebot-room detect-chan quit-chan &
{:keys [cache-size default-pause]
:or {cache-size 10
default-pause 15}}]
(let [context (atom {:silence-map (silence-map default-pause)
:recents (snapshot-cache cache-size)})
mentions (mebot/room-self-mention-channel! mebot-room)]
(go-loop [update (alt! detect-chan ([d] {:type :detection :content d})
mentions ([m] {:type :message :content m})
quit-chan ([_] {:type :quit}))]
(when (not (= (:type update) :quit))
(try+
(handle-update! update mebot-room context)
(catch Exception e
(mebot/room-message! mebot-room "Encountered error")
(pprint e)))
(recur (alt! detect-chan ([d] {:type :detection :content d})
mentions ([m] {:type :message :content m})
quit-chan ([_] {:type :quit})))))))