diff --git a/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml b/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml
index 75513ed7e733..96f46373b272 100644
--- a/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml
+++ b/nixos/doc/manual/from_md/release-notes/rl-2111.section.xml
@@ -37,6 +37,13 @@
PostgreSQL now defaults to major version 13.
+
+
+ spark now defaults to spark 3, updated from 2. A
+ migration
+ guide is available.
+
+
Activation scripts can now opt int to be run when running
@@ -250,6 +257,12 @@
entry.
+
+
+ spark, a
+ unified analytics engine for large-scale data processing.
+
+
diff --git a/nixos/doc/manual/release-notes/rl-2111.section.md b/nixos/doc/manual/release-notes/rl-2111.section.md
index a55fbbe2afdb..4239df38a805 100644
--- a/nixos/doc/manual/release-notes/rl-2111.section.md
+++ b/nixos/doc/manual/release-notes/rl-2111.section.md
@@ -14,6 +14,8 @@ In addition to numerous new and upgraded packages, this release has the followin
- PostgreSQL now defaults to major version 13.
+- spark now defaults to spark 3, updated from 2. A [migration guide](https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-24-to-30) is available.
+
- Activation scripts can now opt int to be run when running `nixos-rebuild dry-activate` and detect the dry activation by reading `$NIXOS_ACTION`.
This allows activation scripts to output what they would change if the activation was really run.
The users/modules activation script supports this and outputs some of is actions.
@@ -78,6 +80,8 @@ subsonic-compatible api. Available as [navidrome](#opt-services.navidrome.enable
or sends them to a downstream service for further analysis.
Documented in [its manual entry](#module-services-parsedmarc).
+- [spark](https://spark.apache.org/), a unified analytics engine for large-scale data processing.
+
## Backward Incompatibilities {#sec-release-21.11-incompatibilities}
diff --git a/nixos/modules/module-list.nix b/nixos/modules/module-list.nix
index c01c55b26700..48c86a00f879 100644
--- a/nixos/modules/module-list.nix
+++ b/nixos/modules/module-list.nix
@@ -297,6 +297,7 @@
./services/cluster/kubernetes/pki.nix
./services/cluster/kubernetes/proxy.nix
./services/cluster/kubernetes/scheduler.nix
+ ./services/cluster/spark/default.nix
./services/computing/boinc/client.nix
./services/computing/foldingathome/client.nix
./services/computing/slurm/slurm.nix
diff --git a/nixos/modules/services/cluster/spark/default.nix b/nixos/modules/services/cluster/spark/default.nix
new file mode 100644
index 000000000000..bbfe0489f115
--- /dev/null
+++ b/nixos/modules/services/cluster/spark/default.nix
@@ -0,0 +1,162 @@
+{config, pkgs, lib, ...}:
+let
+ cfg = config.services.spark;
+in
+with lib;
+{
+ options = {
+ services.spark = {
+ master = {
+ enable = mkEnableOption "Spark master service";
+ bind = mkOption {
+ type = types.str;
+ description = "Address the spark master binds to.";
+ default = "127.0.0.1";
+ example = "0.0.0.0";
+ };
+ restartIfChanged = mkOption {
+ type = types.bool;
+ description = ''
+ Automatically restart master service on config change.
+ This can be set to false to defer restarts on clusters running critical applications.
+ Please consider the security implications of inadvertently running an older version,
+ and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
+ '';
+ default = true;
+ };
+ extraEnvironment = mkOption {
+ type = types.attrsOf types.str;
+ description = "Extra environment variables to pass to spark master. See spark-standalone documentation.";
+ default = {};
+ example = {
+ SPARK_MASTER_WEBUI_PORT = 8181;
+ SPARK_MASTER_OPTS = "-Dspark.deploy.defaultCores=5";
+ };
+ };
+ };
+ worker = {
+ enable = mkEnableOption "Spark worker service";
+ workDir = mkOption {
+ type = types.path;
+ description = "Spark worker work dir.";
+ default = "/var/lib/spark";
+ };
+ master = mkOption {
+ type = types.str;
+ description = "Address of the spark master.";
+ default = "127.0.0.1:7077";
+ };
+ restartIfChanged = mkOption {
+ type = types.bool;
+ description = ''
+ Automatically restart worker service on config change.
+ This can be set to false to defer restarts on clusters running critical applications.
+ Please consider the security implications of inadvertently running an older version,
+ and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
+ '';
+ default = true;
+ };
+ extraEnvironment = mkOption {
+ type = types.attrsOf types.str;
+ description = "Extra environment variables to pass to spark worker.";
+ default = {};
+ example = {
+ SPARK_WORKER_CORES = 5;
+ SPARK_WORKER_MEMORY = "2g";
+ };
+ };
+ };
+ confDir = mkOption {
+ type = types.path;
+ description = "Spark configuration directory. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.";
+ default = "${cfg.package}/lib/${cfg.package.untarDir}/conf";
+ defaultText = literalExample "\${cfg.package}/lib/\${cfg.package.untarDir}/conf";
+ };
+ logDir = mkOption {
+ type = types.path;
+ description = "Spark log directory.";
+ default = "/var/log/spark";
+ };
+ package = mkOption {
+ type = types.package;
+ description = "Spark package.";
+ default = pkgs.spark;
+ defaultText = "pkgs.spark";
+ example = literalExample ''pkgs.spark.overrideAttrs (super: rec {
+ pname = "spark";
+ version = "2.4.4";
+
+ src = pkgs.fetchzip {
+ url = "mirror://apache/spark/"''${pname}-''${version}/''${pname}-''${version}-bin-without-hadoop.tgz";
+ sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
+ };
+ })'';
+ };
+ };
+ };
+ config = lib.mkIf (cfg.worker.enable || cfg.master.enable) {
+ environment.systemPackages = [ cfg.package ];
+ systemd = {
+ services = {
+ spark-master = lib.mkIf cfg.master.enable {
+ path = with pkgs; [ procps openssh nettools ];
+ description = "spark master service.";
+ after = [ "network.target" ];
+ wantedBy = [ "multi-user.target" ];
+ restartIfChanged = cfg.master.restartIfChanged;
+ environment = cfg.master.extraEnvironment // {
+ SPARK_MASTER_HOST = cfg.master.bind;
+ SPARK_CONF_DIR = cfg.confDir;
+ SPARK_LOG_DIR = cfg.logDir;
+ };
+ serviceConfig = {
+ Type = "forking";
+ User = "spark";
+ Group = "spark";
+ WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
+ ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-master.sh";
+ ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-master.sh";
+ TimeoutSec = 300;
+ StartLimitBurst=10;
+ Restart = "always";
+ };
+ };
+ spark-worker = lib.mkIf cfg.worker.enable {
+ path = with pkgs; [ procps openssh nettools rsync ];
+ description = "spark master service.";
+ after = [ "network.target" ];
+ wantedBy = [ "multi-user.target" ];
+ restartIfChanged = cfg.worker.restartIfChanged;
+ environment = cfg.worker.extraEnvironment // {
+ SPARK_MASTER = cfg.worker.master;
+ SPARK_CONF_DIR = cfg.confDir;
+ SPARK_LOG_DIR = cfg.logDir;
+ SPARK_WORKER_DIR = cfg.worker.workDir;
+ };
+ serviceConfig = {
+ Type = "forking";
+ User = "spark";
+ WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
+ ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-worker.sh spark://${cfg.worker.master}";
+ ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-worker.sh";
+ TimeoutSec = 300;
+ StartLimitBurst=10;
+ Restart = "always";
+ };
+ };
+ };
+ tmpfiles.rules = [
+ "d '${cfg.worker.workDir}' - spark spark - -"
+ "d '${cfg.logDir}' - spark spark - -"
+ ];
+ };
+ users = {
+ users.spark = {
+ description = "spark user.";
+ group = "spark";
+ isSystemUser = true;
+ };
+ groups.spark = { };
+ };
+ };
+}
diff --git a/nixos/tests/spark/default.nix b/nixos/tests/spark/default.nix
new file mode 100644
index 000000000000..254cdec6e6b0
--- /dev/null
+++ b/nixos/tests/spark/default.nix
@@ -0,0 +1,28 @@
+import ../make-test-python.nix ({...}: {
+ name = "spark";
+
+ nodes = {
+ worker = { nodes, pkgs, ... }: {
+ virtualisation.memorySize = 1024;
+ services.spark.worker = {
+ enable = true;
+ master = "master:7077";
+ };
+ };
+ master = { config, pkgs, ... }: {
+ services.spark.master = {
+ enable = true;
+ bind = "0.0.0.0";
+ };
+ networking.firewall.allowedTCPPorts = [ 22 7077 8080 ];
+ };
+ };
+
+ testScript = ''
+ master.wait_for_unit("spark-master.service")
+ worker.wait_for_unit("spark-worker.service")
+ worker.copy_from_host( "${./spark_sample.py}", "/spark_sample.py" )
+ assert "Spark Master at spark://" in worker.succeed("curl -sSfkL http://master:8080/")
+ worker.succeed("spark-submit --master spark://master:7077 --executor-memory 512m --executor-cores 1 /spark_sample.py")
+ '';
+})
diff --git a/nixos/tests/spark/spark_sample.py b/nixos/tests/spark/spark_sample.py
new file mode 100644
index 000000000000..c4939451eae0
--- /dev/null
+++ b/nixos/tests/spark/spark_sample.py
@@ -0,0 +1,40 @@
+from pyspark.sql import Row, SparkSession
+from pyspark.sql import functions as F
+from pyspark.sql.functions import udf
+from pyspark.sql.types import *
+from pyspark.sql.functions import explode
+
+def explode_col(weight):
+ return int(weight//10) * [10.0] + ([] if weight%10==0 else [weight%10])
+
+spark = SparkSession.builder.getOrCreate()
+
+dataSchema = [
+ StructField("feature_1", FloatType()),
+ StructField("feature_2", FloatType()),
+ StructField("bias_weight", FloatType())
+]
+
+data = [
+ Row(0.1, 0.2, 10.32),
+ Row(0.32, 1.43, 12.8),
+ Row(1.28, 1.12, 0.23)
+]
+
+df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(dataSchema))
+
+normalizing_constant = 100
+sum_bias_weight = df.select(F.sum('bias_weight')).collect()[0][0]
+normalizing_factor = normalizing_constant / sum_bias_weight
+df = df.withColumn('normalized_bias_weight', df.bias_weight * normalizing_factor)
+df = df.drop('bias_weight')
+df = df.withColumnRenamed('normalized_bias_weight', 'bias_weight')
+
+my_udf = udf(lambda x: explode_col(x), ArrayType(FloatType()))
+df1 = df.withColumn('explode_val', my_udf(df.bias_weight))
+df1 = df1.withColumn("explode_val_1", explode(df1.explode_val)).drop("explode_val")
+df1 = df1.drop('bias_weight').withColumnRenamed('explode_val_1', 'bias_weight')
+
+df1.show()
+
+assert(df1.count() == 12)
diff --git a/pkgs/applications/networking/cluster/spark/default.nix b/pkgs/applications/networking/cluster/spark/default.nix
index 76230b8e1003..af194afafa9a 100644
--- a/pkgs/applications/networking/cluster/spark/default.nix
+++ b/pkgs/applications/networking/cluster/spark/default.nix
@@ -1,56 +1,75 @@
-{ lib, stdenv, fetchzip, makeWrapper, jre, pythonPackages, coreutils, hadoop
+{ lib, stdenv, fetchzip, makeWrapper, jdk8, python3Packages, extraPythonPackages ? [], coreutils, hadoop
, RSupport? true, R
}:
with lib;
-stdenv.mkDerivation rec {
+let
+ spark = { pname, version, src }:
+ stdenv.mkDerivation rec {
+ inherit pname version src;
+ nativeBuildInputs = [ makeWrapper ];
+ buildInputs = [ jdk8 python3Packages.python ]
+ ++ extraPythonPackages
+ ++ optional RSupport R;
- pname = "spark";
- version = "2.4.4";
+ untarDir = "${pname}-${version}";
+ installPhase = ''
+ mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java}
+ mv * $out/lib/${untarDir}
- src = fetchzip {
- url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
- sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
+ cp $out/lib/${untarDir}/conf/log4j.properties{.template,}
+
+ cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF
+ export JAVA_HOME="${jdk8}"
+ export SPARK_HOME="$out/lib/${untarDir}"
+ export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath)
+ export PYSPARK_PYTHON="${python3Packages.python}/bin/${python3Packages.python.executable}"
+ export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH"
+ ${optionalString RSupport ''
+ export SPARKR_R_SHELL="${R}/bin/R"
+ export PATH="\$PATH:${R}/bin"''}
+ EOF
+
+ for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do
+ makeWrapper "$n" "$out/bin/$(basename $n)"
+ substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname
+ done
+ for n in $(find $out/lib/${untarDir}/sbin -type f); do
+ # Spark deprecated scripts with "slave" in the name.
+ # This line adds forward compatibility with the nixos spark module for
+ # older versions of spark that don't have the new "worker" scripts.
+ ln -s "$n" $(echo "$n" | sed -r 's/slave(s?).sh$/worker\1.sh/g') || true
+ done
+ ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java
+ '';
+
+ meta = {
+ description = "Apache Spark is a fast and general engine for large-scale data processing";
+ homepage = "http://spark.apache.org";
+ license = lib.licenses.asl20;
+ platforms = lib.platforms.all;
+ maintainers = with maintainers; [ thoughtpolice offline kamilchm illustris ];
+ repositories.git = "git://git.apache.org/spark.git";
+ };
+ };
+in {
+ spark3 = spark rec {
+ pname = "spark";
+ version = "3.1.2";
+
+ src = fetchzip {
+ url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
+ sha256 = "1bgh2y6jm7wqy6yc40rx68xkki31i3jiri2yixb1bm0i9pvsj9yf";
+ };
};
+ spark2 = spark rec {
+ pname = "spark";
+ version = "2.4.8";
- nativeBuildInputs = [ makeWrapper ];
- buildInputs = [ jre pythonPackages.python pythonPackages.numpy ]
- ++ optional RSupport R;
-
- untarDir = "${pname}-${version}-bin-without-hadoop";
- installPhase = ''
- mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java}
- mv * $out/lib/${untarDir}
-
- sed -e 's/INFO, console/WARN, console/' < \
- $out/lib/${untarDir}/conf/log4j.properties.template > \
- $out/lib/${untarDir}/conf/log4j.properties
-
- cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF
- export JAVA_HOME="${jre}"
- export SPARK_HOME="$out/lib/${untarDir}"
- export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath)
- export PYSPARK_PYTHON="${pythonPackages.python}/bin/${pythonPackages.python.executable}"
- export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH"
- ${optionalString RSupport
- ''export SPARKR_R_SHELL="${R}/bin/R"
- export PATH=$PATH:"${R}/bin/R"''}
- EOF
-
- for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do
- makeWrapper "$n" "$out/bin/$(basename $n)"
- substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname
- done
- ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java
- '';
-
- meta = {
- description = "Apache Spark is a fast and general engine for large-scale data processing";
- homepage = "http://spark.apache.org";
- license = lib.licenses.asl20;
- platforms = lib.platforms.all;
- maintainers = with maintainers; [ thoughtpolice offline kamilchm ];
- repositories.git = "git://git.apache.org/spark.git";
+ src = fetchzip {
+ url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
+ sha256 = "1mkyq0gz9fiav25vr0dba5ivp0wh0mh7kswwnx8pvsmb6wbwyfxv";
+ };
};
}
diff --git a/pkgs/top-level/all-packages.nix b/pkgs/top-level/all-packages.nix
index 67c75abd5945..dbf23a650454 100644
--- a/pkgs/top-level/all-packages.nix
+++ b/pkgs/top-level/all-packages.nix
@@ -13146,7 +13146,10 @@ with pkgs;
self = pkgsi686Linux.callPackage ../development/interpreters/self { };
- spark = callPackage ../applications/networking/cluster/spark { };
+ inherit (callPackages ../applications/networking/cluster/spark { hadoop = hadoop_3_1; })
+ spark3
+ spark2;
+ spark = spark3;
sparkleshare = callPackage ../applications/version-management/sparkleshare { };