Split incoming & outgoing MQTT servers

This commit is contained in:
niten 2023-05-08 22:46:42 -07:00
parent 6b5c41b23f
commit d886bd9586
3 changed files with 90 additions and 42 deletions

View File

@ -9,7 +9,7 @@ let
in {
options.services.snooper = with types; {
enable = mkEnableOption "Enable Snooper notifiaction server.";
enable = mkEnableOption "Enable Snooper notification server.";
verbose = mkEnableOption "Generate verbose logs and output.";
@ -24,26 +24,51 @@ in {
};
mqtt = {
host = mkOption {
type = str;
description = "Hostname of the MQTT server.";
};
incoming = {
host = mkOption {
type = str;
description = "Hostname of the MQTT server.";
};
port = mkOption {
type = port;
description = "Port on which the MQTT server is listening.";
default = 1883;
};
port = mkOption {
type = port;
description = "Port on which the MQTT server is listening.";
default = 1883;
};
username = mkOption {
type = str;
description = "User as which to connect to the MQTT server.";
};
username = mkOption {
type = str;
description = "User as which to connect to the MQTT server.";
};
password-file = mkOption {
type = str;
description =
"File (on the local host) containing the password for the MQTT server.";
password-file = mkOption {
type = str;
description =
"File (on the local host) containing the password for the MQTT server.";
};
};
outgoing = {
host = mkOption {
type = str;
description = "Hostname of the MQTT server.";
};
port = mkOption {
type = port;
description = "Port on which the MQTT server is listening.";
default = 1883;
};
username = mkOption {
type = str;
description = "User as which to connect to the MQTT server.";
};
password-file = mkOption {
type = str;
description =
"File (on the local host) containing the password for the MQTT server.";
};
};
};
};
@ -54,14 +79,24 @@ in {
wantedBy = [ "multi-user.target" ];
serviceConfig = {
DynamicUser = true;
LoadCredential = [ "mqtt.passwd:${cfg.mqtt.password-file}" ];
LoadCredential = [
"mqtt-incoming.passwd:${cfg.mqtt.incoming.password-file}"
"mqtt-outgoing.passwd:${cfg.mqtt.outgoing.password-file}"
];
ExecStart = pkgs.writeShellScript "snooper-server.sh"
(concatStringsSep " " ([
"snooper-server"
"--mqtt-host=${cfg.mqtt.host}"
"--mqtt-port=${toString cfg.mqtt.port}"
"--mqtt-user=${cfg.mqtt.username}"
"--mqtt-password-file=$CREDENTIALS_DIRECTORY/mqtt.passwd"
"--incoming-mqtt-host=${cfg.mqtt.incoming.host}"
"--incoming-mqtt-port=${toString cfg.mqtt.incoming.port}"
"--incoming-mqtt-user=${cfg.mqtt.incoming.username}"
"--incoming-mqtt-password-file=$CREDENTIALS_DIRECTORY/mqtt-incoming.passwd"
"--outgoing-mqtt-host=${cfg.mqtt.outgoing.host}"
"--outgoing-mqtt-port=${toString cfg.mqtt.outgoing.port}"
"--outgoing-mqtt-user=${cfg.mqtt.outgoing.username}"
"--outgoing-mqtt-password-file=$CREDENTIALS_DIRECTORY/mqtt-outgoing.passwd"
"--notification-topic=${cfg.notification-topic}"
] ++ (map (topic: "--event-topic=${topic}") cfg.event-topics)
++ (optional cfg.verbose "--verbose")));

View File

@ -48,33 +48,45 @@
{:keys [options _ errors summary]} (parse-opts args required-args cli-opts)]
(when (:help options) (msg-quit 0 (usage summary)))
(when (seq errors) (msg-quit 1 (usage summary errors)))
(let [{:keys [mqtt-host
mqtt-port
mqtt-user
mqtt-password-file
(let [{:keys [incoming-mqtt-host
incoming-mqtt-port
incoming-mqtt-user
incoming-mqtt-password-file
outgoing-mqtt-host
outgoing-mqtt-port
outgoing-mqtt-user
outgoing-mqtt-password-file
notification-topic
event-topic
verbose]} options
catch-shutdown (async/chan)
mqtt-client (mqtt/connect-json! :host mqtt-host
:port mqtt-port
:username mqtt-user
:password (-> mqtt-password-file
(slurp)
(str/trim)))
incoming-client (mqtt/connect-json! :host incoming-mqtt-host
:port incoming-mqtt-port
:username incoming-mqtt-user
:password (-> incoming-mqtt-password-file
(slurp)
(str/trim)))
outgoing-client (mqtt/connect-json! :host outgoing-mqtt-host
:port outgoing-mqtt-port
:username outgoing-mqtt-user
:password (-> outgoing-mqtt-password-file
(slurp)
(str/trim)))
logger (log/print-logger)]
(when verbose
(println (format "launching snooper server to listen on %s and report events on %s"
event-topic notification-topic)))
(snooper/listen! :mqtt-client mqtt-client
:notification-topic notification-topic
:event-topics event-topic
:logger logger
:verbose verbose)
(snooper/listen! :incoming-mqtt-client incoming-client
:outgoing-mqtt-client outgoing-client
:notification-topic notification-topic
:event-topics event-topic
:logger logger
:verbose verbose)
(.addShutdownHook (Runtime/getRuntime)
(Thread. (fn [] (>!! catch-shutdown true))))
(<!! catch-shutdown)
(println "stopping snooper server")
;; Stopping the MQTT will stop tattler
(mqtt/stop! mqtt-client)
(mqtt/stop! incoming-client)
(mqtt/stop! outgoing-client)
(System/exit 0))))

View File

@ -84,12 +84,13 @@
:urgency criticality}))
(defn listen!
[& {mqtt-client :mqtt-client
[& {incoming-client :incoming-mqtt-client
outgoing-client :outgoing-mqtt-client
notification-topic :notification-topic
event-topics :event-topics
logger :logger
verbose :verbose}]
(let [incoming (map (partial mqtt/subscribe! mqtt-client) event-topics)
(let [incoming (map (partial mqtt/subscribe! incoming-client) event-topics)
valid-evt? (t/validator MotionEvent)]
(go-loop [evts (alts! incoming)]
(let [evt (first evts)]
@ -98,7 +99,7 @@
(valid-evt? evt) (do (log/info! logger (format "received motion event id %s from %s"
(:id evt)
(:topic evt)))
(mqtt/send! mqtt-client notification-topic
(mqtt/send! outgoing-client notification-topic
(verbose-pthru verbose (translate-event (:payload evt))))
(recur (alts! incoming)))
:else (do (log/error! logger (format "invalid motion event: %s" evt))