package org.aksw.commons.rx.cache.range;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.aksw.commons.util.range.RangeBuffer;
import org.aksw.commons.util.range.RangeUtils;
import org.aksw.commons.util.ref.RefFuture;
import org.aksw.commons.util.slot.Slot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/rx/cache/range/RequestIterator.class */
public class RequestIterator<T> extends AbstractIterator<T> {
    private static final Logger logger = LoggerFactory.getLogger(RequestIterator.class);
    protected SmartRangeCacheImpl<T> cache;
    protected Range<Long> requestRange;
    protected int maxReadAheadItemCount = 100;
    protected ConcurrentNavigableMap<Long, RefFuture<RangeBuffer<T>>> claimedPages = new ConcurrentSkipListMap();
    protected Map<RangeRequestExecutor<T>, Slot<Long>> workerToSlot = new HashMap();
    protected Iterator<T> currentPageIt = null;
    protected int currentIndex = -1;
    protected Runnable abortAction = null;
    protected boolean isAborted = false;
    protected long currentOffset;
    protected long claimAheadLength;
    protected long blockLength;
    protected long nextCheckpointOffset;

    public RequestIterator(SmartRangeCacheImpl<T> smartRangeCacheImpl, Range<Long> range) {
        this.nextCheckpointOffset = 0L;
        this.cache = smartRangeCacheImpl;
        this.requestRange = range;
        long longValue = ((Long) ContiguousSet.create(range, DiscreteDomain.longs()).first()).longValue();
        this.nextCheckpointOffset = longValue;
        this.currentOffset = longValue;
    }

    protected Range<Long> getClaimAheadRange() {
        return Range.closedOpen(Long.valueOf(this.currentOffset), Long.valueOf(this.currentOffset + this.claimAheadLength));
    }

    protected void init() {
        synchronized (this) {
            if (!this.isAborted) {
                this.abortAction = this.cache.register(this);
            }
        }
    }

    public void checkpoint(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        long j2 = this.nextCheckpointOffset;
        long j3 = j2 + j;
        Range closedOpen = Range.closedOpen(Long.valueOf(j2), Long.valueOf(j3));
        long pageSize = this.cache.getPageSize();
        long pageIdForOffset = this.cache.getPageIdForOffset(j2);
        long pageIdForOffset2 = this.cache.getPageIdForOffset(j3);
        ConcurrentNavigableMap<Long, RefFuture<RangeBuffer<T>>> headMap = this.claimedPages.headMap((ConcurrentNavigableMap<Long, RefFuture<RangeBuffer<T>>>) Long.valueOf(pageIdForOffset), false);
        headMap.values().forEach((v0) -> {
            v0.close();
        });
        headMap.clear();
        clearPassedSlots();
        long j4 = pageIdForOffset;
        while (true) {
            long j5 = j4;
            if (j5 > pageIdForOffset2) {
                break;
            }
            this.claimedPages.computeIfAbsent(Long.valueOf(j5), l -> {
                return this.cache.getPageForPageId(l.longValue());
            });
            j4 = j5 + 1;
        }
        NavigableMap navigableMap = (NavigableMap) LongStream.rangeClosed(pageIdForOffset, pageIdForOffset2).boxed().collect(Collectors.toMap(l2 -> {
            return l2;
        }, l3 -> {
            try {
                return (RangeBuffer) ((RefFuture) this.claimedPages.get(l3)).await();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, (rangeBuffer, rangeBuffer2) -> {
            throw new RuntimeException("should not happen");
        }, TreeMap::new));
        try {
            navigableMap.values().forEach(rangeBuffer3 -> {
                rangeBuffer3.getReadWriteLock().readLock().lock();
            });
            TreeRangeSet create = TreeRangeSet.create();
            Stream flatMap = navigableMap.entrySet().stream().flatMap(entry -> {
                return ((RangeBuffer) entry.getValue()).getLoadedRanges().asRanges().stream().map(range -> {
                    return RangeUtils.apply(range, Long.valueOf(((Long) entry.getKey()).longValue() * pageSize), (num, l4) -> {
                        return Long.valueOf(LongMath.saturatedAdd(num.intValue(), l4.longValue()));
                    });
                });
            });
            Objects.requireNonNull(create);
            flatMap.forEach(create::add);
            ArrayDeque arrayDeque = new ArrayDeque(RangeUtils.gaps(closedOpen, create).asRanges());
            this.cache.getExecutorCreationReadLock().lock();
            arrayList.addAll(this.cache.getExecutors());
            if (1 == 0) {
                throw new RuntimeException("not implemented");
            }
            while (!arrayDeque.isEmpty()) {
                Range range = (Range) arrayDeque.pollFirst();
                long j6 = j2;
                for (Map.Entry<RangeRequestExecutor<T>, Slot<Long>> entry2 : this.workerToSlot.entrySet()) {
                    RangeRequestExecutor<T> key = entry2.getKey();
                    Slot<Long> value = entry2.getValue();
                    if (key.getCurrentOffset() <= j6) {
                        key.getEndOffset();
                        long min = Math.min(key.getEndOffset(), j3);
                        Range closedOpen2 = Range.closedOpen(Long.valueOf(min), (Long) range.upperEndpoint());
                        if (!closedOpen2.isEmpty()) {
                            arrayDeque.addFirst(closedOpen2);
                        }
                        if (min > j6) {
                            j6 = min;
                            Long l4 = (Long) value.getSupplier().get();
                            value.set(Long.valueOf(min));
                            logger.debug("Updated slot " + l4 + " -> " + value.getSupplier().get());
                        }
                    }
                }
                if (j6 == j2) {
                    Map.Entry<RangeRequestExecutor<T>, Slot<Long>> newExecutor = this.cache.newExecutor(((Long) range.lowerEndpoint()).longValue(), ((Long) range.upperEndpoint()).longValue() - ((Long) range.lowerEndpoint()).longValue());
                    this.workerToSlot.put(newExecutor.getKey(), newExecutor.getValue());
                    arrayDeque.addFirst(range);
                }
            }
        } finally {
            navigableMap.values().forEach(rangeBuffer4 -> {
                rangeBuffer4.getReadWriteLock().readLock().unlock();
            });
            this.cache.getExecutorCreationReadLock().unlock();
            this.nextCheckpointOffset += j;
        }
    }

    public void clearPassedSlots() {
        Iterator<Slot<Long>> it = this.workerToSlot.values().iterator();
        while (it.hasNext()) {
            Slot<Long> next = it.next();
            if (((Long) next.getSupplier().get()).longValue() < this.currentOffset) {
                next.close();
                it.remove();
            }
        }
    }

    protected T computeNext() {
        Object endOfData;
        if (this.currentOffset == this.nextCheckpointOffset) {
            try {
                checkpoint(Math.min(this.maxReadAheadItemCount, Ints.saturatedCast(LongMath.saturatedAdd(((Long) ContiguousSet.create(this.requestRange, DiscreteDomain.longs()).last()).longValue() - this.currentOffset, 1L))));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (this.requestRange.contains(Long.valueOf(this.currentOffset))) {
            if (this.currentPageIt == null || !this.currentPageIt.hasNext()) {
                try {
                    RangeBuffer rangeBuffer = (RangeBuffer) ((RefFuture) this.claimedPages.get(Long.valueOf(this.cache.getPageIdForOffset(this.currentOffset)))).await();
                    this.currentIndex = this.cache.getIndexInPageForOffset(this.currentOffset);
                    this.currentPageIt = rangeBuffer.get(this.currentIndex);
                } catch (InterruptedException | ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            }
            if (this.currentPageIt.hasNext()) {
                endOfData = this.currentPageIt.next();
                this.currentIndex++;
                this.currentOffset++;
            } else {
                close();
                endOfData = endOfData();
            }
        } else {
            close();
            endOfData = endOfData();
        }
        return (T) endOfData;
    }

    public void close() {
        if (this.isAborted) {
            return;
        }
        synchronized (this) {
            if (!this.isAborted) {
                this.isAborted = true;
                this.claimedPages.values().forEach((v0) -> {
                    v0.close();
                });
                this.workerToSlot.values().forEach((v0) -> {
                    v0.close();
                });
                this.claimedPages.clear();
                this.workerToSlot.clear();
            }
        }
    }
}
