package org.aksw.commons.rx.cache.range;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;
import com.google.common.math.LongMath;
import com.google.common.util.concurrent.MoreExecutors;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.aksw.commons.kyro.guava.EntrySerializer;
import org.aksw.commons.kyro.guava.RangeMapSerializer;
import org.aksw.commons.kyro.guava.RangeSetSerializer;
import org.aksw.commons.rx.lookup.ListPaginator;
import org.aksw.commons.rx.range.RangedSupplier;
import org.aksw.commons.store.object.key.api.KeyObjectStore;
import org.aksw.commons.store.object.key.impl.KeyObjectStoreImpl;
import org.aksw.commons.store.object.path.impl.ObjectFileStoreKyro;
import org.aksw.commons.util.range.BufferWithGeneration;
import org.aksw.commons.util.range.CountInfo;
import org.aksw.commons.util.range.RangeBuffer;
import org.aksw.commons.util.range.RangeUtils;
import org.aksw.commons.util.slot.Slot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/rx/cache/range/SmartRangeCacheImpl.class */
public class SmartRangeCacheImpl<T> implements ListPaginator<T> {
    private static final Logger logger = LoggerFactory.getLogger(SmartRangeCacheImpl.class);
    protected ListPaginator<T> backend;
    protected SliceWithPages<T> slice;
    protected Single<Range<Long>> countSingle;
    protected long requestLimit;
    protected long terminationDelayInMs;
    protected ExecutorService executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newCachedThreadPool());
    protected Set<RangeRequestWorker<T>> executors = Collections.synchronizedSet(Sets.newIdentityHashSet());
    protected Set<RangeRequestIterator<T>> activeRequests = Collections.synchronizedSet(Sets.newIdentityHashSet());

    public SmartRangeCacheImpl(ListPaginator<T> listPaginator, KeyObjectStore keyObjectStore, int i, long j, Duration duration, long j2, long j3) {
        this.backend = listPaginator;
        this.slice = new SliceWithPagesImpl(keyObjectStore, i, j, duration);
        this.requestLimit = j2;
        this.terminationDelayInMs = j3;
        this.countSingle = listPaginator.fetchCount(null, null).map(range -> {
            CountInfo countInfo = RangeUtils.toCountInfo(range);
            if (!countInfo.isHasMoreItems()) {
                long count = countInfo.getCount();
                this.slice.mutateMetaData(sliceMetaData -> {
                    sliceMetaData.setKnownSize(count);
                });
            }
            return range;
        }).cache();
    }

    public RangedSupplier<Long, T> getBackend() {
        return this.backend;
    }

    public SliceWithPages<T> getSlice() {
        return this.slice;
    }

    public Set<RangeRequestWorker<T>> getExecutors() {
        return this.executors;
    }

    Lock getExecutorCreationReadLock() {
        return this.slice.getWorkerCreationLock();
    }

    public Runnable register(RangeRequestIterator<T> rangeRequestIterator) {
        this.activeRequests.add(rangeRequestIterator);
        return () -> {
            this.activeRequests.remove(rangeRequestIterator);
        };
    }

    public Map.Entry<RangeRequestWorker<T>, Slot<Long>> newExecutor(long j, long j2) {
        RangeRequestWorker<T> rangeRequestWorker = new RangeRequestWorker<>(this, j, this.requestLimit, this.terminationDelayInMs);
        Slot<Long> endpointSlot = rangeRequestWorker.getEndpointSlot();
        endpointSlot.set(Long.valueOf(j + j2));
        this.executors.add(rangeRequestWorker);
        Logger logger2 = logger;
        logger2.debug("NEW WORKER: " + j + ":" + logger2);
        this.executorService.submit(rangeRequestWorker);
        return new AbstractMap.SimpleEntry(rangeRequestWorker, endpointSlot);
    }

    public RangeRequestIterator<T> request(Range<Long> range) {
        return new RangeRequestIterator<>(this, range);
    }

    @Override // java.util.function.Function
    public Flowable<T> apply(Range<Long> range) {
        return Flowable.generate(() -> {
            return request(range);
        }, (rangeRequestIterator, emitter) -> {
            if (rangeRequestIterator.hasNext()) {
                emitter.onNext(rangeRequestIterator.next());
            } else {
                emitter.onComplete();
            }
        }, (v0) -> {
            v0.close();
        });
    }

    public static <V> SmartRangeCacheImpl<V> wrap(ListPaginator<V> listPaginator, KeyObjectStore keyObjectStore, int i, long j, Duration duration, long j2, long j3) {
        return new SmartRangeCacheImpl<>(listPaginator, keyObjectStore, i, j, duration, j2, j3);
    }

    @Override // org.aksw.commons.rx.lookup.ListPaginator
    public Single<Range<Long>> fetchCount(Long l, Long l2) {
        long longValue = ((Long) this.slice.computeFromMetaData(false, (v0) -> {
            return v0.getKnownSize();
        })).longValue();
        return longValue >= 0 ? Single.just(Range.singleton(Long.valueOf(longValue))) : this.countSingle;
    }

    public static KryoPool createKyroPool(final Consumer<Kryo> consumer) {
        return new KryoPool.Builder(new KryoFactory() { // from class: org.aksw.commons.rx.cache.range.SmartRangeCacheImpl.1
            public Kryo create() {
                Kryo kryo = new Kryo();
                JavaSerializer javaSerializer = new JavaSerializer();
                RangeSetSerializer rangeSetSerializer = new RangeSetSerializer();
                RangeMapSerializer rangeMapSerializer = new RangeMapSerializer();
                EntrySerializer entrySerializer = new EntrySerializer();
                kryo.register(TreeRangeSet.class, rangeSetSerializer);
                kryo.register(TreeRangeMap.class, rangeMapSerializer);
                kryo.register(Range.class, javaSerializer);
                kryo.register(AbstractMap.SimpleEntry.class, entrySerializer);
                if (consumer != null) {
                    consumer.accept(kryo);
                }
                return kryo;
            }
        }).softReferences().build();
    }

    public static KeyObjectStore createKeyObjectStore(Path path, KryoPool kryoPool) {
        return KeyObjectStoreImpl.create(path, new ObjectFileStoreKyro(kryoPool));
    }

    public static <T> Deque<Range<Long>> computeGaps(Range<Long> range, long j, NavigableMap<Long, RangeBuffer<T>> navigableMap) {
        TreeRangeSet create = TreeRangeSet.create();
        Stream<R> flatMap = navigableMap.entrySet().stream().flatMap(entry -> {
            return ((RangeBuffer) entry.getValue()).getLoadedRanges().asRanges().stream().map(range2 -> {
                return RangeUtils.apply(range2, Long.valueOf(((Long) entry.getKey()).longValue() * j), (l, l2) -> {
                    return Long.valueOf(LongMath.saturatedAdd(l.longValue(), l2.longValue()));
                });
            });
        });
        Objects.requireNonNull(create);
        flatMap.forEach(create::add);
        return new ArrayDeque(RangeUtils.gaps(range, create).asRanges());
    }

    public static <V> AsyncClaimingCache<Long, Map.Entry<BufferWithGeneration<V>, Long>> syncedBuffer(long j, Duration duration, KeyObjectStore keyObjectStore, Supplier<BufferWithGeneration<V>> supplier, Runnable runnable) {
        return AsyncClaimingCacheImpl.create(duration, Caffeine.newBuilder().maximumSize(j), l -> {
            List asList = Arrays.asList(Long.toString(l.longValue()));
            try {
                Objects.requireNonNull(supplier);
                BufferWithGeneration bufferWithGeneration = (BufferWithGeneration) keyObjectStore.computeIfAbsent(asList, supplier::get);
                return new AbstractMap.SimpleEntry(bufferWithGeneration, Long.valueOf(bufferWithGeneration.getGeneration()));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, (l2, entry, removalCause) -> {
        }, (l3, entry2, removalCause2) -> {
            BufferWithGeneration bufferWithGeneration = (BufferWithGeneration) entry2.getKey();
            Long l3 = (Long) entry2.getValue();
            Iterable<String> asList = Arrays.asList(Long.toString(l3.longValue()));
            Lock readLock = bufferWithGeneration.getReadWriteLock().readLock();
            readLock.lock();
            if (l3 == null) {
                logger.error("Missing version for [" + l3 + "]");
            }
            long generation = bufferWithGeneration.getGeneration();
            try {
                try {
                    if (generation != l3.longValue()) {
                        logger.info("Syncing metadata");
                        runnable.run();
                        logger.info("Syncing dirty buffer " + asList);
                        keyObjectStore.put(asList, bufferWithGeneration);
                        entry2.setValue(Long.valueOf(generation));
                    } else {
                        logger.info("Syncing not needed because of clean buffer " + asList);
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } finally {
                readLock.unlock();
            }
        });
    }
}
