package org.aksw.commons.rx.cache.range;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.MoreExecutors;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.aksw.commons.rx.range.KeyObjectStore;
import org.aksw.commons.rx.range.KeyObjectStoreImpl;
import org.aksw.commons.rx.range.ObjectFileStoreKyro;
import org.aksw.commons.rx.range.RangedSupplier;
import org.aksw.commons.util.range.RangeBuffer;
import org.aksw.commons.util.range.RangeBufferImpl;
import org.aksw.commons.util.ref.RefFuture;
import org.aksw.commons.util.slot.Slot;
import org.aksw.jena_sparql_api.lookup.ListPaginator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/rx/cache/range/SmartRangeCacheImpl.class */
public class SmartRangeCacheImpl<T> implements ListPaginator<T> {
    private static final Logger logger = LoggerFactory.getLogger(SmartRangeCacheImpl.class);
    protected ListPaginator<T> backend;
    protected int pageSize;
    protected AsyncClaimingCache<Long, RangeBuffer<T>> pageCache;
    protected AsyncClaimingCache<String, Range<Long>> countCache;
    protected long requestLimit;
    protected long terminationDelayInMs;
    protected ExecutorService executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newCachedThreadPool());
    protected Set<RangeRequestExecutor<T>> executors = Collections.synchronizedSet(Sets.newIdentityHashSet());
    protected Set<RequestIterator<T>> activeRequests = Collections.synchronizedSet(Sets.newIdentityHashSet());
    protected ReentrantReadWriteLock executorCreationLock = new ReentrantReadWriteLock(true);
    protected volatile long knownSize = -1;
    protected NavigableMap<Long, Executor> offsetToExecutor = new TreeMap();

    public SmartRangeCacheImpl(ListPaginator<T> listPaginator, KeyObjectStore keyObjectStore, int i, long j, Duration duration, long j2, long j3) {
        this.backend = listPaginator;
        this.pageSize = i;
        this.requestLimit = j2;
        this.terminationDelayInMs = j3;
        this.pageCache = LocalOrderAsyncTest.syncedRangeBuffer(j, duration, keyObjectStore, () -> {
            return new RangeBufferImpl(i);
        });
        this.countCache = AsyncClaimingCache.create(duration, AsyncRefCache.create(Caffeine.newBuilder().scheduler(Scheduler.systemScheduler()).maximumSize(1L), str -> {
            Range range;
            try {
                range = (Range) keyObjectStore.get(Arrays.asList(str));
            } catch (Exception e) {
                range = (Range) listPaginator.fetchCount(null, null).blockingGet();
            }
            return range;
        }, (str2, range, removalCause) -> {
        }), (str3, range2, removalCause2) -> {
            List asList = Arrays.asList(str3);
            try {
                keyObjectStore.put(asList, range2);
                logger.info("Synced " + asList);
                System.out.println("Synced" + asList);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public RangedSupplier<Long, T> getBackend() {
        return this.backend;
    }

    public void setKnownSize(long j) {
        this.knownSize = j;
    }

    public int getPageSize() {
        return this.pageSize;
    }

    protected void onPageLoad(Long l, RangeBufferImpl<T> rangeBufferImpl) {
    }

    public Set<RangeRequestExecutor<T>> getExecutors() {
        return this.executors;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Lock getExecutorCreationReadLock() {
        return this.executorCreationLock.readLock();
    }

    public long getPageIdForOffset(long j) {
        return j / this.pageSize;
    }

    public RefFuture<RangeBuffer<T>> getPageForOffset(long j) {
        return getPageForPageId(getPageIdForOffset(j));
    }

    public RefFuture<RangeBuffer<T>> getPageForPageId(long j) {
        try {
            return this.pageCache.claim(Long.valueOf(j));
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public int getIndexInPageForOffset(long j) {
        return (int) (j % this.pageSize);
    }

    public Runnable register(RequestIterator<T> requestIterator) {
        this.activeRequests.add(requestIterator);
        return () -> {
            this.activeRequests.remove(requestIterator);
        };
    }

    public Map.Entry<RangeRequestExecutor<T>, Slot<Long>> newExecutor(long j, long j2) {
        RangeRequestExecutor<T> rangeRequestExecutor = new RangeRequestExecutor<>(this, j, this.requestLimit, this.terminationDelayInMs);
        Slot<Long> endpointSlot = rangeRequestExecutor.getEndpointSlot();
        endpointSlot.set(Long.valueOf(j + j2));
        this.executors.add(rangeRequestExecutor);
        PrintStream printStream = System.out;
        printStream.println("NEW WORKER: " + j + ":" + printStream);
        this.executorService.submit(rangeRequestExecutor);
        return new AbstractMap.SimpleEntry(rangeRequestExecutor, endpointSlot);
    }

    public RequestIterator<T> request(Range<Long> range) {
        return new RequestIterator<>(this, range);
    }

    @Override // java.util.function.Function
    public Flowable<T> apply(Range<Long> range) {
        return Flowable.generate(() -> {
            return request(range);
        }, (requestIterator, emitter) -> {
            if (requestIterator.hasNext()) {
                emitter.onNext(requestIterator.next());
            } else {
                emitter.onComplete();
            }
        }, (v0) -> {
            v0.close();
        });
    }

    public static <V> ListPaginator<V> wrap(ListPaginator<V> listPaginator, KeyObjectStore keyObjectStore, int i, long j, Duration duration, long j2, long j3) {
        return new SmartRangeCacheImpl(listPaginator, keyObjectStore, i, j, duration, j2, j3);
    }

    @Override // org.aksw.jena_sparql_api.lookup.ListPaginator
    public Single<Range<Long>> fetchCount(Long l, Long l2) {
        return Flowable.generate(() -> {
            return new AbstractMap.SimpleEntry(this.countCache.claim("count"), false);
        }, (simpleEntry, emitter) -> {
            try {
                if (((Boolean) simpleEntry.getValue()).booleanValue()) {
                    emitter.onComplete();
                } else {
                    emitter.onNext((Range) ((RefFuture) simpleEntry.getKey()).await());
                    simpleEntry.setValue(true);
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }, simpleEntry2 -> {
            ((RefFuture) simpleEntry2.getKey()).close();
        }).singleOrError();
    }

    public static KeyObjectStore createKeyObjectStore(Path path) {
        return KeyObjectStoreImpl.create(path, new ObjectFileStoreKyro(new KryoPool.Builder(new KryoFactory() { // from class: org.aksw.commons.rx.cache.range.SmartRangeCacheImpl.1
            public Kryo create() {
                Kryo kryo = new Kryo();
                JavaSerializer javaSerializer = new JavaSerializer();
                RangeSetSerializer rangeSetSerializer = new RangeSetSerializer();
                RangeMapSerializer rangeMapSerializer = new RangeMapSerializer();
                kryo.register(TreeRangeSet.class, rangeSetSerializer);
                kryo.register(TreeRangeMap.class, rangeMapSerializer);
                kryo.register(Range.class, javaSerializer);
                return kryo;
            }
        }).softReferences().build()));
    }
}
