Programing

Hadoop 프로세스 레코드는 블록 경계에서 어떻게 분할됩니까?

crosscheck 2020. 7. 20. 07:37
반응형

Hadoop 프로세스 레코드는 블록 경계에서 어떻게 분할됩니까?


에 따르면 Hadoop - The Definitive Guide

FileInputFormats가 정의한 논리 레코드는 일반적으로 HDFS 블록에 깔끔하게 맞지 않습니다. 예를 들어, TextInputFormat의 논리 레코드는 행이며 HDFS 경계를 넘어 자주 교차합니다. 이것은 프로그램의 기능에 영향을 미치지 않습니다. 예를 들어 라인이 빠지거나 끊어지지는 않습니다. 그러나 데이터 로컬 맵 (즉, 해당 호스트와 동일한 호스트에서 실행되는 맵)을 의미하므로 알만한 가치가 있습니다. 입력 데이터)는 일부 원격 읽기를 수행합니다. 이로 인한 약간의 오버 헤드는 일반적으로 중요하지 않습니다.

레코드 라인이 두 블록 (b1 및 b2)으로 분할되었다고 가정하십시오. 첫 번째 블록 (b1)을 처리하는 매퍼는 마지막 행에 EOL 구분 기호가없고 다음 데이터 블록 (b2)에서 나머지 행을 가져옵니다.

두 번째 블록 (b2)을 처리하는 맵퍼는 첫 번째 레코드가 불완전하고 블록 (b2)의 두 번째 레코드부터 처리해야한다고 어떻게 판단합니까?


재미있는 질문, 나는 세부 사항에 대한 코드를 보면서 시간을 보냈고 여기 내 생각이 있습니다. 분할은 클라이언트에 의해 처리 InputFormat.getSplits되므로 FileInputFormat을 보면 다음 정보가 제공됩니다.

  • 각 입력 파일에는 파일의 길이, 블록 크기를 얻을하고 분할 사이즈 계산 max(minSize, min(maxSize, blockSize))곳에 maxSize대응하는 mapred.max.split.sizeminSize이다 mapred.min.split.size.
  • FileSplit위에서 계산 된 분할 크기에 따라 파일을 다른 s 나눕니다. 여기서 중요한 것은 각각 FileSplitstart입력 파일의 오프셋에 해당 하는 매개 변수 로 초기화 된다는 입니다 . 그 시점에서는 여전히 라인을 처리하지 않습니다. 코드의 관련 부분은 다음과 같습니다.

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

그 후,에 LineRecordReader의해 정의 된 것을 TextInputFormat보면 그 곳에서 선이 처리됩니다.

  • 초기화 할 때 라인을 읽을 수있는 추상화 인 LineRecordReadera를 인스턴스화하려고 시도합니다 . 2 가지 경우가 있습니다 :LineReaderFSDataInputStream
  • 이 경우 CompressionCodec정의,이 코덱은 경계를 처리 할 책임이있다. 아마도 귀하의 질문과 관련이 없습니다.
  • 가지 흥미있는 곳이다 그러나 어떤 코덱이없는 경우 : 경우에 start당신이의 InputSplit0이 아닌 다른이, 당신은 1 개 문자를 역 추적 한 후하여 식별 발생하는 첫 번째 줄을 건너 \ 또는 \ R \ N (윈도우) N을 ! 선 경계가 분할 경계와 동일한 경우 유효한 선을 건너 뛰지 않기 때문에 역 추적이 중요합니다. 관련 코드는 다음과 같습니다.

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

따라서 분할은 클라이언트에서 계산되므로 매퍼가 순차적으로 실행될 필요가 없으므로 모든 매퍼는 첫 번째 줄을 버릴 것인지 여부를 이미 알고 있습니다.

기본적으로 동일한 파일에 각 100Mb의 두 줄이 있고 분할 크기가 64Mb라고 가정합니다. 그런 다음 입력 분할을 계산할 때 다음 시나리오가 나타납니다.

  • 이 블록의 경로와 호스트를 포함하는 분할 1 시작 200-200 = 0Mb, 길이 64Mb에서 초기화되었습니다.
  • 시작 200-200 + 64 = 64Mb, 길이 64Mb에서 분할 2가 초기화되었습니다.
  • 시작 200-200 + 128 = 128Mb, 길이 64Mb에서 분할 3이 초기화되었습니다.
  • 시작 200-200 + 192 = 192Mb, 길이 8Mb에서 분할 4가 초기화되었습니다.
  • 매퍼 A는 분할 1을 처리하고 시작은 0이므로 첫 번째 줄을 건너 뛰지 말고 64Mb 제한을 초과하는 전체 줄을 읽으면 원격 읽기가 필요합니다.
  • 매퍼 B는 스플릿 2를 처리하고 시작은! = 0이므로 64Mb-1 바이트 이후의 첫 번째 라인을 건너 뜁니다. 64Mb-1 바이트는 100Mb에서 라인 1의 끝 부분에 해당하며 여전히 스플릿 2에 있습니다. 원격으로 나머지 72Mb를 읽습니다.
  • 매퍼 C는 split 3을 처리하고 start는! = 0이므로 128Mb-1byte 이후의 첫 번째 줄을 건너 뜁니다. 128Mb-1byte는 200Mb의 2 줄 끝에 해당합니다. 이는 파일 끝이므로 아무 것도하지 마십시오.
  • Mapper D is the same as mapper C except it looks for a newline after 192Mb-1byte.

Map Reduce algorithm does not work on physical blocks of the file. It works on logical input splits. Input split depends on where the record was written. A record may span two Mappers.

The way HDFS has been set up, it breaks down very large files into large blocks (for example, measuring 128MB), and stores three copies of these blocks on different nodes in the cluster.

HDFS has no awareness of the content of these files. A record may have been started in Block-a but end of that record may be present in Block-b.

To solve this problem, Hadoop uses a logical representation of the data stored in file blocks, known as input splits. When a MapReduce job client calculates the input splits, it figures out where the first whole record in a block begins and where the last record in the block ends.

The key point :

In cases where the last record in a block is incomplete, the input split includes location information for the next block and the byte offset of the data needed to complete the record.

Have a look at below diagram.

enter image description here

Have a look at this article and related SE question : About Hadoop/HDFS file splitting

More details can be read from documentation

The Map-Reduce framework relies on the InputFormat of the job to:

  1. Validate the input-specification of the job.
  2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
  3. Each InputSplit is then assigned to an individual Mapper for processing. Split could be tuple. InputSplit[] getSplits(JobConf job,int numSplits) is the API to take care of these things.

FileInputFormat, which extends InputFormat implemented getSplits() method. Have a look at internals of this method at grepcode


I see it as following: InputFormat is responsible to split data into logical splits taking into account the nature of the data.
Nothing prevents it to do so, although it can add significant latency to the job - all the logic and reading around the desired split size boundaries will happen in the jobtracker.
Simplest record aware input format is TextInputFormat. It is working as following (as far as I understood from code) - input format create splits by size, regardless of the lines, but LineRecordReader always :
a) Skip first line in the split (or part of it), if it is not the first split
b) Read one line after the boundary of the split in the end (if data it is available, so it is not the last split).


From what I've understood, when the FileSplit is initialized for the first block, the default constructor is called. Therefore the values for start and length are zero initially. By the end of processing of the fist block, the if the last line is incomplete, then the value of length will be greater than the length of the split and it'll read the first line of next block as well. Due to this the value of start for the first block will be greater than zero and under this condition, the LineRecordReader will skip the fist line of the second block. (See source)

In case the last line of the first block is complete, then the value of length will be equal to the length of the first block and the value of the start for the second block will be zero. In that case the LineRecordReader will not skip the first line and read the second block form the beginning.

Makes sense?


From hadoop source code of LineRecordReader.java the constructor: I find some comments :

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

from this I believe hadoop will read one extra line for each split(at the end of current split, read next line in next split), and if not first split, the first line will be throw away. so that no line record will be lost and incomplete


The mappers do not have to communicate. The file blocks are in HDFS and can the current mapper(RecordReader) can read the block that has the remaining part of the line. This happens behind the scenes.

참고URL : https://stackoverflow.com/questions/14291170/how-does-hadoop-process-records-split-across-block-boundaries

반응형