/*
 * Decompiled with CFR 0.152.
 */
package net.sansa_stack.hadoop.core;

import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.sansa_stack.hadoop.util.DeferredSeekablePushbackInputStream;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.buffer.array.BufferOverReadableChannel;
import org.aksw.commons.io.hadoop.SeekableInputStream;
import org.aksw.commons.io.hadoop.SeekableInputStreams;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelWithConditionalBound;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannelBase;
import org.aksw.commons.io.input.SeekableReadableChannelSource;
import org.aksw.commons.io.input.SeekableReadableChannelWithLimit;
import org.aksw.commons.io.input.SeekableReadableChannels;
import org.aksw.commons.util.lock.LockUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Seekable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SeekableSourceOverSplit
implements SeekableReadableChannelSource<byte[]>,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(SeekableSourceOverSplit.class);
    protected ReadableChannel<byte[]> baseStream;
    protected BufferOverReadableChannel<byte[]> headBuffer;
    protected BufferOverReadableChannel<byte[]> tailBuffer;
    protected BufferOverReadableChannel<byte[]> postambleBuffer;
    protected SeekableReadableChannel<byte[]> debufferedHead;
    protected NavigableMap<Long, Integer> posToIndex = new TreeMap<Long, Integer>();
    protected NavigableMap<Long, Long> absPosToBlockOffset = null;

    @Override
    public void close() throws IOException {
        if (this.headBuffer.getDataSupplier() != null && this.headBuffer.getDataSupplier().isOpen()) {
            this.headBuffer.getDataSupplier().close();
        }
        if (this.tailBuffer.getDataSupplier() != null && this.tailBuffer.getDataSupplier().isOpen()) {
            this.tailBuffer.getDataSupplier().close();
        }
        if (this.postambleBuffer.getDataSupplier() != null && this.postambleBuffer.getDataSupplier().isOpen()) {
            this.postambleBuffer.getDataSupplier().close();
        }
        if (this.debufferedHead != null) {
            this.debufferedHead.close();
        }
        this.baseStream.close();
    }

    public long getBlockForPos(long pos) {
        Map.Entry<Long, Long> e = this.absPosToBlockOffset.floorEntry(pos);
        return e.getValue();
    }

    public long getKnownSize() {
        Map.Entry<Long, Integer> offsetAndBufferId = this.posToIndex.lastEntry();
        long bufferSize = this.getBufferByIndexUnsafe(offsetAndBufferId.getValue()).getKnownDataSize();
        long result = offsetAndBufferId.getKey() + bufferSize;
        return result;
    }

    public SeekableSourceOverSplit(ReadableChannel<byte[]> baseStream, BufferOverReadableChannel<byte[]> headBuffer, BufferOverReadableChannel<byte[]> tailBuffer, BufferOverReadableChannel<byte[]> postambleBuffer, NavigableMap<Long, Long> absPosToBlockOffset) {
        this.baseStream = baseStream;
        this.headBuffer = headBuffer;
        this.tailBuffer = tailBuffer;
        this.postambleBuffer = postambleBuffer;
        this.absPosToBlockOffset = absPosToBlockOffset;
        this.posToIndex.put(0L, 0);
    }

    public NavigableMap<Long, Long> getAbsPosToBlockOffset() {
        return this.absPosToBlockOffset;
    }

    protected BufferOverReadableChannel<byte[]> getBufferByBaseOffset(long baseOffset) {
        Integer index = (Integer)this.posToIndex.get(baseOffset);
        return this.getBufferByIndex(index);
    }

    protected BufferOverReadableChannel<byte[]> getBufferByIndex(int index) {
        if (index == 0 && this.debufferedHead != null) {
            throw new IllegalStateException("Should never be called if in debuffered state");
        }
        return this.getBufferByIndexUnsafe(index);
    }

    protected BufferOverReadableChannel<byte[]> getBufferByIndexUnsafe(int index) {
        return switch (index) {
            case 0 -> this.headBuffer;
            case 1 -> this.tailBuffer;
            case 2 -> this.postambleBuffer;
            default -> null;
        };
    }

    protected void setupTailBuffer() {
        Map.Entry e = this.posToIndex.descendingMap().entrySet().iterator().next();
        long currentOffset = (Long)e.getKey();
        int currentIndex = (Integer)e.getValue();
        if (currentIndex != 0) {
            throw new IllegalStateException("Method may only be called during reads from the head buffer");
        }
        int nextIndex = currentIndex + 1;
        BufferOverReadableChannel<byte[]> nextBuffer = this.getBufferByIndex(nextIndex);
        if (nextBuffer != null) {
            BufferOverReadableChannel<byte[]> currentBuffer = this.getBufferByIndex(currentIndex);
            boolean doSanityCheck = true;
            if (doSanityCheck && !currentBuffer.isDataSupplierConsumed()) {
                throw new IllegalStateException("Attempt to set up the next buffer although the current one has not been exhausted.");
            }
            long currentSize = currentBuffer.getKnownDataSize();
            long nextOffset = currentOffset + currentSize;
            this.posToIndex.put(nextOffset, nextIndex);
        }
    }

    public BufferOverReadableChannel<byte[]> getHeadBuffer() {
        return this.headBuffer;
    }

    public BufferOverReadableChannel<byte[]> getTailBuffer() {
        return this.tailBuffer;
    }

    public static SeekableSourceOverSplit createForNonEncodedStream(SeekableInputStream in, long splitPoint, byte[] postambleBytes) {
        SeekableReadableChannel baseStream = SeekableInputStreams.wrap((SeekableInputStream)in);
        SeekableReadableChannelWithLimit headStream = new SeekableReadableChannelWithLimit(SeekableReadableChannels.closeShield((SeekableReadableChannel)baseStream), splitPoint);
        return SeekableSourceOverSplit.create((ReadableChannel<byte[]>)baseStream, (ReadableChannel<byte[]>)headStream, postambleBytes, null);
    }

    public static SeekableSourceOverSplit createForBlockEncodedStream(final SeekableInputStream inn, long splitPoint, byte[] postambleBytes) {
        final TreeMap<Long, Long> absPosToBlockOffset = new TreeMap<Long, Long>();
        absPosToBlockOffset.put(0L, inn.position());
        if (logger.isDebugEnabled()) {
            logger.debug("Detected first block in encoded stream at offset: " + String.valueOf(absPosToBlockOffset));
        }
        DeferredSeekablePushbackInputStream in1 = new DeferredSeekablePushbackInputStream((InputStream)inn){
            protected long readCount;
            {
                super(in);
                this.readCount = 0L;
            }

            @Override
            protected int readInternal(byte[] b, int off, int len) throws IOException {
                long before = inn.position();
                int result = super.readInternal(b, off, len);
                long after = inn.position();
                if (after != before) {
                    absPosToBlockOffset.put(this.readCount, after);
                }
                if (result > 0) {
                    this.readCount += (long)result;
                }
                return result;
            }
        };
        SeekableReadableChannel baseStream = SeekableInputStreams.wrap((SeekableInputStream)SeekableInputStreams.create((InputStream)in1, (Seekable)in1));
        ReadableChannelWithConditionalBound headStream = new ReadableChannelWithConditionalBound((ReadableChannel)baseStream, self -> {
            boolean isEof;
            long pos = SeekableReadableChannels.position((SeekableReadableChannel)baseStream);
            boolean bl = isEof = pos >= splitPoint;
            if (isEof && logger.isDebugEnabled()) {
                logger.debug("Found first block after split " + splitPoint + " at " + pos);
            }
            return isEof;
        });
        return SeekableSourceOverSplit.create((ReadableChannel<byte[]>)baseStream, (ReadableChannel<byte[]>)ReadableChannels.closeShield((ReadableChannel)headStream), postambleBytes, absPosToBlockOffset);
    }

    protected static SeekableSourceOverSplit create(ReadableChannel<byte[]> baseStream, ReadableChannel<byte[]> headStream, byte[] postambleBytes, NavigableMap<Long, Long> blockOffsetToAbsPos) {
        BufferOverReadableChannel headBuffer = BufferOverReadableChannel.createForBytes(headStream, (int)8192);
        BufferOverReadableChannel tailBuffer = BufferOverReadableChannel.createForBytes(baseStream, (int)8192);
        BufferOverReadableChannel postambleBuffer = BufferOverReadableChannel.createForBytes((ReadableChannel)ReadableChannels.wrap((InputStream)new ByteArrayInputStream(postambleBytes)), (int)8192);
        return new SeekableSourceOverSplit(baseStream, (BufferOverReadableChannel<byte[]>)headBuffer, (BufferOverReadableChannel<byte[]>)tailBuffer, (BufferOverReadableChannel<byte[]>)postambleBuffer, blockOffsetToAbsPos);
    }

    public long getHeadSize() {
        long index = this.posToIndex.entrySet().stream().filter(e -> (Integer)e.getValue() == 1).map(Map.Entry::getKey).findFirst().orElseThrow(() -> new IllegalStateException("Head size not yet detected"));
        return index;
    }

    public Channel newReadableChannel() throws IOException {
        if (this.debufferedHead != null) {
            throw new RuntimeException("Already debuffered");
        }
        SeekableReadableChannel baseChannel = this.headBuffer.newReadableChannel();
        return new Channel((SeekableReadableChannel<byte[]>)baseChannel, 0L, -1L, null);
    }

    public long size() throws IOException {
        return this.headBuffer.getKnownDataSize() + this.tailBuffer.getKnownDataSize();
    }

    public ArrayOps<byte[]> getArrayOps() {
        return ArrayOps.BYTE;
    }

    class Channel
    extends SeekableReadableChannelBase<byte[]> {
        protected SeekableReadableChannel<byte[]> currentStream;
        protected long currentStreamOffset;
        protected long requestedPos;
        protected Runnable transitionAction;
        protected ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

        public Channel(SeekableReadableChannel<byte[]> currentStream, long currentStreamOffset, long requestedPos, Runnable transitionAction) {
            this.currentStream = currentStream;
            this.currentStreamOffset = currentStreamOffset;
            this.requestedPos = requestedPos;
            this.transitionAction = transitionAction;
        }

        public ReadWriteLock getReadWriteLock() {
            return this.rwl;
        }

        public boolean isHeadStream() {
            int streamId = (Integer)SeekableSourceOverSplit.this.posToIndex.get(this.currentStreamOffset);
            boolean result = streamId == 0 && SeekableSourceOverSplit.this.posToIndex.size() == 1;
            return result;
        }

        protected boolean isDebuffered() {
            return SeekableSourceOverSplit.this.debufferedHead != null;
        }

        public void debufferHead() {
            if (!this.rwl.isWriteLocked()) {
                throw new IllegalStateException("Debuffering requires the channel's write lock to be locked");
            }
            if (this.isDebuffered()) {
                throw new RuntimeException("Already debuffered");
            }
            if (this.isHeadStream()) {
                SeekableReadableChannel bufferChannel;
                long pos = this.position();
                long bufferSize = SeekableSourceOverSplit.this.headBuffer.getKnownDataSize();
                ReadableChannel headDataSupplier = SeekableSourceOverSplit.this.headBuffer.getDataSupplier();
                SeekableSourceOverSplit.this.headBuffer.setDataSupplier(null);
                try {
                    bufferChannel = pos < bufferSize ? SeekableSourceOverSplit.this.headBuffer.getBuffer().newReadableChannel(pos) : null;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                ReadableChannel debuffered = bufferChannel == null ? headDataSupplier : ReadableChannels.concat(Arrays.asList(bufferChannel, headDataSupplier));
                SeekableSourceOverSplit.this.debufferedHead = SeekableReadableChannels.wrapForwardSeekable((ReadableChannel)debuffered, (long)pos);
                IOUtils.closeQuietly(this.currentStream);
                this.currentStream = SeekableSourceOverSplit.this.debufferedHead;
            }
        }

        public SeekableReadableChannel<byte[]> cloneObject() {
            try {
                long pos = this.position();
                return new Channel((SeekableReadableChannel<byte[]>)this.currentStream.cloneObject(), this.currentStreamOffset, pos, this.transitionAction);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public long position() {
            long result = this.requestedPos >= 0L ? this.requestedPos : this.getInternalPosition();
            return result;
        }

        public void position(long pos) {
            this.requestedPos = pos;
        }

        protected void applyPosition() throws IOException {
            block5: {
                long knownDataSize;
                long nextRequestedBaseOffset;
                long requestedBaseOffset;
                long currentAbsPos = this.getInternalPosition();
                do {
                    requestedBaseOffset = SeekableSourceOverSplit.this.posToIndex.floorKey(this.requestedPos);
                    Integer requestedIndex = (Integer)SeekableSourceOverSplit.this.posToIndex.get(requestedBaseOffset);
                    long requiredAdditionalBytes = this.requestedPos - currentAbsPos;
                    long currentRelPos = this.requestedPos - requestedBaseOffset;
                    if (requestedIndex == 0 && this.isDebuffered()) {
                        this.currentStream.position(currentRelPos);
                        break block5;
                    }
                    BufferOverReadableChannel<byte[]> requestedBuffer = SeekableSourceOverSplit.this.getBufferByBaseOffset(requestedBaseOffset);
                    if (requestedBaseOffset != this.currentStreamOffset) {
                        this.currentStream.close();
                        this.currentStream = requestedBuffer.newReadableChannel();
                        this.currentStreamOffset = requestedBaseOffset;
                    }
                    if (requiredAdditionalBytes > 0L) {
                        requestedBuffer.loadFully(Ints.checkedCast((long)currentRelPos), true);
                    }
                    if (currentRelPos < (knownDataSize = requestedBuffer.getKnownDataSize()) || currentRelPos == knownDataSize && !requestedBuffer.isDataSupplierConsumed()) {
                        this.currentStream.position(currentRelPos);
                        break block5;
                    }
                    int currentStreamIdx = (Integer)SeekableSourceOverSplit.this.posToIndex.get(this.currentStreamOffset);
                    if (currentStreamIdx != 0 || !requestedBuffer.isDataSupplierConsumed()) continue;
                    SeekableSourceOverSplit.this.setupTailBuffer();
                } while (requestedBaseOffset != (nextRequestedBaseOffset = SeekableSourceOverSplit.this.posToIndex.floorKey(this.requestedPos).longValue()));
                this.currentStream.position(knownDataSize);
            }
            this.requestedPos = -1L;
        }

        protected long getInternalPosition() {
            long relativePos = SeekableReadableChannels.position(this.currentStream);
            long result = this.currentStreamOffset + relativePos;
            return result;
        }

        void setLimit(long newLimitPos) {
            int max = SeekableSourceOverSplit.this.posToIndex.values().stream().mapToInt(x -> x).max().orElse(-1);
            if (max != 1) {
                throw new IllegalStateException("Limit can only be set once and only if data has been read from the tail region");
            }
            SeekableSourceOverSplit.this.posToIndex.put(newLimitPos, 2);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int read(byte[] array, int position, int length) throws IOException {
            int result;
            if (length == 0) {
                result = 0;
            } else {
                ReentrantReadWriteLock.ReadLock readLock = this.rwl.readLock();
                readLock.lock();
                try {
                    while (true) {
                        int l;
                        if (this.requestedPos >= 0L) {
                            this.applyPosition();
                        }
                        if ((l = this.adjustLength(length)) <= 0) {
                            long p = this.position();
                            this.position(p);
                            continue;
                        }
                        result = this.currentStream.read((Object)array, position, l);
                        if (result != -1) break;
                        boolean ihs = this.isHeadStream();
                        SeekableReadableChannel<byte[]> cs = this.currentStream;
                        long currentSize = ihs && this.isDebuffered() ? this.currentStream.position() - this.currentStreamOffset : SeekableSourceOverSplit.this.getBufferByBaseOffset(this.currentStreamOffset).getKnownDataSize();
                        boolean exhaustedHeadStream = ihs;
                        long newPos = this.currentStreamOffset + currentSize;
                        this.position(newPos);
                        if (exhaustedHeadStream) {
                            SeekableSourceOverSplit.this.posToIndex.put(newPos, 1);
                        }
                        this.applyPosition();
                        if (this.currentStream == cs) {
                            break;
                        }
                        if (!exhaustedHeadStream) continue;
                        this.transition();
                    }
                }
                finally {
                    readLock.unlock();
                }
            }
            if (result == -1) {
                // empty if block
            }
            return result;
        }

        public int adjustLength(int length) {
            int l;
            Long nextStreamOffset = SeekableSourceOverSplit.this.posToIndex.higherKey(this.currentStreamOffset);
            if (nextStreamOffset == null) {
                l = length;
            } else {
                long p = this.position();
                long delta = nextStreamOffset - p;
                l = Math.min(length, Ints.saturatedCast((long)delta));
            }
            return l;
        }

        public ArrayOps<byte[]> getArrayOps() {
            return ArrayOps.BYTE;
        }

        protected void closeActual() throws Exception {
            LockUtils.runWithLock((Lock)this.rwl.writeLock(), () -> {
                this.currentStream.close();
                super.closeActual();
            });
        }

        public void setTransitionAction(Runnable transitionAction) {
            this.transitionAction = transitionAction;
        }

        protected void transition() {
            if (this.transitionAction != null) {
                this.transitionAction.run();
            }
        }

        SeekableSourceOverSplit getEnclosingInstance() {
            return SeekableSourceOverSplit.this;
        }
    }
}

