概述:
1.sparkContext初始化很重要,因为他是Driver应用程序提交执行的前提,只有sparkContext初始化后才可以Driver提交用户应用程序,也就是说spark driver的初始化围绕着sparkContext初始化展开的,SparkContext可以算是spark应用程序的发动机引擎 2.源码研究以local模式为主 3.Spark中的组件很多,就其功能而言涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI的方方面面。 4.SparkContext初始化的步骤如下: 0)SparkConf:是SparkContext的配置参数,相当于控制版面 1)创建Spark执行环境SparkEnv; 2)创建RDD清理器metadataCleaner; 3)创建并初始化Spark UI; 4)Hadoop相关配置及Executor环境变量的设置; 5)创建任务调度TaskScheduler; 6)创建和启动DAGScheduler; 7)TaskScheduler的启动; 8)初始化块管理器BlockManager(BlockManager是存储体系的主要组件之一,将在第4章介绍); 9)启动测量系统MetricsSystem; 10)创建和启动Executor分配管理器ExecutorAllocationManager; 11)ContextCleaner的创建与启动; 12)Spark环境更新; 13)创建DAGSchedulerSource和BlockManagerSource; 14)将SparkContext标记为激活。
正文
一、控制版面SparkConf
1.代码结构
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._ def this() = this(true) private val settings = new ConcurrentHashMap[String, String]() if (loadDefaults) { // 加载任何以spark.开头的系统属性 for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value) } } //其余代码省略
Utils.getSystemProperties获取的系统属性,显然SparkConf是加载系统属性里面以“spark.”为开头的属性。
SparkContext的主构造器参数为SparkConf,其实现如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
//获取当前SparkContext的当前调用栈。包含了最靠近栈顶的用户类及最靠近栈底的Scala或者Spark核心类信息 private val creationSite: CallSite = Utils.getCallSite() private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
Utils.getCallSite()
//skipClass: String => Boolean = sparkInternalExclusionFunction 采用了柯里化 def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = { var lastSparkMethod = "<unknown>" var firstUserFile = "<unknown>" var firstUserLine = 0 var insideSpark = true val callStack = new ArrayBuffer[String]() :+ "<unknown>" Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement => if (ste != null && ste.getMethodName != null && !ste.getMethodName.contains("getStackTrace")) { if (insideSpark) { if (skipClass(ste.getClassName)) { lastSparkMethod = if (ste.getMethodName == "<init>") { // Spark method is a constructor; get its class name ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1) } else { ste.getMethodName } callStack(0) = ste.toString // Put last Spark method on top of the stack trace. } else { if (ste.getFileName != null) { //方法没有参数可以省略() 如ste.getFileName ste.getLineNumber firstUserFile = ste.getFileName if (ste.getLineNumber >= 0) { firstUserLine = ste.getLineNumber } } callStack += ste.toString insideSpark = false } } else { callStack += ste.toString } } } val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt val shortForm = if (firstUserFile == "HiveSessionImpl.java") { "Spark JDBC Server Query" } else { s"$lastSparkMethod at $firstUserFile:$firstUserLine" } //take取集合的前n个素 mkString表示将集合素合并成一个字符串 val longForm = callStack.take(callStackDepth).mkString("\n") CallSite(shortForm, longForm) }
sparkInternalExclusionFunction
private def sparkInternalExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the internal Spark API's // that we want to skip when finding the call site of a method. val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r val SCALA_CORE_CLASS_PREFIX = "scala" val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined || SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX) // If the class is a Spark internal class or a Scala class, then exclude. isSparkClass || isScalaClass }
CallSite类
private[spark] case class CallSite(shortForm: String, longForm: String) private[spark] object CallSite {
val SHORT_FORM = "callSite.short" val LONG_FORM = "callSite.long" }
getCallSite
功能描述:获取当前SparkContext的当前调用堆栈,将栈里最靠近栈底的属于spark或者Scala核心的类压入callStack的栈顶,并将此类的方法存入lastSparkMethod;将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,类名存入firstUserFile,最终返回的样例类CallSite存储了最短栈和长度默认为20的最长栈的样例类。
但是通过看代码不符合逻辑(有可能是自己的能力有限)
分析SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
作用:SparkContext标记为正在构建中
代码:
private[spark] def markPartiallyConstructed( sc: SparkContext, allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { //标记前进行验证 是否有其他context在运行 assertNoOtherContextIsRunning(sc, allowMultipleContexts) //由于contextBeingConstructed 是option[]类型的 需要子类some()或者none赋值 //contextBeingConstructed 被赋值标志:将当前SparkContext标记为正在构建中 contextBeingConstructed = Some(sc) } }
private def assertNoOtherContextIsRunning( sc: SparkContext, allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { //判读是否有其他sparkcontext被标记为构建中,如果有提出warning contextBeingConstructed.foreach { otherContext => if (otherContext ne sc) { // checks for reference equality // Since otherContext might point to a partially-constructed context, guard against // its creationSite field being null: val otherContextCreationSite = Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor). This may indicate an error, since only one SparkContext may be" + " running in this JVM (see SPARK-2243)." + s" The other SparkContext was created at:\n$otherContextCreationSite" logWarning(warnMsg) } //判断是否有sparkcontext正在生效,如果有生效的,再判断allowMultipleContexts是否允许,如果不允许直接抛出异常 if (activeContext.get() != null) { val ctx = activeContext.get() val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" val exception = new SparkException(errMsg) if (allowMultipleContexts) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) } else { throw exception } } } } }
接下来会对SparkConf进行复制,然后对各种配置信息进行校验,代码如下。
private[spark] val conf = config.clone() //检查非法或不赞成的配置设置 conf.validateSettings() if (!conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration")
从上面校验的代码看到必须指定属性spark.master和spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。
conf.validateSettings()代码如下
private[spark] def validateSettings() { if (contains("spark.local.dir")) { val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " + "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)." logWarning(msg) } val executorOptsKey = "spark.executor.extraJavaOptions" val executorClasspathKey = "spark.executor.extraClassPath" val driverOptsKey = "spark.driver.extraJavaOptions" val driverClassPathKey = "spark.driver.extraClassPath" val driverLibraryPathKey = "spark.driver.extraLibraryPath" val sparkExecutorInstances = "spark.executor.instances" // Used by Yarn in 1.1 and before sys.props.get("spark.driver.libraryPath").foreach { value => val warning = s""" |spark.driver.libraryPath was detected (set to '$value'). |This is deprecated in Spark 1.2+. | |Please instead use: $driverLibraryPathKey """.stripMargin logWarning(warning) } // Validate spark.executor.extraJavaOptions getOption(executorOptsKey).map { javaOpts => if (javaOpts.contains("-Dspark")) { val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " + "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit." throw new Exception(msg) } if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) { val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " + "Use spark.executor.memory instead." throw new Exception(msg) } } // Validate memory fractions val memoryKeys = Seq( "spark.storage.memoryFraction", "spark.shuffle.memoryFraction", "spark.shuffle.safetyFraction", "spark.storage.unrollFraction", "spark.storage.safetyFraction") for (key <- memoryKeys) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').") } } // Check for legacy configs sys.env.get("SPARK_JAVA_OPTS").foreach { value => val warning = s""" |SPARK_JAVA_OPTS was detected (set to '$value'). |This is deprecated in Spark 1.0+. | |Please instead use: | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application | - ./spark-submit with --driver-java-options to set -X options for a driver | - spark.executor.extraJavaOptions to set -X options for executors | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) """.stripMargin logWarning(warning) for (key <- Seq(executorOptsKey, driverOptsKey)) { if (getOption(key).isDefined) { throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") } else { logWarning(s"Setting '$key' to '$value' as a work-around.") set(key, value) } } } sys.env.get("SPARK_CLASSPATH").foreach { value => val warning = s""" |SPARK_CLASSPATH was detected (set to '$value'). |This is deprecated in Spark 1.0+. | |Please instead use: | - ./spark-submit with --driver-class-path to augment the driver classpath | - spark.executor.extraClassPath to augment the executor classpath """.stripMargin logWarning(warning) for (key <- Seq(executorClasspathKey, driverClassPathKey)) { if (getOption(key).isDefined) { throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.") } else { logWarning(s"Setting '$key' to '$value' as a work-around.") set(key, value) } } } if (!contains(sparkExecutorInstances)) { sys.env.get("SPARK_WORKER_INSTANCES").foreach { value => val warning = s""" |SPARK_WORKER_INSTANCES was detected (set to '$value'). |This is deprecated in Spark 1.0+. | |Please instead use: | - ./spark-submit with --num-executors to specify the number of executors | - Or set SPARK_EXECUTOR_INSTANCES | - spark.executor.instances to configure the number of instances in the spark config. """.stripMargin logWarning(warning) set("spark.executor.instances", value) } } }
今天的文章
spark源码一:sparkContext初始化之SparkConf分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/82841.html