5 minute read

Motivation

Apache Spark는 대규모 데이터를 효율적으로 처리하기 위한 분산 처리 프레임워크로, 데이터 엔지니어링과 분석 환경에서 널리 사용되고 있습니다. 다양한 클러스터 매니저와 호환되며, 그중에서도 YARN은 자주 활용되는 클러스터 매니저입니다. 그렇다면 Spark 애플리케이션을 YARN 클러스터에 제출하면, 내부에서는 어떤 일이 진행될까요?

이번 글에서는 spark-submit 명령어 이후 Spark와 YARN이 상호작용하며 ApplicationMasterDriver를 생성하고 Executor 컨테이너를 할당하는 과정을 자세히 살펴보고자 합니다. 이를 위해 Spark 애플리케이션이 실행되는 과정을 주요 클래스와 메서드 중심으로 분석해 보았습니다.

특히, 현업에서 자주 활용되는 YARN 클러스터 모드에 초점을 맞춰 Spark와 YARN이 어떻게 협력하는지, 그리고 그 과정에서의 핵심 흐름을 이해하기 쉽게 설명해 보려 합니다. 소스코드 분석에는 Spark version 3.5.3을 사용하였습니다.

Contents

Spark Submit

[class] SparkSubmit

SparkSubmit 객체는 Spark 애플리케이션을 제출하고 실행하는 진입점입니다. 각 클러스터 매니저에 맞춰 submit할 클래스들을 정의후, 배포 모드(Client, Cluster)와 기타 설정에 맞춰 애플리케이션 실행을 관리합니다.

[method] runMain

spark-submit 명령어가 제출되면, 인자가 파싱된 후 runMain 메서드가 호출됩니다. runMain에서는 prepareSubmitEnvironment 메서드를 통해 Spark 설정, 메인 클래스와 같은 애플리케이션 환경이 설정됩니다. YARN의 클러스터 배포 모드로 제출되는 경우, childMainClassorg.apache.spark.deploy.yarn.YarnClusterApplication으로 설정됩니다. 이후 해당 클래스의 인스턴스를 생성후 app.start를 통해 메인 메서드를 호출합니다.

def prepareSubmitEnvironment(args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = {
  if (isYarnCluster) {
    childMainClass = YARN_CLUSTER_SUBMIT_CLASS
  }
  (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
mainClass = Utils.classForName(childMainClass)

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
}

app.start(childArgs.toArray, sparkConf)

[class] YarnClusterApplication

YarnClusterApplication 클래스에서는 YARN과 소통하는 Client 객체를 생성함으로써 에플리케이션 제출 및 실행을 관리합니다. Client 클래스는 ApplicationMasterResourceManager에 제출후 실행될 수 있도록 리소스 검증, 컨테이너 생성등의 기능을 수행합니다.

[method] submitApplication

YARN 클러스터와의 통신을 위해 yarnClient를 생성한후 상태를 모니터링할 수 있도록 launcherBackend와의 연결도 설정합니다. 이후 yarnClient을 통해 새로운 애플리케이션을 요청하여 AM 컨테이너 실행 준비를 합니다.

yarnClient.init(hadoopConf)
yarnClient.start()
launcherBackend.connect()
val newApp = yarnClient.createApplication()

createContainerLaunchContext 메소드를 통해 AM 컨테이너를 실행하기 위한 환경(클래스 경로, 실행 명령어 등)을 설정합니다. YARN에 클러스터 모드로 제출된 경우 amClassorg.apache.spark.deploy.yarn.ApplicationMaster가 설정됩니다. 이후 yarnClient.submitApplication를 통해 애플리케이션을 YARN에 제출합니다.

def createContainerLaunchContext(): ContainerLaunchContext = {
  val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
  val amClass =
    if (isClusterMode) {
      Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
    } else {
      Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
    }
  amContainer.setCommands(amClass)
  amContainer
}

val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)

yarnClient.submitApplication(appContext)

ApplicationMaster

[class] ApplicationMaster

Client class에서 ResourceManager에 제출후 AM container에서 ApplicationMaster가 실행됩니다. ApplicationMaster class는 유저 애플리케이션 실행, 모니터링을 하며 executor 자원할당을 관리합니다. UserApplication을 별도의 스레드에서 실행후 SparkContext가 성공적으로 생성되면 이후 YARN에 AM을 등록후 executor container 자원을 요청 및 모니터링 합니다.

[method] run

ClusterMode인 경우 runDriver 메서드를 통해 YARN 클러스터 내에서 드라이버를 실행하여 애플리케이션을 제어합니다. runDriver와 다르게 Client 모드에서는 runExecutorLauncher가 실행되며 드라이버가 클러스터 외부에서 실행됩니다. ClienMode일 경우 runExecutorLauncher는 YARN에서 executor만 실행합니다.

if (isClusterMode) {
  runDriver()
} else {
  runExecutorLauncher()
}

[method] runDriver

userApplication을 별도의 스레드에서 실행합니다. 여기서 애플리케이션의 유저 메인 클래스가 시작되며, 이 클래스는 클러스터에서 실행되는 드라이버 역할을 수행합니다.

userClassThread = startUserApplication()

userApplication에서 SparkContext가 초기화될 때까지 대기합니다. SparkContext가 정상적으로 설정되었는지 확인후, ApplicationMasterregisterAM을 통해 YARN의 ResourceManager에 등록하여, 자원 할당 및 상태를 관리할 수 있게 합니다. 이후 리소스를 할당하는 YarnAllocator를 생성하여 Executor container 자원 할당 및 실행을 시작합니다.

val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
  Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
  val rpcEnv = sc.env.rpcEnv

  registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

  createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf())
}

[class] YarnAllocator

YarnAllocatorResourceManagerexecutor container 자원을 요청하고 Locality(호스트, 랙, ANYHOST)순으로 선택한 다음 executorRunnable를 통해 executor를 실행시킵니다.

[method] allocateResource

allocateResource에서는 YARN ResourceManager에 자원 할당 요청을 보냅니다. executor를 실행할 컨테이너들을 allocateResponse에 받은후 handleAllocatedContainers 메서드를 통해 할당된 컨테이너를 순회하며, 각 컨테이너에 executor를 배포합니다.

val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
handleAllocatedContainers(allocatedContainers.asScala.toSeq)

[method] handleAllocatedContainers

containersToUse 리스트로 실제 사용할 컨테이너를 저장하는 곳을 생성한후, 컨테이너와 호스트 매칭을 우선 시도합니다. matchContainerToRequest 메서드를 통해 컨테이너의 호스트와 애플리케이션이 요청한 호스트와 일치하는지 확인합니다. 일치하는 컨테이너는 위에서 선언된 containersToUse에 추가되고, 일치하지 않는 컨테이너는 이후 랙이 매칭되는지 확인합니다.

val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
  matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
    containersToUse, remainingAfterHostMatches)
}

컨테이너에 대해 랙 매칭을 다음 단계로 시도합니다. matchContainerToRequest로 랙과 일치하는지 확인하고, 일치하는 컨테이너는 containersToUse에 추가됩니다. 일치하지 않는 경우 ANY_HOST를 사용하여 매칭을 시도하고, 이후 locality 기반으로 매칭되지 않은 불필요한 컨테이너들은 YARN ResourceManager에 반환됩니다.

호스트, 랙, 오프랙 순 매칭을 통해 containerToUse에 저장된 컨테이너들에 runAllocatedContainers 메서드를 통해 executor를 배치 및 실행합니다.

val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
  for (allocatedContainer <- remainingAfterHostMatches) {
    val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
    matchContainerToRequest(allocatedContainer, rack, containersToUse,
      remainingAfterRackMatches)
  }
}

runAllocatedContainers(containersToUse)

[method] runAllocatedContainers

executor 실행을 위해 ExecutorRunnable을 사용합니다. ExecutorRunnable은 컨테이너 내에서 실행될 executor를 위한 자원(메모리, CPU)을 설정한후 NodeManager를 통해 executor를 실행하는 역할을 합니다. launcherPool 스레드풀을 이용해서 비동기적으로 여러 컨테이너에서 동시에 executor를 시작할 수 있게 해줍니다.

for (container <- containersToUse) {
  launcherPool.execute(() => {
    new ExecutorRunnable(
      Some(container),
      sparkConf,
      driverUrl,
    ).run()
    updateInternalState(rpId, executorId, container)
  })
}

[class] ExecutorRunnable

NodeManager Client를 생성하여 NodeManager와 통신할 수 있도록 준비합니다. 이후 컨테이너의 실행 환경 객체를 생성후 ClusterManager에 맞게 실행 class를 생성후 NodeManager에 전달하여 executor를 실행합니다.

[method] run

NMClient를 생성하여 NodeManager와 통신할 수 있도록 준비합니다. startContainer 메서드를 통해 컨테이너의 환경, 리소스를 설정후 실제로 executor를 실행합니다

nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()

[method] startContainer

ContainerLaunchContext는 컨테이너가 실행될 때 필요한 정보(자원, 환경 변수, 명령어 등)를 담고 있는 객체입니다. 이 객체는 YARN NodeManager에게 전달되어 컨테이너의 실행 환경을 설정합니다.

val ctx = Records.newRecord(classOf[ContainerLaunchContext])
  .asInstanceOf[ContainerLaunchContext]

이후 prepareCommand 메서드를 통해 executor를 실행하기 위한 명령어(JAR 파일, 라이브러리)를 준비합니다. YARN에서 실행시 container 클래스로 org.apache.spark.executor.YarnCoarseGrainedExecutorBackend가 설정됩니다. ctx.setCommands를 통해 ContainerLaunchContext에 설정됩니다.

val commands = prepareCommand()
ctx.setLocalResources(localResources.asJava)
ctx.setCommands(commands.asJava)

설정된 ContainerLaunchContext를 YARN NodeManager에 전달하여, 할당된 컨테이너에서 실제로 executor를 실행하는 작업을 수행합니다.

nmClient.startContainer(container.get, ctx)

Executor

[class] CoarceGrainedExecutorBackend

YarnCoarseGrainedExecutorBackend 에서는 CoarseGrainedExecutorBackend 객체를 생성함으로써 driver와의 통신과 작업 처리를 담당합니다. CoarseGrainedExecutorBackend 클래스는 executordriver 간의 작업 전송과 상태 관리 기능을 수행하며, executor에서 발생하는 작업을 처리하고 그 결과를 driver에 전송하는 역할도 맡습니다.

[method] run

setupEndpointRefByURI 메서드를 사용하여 driver와의 RPC 연결을 위한 EndpointRef를 생성합니다.

val fetcher = RpcEnv.create("driverPropsFetcher", ... )
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)

backendCreateFn을 통해 CoarceGrainedExecutorBackend 객체를 executor 백엔드로 생성하고, 이를 RPC 엔드포인트로 등록합니다. 이로 인해 드라이버와의 통신이 활성화되며, 작업을 수신할 수 있게 됩니다.

val env = SparkEnv.createExecutorEnv(driverConf, ... )
val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
env.rpcEnv.setupEndpoint("Executor", backend)

[method] onStart

CoarceGrainedExecutorBackend 객체가 RPC 엔드포인트로 등록된후 executor가 준비되었음을 드라이버에게 알립니다. asyncSetupEndpointRefByURI 메서드로 RPC 엔드포인트를 통해 드라이버와의 연결을 설정합니다. 이후 RegisterExecutor 메시지를 드라이버에게 보내서, executor가 드라이버에 등록될 수 있도록 요청합니다. 이후 드라이버가 executor를 등록하면 RegisteredExecutor 메시지를 스스로에게 보내 executor가 정상적으로 등록되었음을 알립니다.

rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
  driver = Some(ref)
  ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
    extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
  case Success(_) =>
    self.send(RegisteredExecutor)
  case Failure(e) =>
    exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)

[method] receive

executor에서 RPC 요청을 받았을때 호출되는 메서드 입니다. RegisteredExecutor 메시지는 드라이버에 executor가 성공적으로 등록되었을 때 수신됩니다. executor가 등록되면 task를 수행하는 Executor 객체를 생성하고, 작업을 처리할 준비가 되었음을 알리는 LaunchedExecutor 메시지를 드라이버에게 전송합니다.

case RegisteredExecutor =>
  logInfo("Successfully registered with driver")
  try {
    executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
      resources = _resources)
    driver.get.send(LaunchedExecutor(executorId))
  } catch {
    case NonFatal(e) =>
      exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
  }

Conclusion

이번 분석에서는 spark-submit 명령어 실행부터 시작해, ApplicationMaster 컨테이너가 실행되고, YARN ResourceManagerNodeManager를 통한 요청으로 Executor 컨테이너가 할당되고 ExecutorBackend가 시작되는 전체 과정을 살펴보았습니다.

특히, YarnAllocator가 컨테이너를 할당할 때 호스트, 랙, ANYHOST 등의 locality 우선순위를 기반으로 Executor 서버를 선택하는 과정이 인상적이었습니다. 이 과정은 이후 DriverTaskSchedulerTaskExecutor에 분배할 때 locality를 고려해 데이터 셔플을 최소화하기 위한 사전 작업임을 확인할 수 있었습니다.

Reference


https://github.com/apache/spark/tree/master