/*
 * Decompiled with CFR 0.152.
 */
package net.sansa_stack.hadoop.core;

import com.google.common.primitives.Ints;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.io.SequenceInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.core.Accumulating;
import net.sansa_stack.hadoop.core.MatcherFactory;
import net.sansa_stack.hadoop.core.OffsetSeekResult;
import net.sansa_stack.hadoop.core.ProbeResult;
import net.sansa_stack.hadoop.core.ProbeStats;
import net.sansa_stack.hadoop.core.Prober;
import net.sansa_stack.hadoop.core.SeekableSourceOverSplit;
import net.sansa_stack.hadoop.core.Stats2;
import net.sansa_stack.hadoop.core.TailBufferChannel;
import net.sansa_stack.hadoop.core.pattern.CustomMatcher;
import net.sansa_stack.hadoop.core.pattern.CustomPattern;
import net.sansa_stack.hadoop.format.jena.base.RecordReaderConf;
import net.sansa_stack.hadoop.util.InputStreamWithCloseLogging;
import net.sansa_stack.hadoop.util.SeekableByteChannelFromSeekableInputStream;
import net.sansa_stack.io.util.InputStreamWithCloseIgnore;
import net.sansa_stack.io.util.InputStreamWithZeroOffsetRead;
import net.sansa_stack.nio.util.InterruptingSeekableByteChannel;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.buffer.array.ArrayOpsObject;
import org.aksw.commons.io.buffer.array.BufferOverReadableChannel;
import org.aksw.commons.io.hadoop.SeekableInputStream;
import org.aksw.commons.io.input.CharSequenceDecorator;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelOverIterator;
import org.aksw.commons.io.input.ReadableChannelSwitchable;
import org.aksw.commons.io.input.ReadableChannelWithValue;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannels;
import org.aksw.commons.io.seekable.api.Seekable;
import org.aksw.commons.util.stream.CollapseRunsSpec;
import org.aksw.commons.util.stream.StreamOperatorCollapseRuns;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFFormat;
import org.apache.jena.sys.JenaSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RecordReaderGenericBase<U, G, A, T>
extends RecordReader<LongWritable, T> {
    private static final Logger logger;
    public static final byte[] EMPTY_BYTE_ARRAY;
    protected final String minRecordLengthKey;
    protected final String maxRecordLengthKey;
    protected final String probeRecordCountKey;
    protected final String probeElementCountKey;
    protected CustomPattern recordStartPattern;
    protected long maxRecordLength;
    protected long minRecordLength;
    protected int probeRecordCount;
    protected int probeElementCount;
    protected boolean enableStats = true;
    protected final Accumulating<U, G, A, T> accumulating;
    protected AtomicLong currentKey = new AtomicLong();
    protected T currentValue;
    protected Runnable recordFlowCloseable;
    protected Iterator<T> datasetFlow;
    protected Decompressor decompressor;
    protected FileSplit split;
    protected CompressionCodec codec;
    protected byte[] preambleBytes = EMPTY_BYTE_ARRAY;
    protected byte[] postambleBytes = EMPTY_BYTE_ARRAY;
    protected FSDataInputStream rawStream;
    protected SeekableInputStream stream;
    protected boolean isEncoded = false;
    protected long splitStart = -1L;
    protected long splitLength = -1L;
    protected long splitEnd = -1L;
    protected boolean isFirstSplit = false;
    protected String splitName;
    protected String splitId;
    protected long totalEltCount = 0L;
    protected long totalRecordCount = 0L;
    SeekableSourceOverSplit source;
    LongPredicate posValidator;
    LongPredicate readPosValidator;
    Function<Long, Long> posToSplitId;
    ProbeResult headProbe = null;
    long knownDecodedDataLength = this.isEncoded ? -1L : this.splitLength;
    protected int skipRecordCount = 2;
    protected long maxExtraByteCount = Long.MAX_VALUE;
    protected ProbeResult tailRecordOffset = null;
    protected long tailBytes = -1L;
    protected BufferOverReadableChannel<U[]> tailEltBuffer;
    protected BufferOverReadableChannel<byte[]> tailByteBuffer;
    protected List<U> tailElts = Collections.emptyList();
    protected long tailEltsTime;
    protected Boolean regionStartSearchReadOverSplitEnd = null;
    protected Boolean regionStartSearchReadOverRegionEnd = null;

    public RecordReaderGenericBase(RecordReaderConf conf, Accumulating<U, G, A, T> accumulating) {
        this.probeElementCountKey = conf.getProbeElementCountKey();
        this.minRecordLengthKey = conf.getMinRecordLengthKey();
        this.maxRecordLengthKey = conf.getMaxRecordLengthKey();
        this.probeRecordCountKey = conf.getProbeRecordCountKey();
        this.recordStartPattern = conf.getRecordSearchPattern();
        this.accumulating = accumulating;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
        Configuration job = context.getConfiguration();
        this.probeElementCount = this.probeElementCountKey == null ? Integer.MAX_VALUE : job.getInt(this.probeElementCountKey, Integer.MAX_VALUE);
        this.minRecordLength = job.getInt(this.minRecordLengthKey, 1);
        this.maxRecordLength = job.getInt(this.maxRecordLengthKey, 0xA00000);
        this.probeRecordCount = job.getInt(this.probeRecordCountKey, 100);
        this.split = (FileSplit)inputSplit;
        Path path = this.split.getPath();
        this.rawStream = path.getFileSystem(context.getConfiguration()).open(path);
        this.isEncoded = false;
        this.splitStart = this.split.getStart();
        this.splitLength = this.split.getLength();
        this.splitEnd = this.splitStart + this.splitLength;
        this.isFirstSplit = this.splitStart == 0L;
        this.splitName = this.split.getPath().getName();
        this.splitId = this.splitName + ":" + this.splitStart + "+" + this.splitLength;
        Path file = this.split.getPath();
        CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(job);
        this.codec = compressionCodecFactory.getCodec(file);
        if (null != this.codec) {
            if (this.codec instanceof SplittableCompressionCodec) {
                this.isEncoded = true;
            } else {
                throw new RuntimeException("Don't know how to handle codec: " + String.valueOf(this.codec));
            }
        }
    }

    public void initRecordFlow() throws IOException {
        Stream<T> tmp = this.createRecordFlow();
        this.recordFlowCloseable = tmp::close;
        this.datasetFlow = tmp.iterator();
    }

    protected abstract Stream<U> parse(InputStream var1, boolean var2);

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public Map.Entry<Long, Long> setStreamToInterval(long start, long end) throws IOException {
        if (null != this.codec) {
            if (this.decompressor != null) {
                // empty if block
            }
            this.decompressor = this.codec.createDecompressor();
            if (!(this.codec instanceof SplittableCompressionCodec)) throw new RuntimeException("Don't know how to handle codec: " + String.valueOf(this.codec));
            SplittableCompressionCodec scc = (SplittableCompressionCodec)this.codec;
            SplitCompressionInputStream tmp = scc.createInputStream((InputStream)this.rawStream, this.decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK);
            long adjustedStart = tmp.getAdjustedStart();
            long adjustedEnd = tmp.getAdjustedEnd();
            this.stream = new SeekableInputStream((InputStream)new InputStreamWithCloseIgnore(InputStreamWithCloseLogging.wrap((InputStream)new InputStreamWithZeroOffsetRead((InputStream)tmp), ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose)), (org.apache.hadoop.fs.Seekable)tmp);
            return new AbstractMap.SimpleEntry<Long, Long>(adjustedStart, adjustedEnd);
        }
        this.rawStream.seek(start);
        this.stream = new SeekableInputStream((InputStream)new InputStreamWithCloseIgnore(Channels.newInputStream((ReadableByteChannel)((Object)new InterruptingSeekableByteChannel(new SeekableByteChannelFromSeekableInputStream(InputStreamWithCloseLogging.wrap((InputStream)this.rawStream, ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose), (org.apache.hadoop.fs.Seekable)this.rawStream), end)))), (org.apache.hadoop.fs.Seekable)this.rawStream);
        return new AbstractMap.SimpleEntry<Long, Long>(start, end);
    }

    public static void logClose(String str) {
    }

    public static void logUnexpectedClose(String str) {
        logger.error(str);
        throw new RuntimeException("Unexpected close");
    }

    public boolean didHitSplitBound(org.apache.hadoop.fs.Seekable seekable, long splitPos) {
        int maxExpectedExcess;
        boolean virtualEofReached;
        long rawPos;
        try {
            rawPos = seekable.getPos();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        long exceededSplitPos = rawPos - splitPos;
        boolean bl = virtualEofReached = exceededSplitPos >= 0L;
        if (virtualEofReached && exceededSplitPos > (long)(maxExpectedExcess = 0)) {
            logger.warn(String.format("Split %s: Exceeded split pos by %d bytes", this.splitId, exceededSplitPos));
        }
        return virtualEofReached;
    }

    protected Stream<T> aggregate(boolean isFirstSplit, Stream<U> splitFlow, Stream<U> tailElts) {
        CollapseRunsSpec spec = CollapseRunsSpec.create(this.accumulating::classify, (accNum, groupKey) -> !isFirstSplit && accNum == 0L ? null : this.accumulating.createAccumulator(groupKey), this.accumulating::accumulate);
        Stream<Object> result = StreamOperatorCollapseRuns.create((CollapseRunsSpec)spec).transform(Stream.concat(splitFlow, tailElts)).map(e -> e.getValue() == null ? null : this.accumulating.accumulatedValue(e.getValue()));
        if (!isFirstSplit) {
            result = result.skip(1L);
        }
        return result;
    }

    public static <T, G, A, U> Stream<U> aggregate(Stream<T> eltStream, Accumulating<T, G, A, U> accumulating) {
        CollapseRunsSpec spec = CollapseRunsSpec.create(accumulating::classify, (accNum, groupKey) -> accNum == 0L ? null : accumulating.createAccumulator(groupKey), accumulating::accumulate);
        Stream<Object> result = StreamOperatorCollapseRuns.create((CollapseRunsSpec)spec).transform(eltStream).map(e -> e.getValue() == null ? null : accumulating.accumulatedValue(e.getValue()));
        return result;
    }

    protected InputStream effectiveInputStream(InputStream base) {
        return base;
    }

    protected InputStream effectiveInputStreamSupp(ReadableChannel<byte[]> seekable) {
        ReadableChannel<byte[]> newChannel = seekable;
        InputStream core = this.effectiveInputStream(ReadableChannels.newInputStream(newChannel));
        InputStream r = this.preambleBytes.length == 0 ? core : new SequenceInputStream(new ByteArrayInputStream(this.preambleBytes), core);
        return r;
    }

    public static long getPos(Seekable seekable) {
        long result;
        try {
            result = seekable.getPos();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

    protected Stream<U> parseFromSeekable(ReadableChannel<byte[]> seekable, boolean isProbe) {
        InputStream in = this.effectiveInputStreamSupp(seekable);
        Stream<U> r = this.parse(in, isProbe);
        return r;
    }

    protected OffsetSeekResult prober(SeekableReadableChannel<byte[]> seekable, BufferOverReadableChannel<U[]> resultBuffer) {
        long endOffset;
        long recordCount;
        ArrayOpsObject arrayOps = ArrayOps.OBJECT;
        Stream<U> eltStream = null;
        ReadableChannelWithValue channel = null;
        SeekableReadableChannel clonedSeekable = seekable.cloneObject();
        long startOffset = RecordReaderGenericBase.getPosition((SeekableReadableChannel<byte[]>)clonedSeekable);
        ReadableChannelSwitchable byteChannel = new ReadableChannelSwitchable((ReadableChannel)clonedSeekable);
        try {
            eltStream = this.parseFromSeekable((ReadableChannel<byte[]>)byteChannel, true);
            if (resultBuffer != null) {
                channel = ReadableChannels.withValue((ReadableChannel)ReadableChannels.wrap(eltStream, (ArrayOps)arrayOps), (Object)byteChannel);
                resultBuffer.truncate();
                resultBuffer.setDataSupplier((ReadableChannel)channel);
                Stream bufferedEltStream = ReadableChannels.newStream((ReadableChannel)resultBuffer.newReadableChannel());
                Stream cappedEltStream = bufferedEltStream.limit(this.probeElementCount);
                Stream<T> recordStream = RecordReaderGenericBase.aggregate(cappedEltStream, this.accumulating);
                try (Stream<T> cappedRecordStream = recordStream.limit(this.probeRecordCount);){
                    recordCount = cappedRecordStream.count();
                }
            }
            Stream<U> cappedEltStream = eltStream.limit(this.probeElementCount);
            Stream<T> recordStream = RecordReaderGenericBase.aggregate(cappedEltStream, this.accumulating);
            try (Stream<T> cappedRecordStream = recordStream.limit(this.probeRecordCount);){
                recordCount = cappedRecordStream.count();
            }
            eltStream.close();
            endOffset = clonedSeekable.position();
        }
        catch (Throwable e) {
            endOffset = RecordReaderGenericBase.getPosition((SeekableReadableChannel<byte[]>)clonedSeekable);
            logger.debug(String.format("Element offset probing result: Start = %d, End = %d, Parse error encountered after reading %d bytes", startOffset, endOffset, endOffset - startOffset));
            if (channel != null) {
                IOUtils.closeQuietly((Closeable)channel);
            } else if (eltStream != null) {
                eltStream.close();
            }
            IOUtils.closeQuietly((Closeable)byteChannel);
            if (e instanceof IOException) {
                throw new RuntimeException(e);
            }
            recordCount = -1L;
        }
        boolean foundValidRecordOffset = recordCount > 0L;
        return new OffsetSeekResult(foundValidRecordOffset, startOffset, endOffset);
    }

    public static ProbeStats convert(Resource tgt, ProbeResult src) {
        ProbeStats result = ((ProbeStats)tgt.as(ProbeStats.class)).setCandidatePos(src.candidatePos()).setProbeCount(src.probeCount()).setTotalDuration((double)src.totalDuration.toNanos() * 1.0E-9);
        return result;
    }

    public Stats2 getStats() {
        Model m = ModelFactory.createDefaultModel();
        Stats2 result = (Stats2)m.createResource().as(Stats2.class);
        NavigableMap<Long, Long> blockMap = this.source.getAbsPosToBlockOffset();
        Long firstBlock = blockMap == null ? null : Optional.ofNullable(blockMap.firstEntry()).map(Map.Entry::getValue).orElse(-1L);
        result.setSplitStart(this.splitStart).setSplitSize(this.splitLength).setFirstBlock(firstBlock).setRegionStartProbeResult(this.headProbe == null ? null : RecordReaderGenericBase.convert(m.createResource(), this.headProbe)).setRegionEndProbeResult(this.tailRecordOffset == null ? null : RecordReaderGenericBase.convert(m.createResource(), this.tailRecordOffset)).setRegionStartSearchReadOverSplitEnd(this.regionStartSearchReadOverSplitEnd).setRegionStartSearchReadOverRegionEnd(this.regionStartSearchReadOverRegionEnd).setTailElementCount(this.tailElts == null ? null : Integer.valueOf(this.tailElts.size())).setTotalElementCount(this.totalEltCount).setTotalRecordCount(this.totalRecordCount).setTotalBytesRead(this.source.getKnownSize());
        return result;
    }

    protected void detectTail(BufferOverReadableChannel<byte[]> tailByteBuffer) {
        BufferOverReadableChannel tailEltBuffer = BufferOverReadableChannel.createForObjects((int)1024);
        long maxExtraBytes = 1000000L;
        LongPredicate tailCharPosValidator = pos -> {
            boolean r = true;
            if (tailByteBuffer.isDataSupplierConsumed()) {
                long maxPos = tailByteBuffer.getKnownDataSize() + maxExtraBytes;
                r = pos < maxPos;
            }
            return r;
        };
        try (SeekableReadableChannel tailByteChannel = tailByteBuffer.newReadableChannel();){
            ReadableChannel tmp;
            StopWatch tailSw = StopWatch.createStarted();
            this.tailRecordOffset = this.skipToNthRecordInSplit(this.skipRecordCount, (SeekableReadableChannel<byte[]>)tailByteChannel, 0L, 0L, this.maxRecordLength, this.maxExtraByteCount, tailCharPosValidator, pos -> true, this.posToSplitId, (BufferOverReadableChannel<U[]>)tailEltBuffer, this::prober);
            this.tailBytes = this.tailRecordOffset.candidatePos() < 0L ? tailByteBuffer.getKnownDataSize() : (long)Ints.checkedCast((long)this.tailRecordOffset.candidatePos());
            logger.info(String.format("Split %s: Got %d tail bytes within %.3f s", this.splitId, this.tailBytes, (double)tailSw.getTime(TimeUnit.MILLISECONDS) * 0.001));
            if (this.tailRecordOffset.candidatePos() >= 0L) {
                CollapseRunsSpec spec = CollapseRunsSpec.create(this.accumulating::classify, () -> new ArrayList(1024), Collection::add);
                try (Stream<T> tailEltStream = RecordReaderGenericBase.debufferedEltStream((BufferOverReadableChannel<T[]>)tailEltBuffer);){
                    this.tailElts = StreamOperatorCollapseRuns.create((CollapseRunsSpec)spec).transform(tailEltStream).map(Map.Entry::getValue).findFirst().orElse(Collections.emptyList());
                }
                long adjustedSplitEnd = -1L;
                this.tailEltsTime = tailSw.getTime(TimeUnit.MILLISECONDS);
                logger.info(String.format("Split %s: Got %d tail items at pos %d within %d bytes read in %d ms", this.splitId, this.tailElts.size(), adjustedSplitEnd, this.tailBytes, this.tailEltsTime));
            }
            if ((tmp = tailEltBuffer.getDataSupplier()) != null) {
                tmp.close();
            }
            tailByteBuffer.getDataSupplier().close();
        }
        catch (Throwable e) {
            throw new RuntimeException("Should never come here", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Stream<T> createRecordFlow() throws IOException {
        ReadableChannelWithValue finalHeadEltChannel;
        logger.info("Processing split " + this.splitId);
        Map.Entry<Long, Long> adjustedTailSplitBounds = this.setStreamToInterval(this.splitStart, Long.MAX_VALUE);
        this.source = this.isEncoded ? SeekableSourceOverSplit.createForBlockEncodedStream(this.stream, this.splitEnd, this.postambleBytes) : SeekableSourceOverSplit.createForNonEncodedStream(this.stream, this.splitEnd, this.postambleBytes);
        SeekableSourceOverSplit.Channel headByteChannel = this.source.newReadableChannel();
        this.posValidator = pos -> {
            boolean r = !this.source.getHeadBuffer().isDataSupplierConsumed() || pos < this.source.getHeadBuffer().getKnownDataSize();
            return r;
        };
        long maxExtraBytes = 1000000L;
        this.readPosValidator = pos -> {
            boolean r = true;
            boolean inHead = this.posValidator.test(pos);
            if (!inHead) {
                long size = this.source.getHeadBuffer().getKnownDataSize();
                long maxPos = size + maxExtraBytes;
                r = pos < maxPos;
            }
            return r;
        };
        Function<Long, Long> idfn = TailBufferChannel.splitIdFn(this.splitStart, this.splitLength);
        this.posToSplitId = !this.isEncoded ? idfn : pos -> {
            long blockOffset = this.source.getBlockForPos((long)pos);
            long sid = (Long)idfn.apply(blockOffset);
            return sid;
        };
        BufferOverReadableChannel headEltBuffer = BufferOverReadableChannel.createForObjects((int)this.probeRecordCount);
        StopWatch headSw = StopWatch.createStarted();
        this.headProbe = this.isFirstSplit ? new ProbeResult(0L, 0L, Duration.ZERO) : this.skipToNthRecordInSplit(this.skipRecordCount, (SeekableReadableChannel<byte[]>)headByteChannel, 0L, 0L, this.maxRecordLength, this.maxExtraByteCount, this.readPosValidator, this.posValidator, this.posToSplitId, (BufferOverReadableChannel<U[]>)headEltBuffer, this::prober);
        long headRecordTime = headSw.getTime(TimeUnit.MILLISECONDS);
        logger.info(String.format("Split %s: Found head region after %d probes at offset at pos %d in %.3f s", this.splitId, this.headProbe.probeCount(), this.headProbe.candidatePos(), Float.valueOf((float)headSw.getTime(TimeUnit.MILLISECONDS) * 0.001f)));
        ReadableChannelWithValue headEltChannel = null;
        Stream<Object> headItems = null;
        if (this.headProbe.candidatePos() < 0L) {
            SeekableSourceOverSplit.Channel finalHeadByteChannel2 = headByteChannel;
            return (Stream)Stream.empty().onClose(() -> {
                try {
                    finalHeadByteChannel2.close();
                    this.source.close();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        if (!this.isFirstSplit) {
            ReadableChannelWithValue typedHeadEltChannel = (ReadableChannelWithValue)headEltBuffer.getDataSupplier();
            headByteChannel.close();
            headByteChannel = (SeekableSourceOverSplit.Channel)((ReadableChannelSwitchable)typedHeadEltChannel.getValue()).getDelegate();
            headEltChannel = typedHeadEltChannel;
        } else {
            headItems = this.parseFromSeekable((ReadableChannel<byte[]>)headByteChannel, false);
        }
        Lock lock = headByteChannel.getReadWriteLock().writeLock();
        lock.lock();
        try {
            if (headByteChannel.isHeadStream()) {
                this.regionStartSearchReadOverSplitEnd = false;
                this.regionStartSearchReadOverRegionEnd = false;
                headByteChannel.debufferHead();
                SeekableSourceOverSplit.Channel finalHeadByteChannel = headByteChannel;
                headByteChannel.setTransitionAction(() -> {
                    logger.info(String.format("Transitioned to tail after reading %d bytes", finalHeadByteChannel.getEnclosingInstance().getHeadBuffer().getKnownDataSize()));
                    this.detectTail(this.source.getTailBuffer());
                    if (this.tailBytes >= 0L) {
                        long absTailPos = this.source.getHeadSize() + this.tailBytes;
                        finalHeadByteChannel.setLimit(absTailPos);
                    }
                });
            } else {
                this.regionStartSearchReadOverSplitEnd = true;
                this.detectTail(this.source.getTailBuffer());
                long pos2 = headByteChannel.position();
                long absTailPos = this.source.getHeadBuffer().getKnownDataSize() + this.tailBytes;
                this.regionStartSearchReadOverRegionEnd = pos2 > absTailPos;
                if (!this.regionStartSearchReadOverRegionEnd.booleanValue()) {
                    if (this.tailBytes >= 0L) {
                        headByteChannel.setLimit(absTailPos);
                    }
                } else {
                    lock.unlock();
                    if (headEltBuffer.getDataSupplier() != null) {
                        headEltBuffer.getDataSupplier().close();
                    }
                    lock.lock();
                    headEltBuffer.setDataSupplier(null);
                    headEltBuffer.truncate();
                    headByteChannel.close();
                    headByteChannel = this.source.newReadableChannel();
                    headByteChannel.position(this.headProbe.candidatePos());
                    if (this.tailBytes >= 0L) {
                        headByteChannel.setLimit(absTailPos);
                    }
                    headItems = this.parseFromSeekable((ReadableChannel<byte[]>)headByteChannel, false);
                }
            }
        }
        finally {
            lock.unlock();
        }
        if (headItems == null) {
            headItems = ReadableChannels.newStream((ReadableChannel)headEltChannel);
            headItems = Stream.concat(ReadableChannels.newStream((ReadableChannel)headEltBuffer.getBuffer().newReadableChannel()), headItems);
        }
        if ((finalHeadEltChannel = headEltChannel) != null) {
            headItems = (Stream<Object>)headItems.onClose(() -> RecordReaderGenericBase.lambda$createRecordFlow$12((ReadableChannel)finalHeadEltChannel));
        }
        Stream lazyTailElts = Stream.of(Integer.valueOf(1)).flatMap(x -> {
            logger.info("Yielding " + this.tailElts.size() + " tail elements");
            return this.tailElts.stream();
        });
        if (this.enableStats) {
            headItems = headItems.peek(x -> ++this.totalEltCount);
        }
        SeekableSourceOverSplit.Channel finalHeadByteChannel1 = headByteChannel;
        Stream<Object> result = (Stream<Object>)this.aggregate(this.isFirstSplit, headItems, lazyTailElts).onClose(() -> {
            finalHeadByteChannel1.close();
            try {
                this.source.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        if (this.enableStats) {
            result = result.peek(x -> ++this.totalRecordCount);
        }
        return result;
    }

    public static Stream<String> lines(Seekable seekable) {
        BufferedReader br = new BufferedReader(new InputStreamReader(Channels.newInputStream((ReadableByteChannel)seekable.cloneObject())));
        return (Stream)br.lines().onClose(() -> IOUtils.closeQuietly((Reader)br));
    }

    public static <T> Stream<T> unbufferedStream(BufferOverReadableChannel<T[]> borc) {
        try {
            Stream buffered = ReadableChannels.newStream((ReadableChannel)borc.getBuffer().newReadableChannel());
            ReadableChannelWithValue eltSource = (ReadableChannelWithValue)borc.getDataSupplier();
            Stream unbuffered = ((ReadableChannelOverIterator)eltSource.getDelegate()).toStream();
            return Stream.concat(buffered, unbuffered);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> Stream<T> debufferedEltStream(BufferOverReadableChannel<T[]> eltBorc) {
        ReadableChannel dataSupplier = eltBorc.getDataSupplier();
        boolean suppressDebuffer = false;
        if (suppressDebuffer) {
            try {
                return (Stream)ReadableChannels.newStream((ReadableChannel)eltBorc.newReadableChannel()).onClose(() -> IOUtils.closeQuietly((Closeable)eltBorc.getDataSupplier()));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        ReadableChannelWithValue byteSource = (ReadableChannelWithValue)eltBorc.getDataSupplier();
        BufferOverReadableChannel.debuffer((ReadableChannel)((ReadableChannel)byteSource.getValue()));
        return (Stream)RecordReaderGenericBase.unbufferedStream(eltBorc).onClose(() -> IOUtils.closeQuietly((Closeable)dataSupplier));
    }

    public static <U> ProbeResult findNextRegion(CustomPattern recordSearchPattern, SeekableReadableChannel<byte[]> nav, long splitStart, long absProbeRegionStart, long maxRecordLength, long absDataRegionEnd, LongPredicate matcherAbsReadPosValidator, LongPredicate posValidator, BufferOverReadableChannel<U[]> outBuffer, Prober<U> prober) throws IOException {
        long absProbeRegionEnd = Math.min(absProbeRegionStart + maxRecordLength, absDataRegionEnd);
        int relProbeRegionEnd = Ints.checkedCast((long)(absProbeRegionEnd - absProbeRegionStart));
        long position = absProbeRegionStart - splitStart;
        nav.position(position);
        long relDataRegionEnd = absDataRegionEnd - absProbeRegionStart;
        try (SeekableReadableChannel seekable = nav.cloneObject();){
            MatcherFactory matcherFactory = RecordReaderGenericBase.createMatcherFactory(recordSearchPattern, relProbeRegionEnd, matcherAbsReadPosValidator);
            ProbeResult matchPosR = RecordReaderGenericBase.findFirstPositionWithProbeSuccess((SeekableReadableChannel<byte[]>)seekable, posValidator, matcherFactory, true, outBuffer, prober);
            long matchPos = matchPosR.candidatePos();
            long adjustedMatchPos = matchPos >= 0L ? matchPos + splitStart : -1L;
            ProbeResult probeResult = new ProbeResult(adjustedMatchPos, matchPosR.probeCount(), matchPosR.totalDuration());
            return probeResult;
        }
    }

    public static long getPosition(SeekableReadableChannel<byte[]> channel) {
        try {
            return channel.position();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected static MatcherFactory createMatcherFactory(CustomPattern recordSearchPattern, int relProbeRegionEnd, final LongPredicate matcherAbsReadPosValidator) {
        return seekable -> {
            final long matcherAbsStartPos = RecordReaderGenericBase.getPosition((SeekableReadableChannel<byte[]>)seekable);
            CharSequence charSequence = SeekableReadableChannels.asCharSequence((SeekableReadableChannel)seekable);
            charSequence = new CharSequenceDecorator(charSequence){

                public char charAt(int index) {
                    long readPosInSplit = matcherAbsStartPos + (long)index;
                    if (!matcherAbsReadPosValidator.test(readPosInSplit)) {
                        throw new ReadTooFarException();
                    }
                    return super.charAt(index);
                }
            };
            CustomMatcher fwdMatcher = recordSearchPattern.matcher(charSequence);
            fwdMatcher.region(0, relProbeRegionEnd);
            return fwdMatcher;
        };
    }

    ProbeResult skipToNthRecordInSplit(int n, SeekableReadableChannel<byte[]> nav, long splitStart, long absProbeRegionStart, long maxRecordLength, long absDataRegionEnd, LongPredicate matcherReadPosValidator, LongPredicate posValidator, Function<Long, Long> posToSplitId, BufferOverReadableChannel<U[]> outBuffer, Prober<U> prober) throws IOException {
        ProbeResult result = null;
        long previousSplitId = -1L;
        long availableDataRegion = absDataRegionEnd - absProbeRegionStart;
        long nextProbePos = absProbeRegionStart;
        for (int i = 0; i < n; ++i) {
            boolean isLastIteration = i + 1 == n;
            Object effOutBuffer = isLastIteration ? outBuffer : null;
            result = RecordReaderGenericBase.findNextRegion(this.recordStartPattern, nav, splitStart, nextProbePos, maxRecordLength, absDataRegionEnd, matcherReadPosValidator, posValidator, effOutBuffer, prober);
            long candidatePos = result.candidatePos();
            if (candidatePos < 0L) {
                if (availableDataRegion >= maxRecordLength) {
                    logger.warn(String.format("Split %s: Found no record start in a search region of %d bytes, although up to %d bytes were allowed for reading", this.splitId, maxRecordLength, availableDataRegion));
                    break;
                }
                logger.warn(String.format("Split %s: No more records found after pos " + (splitStart + nextProbePos), this.splitId));
                break;
            }
            long splitId = posToSplitId.apply(candidatePos);
            if (splitId != previousSplitId) {
                if (i > 0) {
                    isLastIteration = false;
                    outBuffer.getDataSupplier().close();
                    outBuffer.truncate();
                    i = 0;
                }
                previousSplitId = splitId;
            }
            if (isLastIteration) continue;
            nextProbePos = candidatePos + this.minRecordLength;
        }
        return result;
    }

    public static <U> ProbeResult findFirstPositionWithProbeSuccess(SeekableReadableChannel<byte[]> rawSeekable, LongPredicate posValidator, MatcherFactory matcherFactory, boolean isFwd, BufferOverReadableChannel<U[]> outBuffer, Prober<U> prober) throws IOException {
        long result = -1L;
        long probeCount = 0L;
        StopWatch swTotal = StopWatch.createStarted();
        try (SeekableReadableChannel seekable = rawSeekable.cloneObject();){
            long initialStartAbsPos;
            long absMatcherStartPos = initialStartAbsPos = seekable.position();
            CustomMatcher m = (CustomMatcher)matcherFactory.apply(seekable);
            boolean showExcerpt = false;
            if (showExcerpt) {
                try (SeekableReadableChannel tmp = seekable.cloneObject();){
                    System.out.println("Probing at pos " + absMatcherStartPos + ":\n" + RecordReaderGenericBase.abbreviateAsUTF8(ReadableChannels.newInputStream((ReadableChannel)tmp), 1024, "..."));
                }
            }
            boolean isEndReached = false;
            StopWatch sw = StopWatch.createStarted();
            while (m.find() && !isEndReached) {
                int start = m.start();
                int end = m.end();
                int matchPos = isFwd ? start : -end + 1;
                long absPos = absMatcherStartPos + (long)matchPos;
                ++probeCount;
                boolean validAbsPos = posValidator.test(absPos);
                if (!validAbsPos) {
                    break;
                }
                seekable.position(absPos);
                isEndReached = seekable.read((Object)new byte[1], 0, 1) <= 0;
                seekable.position(absPos);
                if (!isEndReached) {
                    OffsetSeekResult probeResult;
                    if (showExcerpt) {
                        try (SeekableReadableChannel tmp = seekable.cloneObject();){
                            System.out.println("Searching for candidate at pos " + tmp.position() + ":\n" + RecordReaderGenericBase.abbreviateAsUTF8(ReadableChannels.newInputStream((ReadableChannel)tmp), 1024, "..."));
                        }
                    }
                    try (SeekableReadableChannel probeSeek = seekable.cloneObject();){
                        probeResult = (OffsetSeekResult)prober.apply(probeSeek, outBuffer);
                    }
                    boolean enableJump = true;
                    if (probeResult.isSuccess()) {
                        result = absPos;
                        break;
                    }
                    long recordDelimThreshold = 1000000L;
                    int backtrackDistance = 10000;
                    long distance = probeResult.getLength();
                    if (enableJump && distance > recordDelimThreshold) {
                        long relStart = probeResult.getStartPos();
                        long relEnd = probeResult.getEndPos();
                        long minRelStart = relStart + 1L;
                        long nextRelStart = Math.max(relStart, relEnd - (long)backtrackDistance);
                        if (nextRelStart != minRelStart) {
                            logger.info(String.format("Jumped ahead to end position: seekStart=%d, errorPos=%d, distance=%d, nextSeekStart=%d", relStart, relEnd, distance, nextRelStart));
                            seekable.position(absMatcherStartPos += nextRelStart);
                            m = (CustomMatcher)matcherFactory.apply(seekable);
                        }
                    }
                }
                sw.reset();
                sw.start();
            }
        }
        catch (ReadTooFarException e) {
            logger.info("Matcher raised 'read too far' exception which indicates that the read threshold for bytes past the split end has been reached");
            result = -1L;
        }
        return new ProbeResult(result, probeCount, Duration.ofMillis(swTotal.getTime(TimeUnit.MILLISECONDS)));
    }

    public boolean nextKeyValue() throws IOException {
        boolean result;
        if (this.datasetFlow == null) {
            this.initRecordFlow();
        }
        try {
            if (!this.datasetFlow.hasNext()) {
                result = false;
            } else {
                this.currentValue = this.datasetFlow.next();
                this.currentKey.incrementAndGet();
                result = this.currentValue != null;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(this.splitId, e);
        }
        return result;
    }

    public LongWritable getCurrentKey() {
        LongWritable result = this.currentValue == null ? null : new LongWritable(this.currentKey.get());
        return result;
    }

    public T getCurrentValue() {
        return this.currentValue;
    }

    public float getProgress() throws IOException {
        float result = this.splitStart == this.splitEnd ? 0.0f : Math.min(1.0f, (float)(this.stream.getPos() - this.splitStart) / (float)(this.splitEnd - this.splitStart));
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        boolean logSummaryStats = false;
        if (logSummaryStats) {
            RDFDataMgr.write((OutputStream)System.err, (Model)this.getStats().getModel(), (RDFFormat)RDFFormat.TURTLE_PRETTY);
        }
        try {
            if (this.recordFlowCloseable != null) {
                this.recordFlowCloseable.run();
            }
        }
        finally {
            try {
                if (this.rawStream != null) {
                    this.rawStream.close();
                    this.rawStream = null;
                }
            }
            finally {
                if (this.decompressor != null) {
                    CodecPool.returnDecompressor((Decompressor)this.decompressor);
                    this.decompressor = null;
                }
            }
        }
    }

    public static String abbreviateAsUTF8(InputStream in, int maxWidth, String abbrevMarker) throws IOException {
        return RecordReaderGenericBase.abbreviate(in, StandardCharsets.UTF_8, maxWidth, abbrevMarker);
    }

    public static String abbreviate(InputStream in, Charset charset, int maxWidth, String abbrevMarker) throws IOException {
        return RecordReaderGenericBase.abbreviate(new InputStreamReader(in, charset), maxWidth, abbrevMarker);
    }

    public static String abbreviate(InputStreamReader reader, int maxWidth, String abbrevMarker) throws IOException {
        int c;
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < maxWidth && (c = reader.read()) != -1; ++i) {
            sb.append((char)c);
        }
        boolean appendAbbrevMarker = true;
        try {
            appendAbbrevMarker = reader.read() != -1;
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (appendAbbrevMarker) {
            sb.append(abbrevMarker);
        }
        return sb.toString();
    }

    private static /* synthetic */ void lambda$createRecordFlow$12(ReadableChannel finalHeadEltChannel) {
        try {
            finalHeadEltChannel.close();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        JenaSystem.init();
        logger = LoggerFactory.getLogger(RecordReaderGenericBase.class);
        EMPTY_BYTE_ARRAY = new byte[0];
    }

    public static class ReadTooFarException
    extends RuntimeException {
    }
}

