155 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Nix
		
	
	
	
	
	
			
		
		
	
	
			155 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Nix
		
	
	
	
	
	
{ config, lib, pkgs, ... }:
 | 
						|
 | 
						|
with lib;
 | 
						|
 | 
						|
let
 | 
						|
  cfg = config.services.apache-kafka;
 | 
						|
 | 
						|
  serverProperties =
 | 
						|
    if cfg.serverProperties != null then
 | 
						|
      cfg.serverProperties
 | 
						|
    else
 | 
						|
      ''
 | 
						|
        # Generated by nixos
 | 
						|
        broker.id=${toString cfg.brokerId}
 | 
						|
        port=${toString cfg.port}
 | 
						|
        host.name=${cfg.hostname}
 | 
						|
        log.dirs=${concatStringsSep "," cfg.logDirs}
 | 
						|
        zookeeper.connect=${cfg.zookeeper}
 | 
						|
        ${toString cfg.extraProperties}
 | 
						|
      '';
 | 
						|
 | 
						|
  serverConfig = pkgs.writeText "server.properties" serverProperties;
 | 
						|
  logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
 | 
						|
 | 
						|
in {
 | 
						|
 | 
						|
  options.services.apache-kafka = {
 | 
						|
    enable = mkOption {
 | 
						|
      description = "Whether to enable Apache Kafka.";
 | 
						|
      default = false;
 | 
						|
      type = types.bool;
 | 
						|
    };
 | 
						|
 | 
						|
    brokerId = mkOption {
 | 
						|
      description = "Broker ID.";
 | 
						|
      default = -1;
 | 
						|
      type = types.int;
 | 
						|
    };
 | 
						|
 | 
						|
    port = mkOption {
 | 
						|
      description = "Port number the broker should listen on.";
 | 
						|
      default = 9092;
 | 
						|
      type = types.int;
 | 
						|
    };
 | 
						|
 | 
						|
    hostname = mkOption {
 | 
						|
      description = "Hostname the broker should bind to.";
 | 
						|
      default = "localhost";
 | 
						|
      type = types.str;
 | 
						|
    };
 | 
						|
 | 
						|
    logDirs = mkOption {
 | 
						|
      description = "Log file directories";
 | 
						|
      default = [ "/tmp/kafka-logs" ];
 | 
						|
      type = types.listOf types.path;
 | 
						|
    };
 | 
						|
 | 
						|
    zookeeper = mkOption {
 | 
						|
      description = "Zookeeper connection string";
 | 
						|
      default = "localhost:2181";
 | 
						|
      type = types.str;
 | 
						|
    };
 | 
						|
 | 
						|
    extraProperties = mkOption {
 | 
						|
      description = "Extra properties for server.properties.";
 | 
						|
      type = types.nullOr types.lines;
 | 
						|
      default = null;
 | 
						|
    };
 | 
						|
 | 
						|
    serverProperties = mkOption {
 | 
						|
      description = ''
 | 
						|
        Complete server.properties content. Other server.properties config
 | 
						|
        options will be ignored if this option is used.
 | 
						|
      '';
 | 
						|
      type = types.nullOr types.lines;
 | 
						|
      default = null;
 | 
						|
    };
 | 
						|
 | 
						|
    log4jProperties = mkOption {
 | 
						|
      description = "Kafka log4j property configuration.";
 | 
						|
      default = ''
 | 
						|
        log4j.rootLogger=INFO, stdout
 | 
						|
 | 
						|
        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 | 
						|
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 | 
						|
        log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
 | 
						|
      '';
 | 
						|
      type = types.lines;
 | 
						|
    };
 | 
						|
 | 
						|
    jvmOptions = mkOption {
 | 
						|
      description = "Extra command line options for the JVM running Kafka.";
 | 
						|
      default = [
 | 
						|
        "-server"
 | 
						|
        "-Xmx1G"
 | 
						|
        "-Xms1G"
 | 
						|
        "-XX:+UseCompressedOops"
 | 
						|
        "-XX:+UseParNewGC"
 | 
						|
        "-XX:+UseConcMarkSweepGC"
 | 
						|
        "-XX:+CMSClassUnloadingEnabled"
 | 
						|
        "-XX:+CMSScavengeBeforeRemark"
 | 
						|
        "-XX:+DisableExplicitGC"
 | 
						|
        "-Djava.awt.headless=true"
 | 
						|
        "-Djava.net.preferIPv4Stack=true"
 | 
						|
      ];
 | 
						|
      type = types.listOf types.str;
 | 
						|
      example = [
 | 
						|
        "-Djava.net.preferIPv4Stack=true"
 | 
						|
        "-Dcom.sun.management.jmxremote"
 | 
						|
        "-Dcom.sun.management.jmxremote.local.only=true"
 | 
						|
      ];
 | 
						|
    };
 | 
						|
 | 
						|
    package = mkOption {
 | 
						|
      description = "The kafka package to use";
 | 
						|
      default = pkgs.apacheKafka;
 | 
						|
      defaultText = "pkgs.apacheKafka";
 | 
						|
      type = types.package;
 | 
						|
    };
 | 
						|
 | 
						|
  };
 | 
						|
 | 
						|
  config = mkIf cfg.enable {
 | 
						|
 | 
						|
    environment.systemPackages = [cfg.package];
 | 
						|
 | 
						|
    users.users.apache-kafka = {
 | 
						|
      uid = config.ids.uids.apache-kafka;
 | 
						|
      description = "Apache Kafka daemon user";
 | 
						|
      home = head cfg.logDirs;
 | 
						|
    };
 | 
						|
 | 
						|
    systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
 | 
						|
 | 
						|
    systemd.services.apache-kafka = {
 | 
						|
      description = "Apache Kafka Daemon";
 | 
						|
      wantedBy = [ "multi-user.target" ];
 | 
						|
      after = [ "network.target" ];
 | 
						|
      serviceConfig = {
 | 
						|
        ExecStart = ''
 | 
						|
          ${pkgs.jre}/bin/java \
 | 
						|
            -cp "${cfg.package}/libs/*" \
 | 
						|
            -Dlog4j.configuration=file:${logConfig} \
 | 
						|
            ${toString cfg.jvmOptions} \
 | 
						|
            kafka.Kafka \
 | 
						|
            ${serverConfig}
 | 
						|
        '';
 | 
						|
        User = "apache-kafka";
 | 
						|
        SuccessExitStatus = "0 143";
 | 
						|
      };
 | 
						|
    };
 | 
						|
 | 
						|
  };
 | 
						|
}
 |