package org.aksw.commons.io.cache;

import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.LongSupplier;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.input.DataStream;
import org.aksw.commons.io.slice.Slice;
import org.aksw.commons.io.slice.SliceAccessor;
import org.aksw.commons.util.closeable.AutoCloseableWithLeakDetectionBase;
import org.aksw.commons.util.lock.LockUtils;
import org.aksw.commons.util.range.RangeUtils;
import org.aksw.commons.util.slot.Slot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/io/cache/DataStreamOverSliceWithCache.class */
public class DataStreamOverSliceWithCache<A> extends AutoCloseableWithLeakDetectionBase implements DataStream<A> {
    private static final Logger logger = LoggerFactory.getLogger(DataStreamOverSliceWithCache.class);
    protected Slice<A> slice;
    protected AdvancedRangeCacheImpl<A> cache;
    protected SliceAccessor<A> pageRange;
    protected Range<Long> requestRange;
    protected long nextCheckpointOffset;
    protected long currentOffset;
    protected int maxReadAheadItemCount = 100;
    protected Map<RangeRequestWorkerImpl<A>, Slot<Long>> workerToSlot = new IdentityHashMap();
    protected long maxRedundantFetchSize = 1000;

    public DataStreamOverSliceWithCache(AdvancedRangeCacheImpl<A> advancedRangeCacheImpl, Range<Long> range) {
        this.requestRange = range;
        this.cache = advancedRangeCacheImpl;
        this.slice = advancedRangeCacheImpl.getSlice();
        this.pageRange = this.slice.newSliceAccessor();
        this.currentOffset = ((Long) ContiguousSet.create(range, DiscreteDomain.longs()).first()).longValue();
        this.nextCheckpointOffset = this.currentOffset;
    }

    public long getNextCheckpointOffset() {
        return this.nextCheckpointOffset;
    }

    public SliceAccessor<A> getPageRange() {
        return this.pageRange;
    }

    protected void checkpoint(long j) {
        Preconditions.checkArgument(j >= 0, "Argument must not be negative");
        long j2 = this.nextCheckpointOffset;
        long j3 = j2 + j;
        Range closedOpen = Range.closedOpen(Long.valueOf(j2), Long.valueOf(j3));
        LockUtils.runWithLock(this.cache.getExecutorCreationReadLock(), () -> {
            LockUtils.runWithLock(this.slice.getReadWriteLock().readLock(), () -> {
                processGaps(this.slice.getGaps(closedOpen), j2, j3);
            });
            this.nextCheckpointOffset = j3;
        });
        clearPassedSlots();
    }

    public DataStreamOverSliceWithCache(AdvancedRangeCacheImpl<A> advancedRangeCacheImpl, long j, LongSupplier longSupplier) {
        this.cache = advancedRangeCacheImpl;
        this.slice = advancedRangeCacheImpl.getSlice();
        this.pageRange = this.slice.newSliceAccessor();
        this.nextCheckpointOffset = j;
    }

    public void clearPassedSlots() {
        Iterator<Slot<Long>> it = this.workerToSlot.values().iterator();
        while (it.hasNext()) {
            Slot<Long> next = it.next();
            if (((Long) next.getSupplier().get()).longValue() < this.currentOffset) {
                logger.debug("Clearing slot for offset " + next.getSupplier().get() + " because current offset " + this.currentOffset + " is higher");
                next.close();
                it.remove();
            }
        }
    }

    protected void scheduleWorkerToGaps(RangeSet<Long> rangeSet) {
        Long l;
        HashMap hashMap = new HashMap();
        TreeMap treeMap = new TreeMap();
        for (RangeRequestWorkerImpl<A> rangeRequestWorkerImpl : this.cache.getExecutors()) {
            long currentOffset = rangeRequestWorkerImpl.getCurrentOffset();
            long endOffset = rangeRequestWorkerImpl.getEndOffset();
            if (currentOffset != endOffset && ((l = (Long) treeMap.get(Long.valueOf(currentOffset))) == null || l.longValue() < endOffset)) {
                hashMap.put(Long.valueOf(currentOffset), rangeRequestWorkerImpl);
                treeMap.put(Long.valueOf(currentOffset), Long.valueOf(endOffset));
            }
        }
        for (Map.Entry entry : RangeUtils.scheduleRangeSupply(treeMap, rangeSet, this.maxRedundantFetchSize, this.cache.requestLimit).entrySet()) {
            long longValue = ((Long) entry.getKey()).longValue();
            long longValue2 = ((Long) entry.getValue()).longValue();
            long j = longValue2 - longValue;
            Map.Entry floorEntry = treeMap.floorEntry(Long.valueOf(longValue));
            Long l2 = (floorEntry == null || ((Long) floorEntry.getValue()).longValue() < longValue2) ? null : (Long) floorEntry.getKey();
            RangeRequestWorkerImpl<A> rangeRequestWorkerImpl2 = l2 == null ? null : (RangeRequestWorkerImpl) hashMap.get(l2);
            if (rangeRequestWorkerImpl2 == null) {
                Map.Entry<RangeRequestWorkerImpl<A>, Slot<Long>> newExecutor = this.cache.newExecutor(longValue, j);
                this.workerToSlot.put(newExecutor.getKey(), newExecutor.getValue());
            } else {
                Slot<Long> slot = this.workerToSlot.get(rangeRequestWorkerImpl2);
                if (slot == null) {
                    slot = rangeRequestWorkerImpl2.newDemandSlot();
                    this.workerToSlot.put(rangeRequestWorkerImpl2, slot);
                }
                slot.set(Long.valueOf(longValue2));
            }
        }
    }

    protected void processGaps(RangeSet<Long> rangeSet, long j, long j2) {
        scheduleWorkerToGaps(rangeSet);
    }

    @Override // org.aksw.commons.io.input.DataStream
    public boolean isOpen() {
        return !this.isClosed;
    }

    protected void closeActual() {
        logger.debug("Releasing slots: " + this.workerToSlot);
        this.workerToSlot.values().forEach((v0) -> {
            v0.close();
        });
        this.workerToSlot.clear();
        this.pageRange.close();
    }

    /* JADX WARN: Code restructure failed: missing block: B:48:0x0245, code lost:
    
        r0.lock();
        r0.unlock();
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.aksw.commons.io.input.DataStream
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public int read(A r9, int r10, int r11) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 797
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.aksw.commons.io.cache.DataStreamOverSliceWithCache.read(java.lang.Object, int, int):int");
    }

    @Override // org.aksw.commons.io.buffer.array.HasArrayOps
    public ArrayOps<A> getArrayOps() {
        return this.slice.getArrayOps();
    }
}
