주요 Spark로 여러 출력에 쓰기-하나의 Spark 작업
단일 작업에서 Spark를 사용하여 키에 따라 여러 출력에 쓸 수있는 방법은 무엇입니까?
관련 : 키 스케일링 하둡, 하나의 MapReduce 작업으로 여러 출력에 쓰기
예
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
보장 할 것 cat prefix/1
입니다
a
b
그리고 cat prefix/2
것
c
편집 : 최근에 전체 가져 오기, 포주 및 압축 코덱을 포함하는 새로운 답변을 추가했습니다 .https : //stackoverflow.com/a/46118044/1586965 참조 하십시오 . 이전 답변 외에도 도움이 될 수 있습니다.
Spark 1.4 이상을 사용하는 경우 DataFrame API 덕분에 훨씬 더 쉬워 졌습니다 . (DataFrames는 Spark 1.3에서 도입되었지만 partitionBy()
우리가 필요로 하는는 1.4 에서 도입되었습니다 .)
RDD로 시작하는 경우 먼저이를 DataFrame으로 변환해야합니다.
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
Python에서이 동일한 코드는 다음과 같습니다.
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
DataFrame이 있으면 특정 키를 기반으로 여러 출력에 쓰기가 간단합니다. 또한, 이것이 DataFrame API의 장점입니다. 코드는 Python, Scala, Java 및 R에서 거의 동일합니다.
people_df.write.partitionBy("number").text("people")
원하는 경우 다른 출력 형식을 쉽게 사용할 수 있습니다.
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
각 예제에서 Spark는 DataFrame을 분할 한 각 키에 대한 하위 디렉터리를 만듭니다.
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
나는 확장 가능한 이렇게 할 것입니다
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
위에서 비슷한 대답을 보았지만 실제로는 사용자 정의 파티션이 필요하지 않습니다. MultipleTextOutputFormat은 각 키에 대한 파일을 생성합니다. 동일한 키를 가진 여러 레코드가 동일한 파티션에 포함 되어도 괜찮습니다.
new HashPartitioner (num), 여기서 num은 원하는 파티션 번호입니다. 다른 키가 많은 경우 숫자를 크게 설정할 수 있습니다. 이 경우 각 파티션은 너무 많은 hdfs 파일 처리기를 열지 않습니다.
주어진 키에 대해 잠재적으로 많은 값을 가지고 있다면 확장 가능한 솔루션은 파티션 당 키당 하나의 파일을 쓰는 것입니다. 안타깝게도 Spark에는 이에 대한 기본 제공 지원이 없지만 우리는 무언가를 채울 수 있습니다.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.mapPartitionsWithIndex { (p, it) =>
val outputs = new MultiWriter(p.toString)
for ((k, v) <- it) {
outputs.write(k.toString, v)
}
outputs.close
Nil.iterator
}
.foreach((x: Nothing) => ()) // To trigger the job.
// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
def write(key: String, value: Any) = {
if (!writers.contains(key)) {
val f = new java.io.File("output/" + key + "/" + suffix)
f.getParentFile.mkdirs
writers(key) = new java.io.PrintWriter(f)
}
writers(key).println(value)
}
def close = writers.values.foreach(_.close)
}
( PrintWriter
선택한 분산 파일 시스템 작업으로 대체하십시오 .)
이것은 RDD를 통해 단일 패스를 만들고 셔플을 수행하지 않습니다. 키당 하나의 디렉토리를 제공하며 각 디렉토리에는 여러 파일이 있습니다.
여기에는 요청 된 코덱, 필요한 가져 오기 및 요청 된 포주가 포함됩니다.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
OP와의 미묘한 차이점 중 하나 <keyName>=
는 디렉토리 이름 앞에 접두사가 있다는 것입니다 . 예
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
줄 것 :
prefix/key=1/part-00000
prefix/key=2/part-00000
여기 prefix/my_number=1/part-00000
에는 라인 a
및 b
, 라인 prefix/my_number=2/part-00000
이 포함됩니다 c
.
과
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
줄 것 :
prefix/foo=1/part-00000
prefix/foo=2/part-00000
에 대한 편집 방법이 명확해야합니다 parquet
.
마지막으로에 대한 예 Dataset
는 튜플을 사용하는 것보다 더 좋을 것입니다.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
나는 비슷한 필요가 있고 방법을 찾았습니다. 하지만 한 가지 단점이 있습니다 (제 경우에는 문제가되지 않습니다). 출력 파일 당 하나의 파티션으로 데이터를 다시 분할해야합니다.
이러한 방식으로 파티션을 나누려면 일반적으로 작업이 출력 할 파일 수를 미리 알고 각 키를 각 파티션에 매핑하는 기능을 찾아야합니다.
먼저 MultipleTextOutputFormat 기반 클래스를 만들어 보겠습니다.
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
key.toString
}
override protected def generateActualKey(key: T, value: V) = {
null
}
}
이 클래스를 사용하면 Spark는 파티션 (첫 번째 / 마지막)에서 키를 가져와이 키로 파일 이름을 지정하므로 동일한 파티션에 여러 키를 혼합하는 것은 좋지 않습니다.
예를 들어, 사용자 지정 파티 셔 너가 필요합니다. 이것은 일을 할 것입니다 :
import org.apache.spark.Partitioner
class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
이제 모든 것을 합쳐 보겠습니다.
val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))
// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)
val prefix = "hdfs://.../prefix"
val partitionedRDD = rdd.partitionBy(partitioner)
partitionedRDD.saveAsHadoopFile(prefix,
classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])
이렇게하면 접두사 (1, 2 및 7로 명명 됨) 아래에 3 개의 파일이 생성되어 모든 것을 한 번에 처리합니다.
보시다시피이 솔루션을 사용하려면 키에 대한 지식이 필요합니다.
저에게는 각 키 해시에 대해 하나의 출력 파일이 필요하고 파일 수를 제어 할 수 있었기 때문에 더 쉬웠으므로 스톡 HashPartitioner를 사용하여 트릭을 수행 할 수있었습니다.
Java에서도 같은 것이 필요했습니다. Spark Java API 사용자에게 Zhang Zhan의 Scala 답변 번역 게시 :
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString();
}
}
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Split Job")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
sc.parallelize(Arrays.asList(strings))
// The first character of the string is the key
.mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
.saveAsHadoopFile("output/", String.class, String.class,
RDDMultipleTextOutputFormat.class);
sc.stop();
}
}
saveAsText () 및 saveAsHadoop (...)은 RDD 데이터를 기반으로 특히 실행되는 PairRdd에서 데이터를 가져 오는 PairRDD.saveAsHadoopDataset 메서드에 의해 구현 됩니다. 가능한 두 가지 옵션이 있습니다. 데이터 크기가 상대적으로 작 으면 RDD를 그룹화하고 각 컬렉션에서 새 RDD를 만들고 해당 RDD를 사용하여 데이터를 작성하여 구현 시간을 절약 할 수 있습니다. 이 같은:
val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}
반복기의 구체화가 v.toSeq
메모리에 맞지 않을 수 있으므로 큰 데이터 세트에서는 작동 하지 않습니다.
내가 본 다른 옵션은 실제로이 경우에 권장하는 옵션은 다음과 같습니다. hadoop / hdfs api를 직접 호출하여 직접 롤링합니다.
다음은이 질문을 조사하면서 시작한 토론 입니다. 다른 RDD에서 RDD를 만드는 방법은 무엇입니까?
Hadoop HDFS의 입력 파일을 키 (키당 파일 1 개)를 기반으로 여러 파일로 분할하는 유사한 사용 사례가있었습니다. 다음은 스파크에 대한 내 스칼라 코드입니다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
@serializable object processGroup {
def apply(groupName:String, records:Iterable[String]): Unit = {
val outFileStream = fs.create(new Path("/output_dir/"+groupName))
for( line <- records ) {
outFileStream.writeUTF(line+"\n")
}
outFileStream.close()
}
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))
키를 기반으로 레코드를 그룹화했습니다. 각 키의 값은 별도의 파일에 기록됩니다.
여러 열이 있고 다른 모든 열을 csv 형식으로 분할하지 않고 Nick Chammas의 제안으로 "text"방법을 사용하면 실패하는 경우 Python 사용자에게 좋은 소식입니다.
people_df.write.partitionBy("number").text("people")
오류 메시지는 "AnalysisException : u'Text 데이터 소스가 단일 열만 지원하고 2 개의 열이 있습니다.; '"입니다.
스파크 2.0.0 (내 테스트 환경은 hdp의 스파크 2.0.0) 패키지 "com.databricks.spark.csv"가 이제 통합되었으며 하나의 열로만 분할 된 텍스트 파일을 저장할 수 있습니다.
people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
(1,"2016-12-25", "alice"),
(1,"2016-12-25", "tom"),
(1, "2016-12-25","bob"),
(2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])
df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")
[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS
[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie
내 spark 1.6.1 환경에서 코드는 오류를 발생시키지 않았지만 생성 된 파일은 하나뿐입니다. 두 개의 폴더로 분할되지 않습니다.
이것이 도움이되기를 바랍니다.
비슷한 사용 사례가있었습니다. 나는 두 개의 사용자 정의 클래스를 작성 위치 : implemeting으로 자바를 해결 MultipleTextOutputFormat
하고 RecordWriter
.
내 입력은 a JavaPairRDD<String, List<String>>
였고 모든 줄이 값에 포함 된 키로 명명 된 파일에 저장하고 싶었습니다.
내 MultipleTextOutputFormat
구현 코드는 다음과 같습니다.
class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString(); //The return will be used as file name
}
/** The following 4 functions are only for visibility purposes
(they are used in the class MyRecordWriter) **/
protected String generateLeafFileName(String name) {
return super.generateLeafFileName(name);
}
protected V generateActualValue(K key, V value) {
return super.generateActualValue(key, value);
}
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
return super.getInputFileBasedOutputFileName(job, name);
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
return super.getBaseRecordWriter(fs, job, name, arg3);
}
/** Use my custom RecordWriter **/
@Override
RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
final String myName = this.generateLeafFileName(name);
return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
}
}
내 RecordWriter
구현 코드는 다음과 같습니다 .
class MyRecordWriter<K, V> implements RecordWriter<K, V> {
private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
private final FileSystem fs;
private final JobConf job;
private final Progressable arg3;
private String myName;
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
this.fs = fs;
this.job = job;
this.arg3 = arg3;
this.myName = myName;
}
@Override
void write(K key, V value) throws IOException {
String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if(rw == null) {
rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
}
@Override
void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while(keys.hasNext()) {
RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
}
대부분의 코드는 FileOutputFormat
. 유일한 차이점은 그 몇 줄
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
These lines allowed me to write each line of my input List<String>
on the file. The first argument of the write
function is set to null
in order to avoid writting the key on each line.
To finish, I only need to do this call to write my files
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
참고URL : https://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
'Programing' 카테고리의 다른 글
PHP에서 register_globals는 무엇입니까? (0) | 2020.11.30 |
---|---|
Magento 관리자 로그인 페이지의 "오류 404 찾을 수 없음" (0) | 2020.11.30 |
Ubuntu-ssh--경고 : 원격 호스트 ID가 변경되었습니다. (0) | 2020.11.29 |
데이터베이스의 모든 테이블을 하나의 데이터 정렬로 변환하는 방법은 무엇입니까? (0) | 2020.11.29 |
유효한 날짜와 일치하는 정규식 (0) | 2020.11.29 |