博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 之SparkContext 源码精读1
阅读量:6554 次
发布时间:2019-06-24

本文共 6778 字,大约阅读时间需要 22 分钟。

hot3.png

由可知,SparkContext是Spark的核心中的核心。

SparkContext 是Spark功能的入口,表示与Spark 集群的连接。用于创建 RDD、广播变量、累加器变量;

 

接下来,咱们一起从源码开始,强烈建议读者把源码下载下来,和我一起分析源码。

引用老师的话,"源码能说明一切问题"

创建核心组件:TaskScheduler和SchedulerBackend

// SparkContext.scala line 521// Create and start the schedulerval (sched, ts) = SparkContext.createTaskScheduler(this, master)
// SparkContext.scala line 2592  /**   * Create a task scheduler based on a given master URL.   * Return a 2-tuple of the scheduler backend and the task scheduler.   */  private def createTaskScheduler(      sc: SparkContext,      master: String): (SchedulerBackend, TaskScheduler) = {    import SparkMasterRegex._    // When running locally, don't try to re-execute tasks on failure.    val MAX_LOCAL_TASK_FAILURES = 1    master match {      case "local" =>        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)        val backend = new LocalBackend(sc.getConf, scheduler, 1)        scheduler.initialize(backend)        (backend, scheduler)        // ... 若干模式匹配// line 2629      case SPARK_REGEX(sparkUrl) =>        val scheduler = new TaskSchedulerImpl(sc)        val masterUrls = sparkUrl.split(",").map("spark://" + _)        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        (backend, scheduler)        // ... 若干模式匹配      case zkUrl if zkUrl.startsWith("zk://") =>        logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +          "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")        createTaskScheduler(sc, "mesos://" + zkUrl)      case _ =>        throw new SparkException("Could not parse Master URL: '" + master + "'")    }  }}

这里着重看 SPARK_REGEX模式 。line 2630。首先创建 TaskSchedulerImpl 

 

// TaskSchedulerImpl.scala line 110// default scheduler is FIFOprivate val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")val schedulingMode: SchedulingMode = try {  SchedulingMode.withName(schedulingModeConf.toUpperCase)} catch {  case e: java.util.NoSuchElementException =>    throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")}

在创建TaskScheduler时,指定了调度模型,默认是FIFO:先入先出。

其他的变量都是初始化,暂先不细究。

其他的方法都是需要从实例的对象中去调用的,也暂不细究。

 

此时TaskSchedulerImpl创建成功,然后将创建的TaskScheduler实例作为构造参数,创建SchedulerBackend。见SparkContext.scala line 2632。

此时创建的SchedulerBackend实例实际上是 SparkDeploySchedulerBackend 类。

// SparkDeploySchedulerBackend.scala line 30private[spark] class SparkDeploySchedulerBackend(    scheduler: TaskSchedulerImpl,    sc: SparkContext,    masters: Array[String])  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)  with AppClientListener  with Logging

看SparkDeploySchedulerBackend 的定义可知,是继承自父类CoarseGrainedSchedulerBackend。让我们看看CoarseGrainedSchedulerBackend的定义。

// CoarseGrainedSchedulerBackend.scala line 31/** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. * This backend holds onto each executor for the duration of the Spark job rather than relinquishing * executors whenever a task is done and asking the scheduler to launch a new executor for * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode * (spark.deploy.*). */private[spark]class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)  extends ExecutorAllocationClient with SchedulerBackend with Logging

从上面的注释可知,这个类是粗粒度【coarse grained】的实现。扩展下,fine-grained【细粒度】的实现是MesosSchedulerBackend

大家应该知道,在创建一个对象实例是,会先执行父类的构造函数。这里,创建SparkDeploySchedulerBackend会先调用CoarseGrainedSchedulerBackend的构造。

至此SchedulerBackend创建成功。

值得注意的是:SchedulerBackend实例中有一个TaskScheduler类型的成员变量。后续有一些TaskScheduler关于调度的方法,会在SchedulerBackend中被调用。

紧接着,看上面代码,调用了

// SparkContext.scala line 2633scheduler.initialize(backend)

上述代码清楚的说明了,将SchedulerBackend作为参数传进来。

至此,咱们可以简单的这么认为,SchedulerBackend中Backend 实际上就是指TaskScheduler。SchedulerBackend是TaskScheduler的后端。

再来看下TaskSchedulerImpl的initialize方法

// TaskSchedulerImpl.scala line 125def initialize(backend: SchedulerBackend) {  this.backend = backend  // temporarily set rootPool name to empty  rootPool = new Pool("", schedulingMode, 0, 0)  schedulableBuilder = {    schedulingMode match {      case SchedulingMode.FIFO =>        new FIFOSchedulableBuilder(rootPool)      case SchedulingMode.FAIR =>        new FairSchedulableBuilder(rootPool, conf)    }  }  schedulableBuilder.buildPools()}

很显然,TaskScheduler将SchedulerBackend实例作为成员变量保存了。后续源码分析中可见,此成员变量被频繁的使用。

同时,初始化了一个调度池,然后根据上面提到的调度模式初始化了调度建造器,之后就创建了调度池。调度模式后面会单独提到。尽请期待。

 

createTaskScheduler调用之后,会返回一个Tuple,具体是SchedulerBackend[SparkDeploySchedulerBackend] 和 TaskScheduler[TaskSchedulerImpl]

之后,将SparkContext作为参数,创建DAGScheduler。

DAG是Directed Acyclic Graph的缩写,是指有向无环图。

// SparkContext.scala line 521// Create and start the scheduler ,实际上,方法调用完,只是创建了实例,并没有start,start是在 line 530val (sched, ts) = SparkContext.createTaskScheduler(this, master)_schedulerBackend = sched_taskScheduler = ts_dagScheduler = new DAGScheduler(this) // line 525 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's// constructor_taskScheduler.start()  // line 530

大家可能会问,明明上面创建了TaskScheduler和SchedulerBackend,却有没有马上start,是因为什么呢?

请看 line 530 上面的注释,已经很清楚的说明了原因。那么让我们看看DAGScheduler的构造中做了什么?

// DAGScheduler.scala line 131def this(sc: SparkContext) = this(sc, sc.taskScheduler)
// DAGScheduler.scala line 121def this(sc: SparkContext, taskScheduler: TaskScheduler) = {  this(    sc,    taskScheduler,    sc.listenerBus,    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],    sc.env.blockManager.master,    sc.env)}

    最后的目的,就是为了将TaskScheduler传进去

// DAGScheduler.scala line 110private[spark]class DAGScheduler(    private[scheduler] val sc: SparkContext,    private[scheduler] val taskScheduler: TaskScheduler,    listenerBus: LiveListenerBus,    mapOutputTracker: MapOutputTrackerMaster,    blockManagerMaster: BlockManagerMaster,    env: SparkEnv,    clock: Clock = new SystemClock())  extends Logging

并在后续构造中,将DAGScheduler赋值给TaskScheduler的成员变量

// DAGScheduler.scala line 185taskScheduler.setDAGScheduler(this)

由此可见,TaskScheduler和DAGScheduler相互之间是有引用的。

那这两个Scheduler之间有什么区别呢?

DAGScheduler可以认为是总监,他是负责阶段性进展的管控。

TaskScheduler可以认为是监工,负责具体的一个任务的进度。

比如一个项目有一期二期。一期下面有A、B、C,二期下面有D、E、F。

DAGScheduler是管控项目每一期的进展的。也就是一期、二期项目顺利完成。

而TaskScheduler是管控每一期中的子任务的进展的。也就是一期中的A、B、C。二期中的D、E、F 任务顺利完成。

当然顺利完成也代表着,如果遇到任务中断了,需要调整,重新开始。即失败重试机制。

而这两者的共同点是,他们都是负责不同粒度的任务顺利完成的。换言之,他们是不管完成任务所需资源从哪来的。

 

当DAGScheduler也创建完成后,SparkContext的3大核心对象已创建完成;分别是TaskScheduler,SchedulerBackend,DAGScheduler。

 

2016-04-04 ,时间不早了,今天就更新到此。

 

下篇介绍

 

转载于:https://my.oschina.net/corleone/blog/652361

你可能感兴趣的文章
TCP 和 UDP 协议发送数据包的大小 (转载)
查看>>
用Alamofire进行网络请求的一段代码解析(一)
查看>>
elasticsearch的percolator操作
查看>>
windows 定时任务:schtasks,定时关闭网易云音乐
查看>>
C# Note17: 使用Ionic.Zip.dll实现解压缩文件
查看>>
Mina Basics 06-传输
查看>>
c 编译异常 switch 之a label can only be part of a statement and a declaration is not a statement...
查看>>
nullnullDataTable 排序
查看>>
Codeforces Ilya and Queries
查看>>
NEWS - InstallShield 2013发布
查看>>
Viewport
查看>>
〖Linux〗Debian 7.1.0 Wheezy使用ltib报错的解决办法
查看>>
〖Android〗(how-to) fix k860/k860i buletooth.
查看>>
static与线程安全 -摘自网络
查看>>
jsf标签,jsp标签与jstl标签
查看>>
使用PHP CURL的POST数据
查看>>
struts2:表单标签
查看>>
mysql字符串截取
查看>>
ASP.NET MVC3 通过Url传多个参数方法
查看>>
遭遇sql server 2005 启动包未能正确加载需要重新安装错误,重装.NET FRAMEWORK经历分析...
查看>>