Package net.sansa_stack.hadoop.core
Class RecordReaderGenericBase<U,G,A,T>
- java.lang.Object
-
- org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
-
- net.sansa_stack.hadoop.core.RecordReaderGenericBase<U,G,A,T>
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
- Direct Known Subclasses:
RecordReaderCsv
,RecordReaderCsv
,RecordReaderGenericRdfBase
,RecordReaderJsonArray
public abstract class RecordReaderGenericBase<U,G,A,T> extends org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
A generic record reader that uses a callback mechanism to detect a consecutive sequence of records that must start in the current split and which may extend over any number of successor splits. The callback that needs to be implemented isparse(Callable)
. This method receives a supplier of input streams and must return an RxJavaFlowable
over such an input stream. Note that in constrast to Java Streams, RxJava Flowables idiomatically support error propagation, cancellation and resource management. The RecordReaderGenericBase framework will attempt to read a configurable number of records from the flowable in order to decide whether a position on the input stream is a valid start of records. If for a probe position the flowable yields an error or zero records then probing continues at a higher offset until the end of data (or the MAX_RECORD_SIZE limit) is reached. This implementation can therefore handle data skews where sometimes large data blocks span multiple splits. E.g. a mix of 1 million graphs of 1 triple and 1 graph of 1 million triples.Each split is separated into a head, body and tail region.
- The tail region always extends beyond the current split's end up to the starting position of the third record in the successor split. The first record of the successor split may actually be a continuation of a record on this split: If you condsider two quads separated by the split boundary such as ":g :s :p :o |splitboundary| :g :x :y :z" then the first record after the boundary still uses the graph :g and thus belongs to the graph record started in the current split.
- Likewise, the head region always - with one exception - starts at the third record in a split (because as mentioned, the first record may belong to the prior split. Unless it is the first split (identified by an absolute starting position of 0. Then the head region start at the beginning of the split). The length of the head region depends on the number of data was considered for verifying that the starting position is a valid record start.
- The body region immediately starts after the head region and extends up to the split boundary
- We first buffer the tail region and then the head region. As a consequence, after buffering the head region the underlying stream is positioned at the start of the body region
- The effective input stream comprises the buffer of the head region, the 'live' stream of the body region (up to the split boundary) following by the tail region. This is then passed to the RDF parser which for valid input data is then able to parse all records in a single pass without errors or interruptions.
- Author:
- Claus Stadler, Lorenz Buehmann
-
-
Field Summary
Fields Modifier and Type Field Description protected Accumulating<U,G,A,T>
accumulating
protected org.apache.hadoop.io.compress.CompressionCodec
codec
protected AtomicLong
currentKey
protected T
currentValue
protected Iterator<T>
datasetFlow
protected org.apache.hadoop.io.compress.Decompressor
decompressor
static byte[]
EMPTY_BYTE_ARRAY
protected boolean
isEncoded
protected boolean
isFirstSplit
protected long
maxRecordLength
protected String
maxRecordLengthKey
protected long
minRecordLength
protected String
minRecordLengthKey
protected byte[]
postambleBytes
protected byte[]
preambleBytes
Subclasses may initialize the pre/post-amble bytes in theinitialize(InputSplit, TaskAttemptContext)
method rather than the ctor! A (possibly empty) sequence of bytes to prepended to any stream passed to the parser.protected int
probeRecordCount
protected String
probeRecordCountKey
protected Throwable
raisedThrowable
protected org.apache.hadoop.fs.FSDataInputStream
rawStream
protected Pattern
recordStartPattern
Regex pattern to search for candidate record starts used to avoid having to invoke the actual parser (which may start a new thread) on each single characterprotected org.apache.hadoop.mapreduce.lib.input.FileSplit
split
protected long
splitEnd
protected long
splitLength
protected long
splitStart
protected SeekableInputStream
stream
-
Constructor Summary
Constructors Constructor Description RecordReaderGenericBase(String minRecordLengthKey, String maxRecordLengthKey, String probeRecordCountKey, Pattern recordStartPattern, Accumulating<U,G,A,T> accumulating)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected io.reactivex.rxjava3.core.Flowable<T>
aggregate(boolean isFirstSplit, io.reactivex.rxjava3.core.Flowable<U> splitFlow, List<U> tailItems)
Modify a flow to perform aggregation of items into records according to specification The complex part here is to correctly combine the two flows: - The first group of the splitAggregateFlow needs to be skipped as this in handled by the previous split's processor - If there are no further groups in splitFlow then no items are emitted at all (because all items belong to s previous split) - ONLY if the splitFlow owned at least one group: The first group in the tailFlow needs to be emittedvoid
close()
protected io.reactivex.rxjava3.core.Flowable<T>
createRecordFlow()
boolean
didHitSplitBound(org.apache.hadoop.fs.Seekable seekable, long splitPos)
protected InputStream
effectiveInputStream(InputStream base)
protected InputStream
effectiveInputStreamSupp(org.aksw.commons.io.seekable.api.Seekable seekable)
static long
findFirstPositionWithProbeSuccess(org.aksw.commons.io.seekable.api.Seekable rawSeekable, Predicate<Long> posValidator, Matcher m, boolean isFwd, Predicate<org.aksw.commons.io.seekable.api.Seekable> prober)
Uses the matcher to find candidate probing positions, and returns the first positoin where probing succeeds.static long
findNextRecord(Pattern recordSearchPattern, org.aksw.commons.io.seekable.api.Seekable nav, long splitStart, long absProbeRegionStart, long maxRecordLength, long absDataRegionEnd, Predicate<Long> posValidator, Predicate<org.aksw.commons.io.seekable.api.Seekable> prober)
org.apache.hadoop.io.LongWritable
getCurrentKey()
T
getCurrentValue()
static long
getPos(org.aksw.commons.io.seekable.api.Seekable seekable)
float
getProgress()
void
initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext context)
Read out config paramaters (prefixes, length thresholds, ...) and examine the codec in order to set an internal flag whether the stream will be encoded or not.void
initRecordFlow()
static Stream<String>
lines(org.aksw.commons.io.seekable.api.Seekable seekable)
static void
logClose(String str)
static void
logUnexpectedClose(String str)
boolean
nextKeyValue()
protected abstract io.reactivex.rxjava3.core.Flowable<U>
parse(Callable<InputStream> inputStreamSupplier)
Create a flowable from the input stream.protected io.reactivex.rxjava3.core.Flowable<U>
parseFromSeekable(org.aksw.commons.io.seekable.api.Seekable seekable)
protected boolean
prober(org.aksw.commons.io.seekable.api.Seekable seekable)
Map.Entry<Long,Long>
setStreamToInterval(long start, long end)
Seek to a given offset and prepare to read up to the 'end' position (exclusive) For non-encoded streams this is just performs a seek on th stream and returns start/end unchanged.
-
-
-
Field Detail
-
EMPTY_BYTE_ARRAY
public static final byte[] EMPTY_BYTE_ARRAY
-
minRecordLengthKey
protected final String minRecordLengthKey
-
maxRecordLengthKey
protected final String maxRecordLengthKey
-
probeRecordCountKey
protected final String probeRecordCountKey
-
recordStartPattern
protected Pattern recordStartPattern
Regex pattern to search for candidate record starts used to avoid having to invoke the actual parser (which may start a new thread) on each single character
-
accumulating
protected final Accumulating<U,G,A,T> accumulating
-
maxRecordLength
protected long maxRecordLength
-
minRecordLength
protected long minRecordLength
-
probeRecordCount
protected int probeRecordCount
-
currentKey
protected AtomicLong currentKey
-
currentValue
protected T currentValue
-
decompressor
protected org.apache.hadoop.io.compress.Decompressor decompressor
-
split
protected org.apache.hadoop.mapreduce.lib.input.FileSplit split
-
codec
protected org.apache.hadoop.io.compress.CompressionCodec codec
-
preambleBytes
protected byte[] preambleBytes
Subclasses may initialize the pre/post-amble bytes in theinitialize(InputSplit, TaskAttemptContext)
method rather than the ctor! A (possibly empty) sequence of bytes to prepended to any stream passed to the parser. For example, for RDF data this could be a set of prefix declarations.
-
postambleBytes
protected byte[] postambleBytes
-
rawStream
protected org.apache.hadoop.fs.FSDataInputStream rawStream
-
stream
protected SeekableInputStream stream
-
isEncoded
protected boolean isEncoded
-
splitStart
protected long splitStart
-
splitLength
protected long splitLength
-
splitEnd
protected long splitEnd
-
isFirstSplit
protected boolean isFirstSplit
-
raisedThrowable
protected Throwable raisedThrowable
-
-
Method Detail
-
initialize
public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException
Read out config paramaters (prefixes, length thresholds, ...) and examine the codec in order to set an internal flag whether the stream will be encoded or not.- Specified by:
initialize
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
- Throws:
IOException
-
initRecordFlow
public void initRecordFlow() throws IOException
- Throws:
IOException
-
parse
protected abstract io.reactivex.rxjava3.core.Flowable<U> parse(Callable<InputStream> inputStreamSupplier)
Create a flowable from the input stream. The input stream may be incorrectly positioned in which case the Flowable is expected to indicate this by raising an error event.- Parameters:
inputStreamSupplier
- A supplier of input streams. May supply the same underlying stream on each call hence only at most a single stream should be taken from the supplier. Supplied streams are safe to use in try-with-resources blocks (possibly using CloseShieldInputStream). Taken streams should be closed by the client code.- Returns:
-
setStreamToInterval
public Map.Entry<Long,Long> setStreamToInterval(long start, long end) throws IOException
Seek to a given offset and prepare to read up to the 'end' position (exclusive) For non-encoded streams this is just performs a seek on th stream and returns start/end unchanged. Encoded streams will adjust the seek to the data part that follows some header information- Parameters:
start
-end
-- Returns:
- Throws:
IOException
-
logClose
public static void logClose(String str)
-
logUnexpectedClose
public static void logUnexpectedClose(String str)
-
didHitSplitBound
public boolean didHitSplitBound(org.apache.hadoop.fs.Seekable seekable, long splitPos)
-
aggregate
protected io.reactivex.rxjava3.core.Flowable<T> aggregate(boolean isFirstSplit, io.reactivex.rxjava3.core.Flowable<U> splitFlow, List<U> tailItems)
Modify a flow to perform aggregation of items into records according to specification The complex part here is to correctly combine the two flows: - The first group of the splitAggregateFlow needs to be skipped as this in handled by the previous split's processor - If there are no further groups in splitFlow then no items are emitted at all (because all items belong to s previous split) - ONLY if the splitFlow owned at least one group: The first group in the tailFlow needs to be emitted- Parameters:
isFirstSplit
- If true then the first record is included in the output; otherwise it is skippedsplitFlow
- The flow of items obtained from the splittailItems
- The first set of group items after in the next split- Returns:
-
effectiveInputStream
protected InputStream effectiveInputStream(InputStream base)
-
effectiveInputStreamSupp
protected InputStream effectiveInputStreamSupp(org.aksw.commons.io.seekable.api.Seekable seekable)
-
getPos
public static long getPos(org.aksw.commons.io.seekable.api.Seekable seekable)
-
parseFromSeekable
protected io.reactivex.rxjava3.core.Flowable<U> parseFromSeekable(org.aksw.commons.io.seekable.api.Seekable seekable)
-
prober
protected boolean prober(org.aksw.commons.io.seekable.api.Seekable seekable)
-
createRecordFlow
protected io.reactivex.rxjava3.core.Flowable<T> createRecordFlow() throws IOException
- Throws:
IOException
-
findNextRecord
public static long findNextRecord(Pattern recordSearchPattern, org.aksw.commons.io.seekable.api.Seekable nav, long splitStart, long absProbeRegionStart, long maxRecordLength, long absDataRegionEnd, Predicate<Long> posValidator, Predicate<org.aksw.commons.io.seekable.api.Seekable> prober) throws IOException
- Parameters:
nav
-splitStart
-absProbeRegionStart
-maxRecordLength
-absDataRegionEnd
-posValidator
-prober
-- Returns:
- Throws:
IOException
-
findFirstPositionWithProbeSuccess
public static long findFirstPositionWithProbeSuccess(org.aksw.commons.io.seekable.api.Seekable rawSeekable, Predicate<Long> posValidator, Matcher m, boolean isFwd, Predicate<org.aksw.commons.io.seekable.api.Seekable> prober) throws IOException
Uses the matcher to find candidate probing positions, and returns the first positoin where probing succeeds. Matching ranges are part of the matcher configuration- Parameters:
rawSeekable
-posValidator
- Test whether the seekable's absolute position is a valid start point. Used to prevent testing start points past a split boundary with unknown split lengths.m
-isFwd
-prober
-- Returns:
- Throws:
IOException
-
nextKeyValue
public boolean nextKeyValue() throws IOException
- Specified by:
nextKeyValue
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
- Throws:
IOException
-
getCurrentKey
public org.apache.hadoop.io.LongWritable getCurrentKey()
- Specified by:
getCurrentKey
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
-
getCurrentValue
public T getCurrentValue()
- Specified by:
getCurrentValue
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
-
getProgress
public float getProgress() throws IOException
- Specified by:
getProgress
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
- Throws:
IOException
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Specified by:
close
in classorg.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
- Throws:
IOException
-
-