From 682bbeb2ca4e43d5528f1e25ef61d88422d8ee3e Mon Sep 17 00:00:00 2001 From: illustris Date: Fri, 17 Sep 2021 20:49:21 +0530 Subject: [PATCH 1/4] spark: 2.4.4 -> 2.4.8, init 3.1.2 The "spark" package now points to 3.1.2, and "spark2" points to spark 2.4.8 --- .../networking/cluster/spark/default.nix | 109 ++++++++++-------- pkgs/top-level/all-packages.nix | 5 +- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/pkgs/applications/networking/cluster/spark/default.nix b/pkgs/applications/networking/cluster/spark/default.nix index 76230b8e1003..af194afafa9a 100644 --- a/pkgs/applications/networking/cluster/spark/default.nix +++ b/pkgs/applications/networking/cluster/spark/default.nix @@ -1,56 +1,75 @@ -{ lib, stdenv, fetchzip, makeWrapper, jre, pythonPackages, coreutils, hadoop +{ lib, stdenv, fetchzip, makeWrapper, jdk8, python3Packages, extraPythonPackages ? [], coreutils, hadoop , RSupport? true, R }: with lib; -stdenv.mkDerivation rec { +let + spark = { pname, version, src }: + stdenv.mkDerivation rec { + inherit pname version src; + nativeBuildInputs = [ makeWrapper ]; + buildInputs = [ jdk8 python3Packages.python ] + ++ extraPythonPackages + ++ optional RSupport R; - pname = "spark"; - version = "2.4.4"; + untarDir = "${pname}-${version}"; + installPhase = '' + mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java} + mv * $out/lib/${untarDir} - src = fetchzip { - url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz"; - sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g"; + cp $out/lib/${untarDir}/conf/log4j.properties{.template,} + + cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF + export JAVA_HOME="${jdk8}" + export SPARK_HOME="$out/lib/${untarDir}" + export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath) + export PYSPARK_PYTHON="${python3Packages.python}/bin/${python3Packages.python.executable}" + export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH" + ${optionalString RSupport '' + export SPARKR_R_SHELL="${R}/bin/R" + export PATH="\$PATH:${R}/bin"''} + EOF + + for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do + makeWrapper "$n" "$out/bin/$(basename $n)" + substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname + done + for n in $(find $out/lib/${untarDir}/sbin -type f); do + # Spark deprecated scripts with "slave" in the name. + # This line adds forward compatibility with the nixos spark module for + # older versions of spark that don't have the new "worker" scripts. + ln -s "$n" $(echo "$n" | sed -r 's/slave(s?).sh$/worker\1.sh/g') || true + done + ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java + ''; + + meta = { + description = "Apache Spark is a fast and general engine for large-scale data processing"; + homepage = "http://spark.apache.org"; + license = lib.licenses.asl20; + platforms = lib.platforms.all; + maintainers = with maintainers; [ thoughtpolice offline kamilchm illustris ]; + repositories.git = "git://git.apache.org/spark.git"; + }; + }; +in { + spark3 = spark rec { + pname = "spark"; + version = "3.1.2"; + + src = fetchzip { + url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz"; + sha256 = "1bgh2y6jm7wqy6yc40rx68xkki31i3jiri2yixb1bm0i9pvsj9yf"; + }; }; + spark2 = spark rec { + pname = "spark"; + version = "2.4.8"; - nativeBuildInputs = [ makeWrapper ]; - buildInputs = [ jre pythonPackages.python pythonPackages.numpy ] - ++ optional RSupport R; - - untarDir = "${pname}-${version}-bin-without-hadoop"; - installPhase = '' - mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java} - mv * $out/lib/${untarDir} - - sed -e 's/INFO, console/WARN, console/' < \ - $out/lib/${untarDir}/conf/log4j.properties.template > \ - $out/lib/${untarDir}/conf/log4j.properties - - cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF - export JAVA_HOME="${jre}" - export SPARK_HOME="$out/lib/${untarDir}" - export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath) - export PYSPARK_PYTHON="${pythonPackages.python}/bin/${pythonPackages.python.executable}" - export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH" - ${optionalString RSupport - ''export SPARKR_R_SHELL="${R}/bin/R" - export PATH=$PATH:"${R}/bin/R"''} - EOF - - for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do - makeWrapper "$n" "$out/bin/$(basename $n)" - substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname - done - ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java - ''; - - meta = { - description = "Apache Spark is a fast and general engine for large-scale data processing"; - homepage = "http://spark.apache.org"; - license = lib.licenses.asl20; - platforms = lib.platforms.all; - maintainers = with maintainers; [ thoughtpolice offline kamilchm ]; - repositories.git = "git://git.apache.org/spark.git"; + src = fetchzip { + url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz"; + sha256 = "1mkyq0gz9fiav25vr0dba5ivp0wh0mh7kswwnx8pvsmb6wbwyfxv"; + }; }; } diff --git a/pkgs/top-level/all-packages.nix b/pkgs/top-level/all-packages.nix index e7fbd3653863..7e38b1aaa406 100644 --- a/pkgs/top-level/all-packages.nix +++ b/pkgs/top-level/all-packages.nix @@ -13140,7 +13140,10 @@ with pkgs; self = pkgsi686Linux.callPackage ../development/interpreters/self { }; - spark = callPackage ../applications/networking/cluster/spark { }; + inherit (callPackages ../applications/networking/cluster/spark { hadoop = hadoop_3_1; }) + spark3 + spark2; + spark = spark3; sparkleshare = callPackage ../applications/version-management/sparkleshare { }; From 71d15cf81660375447fea459ecd8752afc446852 Mon Sep 17 00:00:00 2001 From: illustris Date: Fri, 17 Sep 2021 20:50:11 +0530 Subject: [PATCH 2/4] nixos/spark: init module --- nixos/modules/module-list.nix | 1 + .../services/cluster/spark/default.nix | 162 ++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 nixos/modules/services/cluster/spark/default.nix diff --git a/nixos/modules/module-list.nix b/nixos/modules/module-list.nix index c01c55b26700..48c86a00f879 100644 --- a/nixos/modules/module-list.nix +++ b/nixos/modules/module-list.nix @@ -297,6 +297,7 @@ ./services/cluster/kubernetes/pki.nix ./services/cluster/kubernetes/proxy.nix ./services/cluster/kubernetes/scheduler.nix + ./services/cluster/spark/default.nix ./services/computing/boinc/client.nix ./services/computing/foldingathome/client.nix ./services/computing/slurm/slurm.nix diff --git a/nixos/modules/services/cluster/spark/default.nix b/nixos/modules/services/cluster/spark/default.nix new file mode 100644 index 000000000000..bbfe0489f115 --- /dev/null +++ b/nixos/modules/services/cluster/spark/default.nix @@ -0,0 +1,162 @@ +{config, pkgs, lib, ...}: +let + cfg = config.services.spark; +in +with lib; +{ + options = { + services.spark = { + master = { + enable = mkEnableOption "Spark master service"; + bind = mkOption { + type = types.str; + description = "Address the spark master binds to."; + default = "127.0.0.1"; + example = "0.0.0.0"; + }; + restartIfChanged = mkOption { + type = types.bool; + description = '' + Automatically restart master service on config change. + This can be set to false to defer restarts on clusters running critical applications. + Please consider the security implications of inadvertently running an older version, + and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option. + ''; + default = true; + }; + extraEnvironment = mkOption { + type = types.attrsOf types.str; + description = "Extra environment variables to pass to spark master. See spark-standalone documentation."; + default = {}; + example = { + SPARK_MASTER_WEBUI_PORT = 8181; + SPARK_MASTER_OPTS = "-Dspark.deploy.defaultCores=5"; + }; + }; + }; + worker = { + enable = mkEnableOption "Spark worker service"; + workDir = mkOption { + type = types.path; + description = "Spark worker work dir."; + default = "/var/lib/spark"; + }; + master = mkOption { + type = types.str; + description = "Address of the spark master."; + default = "127.0.0.1:7077"; + }; + restartIfChanged = mkOption { + type = types.bool; + description = '' + Automatically restart worker service on config change. + This can be set to false to defer restarts on clusters running critical applications. + Please consider the security implications of inadvertently running an older version, + and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option. + ''; + default = true; + }; + extraEnvironment = mkOption { + type = types.attrsOf types.str; + description = "Extra environment variables to pass to spark worker."; + default = {}; + example = { + SPARK_WORKER_CORES = 5; + SPARK_WORKER_MEMORY = "2g"; + }; + }; + }; + confDir = mkOption { + type = types.path; + description = "Spark configuration directory. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory."; + default = "${cfg.package}/lib/${cfg.package.untarDir}/conf"; + defaultText = literalExample "\${cfg.package}/lib/\${cfg.package.untarDir}/conf"; + }; + logDir = mkOption { + type = types.path; + description = "Spark log directory."; + default = "/var/log/spark"; + }; + package = mkOption { + type = types.package; + description = "Spark package."; + default = pkgs.spark; + defaultText = "pkgs.spark"; + example = literalExample ''pkgs.spark.overrideAttrs (super: rec { + pname = "spark"; + version = "2.4.4"; + + src = pkgs.fetchzip { + url = "mirror://apache/spark/"''${pname}-''${version}/''${pname}-''${version}-bin-without-hadoop.tgz"; + sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g"; + }; + })''; + }; + }; + }; + config = lib.mkIf (cfg.worker.enable || cfg.master.enable) { + environment.systemPackages = [ cfg.package ]; + systemd = { + services = { + spark-master = lib.mkIf cfg.master.enable { + path = with pkgs; [ procps openssh nettools ]; + description = "spark master service."; + after = [ "network.target" ]; + wantedBy = [ "multi-user.target" ]; + restartIfChanged = cfg.master.restartIfChanged; + environment = cfg.master.extraEnvironment // { + SPARK_MASTER_HOST = cfg.master.bind; + SPARK_CONF_DIR = cfg.confDir; + SPARK_LOG_DIR = cfg.logDir; + }; + serviceConfig = { + Type = "forking"; + User = "spark"; + Group = "spark"; + WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}"; + ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-master.sh"; + ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-master.sh"; + TimeoutSec = 300; + StartLimitBurst=10; + Restart = "always"; + }; + }; + spark-worker = lib.mkIf cfg.worker.enable { + path = with pkgs; [ procps openssh nettools rsync ]; + description = "spark master service."; + after = [ "network.target" ]; + wantedBy = [ "multi-user.target" ]; + restartIfChanged = cfg.worker.restartIfChanged; + environment = cfg.worker.extraEnvironment // { + SPARK_MASTER = cfg.worker.master; + SPARK_CONF_DIR = cfg.confDir; + SPARK_LOG_DIR = cfg.logDir; + SPARK_WORKER_DIR = cfg.worker.workDir; + }; + serviceConfig = { + Type = "forking"; + User = "spark"; + WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}"; + ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-worker.sh spark://${cfg.worker.master}"; + ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-worker.sh"; + TimeoutSec = 300; + StartLimitBurst=10; + Restart = "always"; + }; + }; + }; + tmpfiles.rules = [ + "d '${cfg.worker.workDir}' - spark spark - -" + "d '${cfg.logDir}' - spark spark - -" + ]; + }; + users = { + users.spark = { + description = "spark user."; + group = "spark"; + isSystemUser = true; + }; + groups.spark = { }; + }; + }; +} From dd987c2dbed988f573734f51f4f28c4c56f58b6b Mon Sep 17 00:00:00 2001 From: illustris Date: Fri, 17 Sep 2021 20:53:40 +0530 Subject: [PATCH 3/4] nixos/spark: release notes --- .../from_md/release-notes/rl-2111.section.xml | 13 +++++++++++++ nixos/doc/manual/release-notes/rl-2111.section.md | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml b/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml index 3c60226b224d..d389fc780699 100644 --- a/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml +++ b/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml @@ -37,6 +37,13 @@ PostgreSQL now defaults to major version 13. + + + spark now defaults to spark 3, updated from 2. A + migration + guide is available. + + Activation scripts can now opt int to be run when running @@ -250,6 +257,12 @@ entry. + + + spark, a + unified analytics engine for large-scale data processing. + +
diff --git a/nixos/doc/manual/release-notes/rl-2111.section.md b/nixos/doc/manual/release-notes/rl-2111.section.md index 256c15fb4988..fd25714a0283 100644 --- a/nixos/doc/manual/release-notes/rl-2111.section.md +++ b/nixos/doc/manual/release-notes/rl-2111.section.md @@ -14,6 +14,8 @@ In addition to numerous new and upgraded packages, this release has the followin - PostgreSQL now defaults to major version 13. +- spark now defaults to spark 3, updated from 2. A [migration guide](https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-24-to-30) is available. + - Activation scripts can now opt int to be run when running `nixos-rebuild dry-activate` and detect the dry activation by reading `$NIXOS_ACTION`. This allows activation scripts to output what they would change if the activation was really run. The users/modules activation script supports this and outputs some of is actions. @@ -78,6 +80,8 @@ subsonic-compatible api. Available as [navidrome](#opt-services.navidrome.enable or sends them to a downstream service for further analysis. Documented in [its manual entry](#module-services-parsedmarc). +- [spark](https://spark.apache.org/), a unified analytics engine for large-scale data processing. + ## Backward Incompatibilities {#sec-release-21.11-incompatibilities} From 13839b0022fee66a1291792c47f6bc2b71b91895 Mon Sep 17 00:00:00 2001 From: illustris Date: Fri, 17 Sep 2021 22:31:01 +0530 Subject: [PATCH 4/4] nixos/spark: add test --- nixos/tests/spark/default.nix | 28 ++++++++++++++++++++++ nixos/tests/spark/spark_sample.py | 40 +++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 nixos/tests/spark/default.nix create mode 100644 nixos/tests/spark/spark_sample.py diff --git a/nixos/tests/spark/default.nix b/nixos/tests/spark/default.nix new file mode 100644 index 000000000000..254cdec6e6b0 --- /dev/null +++ b/nixos/tests/spark/default.nix @@ -0,0 +1,28 @@ +import ../make-test-python.nix ({...}: { + name = "spark"; + + nodes = { + worker = { nodes, pkgs, ... }: { + virtualisation.memorySize = 1024; + services.spark.worker = { + enable = true; + master = "master:7077"; + }; + }; + master = { config, pkgs, ... }: { + services.spark.master = { + enable = true; + bind = "0.0.0.0"; + }; + networking.firewall.allowedTCPPorts = [ 22 7077 8080 ]; + }; + }; + + testScript = '' + master.wait_for_unit("spark-master.service") + worker.wait_for_unit("spark-worker.service") + worker.copy_from_host( "${./spark_sample.py}", "/spark_sample.py" ) + assert "Spark Master at spark://" in worker.succeed("curl -sSfkL http://master:8080/") + worker.succeed("spark-submit --master spark://master:7077 --executor-memory 512m --executor-cores 1 /spark_sample.py") + ''; +}) diff --git a/nixos/tests/spark/spark_sample.py b/nixos/tests/spark/spark_sample.py new file mode 100644 index 000000000000..c4939451eae0 --- /dev/null +++ b/nixos/tests/spark/spark_sample.py @@ -0,0 +1,40 @@ +from pyspark.sql import Row, SparkSession +from pyspark.sql import functions as F +from pyspark.sql.functions import udf +from pyspark.sql.types import * +from pyspark.sql.functions import explode + +def explode_col(weight): + return int(weight//10) * [10.0] + ([] if weight%10==0 else [weight%10]) + +spark = SparkSession.builder.getOrCreate() + +dataSchema = [ + StructField("feature_1", FloatType()), + StructField("feature_2", FloatType()), + StructField("bias_weight", FloatType()) +] + +data = [ + Row(0.1, 0.2, 10.32), + Row(0.32, 1.43, 12.8), + Row(1.28, 1.12, 0.23) +] + +df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(dataSchema)) + +normalizing_constant = 100 +sum_bias_weight = df.select(F.sum('bias_weight')).collect()[0][0] +normalizing_factor = normalizing_constant / sum_bias_weight +df = df.withColumn('normalized_bias_weight', df.bias_weight * normalizing_factor) +df = df.drop('bias_weight') +df = df.withColumnRenamed('normalized_bias_weight', 'bias_weight') + +my_udf = udf(lambda x: explode_col(x), ArrayType(FloatType())) +df1 = df.withColumn('explode_val', my_udf(df.bias_weight)) +df1 = df1.withColumn("explode_val_1", explode(df1.explode_val)).drop("explode_val") +df1 = df1.drop('bias_weight').withColumnRenamed('explode_val_1', 'bias_weight') + +df1.show() + +assert(df1.count() == 12)