Programing

Spark에서 단계는 어떻게 작업으로 분할됩니까?

crosscheck 2020. 7. 18. 09:17
반응형

Spark에서 단계는 어떻게 작업으로 분할됩니까?


다음에 대해 특정 시점마다 하나의 Spark 작업 만 실행한다고 가정 해 봅시다.

내가 지금까지 얻는 것

다음은 Spark에서 발생하는 것을 이해하는 것입니다.

  1. a SparkContext가 작성되면 각 작업자 노드가 실행기를 시작합니다. 실행기는 별도의 프로세스 (JVM)이며 드라이버 프로그램에 다시 연결됩니다. 각 실행 프로그램에는 드라이버 프로그램의 jar이 있습니다. 드라이버를 종료하면 실행 프로그램이 종료됩니다. 각 실행자는 일부 파티션을 보유 할 수 있습니다.
  2. 작업이 실행될 때 계보 그래프에 따라 실행 계획이 작성됩니다.
  3. 실행 작업은 여러 개의 (계보 그래프에서) 변환 및 작업을 포함하지만 셔플은없는 단계로 분할됩니다. 따라서 스테이지는 셔플로 분리됩니다.

이미지 1

나는 이해

  • 작업은 Function 객체를 직렬화하여 드라이버에서 실행기로 전송되는 명령입니다.
  • 실행 프로그램은 드라이버 jar를 사용하여 명령 (작업)을 deserialize하고 파티션에서 실행합니다.

그러나

질문

스테이지를 해당 작업으로 어떻게 분할합니까?

구체적으로 특별히:

  1. 작업이 변환 및 작업에 의해 결정됩니까 아니면 작업에 여러 변환 / 작업이있을 수 있습니까?
  2. 작업이 파티션에 의해 결정됩니까 (예 : 파티션 당 단계 당 하나의 작업).
  3. 작업이 노드에 의해 결정됩니까 (예 : 노드 당 단계 당 하나의 작업)?

내가 생각하는 것 (올바른 경우에도 부분적인 대답 만)

에서 https://0x0fff.com/spark-architecture-shuffle , 셔플은 이미지와 설명

여기에 이미지 설명을 입력하십시오

나는 규칙이

각 단계는 노드 수에 관계없이 # number-of-partitions 작업으로 나뉩니다.

첫 번째 이미지의 경우 3 개의 맵 작업과 3 개의 축소 작업이 있다고합니다.

0x0fff의 이미지의 경우 8 개의 맵 작업과 3 개의 축소 작업이 있습니다 (오렌지 3 개와 짙은 녹색 파일 3 개만 있다고 가정).

어떤 경우에도 공개 질문

그 맞습니까? 그러나 그것이 정확하더라도, 여러 작업 (예 : 여러 맵)이 하나의 작업 내에 있는지 또는 작업 당 하나의 작업으로 구분되는지 여부에 따라 여전히 열려 있기 때문에 위의 질문에 모두 대답하지는 않았습니다.

다른 사람들의 말

Spark의 작업은 무엇입니까? Spark 작업자는 jar 파일을 어떻게 실행합니까? 그리고 어떻게 아파치 스파크 스케줄러는 작업으로 파일을 분할합니까? 비슷하지만 내 질문에 명확하게 대답했다고 생각하지 않았습니다.


여기 꽤 좋은 개요가 있습니다. 질문에 대답하기 위해

  • 각 데이터 파티션마다 별도의 시작 task 필요합니다 stage. 각 파티션은 별도의 물리적 위치 (예 : HDFS의 블록 또는 로컬 파일 시스템의 디렉토리 / 볼륨)에있을 것입니다.

의 제출 Stage은에 의해 진행됩니다 DAG Scheduler. 이는 상호 의존적이지 않은 단계가 병렬로 실행되도록 클러스터에 제출 될 수 있음을 의미합니다. 이는 클러스터의 병렬화 기능을 최대화합니다. 따라서 데이터 흐름의 작업이 동시에 발생할 수 있으면 여러 단계가 시작될 것으로 예상됩니다.

다음 장난감 예제에서 실제로 다음 유형의 작업을 수행하는 것을 볼 수 있습니다.

  • 두 개의 데이터 소스를로드
  • 두 데이터 소스 모두에서 개별적으로 일부 맵 작업을 수행
  • 그들과 합류
  • 결과에 대한 일부 맵 및 필터 작업 수행
  • 결과를 저장

그렇다면 몇 단계를 거쳐야할까요?

  • 두 개의 데이터 소스를 병렬로로드하기위한 각각 1 단계 = 2 단계
  • 을 나타내는 제 3 단 join이다 따라 다른 두 단계에
  • 참고 : 결합 된 데이터에 대한 모든 후속 작업은 순차적으로 수행 되어야하므로 동일한 단계 에서 수행 될 수 있습니다 . 추가 작업을 시작하면 이전 작업이 완료 될 때까지 작업을 시작할 수 없으므로 추가 단계를 시작하면 이점이 없습니다.

그 장난감 프로그램입니다

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

그리고 여기에 결과의 DAG가 있습니다

여기에 이미지 설명을 입력하십시오

이제 얼마나 많은 작업이 있습니까? 작업 수는 다음과 같아야합니다

( Stage* #Partitions in the stage)의


이것은 다른 부분을 더 잘 이해하는 데 도움이 될 수 있습니다.

  • Stage: is a collection of tasks. Same process running against different subsets of data (partitions).
  • Task: represents a unit of work on a partition of a distributed dataset. So in each stage, number-of-tasks = number-of-partitions, or as you said "one task per stage per partition”.
  • Each executer runs on one yarn container, and each container resides on one node.
  • Each stage utilizes multiple executers, each executer is allocated multiple vcores.
  • Each vcore can execute exactly one task at a time
  • So at any stage, multiple tasks could be executed in parallel. number-of-tasks running = number-of-vcores being used.

If I understand correctly there are 2 ( related ) things that confuse you:

1) What determines the content of a task?

2) What determines the number of tasks to be executed?

Spark's engine "glues" together simple operations on consecutive rdds, for example:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

so when rdd3 is (lazily) computed, spark will generate a task per partition of rdd1 and each task will execute both the filter and the map per line to result in rdd3.

The number of tasks is determined by the number of partitions. Every RDD has a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is the number of splits generated by the input format. Some operations on RDD(s) can result in an RDD with a different number of partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Another example is joins:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

( Most ) operations that change the number of partitions involve a shuffle, When we do for example:

rdd2 = rdd1.repartition( 1000 ) 

what actually happens is the task on each partition of rdd1 needs to produce an end-output that can be read by the following stage so to make rdd2 have exactly 1000 partitions ( How they do it? Hash or Sort ). Tasks on this side are sometimes referred to as "Map ( side ) tasks". A task that will later run on rdd2 will act on one partition ( of rdd2! ) and would have to figure out how to read/combine the map-side outputs relevant to that partition. Tasks on this side are sometimes referred to as "Reduce ( side ) tasks".

두 가지 질문이 관련되어 있습니다. 단계의 작업 수는 파티션 수 (연속 rdd가 "접착 됨"에 공통 임)이며 rdd의 파티션 수는 단계 수 (파티션 수를 일부로 지정하여) 사이에서 변경 될 수 있습니다. 셔플 (예 :)).

스테이지 실행이 시작되면 해당 작업이 작업 슬롯을 차지할 수 있습니다. 동시 작업 슬롯 수는 numExecutors * ExecutorCores입니다. 일반적으로, 서로 다른 비 종속적 단계의 작업이이를 점유 할 수 있습니다.

참고 URL : https://stackoverflow.com/questions/37528047/how-are-stages-split-into-tasks-in-spark

반응형