package org.aksw.commons.rx.cache.range;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.math.LongMath;
import io.reactivex.rxjava3.disposables.Disposable;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.LongUnaryOperator;
import org.aksw.commons.util.closeable.AutoCloseableWithLeakDetectionBase;
import org.aksw.commons.util.ref.RefFuture;
import org.aksw.commons.util.sink.BulkingSink;
import org.aksw.commons.util.slot.ObservableSlottedValue;
import org.aksw.commons.util.slot.ObservableSlottedValueImpl;
import org.aksw.commons.util.slot.Slot;
import org.aksw.commons.util.slot.SlottedBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/rx/cache/range/RangeRequestWorker.class */
public class RangeRequestWorker<T> extends AutoCloseableWithLeakDetectionBase implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(RangeRequestWorker.class);
    protected SmartRangeCacheImpl<T> cacheSystem;
    protected Disposable disposable;
    protected SliceWithAutoSync<T> slice;
    protected PageRange<T> pageRange;
    protected final long requestOffset;
    protected final long requestLimit;
    protected volatile long offset;
    protected long nextCheckpointOffset;
    protected long terminationDelay;
    protected ObservableSlottedValue<Long, Long> endpointDemands = ObservableSlottedValueImpl.wrap(SlottedBuilderImpl.create(collection -> {
        return (Long) collection.stream().reduce(-1L, (v0, v1) -> {
            return Math.max(v0, v1);
        });
    }));
    protected Iterator<T> iterator = null;
    protected long currentPageId = -1;
    protected int bulkSize = 16;
    protected int reportingInterval = this.bulkSize;
    protected boolean waitMode = false;
    protected transient Stopwatch terminationTimer = Stopwatch.createUnstarted();
    protected Duration firstItemTime = null;
    protected long numItemsProcessed = 0;
    protected long processingTimeInNanos = 0;
    protected LongUnaryOperator offsetToMaxAllowedRefetchCount = j -> {
        return 5000L;
    };

    public RangeRequestWorker(SmartRangeCacheImpl<T> smartRangeCacheImpl, long j, long j2, long j3) {
        this.cacheSystem = smartRangeCacheImpl;
        this.requestOffset = j;
        this.offset = j;
        this.requestLimit = j2;
        this.terminationDelay = j3;
        this.slice = smartRangeCacheImpl.getSlice();
        this.pageRange = this.slice.newPageRange();
        this.endpointDemands.addValueChangeListener(valueChangeEvent -> {
            logger.info("Slot event on " + this + ": " + valueChangeEvent);
            synchronized (this.endpointDemands) {
                this.endpointDemands.notifyAll();
            }
        });
    }

    public long getMaxAllowedRefetchCount(long j) {
        return this.offsetToMaxAllowedRefetchCount.applyAsLong(j);
    }

    public Duration getFirstItemTime() {
        return this.firstItemTime;
    }

    public float getThroughput() {
        return ((float) this.numItemsProcessed) / ((float) (this.processingTimeInNanos / 1.0E9d));
    }

    public float etaAtIndex(long j) {
        return ((float) (j - this.offset)) * getThroughput();
    }

    protected void closeActual() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
        this.pageRange.releaseAll();
    }

    protected synchronized void initBackendRequest() {
        if (this.isClosed) {
            return;
        }
        this.iterator = this.cacheSystem.getBackend().apply(Range.atLeast(Long.valueOf(this.offset))).blockingIterable().iterator();
        this.disposable = this.iterator;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                checkpoint();
                if (this.nextCheckpointOffset != this.offset) {
                    runCore();
                }
                logger.debug("RangeRequestWorker normal termination");
                close();
            } catch (Exception e) {
                logger.error("RangeRequestWorker exceptional termination", e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    protected void checkpoint() {
        Range closedOpen = Range.closedOpen(Long.valueOf(this.offset), Long.valueOf(this.offset + Math.min(getMaxAllowedRefetchCount(this.offset), this.requestLimit - this.numItemsProcessed)));
        this.slice.computeFromMetaData(false, sliceMetaData -> {
            RangeSet<Long> gaps = sliceMetaData.getGaps(closedOpen);
            if (gaps.isEmpty()) {
                return null;
            }
            this.nextCheckpointOffset = LongMath.saturatedAdd(((Long) ContiguousSet.create((Range) gaps.asDescendingSetOfRanges().iterator().next(), DiscreteDomain.longs()).last()).longValue(), 1L);
            return null;
        });
    }

    public void runCore() {
        initBackendRequest();
        Stopwatch createStarted = Stopwatch.createStarted();
        this.iterator.hasNext();
        this.firstItemTime = createStarted.elapsed();
        while (true) {
            if (this.terminationTimer.isRunning() && this.terminationTimer.elapsed(TimeUnit.MILLISECONDS) > this.terminationDelay) {
                return;
            }
            if (this.offset == this.nextCheckpointOffset) {
                checkpoint();
                if (this.offset == this.nextCheckpointOffset) {
                    return;
                }
            }
            try {
                process(this.reportingInterval);
                if (!this.iterator.hasNext()) {
                    return;
                }
                synchronized (this.endpointDemands) {
                    long longValue = ((Long) this.endpointDemands.build()).longValue();
                    if (this.offset >= longValue) {
                        if (this.waitMode) {
                            try {
                                this.endpointDemands.wait(this.terminationDelay);
                            } catch (InterruptedException e) {
                            }
                        } else if (longValue < 0 && !this.terminationTimer.isRunning()) {
                            logger.debug("No more demand for data - starting termination timer");
                            this.terminationTimer.start();
                        }
                    } else if (this.terminationTimer.isRunning()) {
                        logger.debug("New demand for data - stopping/resetting termination timer");
                        this.terminationTimer.reset();
                    }
                }
                RefFuture<SliceMetaData> metaData = this.slice.getMetaData();
                try {
                    long knownSize = ((SliceMetaData) metaData.await()).getKnownSize();
                    if (knownSize >= 0 && this.offset >= knownSize) {
                        if (metaData != null) {
                            metaData.close();
                            return;
                        }
                        return;
                    } else {
                        if (metaData != null) {
                            metaData.close();
                        }
                        if (this.waitMode) {
                            synchronized (this.endpointDemands) {
                                if (((Long) this.endpointDemands.build()).longValue() < 0) {
                                    return;
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (metaData != null) {
                        try {
                            metaData.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    public long getCurrentOffset() {
        return this.offset;
    }

    public long getEndOffset() {
        return LongMath.saturatedAdd(this.requestOffset, this.requestLimit);
    }

    public Range<Long> getWorkingRange() {
        return Range.closedOpen(Long.valueOf(this.offset), Long.valueOf(getEndOffset()));
    }

    public void process(int i) throws Exception {
        boolean hasNext;
        this.pageRange.claimByOffsetRange(this.offset, this.offset + i);
        BulkingSink bulkingSink = new BulkingSink(this.bulkSize, (obj, i2, i3) -> {
            this.pageRange.putAll(this.offset, obj, i2, i3);
        });
        long min = Math.min(this.reportingInterval, this.nextCheckpointOffset - this.offset);
        if (this.waitMode) {
            min = Math.min(min, ((Long) this.endpointDemands.build()).longValue() - this.offset);
        }
        this.pageRange.lock();
        int i4 = 0;
        while (true) {
            try {
                hasNext = this.iterator.hasNext();
                if (!hasNext || i4 >= min || this.isClosed || Thread.interrupted()) {
                    break;
                }
                i4++;
                bulkingSink.accept(this.iterator.next());
            } finally {
                this.pageRange.unlock();
            }
        }
        bulkingSink.flush();
        bulkingSink.close();
        this.numItemsProcessed += i4;
        this.offset += i4;
        RefFuture<SliceMetaData> metaData = this.slice.getMetaData();
        try {
            SliceMetaData sliceMetaData = (SliceMetaData) metaData.await();
            Lock writeLock = sliceMetaData.getReadWriteLock().writeLock();
            writeLock.lock();
            try {
                sliceMetaData.setMinimumKnownSize(this.offset);
                if (!hasNext && this.numItemsProcessed < this.requestLimit && sliceMetaData.getKnownSize() < 0) {
                    sliceMetaData.setKnownSize(this.offset);
                }
                Logger logger2 = logger;
                long j = this.numItemsProcessed;
                long j2 = this.requestLimit;
                logger2.info("Signalling data condition to clients - " + hasNext + " - " + j + " " + logger2);
                sliceMetaData.getHasDataCondition().signalAll();
                writeLock.unlock();
                if (metaData != null) {
                    metaData.close();
                }
            } catch (Throwable th) {
                writeLock.unlock();
                throw th;
            }
        } finally {
        }
    }

    public Slot<Long> getEndpointSlot() {
        Slot<Long> newSlot;
        synchronized (this.endpointDemands) {
            newSlot = this.endpointDemands.newSlot();
        }
        return newSlot;
    }
}
