package org.aksw.commons.rx.cache.range;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Range;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import io.reactivex.rxjava3.disposables.Disposable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.aksw.commons.rx.range.RangedSupplier;
import org.aksw.commons.util.ref.Ref;
import org.aksw.commons.util.sink.BulkingSink;

/* loaded from: input_file:org/aksw/commons/rx/cache/range/RangeRequestExecutor.class */
public class RangeRequestExecutor<T> {
    protected SmartRangeCacheImpl<T> manager;
    protected Iterator<T> iterator;
    protected Disposable disposable;
    protected Ref<RangeBuffer<T>> currentPageRef;
    protected long requestOffset;
    protected Map<Object, Long> contextToEndpoint;
    protected long effectiveEndpoint;
    protected long requestLimit;
    protected RangedSupplier<Long, T> backend;
    protected long numItemsRead;
    protected long offset;
    protected long currentLimit;
    protected boolean terminateIfNoObserver;
    protected long terminationDelay;
    protected boolean isAborted = false;
    protected ReentrantReadWriteLock pauseLock = new ReentrantReadWriteLock(true);
    protected volatile boolean isPaused = false;
    protected int reportingInterval = 10;
    protected Duration firstItemTime = null;
    protected long numItemsProcessed = 0;
    protected long processingTimeInNanos = 0;
    protected ReentrantReadWriteLock executorCreationLock = new ReentrantReadWriteLock();

    public Duration getFirstItemTime() {
        return this.firstItemTime;
    }

    public float getThroughput() {
        return ((float) this.numItemsProcessed) / ((float) (this.processingTimeInNanos / 1.0E9d));
    }

    public CompletableFuture<Runnable> pause() throws InterruptedException {
        ReentrantReadWriteLock.ReadLock readLock = this.pauseLock.readLock();
        readLock.lock();
        return CompletableFuture.completedFuture(() -> {
            readLock.unlock();
        });
    }

    public float etaAtIndex(long j) {
        return ((float) (j - this.offset)) * getThroughput();
    }

    public void abort() {
        synchronized (this) {
            if (!this.isAborted && this.disposable != null) {
                this.disposable.dispose();
                this.isAborted = true;
            }
        }
    }

    public void close() {
        synchronized (this) {
            if (this.currentPageRef != null) {
                this.currentPageRef.close();
            }
        }
    }

    protected void init() {
        synchronized (this) {
            if (this.isAborted) {
                return;
            }
            this.iterator = this.backend.apply(Range.atLeast(Long.valueOf(this.offset))).blockingIterable().iterator();
            this.disposable = this.iterator;
        }
    }

    public void run() {
        init();
        Stopwatch createStarted = Stopwatch.createStarted();
        this.iterator.hasNext();
        createStarted.elapsed();
        ReentrantReadWriteLock.WriteLock writeLock = this.pauseLock.writeLock();
        writeLock.lock();
        while (true) {
            try {
                if (!this.pauseLock.hasQueuedThreads()) {
                    this.isPaused = false;
                    process(this.reportingInterval);
                    if (!this.iterator.hasNext()) {
                        break;
                    }
                    try {
                        Thread.sleep(this.terminationDelay);
                    } catch (InterruptedException e) {
                    }
                    if (this.currentLimit < this.numItemsRead) {
                        break;
                    }
                } else {
                    this.isPaused = true;
                    writeLock.unlock();
                    writeLock.lock();
                }
            } finally {
                writeLock.unlock();
            }
        }
    }

    public long getCurrentOffset() {
        return this.offset;
    }

    public long getEndOffset() {
        return LongMath.saturatedAdd(this.requestOffset, this.requestLimit);
    }

    public Range<Long> getWorkingRange() {
        return Range.closedOpen(Long.valueOf(this.offset), Long.valueOf(getEndOffset()));
    }

    public void process(int i) {
        boolean hasNext;
        synchronized (this) {
            if (this.currentPageRef != null) {
                this.currentPageRef.close();
            }
            this.currentPageRef = this.manager.getPageForOffset(this.offset);
        }
        int indexInPageForOffset = this.manager.getIndexInPageForOffset(this.offset);
        RangeBuffer rangeBuffer = (RangeBuffer) this.currentPageRef.get();
        BulkingSink bulkingSink = new BulkingSink(16, (obj, i2, i3) -> {
            rangeBuffer.put(indexInPageForOffset, obj, i2, i3);
        });
        int min = Math.min(Math.min(Ints.saturatedCast(this.currentLimit), rangeBuffer.getCapacity()), this.reportingInterval);
        int i4 = 0;
        while (true) {
            hasNext = this.iterator.hasNext();
            if (!hasNext || i4 >= min || this.isAborted || Thread.interrupted()) {
                break;
            }
            i4++;
            bulkingSink.accept(this.iterator.next());
        }
        bulkingSink.flush();
        bulkingSink.close();
        this.numItemsRead += i4;
        this.offset += this.numItemsRead;
        if (hasNext || this.numItemsRead < this.requestLimit) {
        }
    }

    protected void updateEffectiveEndpoint() {
        this.effectiveEndpoint = this.contextToEndpoint.values().stream().mapToLong(l -> {
            return l.longValue();
        }).reduce(-1L, Math::max);
    }

    public Runnable requestEndpoint(Object obj, long j) {
        synchronized (this) {
            Range<Long> workingRange = getWorkingRange();
            if (!workingRange.contains(Long.valueOf(j))) {
                throw new IllegalArgumentException(String.format("Request for endpoint %d is outside of working range %s", Long.valueOf(j), workingRange));
            }
            this.contextToEndpoint.put(obj, Long.valueOf(j));
            updateEffectiveEndpoint();
        }
        return () -> {
            synchronized (this) {
                this.contextToEndpoint.remove(obj);
                updateEffectiveEndpoint();
            }
        };
    }
}
