package net.sansa_stack.hadoop.generic;

import io.reactivex.rxjava3.core.Flowable;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.SequenceInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.util.InputStreamWithCloseLogging;
import net.sansa_stack.hadoop.util.SeekableByteChannelFromSeekableInputStream;
import net.sansa_stack.hadoop.util.SeekableInputStream;
import net.sansa_stack.io.util.InputStreamWithCloseIgnore;
import net.sansa_stack.io.util.InputStreamWithZeroOffsetRead;
import net.sansa_stack.nio.util.InterruptingSeekableByteChannel;
import net.sansa_stack.nio.util.ReadableByteChannelFromInputStream;
import net.sansa_stack.nio.util.ReadableByteChannelWithConditionalBound;
import org.aksw.commons.rx.op.FlowableOperatorSequentialGroupBy;
import org.aksw.jena_sparql_api.io.binseach.BufferFromInputStream;
import org.aksw.jena_sparql_api.io.binseach.CharSequenceFromSeekable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.jena.ext.com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sansa_stack/hadoop/generic/RecordReaderGenericBase.class */
public abstract class RecordReaderGenericBase<U, G, A, T> extends RecordReader<LongWritable, T> {
    private static final Logger logger = LoggerFactory.getLogger(RecordReaderGenericBase.class);
    public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    protected final String minRecordLengthKey;
    protected final String maxRecordLengthKey;
    protected final String probeRecordCountKey;
    protected Pattern recordStartPattern;
    protected final Accumulating<U, G, A, T> accumulating;
    protected long maxRecordLength;
    protected long minRecordLength;
    protected int probeRecordCount;
    protected T currentValue;
    protected Iterator<T> datasetFlow;
    protected Decompressor decompressor;
    protected FileSplit split;
    protected CompressionCodec codec;
    protected FSDataInputStream rawStream;
    protected SeekableInputStream stream;
    protected AtomicLong currentKey = new AtomicLong();
    protected byte[] preambleBytes = EMPTY_BYTE_ARRAY;
    protected byte[] postambleBytes = EMPTY_BYTE_ARRAY;
    protected boolean isEncoded = false;
    protected long splitStart = -1;
    protected long splitLength = -1;
    protected long splitEnd = -1;
    protected boolean isFirstSplit = false;
    protected Throwable raisedThrowable = null;

    public RecordReaderGenericBase(String str, String str2, String str3, Pattern pattern, Accumulating<U, G, A, T> accumulating) {
        this.minRecordLengthKey = str;
        this.maxRecordLengthKey = str2;
        this.probeRecordCountKey = str3;
        this.recordStartPattern = pattern;
        this.accumulating = accumulating;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.minRecordLength = configuration.getInt(this.minRecordLengthKey, 1);
        this.maxRecordLength = configuration.getInt(this.maxRecordLengthKey, 10485760);
        this.probeRecordCount = configuration.getInt(this.probeRecordCountKey, 100);
        this.split = (FileSplit) inputSplit;
        Path path = this.split.getPath();
        this.rawStream = path.getFileSystem(taskAttemptContext.getConfiguration()).open(path);
        this.isEncoded = false;
        this.splitStart = this.split.getStart();
        this.splitLength = this.split.getLength();
        this.splitEnd = this.splitStart + this.splitLength;
        this.isFirstSplit = this.splitStart == 0;
        this.codec = new CompressionCodecFactory(configuration).getCodec(this.split.getPath());
        if (null != this.codec) {
            if (!(this.codec instanceof SplittableCompressionCodec)) {
                throw new RuntimeException("Don't know how to handle codec: " + this.codec);
            }
            this.isEncoded = true;
        }
    }

    public void initRecordFlow() throws IOException {
        this.datasetFlow = createRecordFlow().doOnError(th -> {
            this.raisedThrowable = th;
        }).onErrorComplete().blockingIterable().iterator();
    }

    protected abstract Flowable<U> parse(Callable<InputStream> callable, long j);

    public Map.Entry<Long, Long> setStreamToInterval(long j, long j2) throws IOException {
        AbstractMap.SimpleEntry simpleEntry;
        if (null != this.codec) {
            if (this.decompressor != null) {
                CodecPool.returnDecompressor(this.decompressor);
            }
            this.decompressor = CodecPool.getDecompressor(this.codec);
            if (!(this.codec instanceof SplittableCompressionCodec)) {
                throw new RuntimeException("Don't know how to handle codec: " + this.codec);
            }
            SplitCompressionInputStream createInputStream = this.codec.createInputStream(this.rawStream, this.decompressor, j, j2, SplittableCompressionCodec.READ_MODE.BYBLOCK);
            long adjustedStart = createInputStream.getAdjustedStart();
            long adjustedEnd = createInputStream.getAdjustedEnd();
            this.stream = new SeekableInputStream(new InputStreamWithCloseIgnore(InputStreamWithCloseLogging.wrap(new InputStreamWithZeroOffsetRead(createInputStream), ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose)), createInputStream);
            simpleEntry = new AbstractMap.SimpleEntry(Long.valueOf(adjustedStart), Long.valueOf(adjustedEnd));
        } else {
            this.rawStream.seek(j);
            this.stream = new SeekableInputStream(new InputStreamWithCloseIgnore(Channels.newInputStream(new InterruptingSeekableByteChannel(new SeekableByteChannelFromSeekableInputStream(InputStreamWithCloseLogging.wrap(this.rawStream, ExceptionUtils::getStackTrace, RecordReaderGenericBase::logUnexpectedClose), this.rawStream), j2))), this.rawStream);
            simpleEntry = new AbstractMap.SimpleEntry(Long.valueOf(j), Long.valueOf(j2));
        }
        return simpleEntry;
    }

    public static void logClose(String str) {
    }

    public static void logUnexpectedClose(String str) {
        logger.error(str);
        throw new RuntimeException("Unexpected close");
    }

    public boolean didHitSplitBound(Seekable seekable, long j) {
        try {
            long pos = seekable.getPos() - j;
            boolean z = pos >= 0;
            if (z) {
                if (pos > (this.isEncoded ? 1 : 0)) {
                    logger.warn("Exceeded split pos by " + pos + " bytes");
                }
            }
            return z;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Flowable<T> aggregate(boolean z, Flowable<U> flowable, List<U> list) {
        Flowable concatWith = flowable.concatWith(Flowable.fromIterable(list));
        Accumulating<U, G, A, T> accumulating = this.accumulating;
        Objects.requireNonNull(accumulating);
        Function function = accumulating::classify;
        BiFunction biFunction = (l, obj) -> {
            if (z || l.longValue() != 0) {
                return this.accumulating.createAccumulator(obj);
            }
            return null;
        };
        Accumulating<U, G, A, T> accumulating2 = this.accumulating;
        Objects.requireNonNull(accumulating2);
        return concatWith.lift(FlowableOperatorSequentialGroupBy.create(function, biFunction, accumulating2::accumulate)).compose(flowable2 -> {
            return z ? flowable2 : flowable2.skip(1L);
        }).map(entry -> {
            return this.accumulating.accumulatedValue(entry.getValue());
        });
    }

    protected InputStream effectiveInputStream(InputStream inputStream) {
        return inputStream;
    }

    protected InputStream effectiveInputStreamSupp(org.aksw.commons.io.seekable.api.Seekable seekable) {
        return new SequenceInputStream(new ByteArrayInputStream(this.preambleBytes), effectiveInputStream(Channels.newInputStream((ReadableByteChannel) seekable.cloneObject())));
    }

    private static long getPos(org.aksw.commons.io.seekable.api.Seekable seekable) {
        try {
            return seekable.getPos();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected Flowable<U> parseFromSeekable(org.aksw.commons.io.seekable.api.Seekable seekable) {
        return parse(() -> {
            return effectiveInputStreamSupp(seekable);
        }, getPos(seekable));
    }

    protected boolean prober(org.aksw.commons.io.seekable.api.Seekable seekable) {
        try {
            seekable.getPos();
            return ((Long) parseFromSeekable(seekable).take((long) this.probeRecordCount).count().onErrorReturnItem(-1L).blockingGet()).longValue() > 0;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v150, types: [java.io.InputStream] */
    /* JADX WARN: Type inference failed for: r0v158, types: [java.io.InputStream] */
    protected Flowable<T> createRecordFlow() throws IOException {
        InputStream boundedInputStream;
        InputStream boundedInputStream2;
        ByteArrayInputStream byteArrayInputStream;
        String name = this.split.getPath().getName();
        long j = this.splitStart;
        long j2 = this.splitLength;
        String str = name + ":" + j + "+" + name;
        long j3 = (2 + this.probeRecordCount) * this.maxRecordLength;
        long longValue = setStreamToInterval(this.splitEnd, this.splitEnd + j3).getKey().longValue();
        BufferFromInputStream create = BufferFromInputStream.create(new BoundedInputStream(this.stream, j3), 1048576, new int[0]);
        org.aksw.commons.io.seekable.api.Seekable newChannel = create.newChannel();
        StopWatch createStarted = StopWatch.createStarted();
        long skipToNthRecord = skipToNthRecord(2, newChannel, 0L, 0L, this.maxRecordLength, j3, l -> {
            return true;
        }, this::prober);
        long knownDataSize = skipToNthRecord < 0 ? create.getKnownDataSize() : Ints.checkedCast(skipToNthRecord);
        newChannel.setPos(knownDataSize);
        Flowable<U> parseFromSeekable = parseFromSeekable(newChannel);
        Accumulating<U, G, A, T> accumulating = this.accumulating;
        Objects.requireNonNull(accumulating);
        List<U> list = (List) parseFromSeekable.lift(FlowableOperatorSequentialGroupBy.create(accumulating::classify, () -> {
            return new ArrayList();
        }, (v0, v1) -> {
            v0.add(v1);
        })).map((v0) -> {
            return v0.getValue();
        }).first(Collections.emptyList()).blockingGet();
        logger.info(String.format("In split %s got %d tail items at pos %d within %d bytes read in %d ms", str, Integer.valueOf(list.size()), Long.valueOf(longValue), Long.valueOf(knownDataSize), Long.valueOf(createStarted.getTime(TimeUnit.MILLISECONDS))));
        long[] jArr = new long[1];
        jArr[0] = this.isEncoded ? -1L : this.splitLength;
        long longValue2 = setStreamToInterval(this.splitStart, longValue).getKey().longValue();
        Predicate<Long> predicate = l2 -> {
            return jArr[0] < 0 || l2.longValue() < jArr[0];
        };
        BufferFromInputStream create2 = BufferFromInputStream.create(new BoundedInputStream(Channels.newInputStream(new ReadableByteChannelWithConditionalBound(new ReadableByteChannelFromInputStream(this.stream), readableByteChannelWithConditionalBound -> {
            if (jArr[0] >= 0 || !didHitSplitBound(this.stream, longValue)) {
                return false;
            }
            jArr[0] = readableByteChannelWithConditionalBound.getBytesRead();
            logger.info("Head stream encountered split end; decoded data length = " + jArr[0]);
            return false;
        })), j3), 1048576, new int[0]);
        org.aksw.commons.io.seekable.api.Seekable newChannel2 = create2.newChannel();
        StopWatch createStarted2 = StopWatch.createStarted();
        int checkedCast = this.isFirstSplit ? 0 : Ints.checkedCast(skipToNthRecord(2, newChannel2, 0L, 0L, this.maxRecordLength, j3, predicate, this::prober));
        logger.info(String.format("In split %s found head record at pos %d with %d bytes read in %d ms", str, Integer.valueOf(checkedCast), Long.valueOf(create2.getKnownDataSize()), Long.valueOf(createStarted2.getTime(TimeUnit.MILLISECONDS))));
        logger.debug(String.format("Adjusted [%d, %d) to [%d, %d) | delta: (%d, %d)", Long.valueOf(this.splitStart), Long.valueOf(this.splitEnd), Long.valueOf(longValue2), Long.valueOf(longValue), Long.valueOf(longValue2 - this.splitStart), Long.valueOf(longValue - this.splitEnd)));
        InputStream newInputStream = Channels.newInputStream(new ReadableByteChannelWithConditionalBound(new ReadableByteChannelFromInputStream(this.stream), readableByteChannelWithConditionalBound2 -> {
            return didHitSplitBound(this.stream, longValue);
        }));
        ByteArrayInputStream byteArrayInputStream2 = new ByteArrayInputStream(this.preambleBytes);
        if (checkedCast < 0) {
            logger.warn(String.format("No data found in split starting at %d. Possible reasons: (1) A large record spanning this split, (2) Syntax error(s) in the data,(3) Some bug in the implementation", Long.valueOf(this.splitStart)));
            boundedInputStream = new ByteArrayInputStream(EMPTY_BYTE_ARRAY);
            newInputStream = new ByteArrayInputStream(EMPTY_BYTE_ARRAY);
            boundedInputStream2 = new ByteArrayInputStream(EMPTY_BYTE_ARRAY);
            byteArrayInputStream = new ByteArrayInputStream(EMPTY_BYTE_ARRAY);
        } else {
            long knownDataSize2 = jArr[0] < 0 ? create2.getKnownDataSize() : jArr[0];
            org.aksw.commons.io.seekable.api.Seekable newChannel3 = create2.newChannel();
            if (checkedCast != 0) {
                newChannel3.nextPos(checkedCast);
            }
            boundedInputStream = new BoundedInputStream(Channels.newInputStream((ReadableByteChannel) newChannel3), knownDataSize2 - checkedCast);
            if (!this.isFirstSplit) {
                boundedInputStream = effectiveInputStream(boundedInputStream);
            }
            if (!this.isEncoded && longValue2 + create2.getKnownDataSize() != this.stream.getPos()) {
                long knownDataSize3 = create2.getKnownDataSize();
                this.stream.getPos();
                RuntimeException runtimeException = new RuntimeException("Expected body offset does not match actual one: adjustedSplitStart = " + longValue2 + " known head buffer size = " + runtimeException + ", expected body offset = " + knownDataSize3 + ", actual body offset = " + runtimeException);
                throw runtimeException;
            }
            int i = this.isEncoded ? 1 : 0;
            org.aksw.commons.io.seekable.api.Seekable newChannel4 = create.newChannel();
            newChannel4.nextPos(i);
            boundedInputStream2 = new BoundedInputStream(Channels.newInputStream((ReadableByteChannel) newChannel4), knownDataSize - i);
            byteArrayInputStream = skipToNthRecord < 0 ? new ByteArrayInputStream(EMPTY_BYTE_ARRAY) : new ByteArrayInputStream(this.postambleBytes);
        }
        if (0 != 0) {
            java.nio.file.Path absolutePath = SystemUtils.getJavaIoTmpDir().toPath().toAbsolutePath();
            Logger logger2 = logger;
            logger2.info("Writing segment " + name + " " + this.splitStart + " to " + logger2);
            java.nio.file.Path resolve = absolutePath.resolve(name + "_" + this.splitStart + ".preamble.dat");
            java.nio.file.Path resolve2 = absolutePath.resolve(name + "_" + this.splitStart + ".head.dat");
            java.nio.file.Path resolve3 = absolutePath.resolve(name + "_" + this.splitStart + ".body.dat");
            java.nio.file.Path resolve4 = absolutePath.resolve(name + "_" + this.splitStart + ".tail.dat");
            java.nio.file.Path resolve5 = absolutePath.resolve(name + "_" + this.splitStart + ".postamble.dat");
            Files.copy(byteArrayInputStream2, resolve, new CopyOption[0]);
            Files.copy(boundedInputStream, resolve2, new CopyOption[0]);
            Files.copy(newInputStream, resolve3, new CopyOption[0]);
            Files.copy(boundedInputStream2, resolve4, new CopyOption[0]);
            Files.copy(byteArrayInputStream, resolve5, new CopyOption[0]);
            byteArrayInputStream2 = Files.newInputStream(resolve, StandardOpenOption.READ);
            boundedInputStream = Files.newInputStream(resolve2, StandardOpenOption.READ);
            newInputStream = Files.newInputStream(resolve3, StandardOpenOption.READ);
            boundedInputStream2 = Files.newInputStream(resolve4, StandardOpenOption.READ);
            byteArrayInputStream = Files.newInputStream(resolve5, StandardOpenOption.READ);
        }
        InputStream wrap = InputStreamWithCloseLogging.wrap(new SequenceInputStream(Collections.enumeration(Arrays.asList(byteArrayInputStream2, boundedInputStream, newInputStream, boundedInputStream2, byteArrayInputStream))), ExceptionUtils::getStackTrace, RecordReaderGenericBase::logClose);
        return checkedCast >= 0 ? aggregate(this.isFirstSplit, parse(() -> {
            return wrap;
        }, checkedCast), list) : Flowable.empty();
    }

    public static Stream<String> lines(org.aksw.commons.io.seekable.api.Seekable seekable) {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(Channels.newInputStream((ReadableByteChannel) seekable.cloneObject())));
        return (Stream) bufferedReader.lines().onClose(() -> {
            IOUtils.closeQuietly(bufferedReader);
        });
    }

    public static long findNextRecord(Pattern pattern, org.aksw.commons.io.seekable.api.Seekable seekable, long j, long j2, long j3, long j4, Predicate<Long> predicate, Predicate<org.aksw.commons.io.seekable.api.Seekable> predicate2) throws IOException {
        int checkedCast = Ints.checkedCast(Math.min(j2 + j3, j4) - j2);
        long j5 = j4 - j2;
        org.aksw.commons.io.seekable.api.Seekable cloneObject = seekable.cloneObject();
        cloneObject.setPos(j2 - j);
        Matcher matcher = pattern.matcher(new CharSequenceFromSeekable(cloneObject));
        matcher.region(0, checkedCast);
        long findFirstPositionWithProbeSuccess = findFirstPositionWithProbeSuccess(cloneObject, predicate, matcher, true, predicate2);
        return findFirstPositionWithProbeSuccess >= 0 ? findFirstPositionWithProbeSuccess + j : -1L;
    }

    long skipToNthRecord(int i, org.aksw.commons.io.seekable.api.Seekable seekable, long j, long j2, long j3, long j4, Predicate<Long> predicate, Predicate<org.aksw.commons.io.seekable.api.Seekable> predicate2) throws IOException {
        long j5 = -1;
        long j6 = j4 - j2;
        long j7 = j2;
        int i2 = 0;
        while (true) {
            if (i2 >= i) {
                break;
            }
            long findNextRecord = findNextRecord(this.recordStartPattern, seekable, j, j7, j3, j4, predicate, predicate2);
            if (findNextRecord >= 0) {
                if (i2 + 1 == i) {
                    j5 = findNextRecord;
                } else {
                    j7 = findNextRecord + this.minRecordLength;
                }
                i2++;
            } else if (j6 >= j3) {
                Logger logger2 = logger;
                logger2.warn("Found no record start in a record search region of " + j3 + " bytes, although up to " + logger2 + " bytes were allowed for reading");
            } else {
                logger.warn("No more records found after pos " + (j + j7));
            }
        }
        return j5;
    }

    public static long findFirstPositionWithProbeSuccess(org.aksw.commons.io.seekable.api.Seekable seekable, Predicate<Long> predicate, Matcher matcher, boolean z, Predicate<org.aksw.commons.io.seekable.api.Seekable> predicate2) throws IOException {
        org.aksw.commons.io.seekable.api.Seekable cloneObject = seekable.cloneObject();
        long pos = cloneObject.getPos();
        boolean z2 = false;
        long j = -1;
        StopWatch createStarted = StopWatch.createStarted();
        while (true) {
            if (!matcher.find() || z2) {
                break;
            }
            int checkedCast = Ints.checkedCast(pos + (z ? matcher.start() : (-matcher.end()) + 1));
            if (!predicate.test(Long.valueOf(checkedCast))) {
                break;
            }
            cloneObject.setPos(checkedCast);
            z2 = cloneObject.isPosAfterEnd();
            if (!z2 && predicate2.test(cloneObject.cloneObject())) {
                j = checkedCast;
                break;
            }
            createStarted.reset();
            createStarted.start();
        }
        return j;
    }

    public boolean nextKeyValue() throws IOException {
        boolean z;
        if (this.datasetFlow == null) {
            initRecordFlow();
        }
        if (this.datasetFlow.hasNext()) {
            this.currentValue = this.datasetFlow.next();
            this.currentKey.incrementAndGet();
            z = this.currentValue != null;
        } else {
            if (this.raisedThrowable != null) {
                throw new RuntimeException(this.raisedThrowable);
            }
            z = false;
        }
        return z;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public LongWritable m1getCurrentKey() {
        return this.currentValue == null ? null : new LongWritable(this.currentKey.get());
    }

    public T getCurrentValue() {
        return this.currentValue;
    }

    public float getProgress() throws IOException {
        return this.splitStart == this.splitEnd ? 0.0f : Math.min(1.0f, ((float) (this.stream.getPos() - this.splitStart)) / ((float) (this.splitEnd - this.splitStart)));
    }

    public void close() throws IOException {
        try {
            if (this.rawStream != null) {
                this.rawStream.close();
                this.rawStream = null;
            }
        } finally {
            if (this.decompressor != null) {
                CodecPool.returnDecompressor(this.decompressor);
                this.decompressor = null;
            }
        }
    }
}
