3 minute read

Motivation

Spark는 대규모 데이터를 분산 처리하기 위한 프레임워크로, 빠른 데이터 처리와 효율적인 작업 스케줄링을 제공하고 있습니다. Spark 애플리케이션을 작성하며 RDD에 액션을 적용할 때, 내부적으로 어떤 일이 일어나는지 궁금했던 적이 있으신가요?

이번 글에서는 RDD 액션 호출 이후 Spark 내부에서 DAG(Directed Acyclic Graph)가 생성되고, 이를 기반으로 Task가 만들어져 Executor에 분배되기까지의 과정을 살펴보려 합니다. 이 과정에서 Spark의 주요 컴포넌트(SparkContext, DAGScheduler, TaskSchedulerImpl, CoarseGrainedSchedulerBackend)가 각각 어떤 역할을 수행하는지도 확인해보고자 합니다.

특히, RDD의 Dependency를 기반으로 DAG를 생성하고 Task를 Scheduling하는 Spark의 내부 과정에 초점을 맞추었습니다. 이 글을 통해 Spark의 내부 구조를 파악하여 더 깊이 있는 활용에 도움이 되면 좋겠습니다. 소스코드 분석에는 spark version 3.5.3을 사용하였습니다.

Contents

1. RDD Action 호출: Stage와 Task가 만들어지는 과정

[class] SparkContext

Spark 애플리케이션의 시작점으로, 사용자 코드에서 호출되는 Action 메서드의 요청을 처리합니다. SparkContext는 애플리케이션 실행에 필요한 class 환경을 설정하고, RDD에 대한 Action 요청을 DAGScheduler로 전달합니다.

[method] submitJob

사용자 코드에서 Action이 호출될 때 SparkContext에서 처음 호출되는 method 입니다. RDD, 파티션 정보와 같은 세부정보를 담아 DAGScheduler.submitJob에게 전달합니다.

def submitJob[T, U, R](
    rdd: RDD[T],
    processPartition: Iterator[T] => U,
    partitions: Seq[Int],
    resultFunc: => R): SimpleFutureAction[R] =
{
    assertNotStopped()
    val cleanF = clean(processPartition)
    val callSite = getCallSite()
    val waiter = dagScheduler.submitJob(
        rdd,
        (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
        partitions,
        callSite,
        localProperties.get)
    new SimpleFutureAction(waiter, resultFunc)
}

[class] DAGScheduler

RDD의 의존성을 기반으로 DAG(Directed Acyclic Graph)를 생성하고, 이를 Stage로 분리한 후, 각 Stage를 Task 단위로 나눕니다. TaskSchedulerImplTask를 전달해 실제 Executor에서 작업이 실행되도록 처리합니다.

[method] submitJob

SparkContext에서 받은 요청을 처리하여 이벤트 큐(eventProcessLoop)에 JobSubmitted 이벤트를 추가하여 DAG 생성 및 Task 분배를 준비합니다.

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    properties: Properties): JobWaiter[U] =
{
    val jobId = nextJobId.getAndIncrement()
    val waiter = new JobWaiter[U](this, jobId, partitions.size)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      JobArtifactSet.getActiveOrDefault(sc),
      Utils.cloneProperties(properties)))
    waiter
}

[method] createResultStage

최종적으로 실행될 ResultStage를 생성하여 stage submit을 준비하는 method입니다. getOrCreateParentStages 메서드를 통해 parent 관계에 있는 ShuffleMapStage들을 정의합니다. parent stage들과 ResultStage를 통해 DAG를 구성합니다.

private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = 
{
    val (shuffleDeps, _) = getShuffleDependenciesAndResourceProfiles(rdd)
    val parents = getOrCreateParentStages(shuffleDeps, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stage
}

[method] getOrCreateParentStages

위의 ResultStage에서 DAG구성에 필요한 parent stage들을 생성하기 위해 호출되는 method 입니다. Action이 호출된 RDDShuffleDependency를 기반으로 Parent ShuffleMapStage들을 생성합니다.

private def getOrCreateParentStages(
    shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
    firstJobId: Int): List[Stage] = 
{
    shuffleDeps.map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
}

[method] submitStage

위 단계에서 생성된 ResultStage를 시작으로, parent Stage들의 실행을 재귀적으로 처리합니다. parent Stage가 완료될 때까지 대기하여 DAG의 의존성을 만족한 후에 각 stage는 submitMissingTasks에 전달되어 Task로 분리됩니다.

private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        if (missing.isEmpty) {
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
    }
    }
}

[method] submitMissingTasks

실행가능한 StageTask 단위로 나누는 작업을 처리합니다. Stage 유형에 따라 TaskShuffleMapTask 또는 ResultTask로 생성한후, TaskExecutor에 스케쥴링 하는 TaskSchedulerImpl에 전달합니다.

private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
          new ShuffleMapTask(stage.id, stage.latestInfo,
            stage.numPartitions, taskIdToLocations(id), stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
          val part = partitions(id)
          new ResultTask(stage.id, stage.latestInfo,
            stage.numPartitions, taskIdToLocations(id), id, stage.rdd.isBarrier())
          }
      }
    }
    taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, ...))
}

2. Task Scheduling: Executor로 작업이 전달되는 과정

[class] TaskSchedulerImpl

DAGScheduler에서 전달받은TaskSet을 스케줄링하여 Executor에 작업을 할당합니다. Locality 수준(Process Local, Node Local, Rack Local, ANY)에 따라 가장 적합한 Executor를 선택합니다.

[method] resourceOffers

Executor로부터 제공받은 리소스를 기반으로 Task들을 스케쥴링하는 method 입니다. Taskset을 순회하며 데이터 locality 에 가장 적합하게 Executor를 배정합니다. locality의 우선순위는 Process LocalNode LocalRack LocalANY순 입니다.

def resourceOffers(
    offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  for (taskSet <- sortedTaskSets) {
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
        val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, availableCpus, availableResources)
        launchedTaskAtCurrentMaxLocality = minLocality.isDefined
        launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
    }
  }
  tasks.map(_.toSeq)
}

[class] CoarseGrainedSchedulerBackend

ExecutorDriver 간의 통신을 관리하며, TaskExecutor로 전송하는 역할을 합니다.

[method] launchTasks

위 단계에서 생성된 TaskDescription을 기반으로 각 Executor에게 Task를 전달합니다. TaskDescription정보를 직렬화 하여 RPC 호출(LaunchTask)을 수행합니다. 이후 ExecutorBackend에서 RPC 호출(LaunchTask)을 수신하여 Task를 실행합니다.

private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    val executorData = executorDataMap(task.executorId)
          
    executorData.freeCores -= task.cpus
    task.resources.foreach { case (rName, addressAmounts) =>
    executorData.resourcesInfo(rName).acquire(addressAmounts)
    }
    executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
  }
}

Conclusion

이 글에서는 Spark 애플리케이션이 실행될 때 RDD의 Action 호출로부터 시작해 DAG를 생성하고, Task로 분해해 Executor에 전달되는 전 과정을 살펴보았습니다. Spark의 주요 컴포넌트들이 어떻게 협력하는지와 함께 RDD → DAG 생성 → Task Scheduling으로 이어지는 내부 동작을 소스코드를 통해 확인하며, Spark의 효율적인 설계 원리를 이해할 수 있었습니다.

특히, Task Scheduling 과정에서 데이터 Locality를 기반으로 최적화하는 부분은, 클러스터 자원을 효과적으로 활용하기 위한 spark의 설계를 확인할수 있어서 신기하였습니다.

Reference


https://github.com/apache/spark