Class RecordReaderGenericBase<U,​G,​A,​T>

  • All Implemented Interfaces:
    Closeable, AutoCloseable
    Direct Known Subclasses:
    RecordReaderCsv, RecordReaderCsv, RecordReaderGenericRdfBase, RecordReaderJsonArray

    public abstract class RecordReaderGenericBase<U,​G,​A,​T>
    extends org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
    A generic record reader that uses a callback mechanism to detect a consecutive sequence of records that must start in the current split and which may extend over any number of successor splits. The callback that needs to be implemented is parse(Callable). This method receives a supplier of input streams and must return an RxJava Flowable over such an input stream. Note that in constrast to Java Streams, RxJava Flowables idiomatically support error propagation, cancellation and resource management. The RecordReaderGenericBase framework will attempt to read a configurable number of records from the flowable in order to decide whether a position on the input stream is a valid start of records. If for a probe position the flowable yields an error or zero records then probing continues at a higher offset until the end of data (or the MAX_RECORD_SIZE limit) is reached. This implementation can therefore handle data skews where sometimes large data blocks span multiple splits. E.g. a mix of 1 million graphs of 1 triple and 1 graph of 1 million triples.

    Each split is separated into a head, body and tail region.

    • The tail region always extends beyond the current split's end up to the starting position of the third record in the successor split. The first record of the successor split may actually be a continuation of a record on this split: If you condsider two quads separated by the split boundary such as ":g :s :p :o |splitboundary| :g :x :y :z" then the first record after the boundary still uses the graph :g and thus belongs to the graph record started in the current split.
    • Likewise, the head region always - with one exception - starts at the third record in a split (because as mentioned, the first record may belong to the prior split. Unless it is the first split (identified by an absolute starting position of 0. Then the head region start at the beginning of the split). The length of the head region depends on the number of data was considered for verifying that the starting position is a valid record start.
    • The body region immediately starts after the head region and extends up to the split boundary
    • We first buffer the tail region and then the head region. As a consequence, after buffering the head region the underlying stream is positioned at the start of the body region
    • The effective input stream comprises the buffer of the head region, the 'live' stream of the body region (up to the split boundary) following by the tail region. This is then passed to the RDF parser which for valid input data is then able to parse all records in a single pass without errors or interruptions.
    Author:
    Claus Stadler, Lorenz Buehmann
    • Field Detail

      • EMPTY_BYTE_ARRAY

        public static final byte[] EMPTY_BYTE_ARRAY
      • minRecordLengthKey

        protected final String minRecordLengthKey
      • maxRecordLengthKey

        protected final String maxRecordLengthKey
      • probeRecordCountKey

        protected final String probeRecordCountKey
      • recordStartPattern

        protected Pattern recordStartPattern
        Regex pattern to search for candidate record starts used to avoid having to invoke the actual parser (which may start a new thread) on each single character
      • maxRecordLength

        protected long maxRecordLength
      • minRecordLength

        protected long minRecordLength
      • probeRecordCount

        protected int probeRecordCount
      • currentValue

        protected T currentValue
      • datasetFlow

        protected Iterator<T> datasetFlow
      • decompressor

        protected org.apache.hadoop.io.compress.Decompressor decompressor
      • split

        protected org.apache.hadoop.mapreduce.lib.input.FileSplit split
      • codec

        protected org.apache.hadoop.io.compress.CompressionCodec codec
      • preambleBytes

        protected byte[] preambleBytes
        Subclasses may initialize the pre/post-amble bytes in the initialize(InputSplit, TaskAttemptContext) method rather than the ctor! A (possibly empty) sequence of bytes to prepended to any stream passed to the parser. For example, for RDF data this could be a set of prefix declarations.
      • postambleBytes

        protected byte[] postambleBytes
      • rawStream

        protected org.apache.hadoop.fs.FSDataInputStream rawStream
      • isEncoded

        protected boolean isEncoded
      • splitStart

        protected long splitStart
      • splitLength

        protected long splitLength
      • splitEnd

        protected long splitEnd
      • isFirstSplit

        protected boolean isFirstSplit
      • raisedThrowable

        protected Throwable raisedThrowable
    • Constructor Detail

      • RecordReaderGenericBase

        public RecordReaderGenericBase​(String minRecordLengthKey,
                                       String maxRecordLengthKey,
                                       String probeRecordCountKey,
                                       Pattern recordStartPattern,
                                       Accumulating<U,​G,​A,​T> accumulating)
    • Method Detail

      • initialize

        public void initialize​(org.apache.hadoop.mapreduce.InputSplit inputSplit,
                               org.apache.hadoop.mapreduce.TaskAttemptContext context)
                        throws IOException
        Read out config paramaters (prefixes, length thresholds, ...) and examine the codec in order to set an internal flag whether the stream will be encoded or not.
        Specified by:
        initialize in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
        Throws:
        IOException
      • parse

        protected abstract io.reactivex.rxjava3.core.Flowable<U> parse​(Callable<InputStream> inputStreamSupplier)
        Create a flowable from the input stream. The input stream may be incorrectly positioned in which case the Flowable is expected to indicate this by raising an error event.
        Parameters:
        inputStreamSupplier - A supplier of input streams. May supply the same underlying stream on each call hence only at most a single stream should be taken from the supplier. Supplied streams are safe to use in try-with-resources blocks (possibly using CloseShieldInputStream). Taken streams should be closed by the client code.
        Returns:
      • setStreamToInterval

        public Map.Entry<Long,​Long> setStreamToInterval​(long start,
                                                              long end)
                                                       throws IOException
        Seek to a given offset and prepare to read up to the 'end' position (exclusive) For non-encoded streams this is just performs a seek on th stream and returns start/end unchanged. Encoded streams will adjust the seek to the data part that follows some header information
        Parameters:
        start -
        end -
        Returns:
        Throws:
        IOException
      • logClose

        public static void logClose​(String str)
      • logUnexpectedClose

        public static void logUnexpectedClose​(String str)
      • didHitSplitBound

        public boolean didHitSplitBound​(org.apache.hadoop.fs.Seekable seekable,
                                        long splitPos)
      • aggregate

        protected io.reactivex.rxjava3.core.Flowable<T> aggregate​(boolean isFirstSplit,
                                                                  io.reactivex.rxjava3.core.Flowable<U> splitFlow,
                                                                  List<U> tailItems)
        Modify a flow to perform aggregation of items into records according to specification The complex part here is to correctly combine the two flows: - The first group of the splitAggregateFlow needs to be skipped as this in handled by the previous split's processor - If there are no further groups in splitFlow then no items are emitted at all (because all items belong to s previous split) - ONLY if the splitFlow owned at least one group: The first group in the tailFlow needs to be emitted
        Parameters:
        isFirstSplit - If true then the first record is included in the output; otherwise it is skipped
        splitFlow - The flow of items obtained from the split
        tailItems - The first set of group items after in the next split
        Returns:
      • effectiveInputStreamSupp

        protected InputStream effectiveInputStreamSupp​(org.aksw.commons.io.seekable.api.Seekable seekable)
      • getPos

        public static long getPos​(org.aksw.commons.io.seekable.api.Seekable seekable)
      • parseFromSeekable

        protected io.reactivex.rxjava3.core.Flowable<U> parseFromSeekable​(org.aksw.commons.io.seekable.api.Seekable seekable)
      • prober

        protected boolean prober​(org.aksw.commons.io.seekable.api.Seekable seekable)
      • createRecordFlow

        protected io.reactivex.rxjava3.core.Flowable<T> createRecordFlow()
                                                                  throws IOException
        Throws:
        IOException
      • lines

        public static Stream<String> lines​(org.aksw.commons.io.seekable.api.Seekable seekable)
      • findNextRecord

        public static long findNextRecord​(Pattern recordSearchPattern,
                                          org.aksw.commons.io.seekable.api.Seekable nav,
                                          long splitStart,
                                          long absProbeRegionStart,
                                          long maxRecordLength,
                                          long absDataRegionEnd,
                                          Predicate<Long> posValidator,
                                          Predicate<org.aksw.commons.io.seekable.api.Seekable> prober)
                                   throws IOException
        Parameters:
        nav -
        splitStart -
        absProbeRegionStart -
        maxRecordLength -
        absDataRegionEnd -
        posValidator -
        prober -
        Returns:
        Throws:
        IOException
      • findFirstPositionWithProbeSuccess

        public static long findFirstPositionWithProbeSuccess​(org.aksw.commons.io.seekable.api.Seekable rawSeekable,
                                                             Predicate<Long> posValidator,
                                                             Matcher m,
                                                             boolean isFwd,
                                                             Predicate<org.aksw.commons.io.seekable.api.Seekable> prober)
                                                      throws IOException
        Uses the matcher to find candidate probing positions, and returns the first positoin where probing succeeds. Matching ranges are part of the matcher configuration
        Parameters:
        rawSeekable -
        posValidator - Test whether the seekable's absolute position is a valid start point. Used to prevent testing start points past a split boundary with unknown split lengths.
        m -
        isFwd -
        prober -
        Returns:
        Throws:
        IOException
      • nextKeyValue

        public boolean nextKeyValue()
                             throws IOException
        Specified by:
        nextKeyValue in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
        Throws:
        IOException
      • getCurrentKey

        public org.apache.hadoop.io.LongWritable getCurrentKey()
        Specified by:
        getCurrentKey in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
      • getCurrentValue

        public T getCurrentValue()
        Specified by:
        getCurrentValue in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
      • getProgress

        public float getProgress()
                          throws IOException
        Specified by:
        getProgress in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,​T>
        Throws:
        IOException