package net.sansa_stack.hadoop.core;

import com.google.common.primitives.Ints;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.SequenceInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.core.SeekableSourceOverSplit;
import net.sansa_stack.hadoop.core.pattern.CustomMatcher;
import net.sansa_stack.hadoop.core.pattern.CustomPattern;
import net.sansa_stack.hadoop.format.jena.base.RecordReaderConf;
import net.sansa_stack.hadoop.util.InputStreamWithCloseLogging;
import net.sansa_stack.hadoop.util.SeekableByteChannelFromSeekableInputStream;
import net.sansa_stack.io.util.InputStreamWithCloseIgnore;
import net.sansa_stack.io.util.InputStreamWithZeroOffsetRead;
import net.sansa_stack.nio.util.InterruptingSeekableByteChannel;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.buffer.array.ArrayOpsObject;
import org.aksw.commons.io.buffer.array.BufferOverReadableChannel;
import org.aksw.commons.io.hadoop.SeekableInputStream;
import org.aksw.commons.io.input.CharSequenceDecorator;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelSwitchable;
import org.aksw.commons.io.input.ReadableChannelWithValue;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannels;
import org.aksw.commons.util.stream.CollapseRunsSpec;
import org.aksw.commons.util.stream.StreamOperatorCollapseRuns;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFFormat;
import org.apache.jena.sys.JenaSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sansa_stack/hadoop/core/RecordReaderGenericBase.class */
public abstract class RecordReaderGenericBase<U, G, A, T> extends RecordReader<LongWritable, T> {
    private static final Logger logger;
    public static final byte[] EMPTY_BYTE_ARRAY;
    protected final String minRecordLengthKey;
    protected final String maxRecordLengthKey;
    protected final String probeRecordCountKey;
    protected final String probeElementCountKey;
    protected CustomPattern recordStartPattern;
    protected long maxRecordLength;
    protected long minRecordLength;
    protected int probeRecordCount;
    protected int probeElementCount;
    protected final Accumulating<U, G, A, T> accumulating;
    protected T currentValue;
    protected Runnable recordFlowCloseable;
    protected Iterator<T> datasetFlow;
    protected Decompressor decompressor;
    protected FileSplit split;
    protected CompressionCodec codec;
    protected FSDataInputStream rawStream;
    protected SeekableInputStream stream;
    protected String splitName;
    protected String splitId;
    SeekableSourceOverSplit source;
    LongPredicate posValidator;
    LongPredicate readPosValidator;
    Function<Long, Long> posToSplitId;
    long knownDecodedDataLength;
    protected int skipRecordCount;
    protected long maxExtraByteCount;
    protected ProbeResult tailRecordOffset;
    protected long tailBytes;
    protected BufferOverReadableChannel<U[]> tailEltBuffer;
    protected BufferOverReadableChannel<byte[]> tailByteBuffer;
    protected List<U> tailElts;
    protected long tailEltsTime;
    protected Boolean regionStartSearchReadOverSplitEnd;
    protected Boolean regionStartSearchReadOverRegionEnd;
    protected boolean enableStats = true;
    protected AtomicLong currentKey = new AtomicLong();
    protected byte[] preambleBytes = EMPTY_BYTE_ARRAY;
    protected byte[] postambleBytes = EMPTY_BYTE_ARRAY;
    protected boolean isEncoded = false;
    protected long splitStart = -1;
    protected long splitLength = -1;
    protected long splitEnd = -1;
    protected boolean isFirstSplit = false;
    protected long totalEltCount = 0;
    protected long totalRecordCount = 0;
    ProbeResult headProbe = null;

    /* loaded from: input_file:net/sansa_stack/hadoop/core/RecordReaderGenericBase$ReadTooFarException.class */
    public static class ReadTooFarException extends RuntimeException {
    }

    public RecordReaderGenericBase(RecordReaderConf recordReaderConf, Accumulating<U, G, A, T> accumulating) {
        this.knownDecodedDataLength = this.isEncoded ? -1L : this.splitLength;
        this.skipRecordCount = 2;
        this.maxExtraByteCount = Long.MAX_VALUE;
        this.tailRecordOffset = null;
        this.tailBytes = -1L;
        this.tailElts = Collections.emptyList();
        this.regionStartSearchReadOverSplitEnd = null;
        this.regionStartSearchReadOverRegionEnd = null;
        this.probeElementCountKey = recordReaderConf.getProbeElementCountKey();
        this.minRecordLengthKey = recordReaderConf.getMinRecordLengthKey();
        this.maxRecordLengthKey = recordReaderConf.getMaxRecordLengthKey();
        this.probeRecordCountKey = recordReaderConf.getProbeRecordCountKey();
        this.recordStartPattern = recordReaderConf.getRecordSearchPattern();
        this.accumulating = accumulating;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.probeElementCount = this.probeElementCountKey == null ? Integer.MAX_VALUE : configuration.getInt(this.probeElementCountKey, Integer.MAX_VALUE);
        this.minRecordLength = configuration.getInt(this.minRecordLengthKey, 1);
        this.maxRecordLength = configuration.getInt(this.maxRecordLengthKey, 10485760);
        this.probeRecordCount = configuration.getInt(this.probeRecordCountKey, 100);
        this.split = (FileSplit) inputSplit;
        Path path = this.split.getPath();
        this.rawStream = path.getFileSystem(taskAttemptContext.getConfiguration()).open(path);
        this.isEncoded = false;
        this.splitStart = this.split.getStart();
        this.splitLength = this.split.getLength();
        this.splitEnd = this.splitStart + this.splitLength;
        this.isFirstSplit = this.splitStart == 0;
        this.splitName = this.split.getPath().getName();
        String str = this.splitName;
        long j = this.splitStart;
        long j2 = this.splitLength;
        this.splitId = str + ":" + j + "+" + this;
        this.codec = new CompressionCodecFactory(configuration).getCodec(this.split.getPath());
        if (null != this.codec) {
            if (!(this.codec instanceof SplittableCompressionCodec)) {
                throw new RuntimeException("Don't know how to handle codec: " + this.codec);
            }
            this.isEncoded = true;
        }
    }

    public void initRecordFlow() throws IOException {
        Stream<T> createRecordFlow = createRecordFlow();
        Objects.requireNonNull(createRecordFlow);
        this.recordFlowCloseable = createRecordFlow::close;
        this.datasetFlow = createRecordFlow.iterator();
    }

    protected abstract Stream<U> parse(InputStream inputStream, boolean z);

    public Map.Entry<Long, Long> setStreamToInterval(long j, long j2) throws IOException {
        AbstractMap.SimpleEntry simpleEntry;
        if (null != this.codec) {
            if (this.decompressor != null) {
            }
            this.decompressor = this.codec.createDecompressor();
            if (!(this.codec instanceof SplittableCompressionCodec)) {
                throw new RuntimeException("Don't know how to handle codec: " + this.codec);
            }
            SplitCompressionInputStream createInputStream = this.codec.createInputStream(this.rawStream, this.decompressor, j, j2, SplittableCompressionCodec.READ_MODE.BYBLOCK);
            long adjustedStart = createInputStream.getAdjustedStart();
            long adjustedEnd = createInputStream.getAdjustedEnd();
            this.stream = new SeekableInputStream(new InputStreamWithCloseIgnore(InputStreamWithCloseLogging.wrap(new InputStreamWithZeroOffsetRead(createInputStream), ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose)), createInputStream);
            simpleEntry = new AbstractMap.SimpleEntry(Long.valueOf(adjustedStart), Long.valueOf(adjustedEnd));
        } else {
            this.rawStream.seek(j);
            this.stream = new SeekableInputStream(new InputStreamWithCloseIgnore(Channels.newInputStream((ReadableByteChannel) new InterruptingSeekableByteChannel(new SeekableByteChannelFromSeekableInputStream(InputStreamWithCloseLogging.wrap(this.rawStream, ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose), this.rawStream), j2))), this.rawStream);
            simpleEntry = new AbstractMap.SimpleEntry(Long.valueOf(j), Long.valueOf(j2));
        }
        return simpleEntry;
    }

    public static void logClose(String str) {
    }

    public static void logUnexpectedClose(String str) {
        logger.error(str);
        throw new RuntimeException("Unexpected close");
    }

    public boolean didHitSplitBound(Seekable seekable, long j) {
        try {
            long pos = seekable.getPos() - j;
            boolean z = pos >= 0;
            if (z && pos > 0) {
                logger.warn(String.format("Split %s: Exceeded split pos by %d bytes", this.splitId, Long.valueOf(pos)));
            }
            return z;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Stream<T> aggregate(boolean z, Stream<U> stream, Stream<U> stream2) {
        Accumulating<U, G, A, T> accumulating = this.accumulating;
        Objects.requireNonNull(accumulating);
        Function function = accumulating::classify;
        BiFunction biFunction = (l, obj) -> {
            if (z || l.longValue() != 0) {
                return this.accumulating.createAccumulator(obj);
            }
            return null;
        };
        Accumulating<U, G, A, T> accumulating2 = this.accumulating;
        Objects.requireNonNull(accumulating2);
        Stream<T> map = StreamOperatorCollapseRuns.create(CollapseRunsSpec.create(function, biFunction, accumulating2::accumulate)).transform(Stream.concat(stream, stream2)).map(entry -> {
            if (entry.getValue() == null) {
                return null;
            }
            return this.accumulating.accumulatedValue(entry.getValue());
        });
        if (!z) {
            map = map.skip(1L);
        }
        return map;
    }

    public static <T, G, A, U> Stream<U> aggregate(Stream<T> stream, Accumulating<T, G, A, U> accumulating) {
        Objects.requireNonNull(accumulating);
        Function function = accumulating::classify;
        BiFunction biFunction = (l, obj) -> {
            if (l.longValue() == 0) {
                return null;
            }
            return accumulating.createAccumulator(obj);
        };
        Objects.requireNonNull(accumulating);
        return StreamOperatorCollapseRuns.create(CollapseRunsSpec.create(function, biFunction, accumulating::accumulate)).transform(stream).map(entry -> {
            if (entry.getValue() == null) {
                return null;
            }
            return accumulating.accumulatedValue(entry.getValue());
        });
    }

    protected InputStream effectiveInputStream(InputStream inputStream) {
        return inputStream;
    }

    protected InputStream effectiveInputStreamSupp(ReadableChannel<byte[]> readableChannel) {
        InputStream effectiveInputStream = effectiveInputStream(ReadableChannels.newInputStream(readableChannel));
        return this.preambleBytes.length == 0 ? effectiveInputStream : new SequenceInputStream(new ByteArrayInputStream(this.preambleBytes), effectiveInputStream);
    }

    public static long getPos(org.aksw.commons.io.seekable.api.Seekable seekable) {
        try {
            return seekable.getPos();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected Stream<U> parseFromSeekable(ReadableChannel<byte[]> readableChannel, boolean z) {
        return parse(effectiveInputStreamSupp(readableChannel), z);
    }

    protected OffsetSeekResult prober(SeekableReadableChannel<byte[]> seekableReadableChannel, BufferOverReadableChannel<U[]> bufferOverReadableChannel) {
        long position;
        long j;
        Stream<T> limit;
        ArrayOpsObject arrayOpsObject = ArrayOps.OBJECT;
        Stream<U> stream = null;
        ReadableChannel readableChannel = null;
        SeekableReadableChannel cloneObject = seekableReadableChannel.cloneObject();
        long position2 = getPosition(cloneObject);
        ReadableChannelSwitchable readableChannelSwitchable = new ReadableChannelSwitchable(cloneObject);
        try {
            stream = parseFromSeekable(readableChannelSwitchable, true);
            if (bufferOverReadableChannel != null) {
                readableChannel = ReadableChannels.withValue(ReadableChannels.wrap(stream, arrayOpsObject), readableChannelSwitchable);
                bufferOverReadableChannel.truncate();
                bufferOverReadableChannel.setDataSupplier(readableChannel);
                limit = aggregate(ReadableChannels.newStream(bufferOverReadableChannel.newReadableChannel()).limit(this.probeElementCount), this.accumulating).limit(this.probeRecordCount);
                try {
                    j = limit.count();
                    if (limit != null) {
                        limit.close();
                    }
                } finally {
                }
            } else {
                limit = aggregate(stream.limit(this.probeElementCount), this.accumulating).limit(this.probeRecordCount);
                try {
                    j = limit.count();
                    if (limit != null) {
                        limit.close();
                    }
                    stream.close();
                } finally {
                }
            }
            position = cloneObject.position();
        } catch (Throwable th) {
            position = getPosition(cloneObject);
            logger.debug(String.format("Element offset probing result: Start = %d, End = %d, Parse error encountered after reading %d bytes", Long.valueOf(position2), Long.valueOf(position), Long.valueOf(position - position2)));
            if (readableChannel != null) {
                IOUtils.closeQuietly(readableChannel);
            } else if (stream != null) {
                stream.close();
            }
            IOUtils.closeQuietly(readableChannelSwitchable);
            if (th instanceof IOException) {
                throw new RuntimeException(th);
            }
            j = -1;
        }
        return new OffsetSeekResult(j > 0, position2, position);
    }

    public static ProbeStats convert(Resource resource, ProbeResult probeResult) {
        return resource.as(ProbeStats.class).setCandidatePos(Long.valueOf(probeResult.candidatePos())).setProbeCount(Long.valueOf(probeResult.probeCount())).setTotalDuration(Double.valueOf(probeResult.totalDuration.toNanos() * 1.0E-9d));
    }

    public Stats2 getStats() {
        Model createDefaultModel = ModelFactory.createDefaultModel();
        Stats2 as = createDefaultModel.createResource().as(Stats2.class);
        NavigableMap<Long, Long> absPosToBlockOffset = this.source.getAbsPosToBlockOffset();
        as.setSplitStart(Long.valueOf(this.splitStart)).setSplitSize(Long.valueOf(this.splitLength)).setFirstBlock(absPosToBlockOffset == null ? null : (Long) Optional.ofNullable(absPosToBlockOffset.firstEntry()).map((v0) -> {
            return v0.getValue();
        }).orElse(-1L)).setRegionStartProbeResult(this.headProbe == null ? null : convert(createDefaultModel.createResource(), this.headProbe)).setRegionEndProbeResult(this.tailRecordOffset == null ? null : convert(createDefaultModel.createResource(), this.tailRecordOffset)).setRegionStartSearchReadOverSplitEnd(this.regionStartSearchReadOverSplitEnd).setRegionStartSearchReadOverRegionEnd(this.regionStartSearchReadOverRegionEnd).setTailElementCount(this.tailElts == null ? null : Integer.valueOf(this.tailElts.size())).setTotalElementCount(Long.valueOf(this.totalEltCount)).setTotalRecordCount(Long.valueOf(this.totalRecordCount)).setTotalBytesRead(Long.valueOf(this.source.getKnownSize()));
        return as;
    }

    protected void detectTail(BufferOverReadableChannel<byte[]> bufferOverReadableChannel) {
        BufferOverReadableChannel<U[]> createForObjects = BufferOverReadableChannel.createForObjects(1024);
        long j = 1000000;
        LongPredicate longPredicate = j2 -> {
            boolean z = true;
            if (bufferOverReadableChannel.isDataSupplierConsumed()) {
                z = j2 < bufferOverReadableChannel.getKnownDataSize() + j;
            }
            return z;
        };
        try {
            SeekableReadableChannel<byte[]> newReadableChannel = bufferOverReadableChannel.newReadableChannel();
            try {
                StopWatch createStarted = StopWatch.createStarted();
                this.tailRecordOffset = skipToNthRecordInSplit(this.skipRecordCount, newReadableChannel, 0L, 0L, this.maxRecordLength, this.maxExtraByteCount, longPredicate, j3 -> {
                    return true;
                }, this.posToSplitId, createForObjects, this::prober);
                this.tailBytes = this.tailRecordOffset.candidatePos() < 0 ? bufferOverReadableChannel.getKnownDataSize() : Ints.checkedCast(this.tailRecordOffset.candidatePos());
                logger.info(String.format("Split %s: Got %d tail bytes within %.3f s", this.splitId, Long.valueOf(this.tailBytes), Double.valueOf(createStarted.getTime(TimeUnit.MILLISECONDS) * 0.001d)));
                if (this.tailRecordOffset.candidatePos() >= 0) {
                    Accumulating<U, G, A, T> accumulating = this.accumulating;
                    Objects.requireNonNull(accumulating);
                    CollapseRunsSpec create = CollapseRunsSpec.create(accumulating::classify, () -> {
                        return new ArrayList(1024);
                    }, (v0, v1) -> {
                        v0.add(v1);
                    });
                    Stream debufferedEltStream = debufferedEltStream(createForObjects);
                    try {
                        this.tailElts = (List) StreamOperatorCollapseRuns.create(create).transform(debufferedEltStream).map((v0) -> {
                            return v0.getValue();
                        }).findFirst().orElse(Collections.emptyList());
                        if (debufferedEltStream != null) {
                            debufferedEltStream.close();
                        }
                        this.tailEltsTime = createStarted.getTime(TimeUnit.MILLISECONDS);
                        logger.info(String.format("Split %s: Got %d tail items at pos %d within %d bytes read in %d ms", this.splitId, Integer.valueOf(this.tailElts.size()), -1L, Long.valueOf(this.tailBytes), Long.valueOf(this.tailEltsTime)));
                    } catch (Throwable th) {
                        if (debufferedEltStream != null) {
                            try {
                                debufferedEltStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                ReadableChannel dataSupplier = createForObjects.getDataSupplier();
                if (dataSupplier != null) {
                    dataSupplier.close();
                }
                bufferOverReadableChannel.getDataSupplier().close();
                if (newReadableChannel != null) {
                    newReadableChannel.close();
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw new RuntimeException("Should never come here", th3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream<T> createRecordFlow() throws IOException {
        logger.info("Processing split " + this.splitId);
        setStreamToInterval(this.splitStart, Long.MAX_VALUE);
        this.source = this.isEncoded ? SeekableSourceOverSplit.createForBlockEncodedStream(this.stream, this.splitEnd, this.postambleBytes) : SeekableSourceOverSplit.createForNonEncodedStream(this.stream, this.splitEnd, this.postambleBytes);
        SeekableSourceOverSplit.Channel m6newReadableChannel = this.source.m6newReadableChannel();
        this.posValidator = j -> {
            return !this.source.getHeadBuffer().isDataSupplierConsumed() || j < this.source.getHeadBuffer().getKnownDataSize();
        };
        long j2 = 1000000;
        this.readPosValidator = j3 -> {
            boolean z = true;
            if (!this.posValidator.test(j3)) {
                z = j3 < this.source.getHeadBuffer().getKnownDataSize() + j2;
            }
            return z;
        };
        Function<Long, Long> splitIdFn = TailBufferChannel.splitIdFn(this.splitStart, this.splitLength);
        this.posToSplitId = !this.isEncoded ? splitIdFn : l -> {
            return Long.valueOf(((Long) splitIdFn.apply(Long.valueOf(this.source.getBlockForPos(l.longValue())))).longValue());
        };
        BufferOverReadableChannel<U[]> createForObjects = BufferOverReadableChannel.createForObjects(this.probeRecordCount);
        StopWatch createStarted = StopWatch.createStarted();
        this.headProbe = this.isFirstSplit ? new ProbeResult(0L, 0L, Duration.ZERO) : skipToNthRecordInSplit(this.skipRecordCount, m6newReadableChannel, 0L, 0L, this.maxRecordLength, this.maxExtraByteCount, this.readPosValidator, this.posValidator, this.posToSplitId, createForObjects, this::prober);
        createStarted.getTime(TimeUnit.MILLISECONDS);
        logger.info(String.format("Split %s: Found head region after %d probes at offset at pos %d in %.3f s", this.splitId, Long.valueOf(this.headProbe.probeCount()), Long.valueOf(this.headProbe.candidatePos()), Float.valueOf(((float) createStarted.getTime(TimeUnit.MILLISECONDS)) * 0.001f)));
        ReadableChannelWithValue readableChannelWithValue = null;
        Stream<U> stream = null;
        if (this.headProbe.candidatePos() < 0) {
            return (Stream) Stream.empty().onClose(() -> {
                try {
                    m6newReadableChannel.close();
                    this.source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        if (this.isFirstSplit) {
            stream = parseFromSeekable(m6newReadableChannel, false);
        } else {
            ReadableChannelWithValue dataSupplier = createForObjects.getDataSupplier();
            m6newReadableChannel.close();
            m6newReadableChannel = (SeekableSourceOverSplit.Channel) ((ReadableChannelSwitchable) dataSupplier.getValue()).getDecoratee();
            readableChannelWithValue = dataSupplier;
        }
        Lock writeLock = m6newReadableChannel.getReadWriteLock().writeLock();
        writeLock.lock();
        try {
            if (m6newReadableChannel.isHeadStream()) {
                this.regionStartSearchReadOverSplitEnd = false;
                this.regionStartSearchReadOverRegionEnd = false;
                m6newReadableChannel.debufferHead();
                SeekableSourceOverSplit.Channel channel = m6newReadableChannel;
                m6newReadableChannel.setTransitionAction(() -> {
                    logger.info(String.format("Transitioned to tail after reading %d bytes", Long.valueOf(channel.getEnclosingInstance().getHeadBuffer().getKnownDataSize())));
                    detectTail(this.source.getTailBuffer());
                    if (this.tailBytes >= 0) {
                        channel.setLimit(this.source.getHeadSize() + this.tailBytes);
                    }
                });
            } else {
                this.regionStartSearchReadOverSplitEnd = true;
                detectTail(this.source.getTailBuffer());
                long position = m6newReadableChannel.position();
                long knownDataSize = this.source.getHeadBuffer().getKnownDataSize() + this.tailBytes;
                this.regionStartSearchReadOverRegionEnd = Boolean.valueOf(position > knownDataSize);
                if (this.regionStartSearchReadOverRegionEnd.booleanValue()) {
                    writeLock.unlock();
                    if (createForObjects.getDataSupplier() != null) {
                        createForObjects.getDataSupplier().close();
                    }
                    writeLock.lock();
                    createForObjects.setDataSupplier((ReadableChannel) null);
                    createForObjects.truncate();
                    m6newReadableChannel.close();
                    m6newReadableChannel = this.source.m6newReadableChannel();
                    m6newReadableChannel.position(this.headProbe.candidatePos());
                    if (this.tailBytes >= 0) {
                        m6newReadableChannel.setLimit(knownDataSize);
                    }
                    stream = parseFromSeekable(m6newReadableChannel, false);
                } else if (this.tailBytes >= 0) {
                    m6newReadableChannel.setLimit(knownDataSize);
                }
            }
            if (stream == null) {
                stream = Stream.concat(ReadableChannels.newStream(createForObjects.getBuffer().newReadableChannel()), ReadableChannels.newStream(readableChannelWithValue));
            }
            ReadableChannelWithValue readableChannelWithValue2 = readableChannelWithValue;
            if (readableChannelWithValue2 != null) {
                stream = (Stream) stream.onClose(() -> {
                    try {
                        readableChannelWithValue2.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            Stream<U> flatMap = Stream.of(1).flatMap(num -> {
                logger.info("Yielding " + this.tailElts.size() + " tail elements");
                return this.tailElts.stream();
            });
            if (this.enableStats) {
                stream = stream.peek(obj -> {
                    this.totalEltCount++;
                });
            }
            SeekableSourceOverSplit.Channel channel2 = m6newReadableChannel;
            Stream<T> stream2 = (Stream) aggregate(this.isFirstSplit, stream, flatMap).onClose(() -> {
                channel2.close();
                try {
                    this.source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            if (this.enableStats) {
                stream2 = stream2.peek(obj2 -> {
                    this.totalRecordCount++;
                });
            }
            return stream2;
        } finally {
            writeLock.unlock();
        }
    }

    public static Stream<String> lines(org.aksw.commons.io.seekable.api.Seekable seekable) {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(Channels.newInputStream((ReadableByteChannel) seekable.cloneObject())));
        return (Stream) bufferedReader.lines().onClose(() -> {
            IOUtils.closeQuietly(bufferedReader);
        });
    }

    public static <T> Stream<T> unbufferedStream(BufferOverReadableChannel<T[]> bufferOverReadableChannel) {
        try {
            return Stream.concat(ReadableChannels.newStream(bufferOverReadableChannel.getBuffer().newReadableChannel()), bufferOverReadableChannel.getDataSupplier().getDecoratee().toStream());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> Stream<T> debufferedEltStream(BufferOverReadableChannel<T[]> bufferOverReadableChannel) {
        ReadableChannel dataSupplier = bufferOverReadableChannel.getDataSupplier();
        if (0 == 0) {
            BufferOverReadableChannel.debuffer((ReadableChannel) bufferOverReadableChannel.getDataSupplier().getValue());
            return (Stream) unbufferedStream(bufferOverReadableChannel).onClose(() -> {
                IOUtils.closeQuietly(dataSupplier);
            });
        }
        try {
            return (Stream) ReadableChannels.newStream(bufferOverReadableChannel.newReadableChannel()).onClose(() -> {
                IOUtils.closeQuietly(bufferOverReadableChannel.getDataSupplier());
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static <U> ProbeResult findNextRegion(CustomPattern customPattern, SeekableReadableChannel<byte[]> seekableReadableChannel, long j, long j2, long j3, long j4, LongPredicate longPredicate, LongPredicate longPredicate2, BufferOverReadableChannel<U[]> bufferOverReadableChannel, Prober<U> prober) throws IOException {
        int checkedCast = Ints.checkedCast(Math.min(j2 + j3, j4) - j2);
        seekableReadableChannel.position(j2 - j);
        long j5 = j4 - j2;
        SeekableReadableChannel cloneObject = seekableReadableChannel.cloneObject();
        try {
            ProbeResult findFirstPositionWithProbeSuccess = findFirstPositionWithProbeSuccess(cloneObject, longPredicate2, createMatcherFactory(customPattern, checkedCast, longPredicate), true, bufferOverReadableChannel, prober);
            long candidatePos = findFirstPositionWithProbeSuccess.candidatePos();
            ProbeResult probeResult = new ProbeResult(candidatePos >= 0 ? candidatePos + j : -1L, findFirstPositionWithProbeSuccess.probeCount(), findFirstPositionWithProbeSuccess.totalDuration());
            if (cloneObject != null) {
                cloneObject.close();
            }
            return probeResult;
        } catch (Throwable th) {
            if (cloneObject != null) {
                try {
                    cloneObject.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static long getPosition(SeekableReadableChannel<byte[]> seekableReadableChannel) {
        try {
            return seekableReadableChannel.position();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected static MatcherFactory createMatcherFactory(CustomPattern customPattern, int i, LongPredicate longPredicate) {
        return seekableReadableChannel -> {
            final long position = getPosition(seekableReadableChannel);
            CustomMatcher matcher = customPattern.matcher(new CharSequenceDecorator(SeekableReadableChannels.asCharSequence(seekableReadableChannel)) { // from class: net.sansa_stack.hadoop.core.RecordReaderGenericBase.1
                public char charAt(int i2) {
                    if (longPredicate.test(position + i2)) {
                        return super.charAt(i2);
                    }
                    throw new ReadTooFarException();
                }
            });
            matcher.region(0, i);
            return matcher;
        };
    }

    ProbeResult skipToNthRecordInSplit(int i, SeekableReadableChannel<byte[]> seekableReadableChannel, long j, long j2, long j3, long j4, LongPredicate longPredicate, LongPredicate longPredicate2, Function<Long, Long> function, BufferOverReadableChannel<U[]> bufferOverReadableChannel, Prober<U> prober) throws IOException {
        ProbeResult probeResult = null;
        long j5 = -1;
        long j6 = j4 - j2;
        long j7 = j2;
        int i2 = 0;
        while (true) {
            if (i2 >= i) {
                break;
            }
            boolean z = i2 + 1 == i;
            probeResult = findNextRegion(this.recordStartPattern, seekableReadableChannel, j, j7, j3, j4, longPredicate, longPredicate2, z ? bufferOverReadableChannel : null, prober);
            long candidatePos = probeResult.candidatePos();
            if (candidatePos >= 0) {
                long longValue = function.apply(Long.valueOf(candidatePos)).longValue();
                if (longValue != j5) {
                    if (i2 > 0) {
                        z = false;
                        bufferOverReadableChannel.getDataSupplier().close();
                        bufferOverReadableChannel.truncate();
                        i2 = 0;
                    }
                    j5 = longValue;
                }
                if (!z) {
                    j7 = candidatePos + this.minRecordLength;
                }
                i2++;
            } else if (j6 >= j3) {
                logger.warn(String.format("Split %s: Found no record start in a search region of %d bytes, although up to %d bytes were allowed for reading", this.splitId, Long.valueOf(j3), Long.valueOf(j6)));
            } else {
                logger.warn(String.format("Split %s: No more records found after pos " + (j + j7), this.splitId));
            }
        }
        return probeResult;
    }

    public static <U> ProbeResult findFirstPositionWithProbeSuccess(SeekableReadableChannel<byte[]> seekableReadableChannel, LongPredicate longPredicate, MatcherFactory matcherFactory, boolean z, BufferOverReadableChannel<U[]> bufferOverReadableChannel, Prober<U> prober) throws IOException {
        SeekableReadableChannel cloneObject;
        long j = -1;
        long j2 = 0;
        StopWatch createStarted = StopWatch.createStarted();
        try {
            SeekableReadableChannel cloneObject2 = seekableReadableChannel.cloneObject();
            try {
                long position = cloneObject2.position();
                CustomMatcher customMatcher = (CustomMatcher) matcherFactory.apply(cloneObject2);
                if (0 != 0) {
                    cloneObject = cloneObject2.cloneObject();
                    try {
                        PrintStream printStream = System.out;
                        abbreviateAsUTF8(ReadableChannels.newInputStream(cloneObject), 1024, "...");
                        printStream.println("Probing at pos " + position + ":\n" + printStream);
                        if (cloneObject != null) {
                            cloneObject.close();
                        }
                    } finally {
                    }
                }
                boolean z2 = false;
                StopWatch createStarted2 = StopWatch.createStarted();
                while (true) {
                    if (!customMatcher.find() || z2) {
                        break;
                    }
                    long start = position + (z ? customMatcher.start() : (-customMatcher.end()) + 1);
                    j2++;
                    if (!longPredicate.test(start)) {
                        break;
                    }
                    cloneObject2.position(start);
                    z2 = cloneObject2.read(new byte[1], 0, 1) <= 0;
                    cloneObject2.position(start);
                    if (!z2) {
                        if (0 != 0) {
                            SeekableReadableChannel cloneObject3 = cloneObject2.cloneObject();
                            try {
                                PrintStream printStream2 = System.out;
                                long position2 = cloneObject3.position();
                                abbreviateAsUTF8(ReadableChannels.newInputStream(cloneObject3), 1024, "...");
                                printStream2.println("Searching for candidate at pos " + position2 + ":\n" + printStream2);
                                if (cloneObject3 != null) {
                                    cloneObject3.close();
                                }
                            } finally {
                            }
                        }
                        cloneObject = cloneObject2.cloneObject();
                        try {
                            OffsetSeekResult apply = prober.apply(cloneObject, bufferOverReadableChannel);
                            if (cloneObject != null) {
                                cloneObject.close();
                            }
                            if (apply.isSuccess()) {
                                j = start;
                                break;
                            }
                            long length = apply.getLength();
                            if (1 != 0 && length > 1000000) {
                                long startPos = apply.getStartPos();
                                long endPos = apply.getEndPos();
                                long j3 = startPos + 1;
                                long max = Math.max(startPos, endPos - 10000);
                                if (max != j3) {
                                    logger.info(String.format("Jumped ahead to end position: seekStart=%d, errorPos=%d, distance=%d, nextSeekStart=%d", Long.valueOf(startPos), Long.valueOf(endPos), Long.valueOf(length), Long.valueOf(max)));
                                    position += max;
                                    cloneObject2.position(position);
                                    customMatcher = (CustomMatcher) matcherFactory.apply(cloneObject2);
                                }
                            }
                        } finally {
                            if (cloneObject != null) {
                                try {
                                    cloneObject.close();
                                } catch (Throwable th) {
                                    th.addSuppressed(th);
                                }
                            }
                        }
                    }
                    createStarted2.reset();
                    createStarted2.start();
                }
                if (cloneObject2 != null) {
                    cloneObject2.close();
                }
            } finally {
            }
        } catch (ReadTooFarException e) {
            logger.info("Matcher raised 'read too far' exception which indicates that the read threshold for bytes past the split end has been reached");
            j = -1;
        }
        return new ProbeResult(j, j2, Duration.ofMillis(createStarted.getTime(TimeUnit.MILLISECONDS)));
    }

    public boolean nextKeyValue() throws IOException {
        boolean z;
        if (this.datasetFlow == null) {
            initRecordFlow();
        }
        try {
            if (this.datasetFlow.hasNext()) {
                this.currentValue = this.datasetFlow.next();
                this.currentKey.incrementAndGet();
                z = this.currentValue != null;
            } else {
                z = false;
            }
            return z;
        } catch (Exception e) {
            throw new RuntimeException(this.splitId, e);
        }
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public LongWritable m1getCurrentKey() {
        return this.currentValue == null ? null : new LongWritable(this.currentKey.get());
    }

    public T getCurrentValue() {
        return this.currentValue;
    }

    public float getProgress() throws IOException {
        return this.splitStart == this.splitEnd ? 0.0f : Math.min(1.0f, ((float) (this.stream.getPos() - this.splitStart)) / ((float) (this.splitEnd - this.splitStart)));
    }

    public void close() throws IOException {
        if (0 != 0) {
            RDFDataMgr.write(System.err, getStats().getModel(), RDFFormat.TURTLE_PRETTY);
        }
        try {
            if (this.recordFlowCloseable != null) {
                this.recordFlowCloseable.run();
            }
            try {
                if (this.rawStream != null) {
                    this.rawStream.close();
                    this.rawStream = null;
                }
            } finally {
                if (this.decompressor != null) {
                    CodecPool.returnDecompressor(this.decompressor);
                    this.decompressor = null;
                }
            }
        } catch (Throwable th) {
            try {
                if (this.rawStream != null) {
                    this.rawStream.close();
                    this.rawStream = null;
                }
                if (this.decompressor != null) {
                    CodecPool.returnDecompressor(this.decompressor);
                    this.decompressor = null;
                }
                throw th;
            } finally {
                if (this.decompressor != null) {
                    CodecPool.returnDecompressor(this.decompressor);
                    this.decompressor = null;
                }
            }
        }
    }

    public static String abbreviateAsUTF8(InputStream inputStream, int i, String str) throws IOException {
        return abbreviate(inputStream, StandardCharsets.UTF_8, i, str);
    }

    public static String abbreviate(InputStream inputStream, Charset charset, int i, String str) throws IOException {
        return abbreviate(new InputStreamReader(inputStream, charset), i, str);
    }

    public static String abbreviate(InputStreamReader inputStreamReader, int i, String str) throws IOException {
        int read;
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < i && (read = inputStreamReader.read()) != -1; i2++) {
            sb.append((char) read);
        }
        boolean z = true;
        try {
            z = inputStreamReader.read() != -1;
        } catch (Exception e) {
        }
        if (z) {
            sb.append(str);
        }
        return sb.toString();
    }

    static {
        JenaSystem.init();
        logger = LoggerFactory.getLogger(RecordReaderGenericBase.class);
        EMPTY_BYTE_ARRAY = new byte[0];
    }
}
