HashPartitioner는 어떻게 작동합니까?
의 문서를 읽었습니다 HashPartitioner. 불행히도 API 호출을 제외하고는 많은 설명이 없습니다. 나는 HashPartitioner키의 해시를 기반으로 분산 세트 를 분할 한다는 가정하에 있습니다. 예를 들어 내 데이터가
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
따라서 파티 셔 너는 동일한 키가 동일한 파티션에있는 다른 파티션에 이것을 넣습니다. 그러나 생성자 인수의 중요성을 이해하지 못합니다.
new HashPartitoner(numPartitions) //What does numPartitions do?
위의 데이터 세트의 경우 결과가 어떻게 다를까요?
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
그렇다면 HashPartitioner실제로 어떻게 작동합니까?
음, 데이터 세트를 약간 더 흥미롭게 만들 수 있습니다.
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
6 가지 요소가 있습니다.
rdd.count
Long = 6
파티 셔너 없음 :
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
8 개의 파티션 :
rdd.partitions.length
Int = 8
이제 파티션 당 요소 수를 세는 작은 도우미를 정의 해 보겠습니다.
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
파티 셔 너가 없기 때문에 데이터 세트가 파티션간에 균일하게 배포됩니다 ( Spark의 기본 파티셔닝 체계 ).
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
이제 데이터 세트를 다시 분할하겠습니다.
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
에 전달 된 매개 변수 HashPartitioner는 파티션 수 를 정의 하므로 하나의 파티션이 필요합니다.
rddOneP.partitions.length
Int = 1
파티션이 하나뿐이므로 모든 요소가 포함됩니다.
countByPartition(rddOneP).collect
Array[Int] = Array(6)
셔플 후 값의 순서는 결정적이지 않습니다.
우리가 사용한다면 같은 방법 HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
우리는 2 개의 파티션을 얻을 것입니다 :
rddTwoP.partitions.length
Int = 2
rdd키 데이터에 의해 분할 되므로 더 이상 균일하게 배포되지 않습니다.
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
세 개의 키와 두 개의 다른 hashCodemod 값만 있기 때문에 numPartitions여기서 예상치 못한 것은 없습니다.
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
위 사항을 확인하기 위해 :
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
마지막으로 HashPartitioner(7)우리는 7 개의 파티션을 얻습니다. 각각 2 개의 요소가있는 비어 있지 않은 3 개의 파티션 :
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
요약 및 참고 사항
HashPartitionertakes a single argument which defines number of partitionsvalues are assigned to partitions using
hashof keys.hashfunction may differ depending on the language (Scala RDD may usehashCode,DataSetsuse MurmurHash 3, PySpark,portable_hash).In simple case like this, where key is a small integer, you can assume that
hashis an identity (i = hash(i)).Scala API uses
nonNegativeModto determine partition based on computed hash,if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.
In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.
Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).
HashPartitioner(or any otherPartitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.
RDD is distributed this means it is split on some number of parts. Each of this partitions is potentially on different machine. Hash partitioner with argument numPartitions chooses on what partition to place pair (key, value) in following way:
- Creates exactly
numPartitionspartitions. - Places
(key, value)in partition with numberHash(key) % numPartitions
이 HashPartitioner.getPartition메서드는 키 를 인수로 사용하고 키가 속한 파티션 의 인덱스 를 반환합니다 . 파티 셔 너는 유효한 인덱스가 무엇인지 알아야하므로 올바른 범위의 숫자를 반환합니다. 파티션 수는 numPartitions생성자 인수를 통해 지정됩니다 .
구현은 대략 key.hashCode() % numPartitions. 자세한 내용은 Partitioner.scala 를 참조하십시오.
참고 URL : https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
'Programing' 카테고리의 다른 글
| Python : 메서드 호출에서 발생할 수있는 예외를 어떻게 알 수 있습니까? (0) | 2020.10.20 |
|---|---|
| HTML5의 Keygen 태그 (0) | 2020.10.20 |
| deltree에 무슨 일이 일어 났고 그 대체물은 무엇입니까? (0) | 2020.10.19 |
| 키로 정렬 된 파이썬 dict ()를 읽기 쉽게 출력 (0) | 2020.10.19 |
| equals (null) 대신 NullPointerException이 발생하면 나쁜 생각입니까? (0) | 2020.10.19 |



