package net.sansa_stack.hadoop.core;

import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.sansa_stack.hadoop.util.DeferredSeekablePushbackInputStream;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.buffer.array.BufferOverReadableChannel;
import org.aksw.commons.io.hadoop.SeekableInputStream;
import org.aksw.commons.io.hadoop.SeekableInputStreams;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelWithConditionalBound;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannelBase;
import org.aksw.commons.io.input.SeekableReadableChannelSource;
import org.aksw.commons.io.input.SeekableReadableChannelWithLimit;
import org.aksw.commons.io.input.SeekableReadableChannels;
import org.aksw.commons.util.lock.LockUtils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sansa_stack/hadoop/core/SeekableSourceOverSplit.class */
public class SeekableSourceOverSplit implements SeekableReadableChannelSource<byte[]>, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(SeekableSourceOverSplit.class);
    protected ReadableChannel<byte[]> baseStream;
    protected BufferOverReadableChannel<byte[]> headBuffer;
    protected BufferOverReadableChannel<byte[]> tailBuffer;
    protected BufferOverReadableChannel<byte[]> postambleBuffer;
    protected SeekableReadableChannel<byte[]> debufferedHead;
    protected NavigableMap<Long, Integer> posToIndex = new TreeMap();
    protected NavigableMap<Long, Long> absPosToBlockOffset;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/sansa_stack/hadoop/core/SeekableSourceOverSplit$Channel.class */
    public class Channel extends SeekableReadableChannelBase<byte[]> {
        protected SeekableReadableChannel<byte[]> currentStream;
        protected long currentStreamOffset;
        protected long requestedPos;
        protected Runnable transitionAction;
        protected ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

        public Channel(SeekableReadableChannel<byte[]> seekableReadableChannel, long j, long j2, Runnable runnable) {
            this.currentStream = seekableReadableChannel;
            this.currentStreamOffset = j;
            this.requestedPos = j2;
            this.transitionAction = runnable;
        }

        public ReadWriteLock getReadWriteLock() {
            return this.rwl;
        }

        public boolean isHeadStream() {
            return ((Integer) SeekableSourceOverSplit.this.posToIndex.get(Long.valueOf(this.currentStreamOffset))).intValue() == 0 && SeekableSourceOverSplit.this.posToIndex.size() == 1;
        }

        protected boolean isDebuffered() {
            return SeekableSourceOverSplit.this.debufferedHead != null;
        }

        public void debufferHead() {
            SeekableReadableChannel newReadableChannel;
            if (!this.rwl.isWriteLocked()) {
                throw new IllegalStateException("Debuffering requires the channel's write lock to be locked");
            }
            if (isDebuffered()) {
                throw new RuntimeException("Already debuffered");
            }
            if (isHeadStream()) {
                long position = position();
                long knownDataSize = SeekableSourceOverSplit.this.headBuffer.getKnownDataSize();
                ReadableChannel dataSupplier = SeekableSourceOverSplit.this.headBuffer.getDataSupplier();
                SeekableSourceOverSplit.this.headBuffer.setDataSupplier((ReadableChannel) null);
                if (position < knownDataSize) {
                    try {
                        newReadableChannel = SeekableSourceOverSplit.this.headBuffer.getBuffer().newReadableChannel(position);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } else {
                    newReadableChannel = null;
                }
                SeekableReadableChannel seekableReadableChannel = newReadableChannel;
                SeekableSourceOverSplit.this.debufferedHead = SeekableReadableChannels.wrapForwardSeekable(seekableReadableChannel == null ? dataSupplier : ReadableChannels.concat(Arrays.asList(seekableReadableChannel, dataSupplier)), position);
                IOUtils.closeQuietly(this.currentStream);
                this.currentStream = SeekableSourceOverSplit.this.debufferedHead;
            }
        }

        public SeekableReadableChannel<byte[]> cloneObject() {
            try {
                return new Channel(this.currentStream.cloneObject(), this.currentStreamOffset, position(), this.transitionAction);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public long position() {
            return this.requestedPos >= 0 ? this.requestedPos : getInternalPosition();
        }

        public void position(long j) {
            this.requestedPos = j;
        }

        protected void applyPosition() throws IOException {
            long j;
            long internalPosition = getInternalPosition();
            while (true) {
                long longValue = SeekableSourceOverSplit.this.posToIndex.floorKey(Long.valueOf(this.requestedPos)).longValue();
                Integer num = (Integer) SeekableSourceOverSplit.this.posToIndex.get(Long.valueOf(longValue));
                long j2 = this.requestedPos - internalPosition;
                j = this.requestedPos - longValue;
                if (num.intValue() == 0 && isDebuffered()) {
                    this.currentStream.position(j);
                    break;
                }
                BufferOverReadableChannel<byte[]> bufferByBaseOffset = SeekableSourceOverSplit.this.getBufferByBaseOffset(longValue);
                if (longValue != this.currentStreamOffset) {
                    this.currentStream.close();
                    this.currentStream = bufferByBaseOffset.newReadableChannel();
                    this.currentStreamOffset = longValue;
                }
                if (j2 > 0) {
                    bufferByBaseOffset.loadFully(Ints.checkedCast(j), true);
                }
                long knownDataSize = bufferByBaseOffset.getKnownDataSize();
                if (j < knownDataSize || (j == knownDataSize && !bufferByBaseOffset.isDataSupplierConsumed())) {
                    break;
                }
                if (((Integer) SeekableSourceOverSplit.this.posToIndex.get(Long.valueOf(this.currentStreamOffset))).intValue() == 0 && bufferByBaseOffset.isDataSupplierConsumed()) {
                    SeekableSourceOverSplit.this.setupTailBuffer();
                }
                if (longValue == SeekableSourceOverSplit.this.posToIndex.floorKey(Long.valueOf(this.requestedPos)).longValue()) {
                    this.currentStream.position(knownDataSize);
                    break;
                }
            }
            this.currentStream.position(j);
            this.requestedPos = -1L;
        }

        protected long getInternalPosition() {
            return this.currentStreamOffset + SeekableReadableChannels.position(this.currentStream);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setLimit(long j) {
            if (SeekableSourceOverSplit.this.posToIndex.values().stream().mapToInt(num -> {
                return num.intValue();
            }).max().orElse(-1) != 1) {
                throw new IllegalStateException("Limit can only be set once and only if data has been read from the tail region");
            }
            SeekableSourceOverSplit.this.posToIndex.put(Long.valueOf(j), 2);
        }

        public int read(byte[] bArr, int i, int i2) throws IOException {
            int read;
            if (i2 == 0) {
                read = 0;
            } else {
                ReentrantReadWriteLock.ReadLock readLock = this.rwl.readLock();
                readLock.lock();
                while (true) {
                    try {
                        if (this.requestedPos >= 0) {
                            applyPosition();
                        }
                        int adjustLength = adjustLength(i2);
                        if (adjustLength > 0) {
                            read = this.currentStream.read(bArr, i, adjustLength);
                            if (read != -1) {
                                break;
                            }
                            boolean isHeadStream = isHeadStream();
                            SeekableReadableChannel<byte[]> seekableReadableChannel = this.currentStream;
                            long position = this.currentStreamOffset + ((isHeadStream && isDebuffered()) ? this.currentStream.position() - this.currentStreamOffset : SeekableSourceOverSplit.this.getBufferByBaseOffset(this.currentStreamOffset).getKnownDataSize());
                            position(position);
                            if (isHeadStream) {
                                SeekableSourceOverSplit.this.posToIndex.put(Long.valueOf(position), 1);
                            }
                            applyPosition();
                            if (this.currentStream == seekableReadableChannel) {
                                break;
                            }
                            if (isHeadStream) {
                                transition();
                            }
                        } else {
                            position(position());
                        }
                    } finally {
                        readLock.unlock();
                    }
                }
            }
            if (read == -1) {
            }
            return read;
        }

        public int adjustLength(int i) {
            int min;
            Long higherKey = SeekableSourceOverSplit.this.posToIndex.higherKey(Long.valueOf(this.currentStreamOffset));
            if (higherKey == null) {
                min = i;
            } else {
                min = Math.min(i, Ints.saturatedCast(higherKey.longValue() - position()));
            }
            return min;
        }

        public ArrayOps<byte[]> getArrayOps() {
            return ArrayOps.BYTE;
        }

        protected void closeActual() throws Exception {
            LockUtils.runWithLock(this.rwl.writeLock(), () -> {
                this.currentStream.close();
                super.closeActual();
            });
        }

        public void setTransitionAction(Runnable runnable) {
            this.transitionAction = runnable;
        }

        protected void transition() {
            if (this.transitionAction != null) {
                this.transitionAction.run();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SeekableSourceOverSplit getEnclosingInstance() {
            return SeekableSourceOverSplit.this;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.headBuffer.getDataSupplier() != null && this.headBuffer.getDataSupplier().isOpen()) {
            this.headBuffer.getDataSupplier().close();
        }
        if (this.tailBuffer.getDataSupplier() != null && this.tailBuffer.getDataSupplier().isOpen()) {
            this.tailBuffer.getDataSupplier().close();
        }
        if (this.postambleBuffer.getDataSupplier() != null && this.postambleBuffer.getDataSupplier().isOpen()) {
            this.postambleBuffer.getDataSupplier().close();
        }
        if (this.debufferedHead != null) {
            this.debufferedHead.close();
        }
        this.baseStream.close();
    }

    public long getBlockForPos(long j) {
        return this.absPosToBlockOffset.floorEntry(Long.valueOf(j)).getValue().longValue();
    }

    public long getKnownSize() {
        Map.Entry<Long, Integer> lastEntry = this.posToIndex.lastEntry();
        return lastEntry.getKey().longValue() + getBufferByIndexUnsafe(lastEntry.getValue().intValue()).getKnownDataSize();
    }

    public SeekableSourceOverSplit(ReadableChannel<byte[]> readableChannel, BufferOverReadableChannel<byte[]> bufferOverReadableChannel, BufferOverReadableChannel<byte[]> bufferOverReadableChannel2, BufferOverReadableChannel<byte[]> bufferOverReadableChannel3, NavigableMap<Long, Long> navigableMap) {
        this.absPosToBlockOffset = null;
        this.baseStream = readableChannel;
        this.headBuffer = bufferOverReadableChannel;
        this.tailBuffer = bufferOverReadableChannel2;
        this.postambleBuffer = bufferOverReadableChannel3;
        this.absPosToBlockOffset = navigableMap;
        this.posToIndex.put(0L, 0);
    }

    public NavigableMap<Long, Long> getAbsPosToBlockOffset() {
        return this.absPosToBlockOffset;
    }

    protected BufferOverReadableChannel<byte[]> getBufferByBaseOffset(long j) {
        return getBufferByIndex(((Integer) this.posToIndex.get(Long.valueOf(j))).intValue());
    }

    protected BufferOverReadableChannel<byte[]> getBufferByIndex(int i) {
        if (i != 0 || this.debufferedHead == null) {
            return getBufferByIndexUnsafe(i);
        }
        throw new IllegalStateException("Should never be called if in debuffered state");
    }

    protected BufferOverReadableChannel<byte[]> getBufferByIndexUnsafe(int i) {
        BufferOverReadableChannel<byte[]> bufferOverReadableChannel;
        switch (i) {
            case 0:
                bufferOverReadableChannel = this.headBuffer;
                break;
            case 1:
                bufferOverReadableChannel = this.tailBuffer;
                break;
            case 2:
                bufferOverReadableChannel = this.postambleBuffer;
                break;
            default:
                bufferOverReadableChannel = null;
                break;
        }
        return bufferOverReadableChannel;
    }

    protected void setupTailBuffer() {
        Map.Entry<Long, Integer> next = this.posToIndex.descendingMap().entrySet().iterator().next();
        long longValue = next.getKey().longValue();
        int intValue = next.getValue().intValue();
        if (intValue != 0) {
            throw new IllegalStateException("Method may only be called during reads from the head buffer");
        }
        int i = intValue + 1;
        if (getBufferByIndex(i) != null) {
            BufferOverReadableChannel<byte[]> bufferByIndex = getBufferByIndex(intValue);
            if (1 != 0 && !bufferByIndex.isDataSupplierConsumed()) {
                throw new IllegalStateException("Attempt to set up the next buffer although the current one has not been exhausted.");
            }
            this.posToIndex.put(Long.valueOf(longValue + bufferByIndex.getKnownDataSize()), Integer.valueOf(i));
        }
    }

    public BufferOverReadableChannel<byte[]> getHeadBuffer() {
        return this.headBuffer;
    }

    public BufferOverReadableChannel<byte[]> getTailBuffer() {
        return this.tailBuffer;
    }

    public static SeekableSourceOverSplit createForNonEncodedStream(SeekableInputStream seekableInputStream, long j, byte[] bArr) {
        SeekableReadableChannel wrap = SeekableInputStreams.wrap(seekableInputStream);
        return create(wrap, new SeekableReadableChannelWithLimit(SeekableReadableChannels.closeShield(wrap), j), bArr, null);
    }

    public static SeekableSourceOverSplit createForBlockEncodedStream(final SeekableInputStream seekableInputStream, long j, byte[] bArr) {
        final TreeMap treeMap = new TreeMap();
        treeMap.put(0L, Long.valueOf(seekableInputStream.position()));
        if (logger.isDebugEnabled()) {
            logger.debug("Detected first block in encoded stream at offset: " + treeMap);
        }
        DeferredSeekablePushbackInputStream deferredSeekablePushbackInputStream = new DeferredSeekablePushbackInputStream(seekableInputStream) { // from class: net.sansa_stack.hadoop.core.SeekableSourceOverSplit.1
            protected long readCount = 0;

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // net.sansa_stack.hadoop.util.DeferredSeekablePushbackInputStream
            public int readInternal(byte[] bArr2, int i, int i2) throws IOException {
                long position = seekableInputStream.position();
                int readInternal = super.readInternal(bArr2, i, i2);
                long position2 = seekableInputStream.position();
                if (position2 != position) {
                    treeMap.put(Long.valueOf(this.readCount), Long.valueOf(position2));
                }
                if (readInternal > 0) {
                    this.readCount += readInternal;
                }
                return readInternal;
            }
        };
        SeekableReadableChannel wrap = SeekableInputStreams.wrap(SeekableInputStreams.create(deferredSeekablePushbackInputStream, deferredSeekablePushbackInputStream));
        return create(wrap, ReadableChannels.closeShield(new ReadableChannelWithConditionalBound(wrap, readableChannelWithConditionalBound -> {
            boolean z = SeekableReadableChannels.position(wrap) >= j;
            if (z && logger.isDebugEnabled()) {
                Logger logger2 = logger;
                logger2.debug("Found first block after split " + j + " at " + logger2);
            }
            return z;
        })), bArr, treeMap);
    }

    protected static SeekableSourceOverSplit create(ReadableChannel<byte[]> readableChannel, ReadableChannel<byte[]> readableChannel2, byte[] bArr, NavigableMap<Long, Long> navigableMap) {
        return new SeekableSourceOverSplit(readableChannel, BufferOverReadableChannel.createForBytes(readableChannel2, 8192), BufferOverReadableChannel.createForBytes(readableChannel, 8192), BufferOverReadableChannel.createForBytes(ReadableChannels.wrap(new ByteArrayInputStream(bArr)), 8192), navigableMap);
    }

    public long getHeadSize() {
        return ((Long) this.posToIndex.entrySet().stream().filter(entry -> {
            return ((Integer) entry.getValue()).intValue() == 1;
        }).map((v0) -> {
            return v0.getKey();
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException("Head size not yet detected");
        })).longValue();
    }

    /* renamed from: newReadableChannel, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public Channel m6newReadableChannel() throws IOException {
        if (this.debufferedHead != null) {
            throw new RuntimeException("Already debuffered");
        }
        return new Channel(this.headBuffer.newReadableChannel(), 0L, -1L, null);
    }

    public long size() throws IOException {
        return this.headBuffer.getKnownDataSize() + this.tailBuffer.getKnownDataSize();
    }

    public ArrayOps<byte[]> getArrayOps() {
        return ArrayOps.BYTE;
    }
}
