【Spark源码】spark-submit和Spark-class
⾸先从启动脚本开始看:
bin/spark-submit \
--class org.amples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
启动脚本调⽤的是spark-submit,所以直接看bin/spark-submit脚本,跟spark-shell⼀样,先检查是否设置了${SPARK_HOME},然后启动spark-class,并传递了org.apache.spark.deploy.SparkSubmit作为第⼀个参数,然后把前⾯Spark-shell的参数都传给spark-class
# -z:判断变量的值是否为空
if[ -z "${SPARK_HOME}"];then
# $0:表⽰当前脚本⽂件名
# dirname:⽤于取指定路径所在的⽬录,如dirname /usr/local/bin 结果为/usr/local
# $(命令) 返回该命令的结果
# 所以结合以上分析,结果为【切换到脚本所在的⽬录】
# 该命令也可以写为 `dirname $0`
source"$(dirname"$0")"/find-spark-home  #
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# $@是传递给脚本的所有参数
exec"${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
脚本⾥⾯调⽤的是/bin/spark-class脚本
if[ -z "${SPARK_HOME}"];then
source"$(dirname"$0")"/find-spark-home
fi
."${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
if[ -n "${JAVA_HOME}"];then
RUNNER="${JAVA_HOME}/bin/java"
else
if["$(command -v java)"];then
RUNNER="java"
else
echo"JAVA_HOME is not set">&2
exit 1
fi
fi
# Find Spark jars.
if[ -d "${SPARK_HOME}/jars"];then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if[! -d "$SPARK_JARS_DIR"]&&[ -z "$SPARK_TESTING$SPARK_SQL_TESTING"];then
echo"Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo"You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if[ -n "$SPARK_PREPEND_CLASSES"];then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if[[ -n "$SPARK_TESTING"]];then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command(){
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf"%d\0"$?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS=read -d "$DELIM" -r ARG;do
if["$CMD_START_FLAG"=="true"];then
CMD+=("$ARG")
else
if["$ARG"== $'\0'];then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif["$ARG"!=""];then
echo"$ARG"
fi
fi
done<<(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT -1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if![[$LAUNCHER_EXIT_CODE=~ ^[0-9]+$ ]];then
echo"${CMD[@]}"|head -n-1 1>&2
exit 1
fi
if[$LAUNCHER_EXIT_CODE!= 0 ];then
exit$LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec"${CMD[@]}"
脚本中会调⽤org.apache.spark.launcher.Main类⽣成shell 执⾏脚本,这个类是真正的执⾏者,我们好好看看这个真正的⼊⼝在哪⾥?⾸先,依然是设置项⽬主⽬录:
# 如果没有设置SPARK_HOME,shell会将当前脚本的上⼀级⽬录做为spark_home
# -z表⽰当串长度为0时,条件为真。⽽$()和`` 都表⽰在shell中执⾏命令同时将结果返回
if[ -z "${SPARK_HOME}"];then
source"$(dirname"$0")"/find-spark-home
fi
这⾥使⽤find-spark-home脚本来进⾏设置,脚本内容如下
# $(cd "$(dirname "$0")"; pwd) : 输出当前脚本所在⽬录如果脚本⽂件路径为/a/,则此结果返回/a/b
FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd"$(dirname "$0")"; pwd)/find_spark_home.py"
# 当${SPARK_HOME}参数已经配置时,退出脚本
if [ ! -z "${SPARK_HOME}" ]; then
exit 0
# 当FIND_SPARK_HOME_PYTHON_SCRIPT所表⽰的⽂件find_spark_home.py不存在时,进⾏spark_home配置
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
# 设置spark_home为当前脚本所在⽬录的上⼀级⽬录,如脚本⽂件为/opt/spark-3.0.0/bin/find-spark-home,这⾥就返回/opt/spark-3.0.0作为SPARK_HOME  export SPARK_HOME="$(cd"$(dirname "$0")"/..; pwd)"
....
然后,配置⼀些环境变量:
."${SPARK_HOME}"/bin/load-spark-env.sh
在spark-env中设置了assembly相关的信息。
# 如果没有设置SPARK_HOME,shell会将当前脚本所在⽬录的上⼀级⽬录做为spark_home
if[ -z "${SPARK_HOME}"];then
source"$(dirname"$0")"/find-spark-home
fi
SPARK_ENV_SH="spark-env.sh"
if[ -z "$SPARK_ENV_LOADED"];then
export SPARK_ENV_LOADED=1
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"scala不是内部或外部命令
if[[ -f "${SPARK_ENV_SH}"]];then
# 将所有变量声明提升为环境(export)变量
# set -a中的SPARK_HOME能够在另外⼀个bash中访问。其实这就是set -a意义所在,它将当前变量导出,使得其他的bash中运⾏的脚本也能够访问改变量,但是与export不同的是只能访问,不能修改。
# 另外如果不⽤set -a,其实可以通过⼦shell也可以访问,⽽不修改,但是这样做使得所有⽗shell中的变量都能够被⼦shell访问到,不能做到有范围的控制。set -a
.${SPARK_ENV_SH}
set +a
fi
fi
# Setting SPARK_SCALA_VERSION if not already set.
# TODO: revisit for Scala 2.13 support
export SPARK_SCALA_VERSION=2.12
然后寻java,并赋值给RUNNER变量
# Find the java binary
if[ -n "${JAVA_HOME}"];then
RUNNER="${JAVA_HOME}/bin/java"
else
# command -v 可以判断⼀个命令是否⽀持,这⾥表⽰如果java命令⽀持则RUNNER等于java,否则提⽰java_home未设置
if["$(command -v java)"];then
RUNNER="java"
else
echo"JAVA_HOME is not set">&2
exit 1
fi
fi
中间⼤部分代码是跟assembly相关的内容
最关键的就是下⾯这句了:
build_command(){
# java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
# $? : 上个命令的退出状态,或函数的返回值。
# printf '输出类型输出格式' 输出内容
printf"%d\0"$?
}
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
# 将 build_command "$@" 命令输出的结果逐⾏添加到CMD参数中
while IFS=read -d "$DELIM" -r ARG;do
if["$CMD_START_FLAG"=="true"];then
CMD+=("$ARG")
else
if["$ARG"== $'\0'];then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif["$ARG"!=""];then
echo"$ARG"
fi
fi
done<<(build_command "$@")# $@是传递给脚本的所有参数
# #CMD[@]获取CMD数组中的元素个数
COUNT=${#CMD[@]}
LAST=$((COUNT -1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# ${str:a:b} 表⽰提取字符串str从a开始的b个字符
CMD=("${CMD[@]:0:$LAST}")
exec"${CMD[@]}"
⾸先循环读取ARG参数,加⼊到CMD中。然后执⾏了"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH"
org.apache.spark.launcher.Main "$@" 这个是真正执⾏的第⼀个spark的类。
不管是启动spark-shell,或者通过spark-submit提交jar,还是其他其他master或者worker的脚本,最后都会进⼊spark-class,并调
⽤launch.main⽅法构建执⾏命令。
java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"
也就是说org.apache.spark.launcher.Main是被spark-class调⽤,从spark-class接收参数。这个类是提供spark内部脚本调⽤的⼯具类,并不是真正的执⾏⼊⼝。它负责调⽤其他类,对参数进⾏解析,并⽣成执⾏命令,最后将命令返回给spark-class的 exec “${CMD[@]}”执⾏。
它主要是根据提交的类型spark-submit和spark-class(master、worker、hostoryserver等等),构建对应的命令解析对象SparkSubmitCommandBuilder和SparkClassCommandBuilder,再通过buildCommand⽅法构造执⾏命令。
⼤概看⼀下这时sparksubmit的参数,Master和Worker后续解析:
⽅式参数
spark-shell org.apache.spark.deploy.SparkSubmit –class org.pl.Main –name “Spark shell”
spark-submit
org.apache.spark.deploy.SparkSubmit
–class com.idmapping.scala.WordCount
–master yarn
–deploy-mode client
–driver-memory 4G
–executor-memory 3G
–executor-cores 2
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer –conf spark.default.parallelism=24
/user/jars/idmapping-job-1.0-SNAPSHOT.jar file:///user/ file:///user/data/wordcount/
该类在launcher模块下,简单的浏览下代码:
package org.apache.spark.launcher;
/**
* Command line interface for the Spark launcher. Used internally by Spark scripts.
* 这是提供spark内部脚本使⽤⼯具类
*/
class Main {
/**
* Usage: Main [class] [class args]
* 分为spark-submit和spark-class两种模式,但提交的是class类的话,会包含其他如:master/worker/history等等  * This CLI works in two different modes:
*  "spark-submit": if class is "org.apache.spark.deploy.SparkSubmit", the
*  {@link SparkLauncher} class is used to launch a Spark application.
*
*  "spark-class": 如果提供了另⼀个类,则运⾏内部Spark类
*
* 类unix系统的输出的参数是集合,⽽windows参数是空格分隔
* This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
* "d" batch script on Windows to execute the final command.
* <p>
* On Unix-like systems, the output is a list of command arguments, separated by the NULL
* character. On Windows, the output is a command line suitable for direct execution from the
* script.
*/
/**
* main这个类主要是解析参数,把需要的参数放到执⾏对象中
* 如果是直接启动spark-shell调⽤spark-class传⼊的参数:
* org.apache.spark.deploy.SparkSubmit --class org.pl.Main --name "Spark shell"
* --master spark://host:7077
*/
public static void main(String[] argsArray) throws Exception {
// 判断参数列表
checkArgument(argsArray.length >0,"Not enough arguments: missing class name.");
/**
* 将参数列表放⼊args集合中
* 移出第⼀个参数赋值给classname,即执⾏程序。剩余参数为:
* --class org.pl.Main --name "Spark shell" --master spark://host:7077
*/
List<String> args =new ArrayList<>(Arrays.asList(argsArray));
String className = ve(0);// 获取org.apache.spark.deploy.SparkSubmit
// 判断是否打印执⾏信息

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。