public abstract class RecordReaderGenericBase<U,G,A,T>
extends org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
parse(Callable)
.
This method receives a supplier of input streams and must return an RxJava Flowable
over such an
input stream.
Note that in constrast to Java Streams, RxJava Flowables idiomatically
support error propagation, cancellation and resource management.
The RecordReaderGenericBase framework will attempt to read a configurable number of records from the
flowable in order to decide whether a position on the input stream is a valid start of records.
If for a probe position the flowable yields an error or zero records then probing continues at a higher
offset until the end of data (or the MAX_RECORD_SIZE limit) is reached.
This implementation can therefore handle data skews where
sometimes large data blocks span multiple splits. E.g. a mix of 1 million graphs of 1 triple
and 1 graph of 1 million triples.
Each split is separated into a head, body and tail region.
Modifier and Type | Field and Description |
---|---|
protected Accumulating<U,G,A,T> |
accumulating |
protected String |
baseIri |
protected String |
baseIriKey |
protected org.apache.hadoop.io.compress.CompressionCodec |
codec |
protected AtomicLong |
currentKey |
protected T |
currentValue |
protected Iterator<T> |
datasetFlow |
protected org.apache.hadoop.io.compress.Decompressor |
decompressor |
protected String |
headerBytesKey |
protected boolean |
isEncoded |
protected long |
maxRecordLength |
protected String |
maxRecordLengthKey |
protected long |
minRecordLength |
protected String |
minRecordLengthKey |
protected byte[] |
prefixBytes |
protected int |
probeRecordCount |
protected String |
probeRecordCountKey |
protected org.apache.hadoop.fs.FSDataInputStream |
rawStream |
protected Pattern |
recordStartPattern
Regex pattern to search for candidate record starts
used to avoid having to invoke the actual parser (which may start a new thread)
on each single character
|
protected org.apache.hadoop.mapreduce.lib.input.FileSplit |
split |
protected long |
splitEnd |
protected long |
splitLength |
protected long |
splitStart |
protected SeekableInputStream |
stream |
Constructor and Description |
---|
RecordReaderGenericBase(String minRecordLengthKey,
String maxRecordLengthKey,
String probeRecordCountKey,
Pattern recordStartPattern,
String baseIriKey,
String headerBytesKey,
Accumulating<U,G,A,T> accumulating) |
Modifier and Type | Method and Description |
---|---|
protected io.reactivex.rxjava3.core.Flowable<T> |
aggregate(boolean isFirstSplit,
io.reactivex.rxjava3.core.Flowable<U> splitFlow,
List<U> tailItems)
Modify a flow to perform aggregation of items
into records according to specification
The complex part here is to correctly combine the two flows:
- The first group of the splitAggregateFlow needs to be skipped as this in handled by the previous split's processor
- If there are no further groups in splitFlow then no items are emitted at all (because all items belong to s previous split)
- ONLY if the splitFlow owned at least one group: The first group in the tailFlow needs to be emitted
|
void |
close() |
protected io.reactivex.rxjava3.core.Flowable<T> |
createRecordFlow() |
static long |
findFirstPositionWithProbeSuccess(org.aksw.jena_sparql_api.io.binseach.Seekable rawSeekable,
Predicate<Long> posValidator,
Matcher m,
boolean isFwd,
Predicate<org.aksw.jena_sparql_api.io.binseach.Seekable> prober)
Uses the matcher to find candidate probing positions, and returns the first positoin
where probing succeeds.
|
static long |
findNextRecord(Pattern recordSearchPattern,
org.aksw.jena_sparql_api.io.binseach.Seekable nav,
long splitStart,
long absProbeRegionStart,
long maxRecordLength,
long absDataRegionEnd,
Predicate<Long> posValidator,
Predicate<org.aksw.jena_sparql_api.io.binseach.Seekable> prober) |
org.apache.hadoop.io.LongWritable |
getCurrentKey() |
T |
getCurrentValue() |
float |
getProgress() |
void |
initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext context)
Read out config paramaters (prefixes, length thresholds, ...) and
examine the codec in order to set an internal
flag whether the stream will be encoded or not.
|
void |
initRecordFlow() |
static void |
logClose(String str) |
static void |
logUnexpectedClose(String str) |
boolean |
nextKeyValue() |
protected abstract io.reactivex.rxjava3.core.Flowable<U> |
parse(Callable<InputStream> inputStreamSupplier)
Create a flowable from the input stream.
|
Map.Entry<Long,Long> |
setStreamToInterval(long start,
long end)
Seek to a given offset and prepare to read up to the 'end' position (exclusive)
For non-encoded streams this is just performs a seek on th stream and returns
start/end unchanged.
|
protected final String minRecordLengthKey
protected final String maxRecordLengthKey
protected final String probeRecordCountKey
protected final String headerBytesKey
protected final String baseIriKey
protected final Pattern recordStartPattern
protected final Accumulating<U,G,A,T> accumulating
protected String baseIri
protected long maxRecordLength
protected long minRecordLength
protected int probeRecordCount
protected AtomicLong currentKey
protected T currentValue
protected org.apache.hadoop.io.compress.Decompressor decompressor
protected org.apache.hadoop.mapreduce.lib.input.FileSplit split
protected org.apache.hadoop.io.compress.CompressionCodec codec
protected byte[] prefixBytes
protected org.apache.hadoop.fs.FSDataInputStream rawStream
protected SeekableInputStream stream
protected boolean isEncoded
protected long splitStart
protected long splitLength
protected long splitEnd
public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException
initialize
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
IOException
public void initRecordFlow() throws IOException
IOException
protected abstract io.reactivex.rxjava3.core.Flowable<U> parse(Callable<InputStream> inputStreamSupplier)
inputStreamSupplier
- A supplier of input streams. May supply the same underlying stream
on each call hence only at most a single stream should be taken from the supplier.
Supplied streams are safe to use in try-with-resources blocks (possibly using CloseShieldInputStream).
Taken streams should be closed by the client code.public Map.Entry<Long,Long> setStreamToInterval(long start, long end) throws IOException
start
- end
- IOException
public static void logClose(String str)
public static void logUnexpectedClose(String str)
protected io.reactivex.rxjava3.core.Flowable<T> aggregate(boolean isFirstSplit, io.reactivex.rxjava3.core.Flowable<U> splitFlow, List<U> tailItems)
isFirstSplit
- If true then the first record is included in the output; otherwise it is skippedsplitFlow
- The flow of items obtained from the splittailItems
- The first set of group items after in the next splitprotected io.reactivex.rxjava3.core.Flowable<T> createRecordFlow() throws IOException
IOException
public static long findNextRecord(Pattern recordSearchPattern, org.aksw.jena_sparql_api.io.binseach.Seekable nav, long splitStart, long absProbeRegionStart, long maxRecordLength, long absDataRegionEnd, Predicate<Long> posValidator, Predicate<org.aksw.jena_sparql_api.io.binseach.Seekable> prober) throws IOException
nav
- splitStart
- absProbeRegionStart
- maxRecordLength
- absDataRegionEnd
- posValidator
- prober
- IOException
public static long findFirstPositionWithProbeSuccess(org.aksw.jena_sparql_api.io.binseach.Seekable rawSeekable, Predicate<Long> posValidator, Matcher m, boolean isFwd, Predicate<org.aksw.jena_sparql_api.io.binseach.Seekable> prober) throws IOException
rawSeekable
- posValidator
- Test whether the seekable's absolute position is a valid start point.
Used to prevent testing start points past a split boundary with unknown split lengths.m
- isFwd
- prober
- IOException
public boolean nextKeyValue() throws IOException
nextKeyValue
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
IOException
public org.apache.hadoop.io.LongWritable getCurrentKey()
getCurrentKey
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
public T getCurrentValue()
getCurrentValue
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
public float getProgress() throws IOException
getProgress
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
IOException
public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
close
in class org.apache.hadoop.mapreduce.RecordReader<org.apache.hadoop.io.LongWritable,T>
IOException
Copyright © 2016–2021 Smart Data Analytics (SDA) Research Group. All rights reserved.