package org.aksw.commons.io.slice;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.text.DecimalFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.aksw.commons.cache.async.AsyncClaimingCache;
import org.aksw.commons.cache.async.AsyncClaimingCacheImpl;
import org.aksw.commons.collection.rangeset.RangeSetDelegateMutable;
import org.aksw.commons.collection.rangeset.RangeSetDelegateMutableImpl;
import org.aksw.commons.collection.rangeset.RangeSetOps;
import org.aksw.commons.collection.rangeset.RangeSetUnion;
import org.aksw.commons.io.buffer.array.ArrayBuffer;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.buffer.plain.Buffer;
import org.aksw.commons.io.buffer.plain.BufferDelegate;
import org.aksw.commons.io.buffer.plain.PagedBuffer;
import org.aksw.commons.io.buffer.range.RangeBuffer;
import org.aksw.commons.io.buffer.range.RangeBufferDelegateMutable;
import org.aksw.commons.io.buffer.range.RangeBufferDelegateMutableImpl;
import org.aksw.commons.io.buffer.range.RangeBufferImpl;
import org.aksw.commons.io.buffer.range.RangeBufferUnion;
import org.aksw.commons.io.util.Sync;
import org.aksw.commons.path.core.Path;
import org.aksw.commons.store.object.key.api.ObjectResource;
import org.aksw.commons.store.object.key.api.ObjectStore;
import org.aksw.commons.store.object.key.api.ObjectStoreConnection;
import org.aksw.commons.store.object.key.impl.KryoUtils;
import org.aksw.commons.store.object.key.impl.ObjectStoreImpl;
import org.aksw.commons.store.object.path.impl.ObjectSerializerKryo;
import org.aksw.commons.txn.api.TxnApi;
import org.aksw.commons.txn.impl.PathDiffState;
import org.aksw.commons.txn.impl.PathState;
import org.aksw.commons.util.lock.LockUtils;
import org.aksw.commons.util.page.PageUtils;
import org.aksw.commons.util.ref.RefFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/io/slice/SliceBufferNew.class */
public class SliceBufferNew<A> implements SliceWithPages<A>, Sync {
    protected ObjectStore objectStore;
    protected Path<String> objectStoreBasePath;
    protected ArrayOps<A> arrayOps;
    protected AsyncClaimingCache<Long, BufferView<A>> pageCache;
    protected int pageSize;
    protected RangeSetDelegateMutable<Long> baseRanges;
    protected SliceMetaDataWithPages baseMetaData;
    protected PathDiffState baseMetaDataStatus;
    protected SliceMetaDataWithPages liveMetaData;
    protected SliceMetaDataWithPages syncMetaData;
    protected PathState metaDataIdentity;
    protected Duration syncDelay;
    protected LongFunction<String> pageIdToFileName;
    protected Logger logger = LoggerFactory.getLogger(SliceBufferNew.class);
    protected ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    protected Condition hasDataCondition = this.readWriteLock.writeLock().newCondition();
    protected volatile ScheduledFuture<?> syncFuture = null;
    protected int liveGeneration = 0;
    protected RangeBufferDelegateMutable<A> liveChanges = new RangeBufferDelegateMutableImpl();
    protected ScheduledExecutorService syncScheduler = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    protected RangeBufferDelegateMutable<A> syncChanges = new RangeBufferDelegateMutableImpl();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/aksw/commons/io/slice/SliceBufferNew$BufferWithAutoReloadOnAccess.class */
    public class BufferWithAutoReloadOnAccess implements BufferDelegate<A> {
        protected String fileName;
        protected int generationHere = -1;
        protected CompletableFuture<Buffer<A>> future;

        public void setFuture(CompletableFuture<Buffer<A>> completableFuture) {
            this.future = completableFuture;
        }

        public BufferWithAutoReloadOnAccess(String str) {
            this.fileName = str;
        }

        public int getGenerationHere() {
            return this.generationHere;
        }

        public CompletableFuture<Buffer<A>> reloadIfNeeded() {
            int i = SliceBufferNew.this.liveGeneration;
            if (this.generationHere != i || this.future == null) {
                synchronized (this) {
                    if (this.generationHere != i || this.future == null) {
                        ObjectStoreConnection connection = SliceBufferNew.this.objectStore.getConnection();
                        connection.begin(false);
                        this.generationHere = SliceBufferNew.this.lockAndSyncMetaData(connection, SliceBufferNew.this.pageSize);
                        ObjectResource access = connection.access(SliceBufferNew.this.objectStoreBasePath.resolve(this.fileName));
                        this.future = CompletableFuture.supplyAsync(() -> {
                            Object loadNewInstance = access.loadNewInstance();
                            if (loadNewInstance == null) {
                                loadNewInstance = SliceBufferNew.this.arrayOps.create(SliceBufferNew.this.pageSize);
                            }
                            return ArrayBuffer.create(SliceBufferNew.this.arrayOps, loadNewInstance);
                        }).whenComplete((buffer, th) -> {
                            try {
                                connection.commit();
                                try {
                                    connection.close();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    throw new RuntimeException(e);
                                }
                            } catch (Throwable th) {
                                try {
                                    connection.close();
                                    throw th;
                                } catch (Exception e2) {
                                    e2.printStackTrace();
                                    throw new RuntimeException(e2);
                                }
                            }
                        });
                        return this.future;
                    }
                }
            }
            return this.future;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void updateIfNeeded(int i, ObjectStoreConnection objectStoreConnection) {
            if (this.generationHere != i || this.future == null) {
                Object loadNewInstance = objectStoreConnection.access(SliceBufferNew.this.objectStoreBasePath.resolve(this.fileName)).loadNewInstance();
                if (loadNewInstance == null) {
                    loadNewInstance = SliceBufferNew.this.arrayOps.create(SliceBufferNew.this.pageSize);
                }
                this.future = CompletableFuture.completedFuture(ArrayBuffer.create(SliceBufferNew.this.arrayOps, loadNewInstance));
                this.generationHere = i;
            }
        }

        @Override // org.aksw.commons.io.buffer.array.BufferLikeDelegate
        public Buffer<A> getDelegate() {
            try {
                return this.future.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/aksw/commons/io/slice/SliceBufferNew$InternalBufferView.class */
    public class InternalBufferView implements BufferView<A> {
        protected SliceBufferNew<A>.BufferWithAutoReloadOnAccess baseBuffer;
        protected RangeBuffer<A> rangeBufferView;

        public InternalBufferView(SliceBufferNew<A>.BufferWithAutoReloadOnAccess bufferWithAutoReloadOnAccess, RangeBuffer<A> rangeBuffer) {
            this.baseBuffer = bufferWithAutoReloadOnAccess;
            this.rangeBufferView = rangeBuffer;
        }

        public SliceBufferNew<A>.BufferWithAutoReloadOnAccess getBaseBuffer() {
            return this.baseBuffer;
        }

        @Override // org.aksw.commons.io.slice.BufferView
        public RangeBuffer<A> getRangeBuffer() {
            return this.rangeBufferView;
        }

        @Override // org.aksw.commons.io.slice.BufferView
        public long getGeneration() {
            return this.baseBuffer.getGenerationHere();
        }

        public String toString() {
            return this.rangeBufferView.toString();
        }

        @Override // org.aksw.commons.io.slice.BufferView
        public long getCapacity() {
            return SliceBufferNew.this.pageSize;
        }

        @Override // org.aksw.commons.io.slice.BufferView
        public ReadWriteLock getReadWriteLock() {
            return SliceBufferNew.this.readWriteLock;
        }
    }

    protected void scheduleSync() {
        if (this.syncFuture == null || this.syncFuture.isDone()) {
            this.logger.info("Scheduled sync of slice in " + this.syncDelay);
            this.syncFuture = this.syncScheduler.schedule(() -> {
                sync();
                return null;
            }, this.syncDelay.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.aksw.commons.io.slice.SliceWithAutoSync
    public ReadWriteLock getReadWriteLock() {
        return this.readWriteLock;
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public void setMinimumKnownSize(long j) {
        this.liveMetaData.setMinimumKnownSize(j);
        scheduleSync();
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public void setMaximumKnownSize(long j) {
        this.liveMetaData.setMaximumKnownSize(j);
        scheduleSync();
    }

    public SliceBufferNew(ArrayOps<A> arrayOps, ObjectStore objectStore, Path<String> path, int i, Duration duration) {
        this.arrayOps = arrayOps;
        this.objectStore = objectStore;
        this.objectStoreBasePath = path;
        this.syncDelay = duration;
        loadMetaData(i);
        this.syncChanges.setDelegate(newChangeBuffer());
    }

    public void loadMetaData(int i) {
        this.baseRanges = new RangeSetDelegateMutableImpl();
        try {
            ObjectStoreConnection connection = this.objectStore.getConnection();
            try {
                TxnApi.execRead(connection, () -> {
                    lockAndSyncMetaData(connection, i);
                });
                if (connection != null) {
                    connection.close();
                }
                this.pageSize = this.baseMetaData.getPageSize();
                this.liveChanges.setDelegate(newChangeBuffer());
                this.liveMetaData = copyWithNewRanges(this.baseMetaData, RangeSetOps.union(this.liveChanges.getRanges(), this.baseRanges));
                DecimalFormat decimalFormat = new DecimalFormat();
                decimalFormat.setMinimumIntegerDigits(8);
                decimalFormat.setGroupingUsed(false);
                this.pageIdToFileName = j -> {
                    return "segment-" + decimalFormat.format(j) + ".ser";
                };
                this.pageCache = AsyncClaimingCacheImpl.newBuilder(Caffeine.newBuilder()).setCacheLoader((v1) -> {
                    return loadPage(v1);
                }).build();
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static <A> SliceBufferNew<A> create(ArrayOps<A> arrayOps, ObjectStore objectStore, Path<String> path, int i, Duration duration) {
        return new SliceBufferNew<>(arrayOps, objectStore, path, i, duration);
    }

    public static <A> SliceBufferNew<A> create(ArrayOps<A> arrayOps, java.nio.file.Path path, Path<String> path2, int i, Duration duration) {
        return new SliceBufferNew<>(arrayOps, ObjectStoreImpl.create(path, ObjectSerializerKryo.create(KryoUtils.createKryoPool((Consumer) null))), path2, i, duration);
    }

    protected RangeBuffer<A> newChangeBuffer() {
        return RangeBufferImpl.create(PagedBuffer.create(this.arrayOps, this.pageSize));
    }

    @Override // org.aksw.commons.io.slice.SliceWithAutoSync
    public SliceAccessor<A> newSliceAccessor() {
        return new SliceAccessorImpl(this);
    }

    @Override // org.aksw.commons.io.slice.SliceWithPages
    public RefFuture<BufferView<A>> getPageForPageId(long j) {
        return this.pageCache.claim(Long.valueOf(j));
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public RangeSet<Long> getGaps(Range<Long> range) {
        return this.liveMetaData.getGaps(range);
    }

    public boolean hasMetaDataChanged() {
        PathDiffState fetchRecencyStatus = this.objectStore.fetchRecencyStatus(this.objectStoreBasePath.resolve("metadata.ser"));
        boolean z = !fetchRecencyStatus.equals(this.baseMetaDataStatus);
        this.logger.debug("Metadata changed: " + z + "; status now: " + fetchRecencyStatus + " - before: " + this.baseMetaDataStatus);
        return z;
    }

    public int lockAndSyncMetaData(ObjectStoreConnection objectStoreConnection, int i) {
        int intValue;
        ObjectResource access = objectStoreConnection.access(this.objectStoreBasePath.resolve("metadata.ser"));
        if (hasMetaDataChanged()) {
            this.logger.info("Metadata was externally modified on disk... attempting to acquire lock for reload...");
            intValue = ((Integer) LockUtils.runWithLock(this.readWriteLock.writeLock(), () -> {
                PathDiffState fetchRecencyStatus = access.fetchRecencyStatus();
                SliceMetaDataWithPages sliceMetaDataWithPages = (SliceMetaDataWithPages) access.loadNewInstance();
                this.logger.info("Lock for reload acquired");
                if (sliceMetaDataWithPages != null) {
                    this.logger.info("Loaded metadata: " + sliceMetaDataWithPages);
                    this.baseMetaData = sliceMetaDataWithPages;
                } else {
                    this.logger.info("Created fresh slice metadata");
                    this.baseMetaData = new SliceMetaDataWithPagesImpl(i);
                }
                this.baseMetaDataStatus = fetchRecencyStatus;
                this.baseRanges.setDelegate(this.baseMetaData.getLoadedRanges());
                this.liveGeneration++;
                return Integer.valueOf(this.liveGeneration);
            })).intValue();
        } else {
            intValue = ((Integer) LockUtils.runWithLock(this.readWriteLock.readLock(), () -> {
                return Integer.valueOf(this.liveGeneration);
            })).intValue();
        }
        return intValue;
    }

    public BufferView<A> loadPage(long j) {
        BufferWithAutoReloadOnAccess bufferWithAutoReloadOnAccess = new BufferWithAutoReloadOnAccess(this.pageIdToFileName.apply(j));
        bufferWithAutoReloadOnAccess.reloadIfNeeded().whenComplete((buffer, th) -> {
            if (th != null) {
                this.logger.error("Reloading buffer failed", th);
            }
        });
        long pageOffsetForId = PageUtils.getPageOffsetForId(j, this.pageSize);
        return new InternalBufferView(bufferWithAutoReloadOnAccess, RangeBufferUnion.create(this.liveChanges.slice(pageOffsetForId, this.pageSize), RangeBufferUnion.create(RangeBufferImpl.create(this.baseRanges, pageOffsetForId, bufferWithAutoReloadOnAccess), this.syncChanges.slice(pageOffsetForId, this.pageSize))));
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public SliceWithAutoSync<A> updateMinimumKnownSize(long j) {
        this.liveMetaData.updateMinimumKnownSize(j);
        return this;
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public SliceWithAutoSync<A> updateMaximumKnownSize(long j) {
        this.liveMetaData.updateMaximumKnownSize(j);
        return this;
    }

    public BufferView<A> loadPages(boolean z, Set<Long> set) {
        throw new UnsupportedOperationException();
    }

    @Override // org.aksw.commons.io.slice.SliceWithAutoSync
    public ArrayOps<A> getArrayOps() {
        return this.arrayOps;
    }

    public static SliceMetaDataWithPages copyWithNewRanges(SliceMetaDataWithPages sliceMetaDataWithPages, RangeSet<Long> rangeSet) {
        return new SliceMetaDataWithPagesImpl(sliceMetaDataWithPages.getPageSize(), rangeSet, sliceMetaDataWithPages.getFailedRanges(), sliceMetaDataWithPages.getMinimumKnownSize(), sliceMetaDataWithPages.getMaximumKnownSize());
    }

    @Override // org.aksw.commons.io.slice.SliceWithAutoSync
    public synchronized void sync() throws IOException {
        Collections.emptySet();
        Stopwatch createStarted = Stopwatch.createStarted();
        LockUtils.runWithLock(this.readWriteLock.writeLock(), () -> {
            this.syncChanges.setDelegate(this.liveChanges.getDelegate());
            this.liveChanges.setDelegate(newChangeBuffer());
            RangeSetUnion union = RangeSetOps.union(this.liveChanges.getRanges(), RangeSetOps.union(this.syncChanges.getRanges(), this.baseRanges));
            this.syncMetaData = this.liveMetaData;
            this.liveMetaData = new SliceMetaDataWithPagesImpl(this.syncMetaData.getPageSize(), union, TreeRangeMap.create(), this.syncMetaData.getMinimumKnownSize(), this.syncMetaData.getMaximumKnownSize());
        });
        int i = this.liveGeneration;
        try {
            ObjectStoreConnection connection = this.objectStore.getConnection();
            try {
                connection.begin(true);
                ObjectResource access = connection.access(this.objectStoreBasePath.resolve("metadata.ser"));
                int lockAndSyncMetaData = lockAndSyncMetaData(connection, this.pageSize);
                TreeRangeSet create = TreeRangeSet.create();
                create.addAll(this.baseRanges);
                create.addAll(this.syncChanges.getRanges());
                SliceMetaDataWithPages copyWithNewRanges = copyWithNewRanges(this.syncMetaData, create);
                if (!copyWithNewRanges.equals(this.baseMetaData)) {
                    access.save(copyWithNewRanges);
                }
                NavigableSet navigableSet = PageUtils.touchedPageIndices(this.syncChanges.getRanges().asRanges(), this.pageSize);
                this.logger.info("Synchronizing " + navigableSet.size() + " dirty pages");
                Iterator it = navigableSet.iterator();
                while (it.hasNext()) {
                    long longValue = ((Long) it.next()).longValue();
                    String apply = this.pageIdToFileName.apply(longValue);
                    RefFuture claim = this.pageCache.claim(Long.valueOf(longValue));
                    try {
                        long pageOffsetForId = PageUtils.getPageOffsetForId(longValue, this.pageSize);
                        SliceBufferNew<A>.BufferWithAutoReloadOnAccess baseBuffer = ((InternalBufferView) claim.await()).getBaseBuffer();
                        LockUtils.runWithLock(getReadWriteLock().writeLock(), () -> {
                            baseBuffer.updateIfNeeded(lockAndSyncMetaData, connection);
                        });
                        RangeBufferUnion create2 = RangeBufferUnion.create(this.syncChanges.slice(pageOffsetForId, this.pageSize), RangeBufferImpl.create(this.baseRanges, pageOffsetForId, baseBuffer));
                        A create3 = this.arrayOps.create(this.pageSize);
                        ArrayBuffer create4 = ArrayBuffer.create(this.arrayOps, create3);
                        RangeBufferImpl create5 = RangeBufferImpl.create(create4);
                        LockUtils.runWithLock(getReadWriteLock().readLock(), () -> {
                            create2.transferTo(0L, create5, 0L, this.pageSize);
                        });
                        connection.access(this.objectStoreBasePath.resolve(apply)).save(create3);
                        LockUtils.runWithLock(getReadWriteLock().writeLock(), () -> {
                            baseBuffer.setFuture(CompletableFuture.completedFuture(create4));
                        });
                        if (claim != null) {
                            claim.close();
                        }
                    } catch (Throwable th) {
                        if (claim != null) {
                            try {
                                claim.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                connection.commit();
                LockUtils.runWithLock(this.readWriteLock.writeLock(), () -> {
                    this.baseRanges.setDelegate(create);
                    this.syncChanges.setDelegate(newChangeBuffer());
                    this.baseMetaData = copyWithNewRanges;
                    this.liveMetaData = copyWithNewRanges(this.liveMetaData, RangeSetOps.union(this.liveChanges.getRanges(), this.baseRanges));
                });
                if (connection != null) {
                    connection.close();
                }
                this.logger.info("Synchronization of " + navigableSet.size() + " dirty pages completed in " + (((float) createStarted.elapsed(TimeUnit.MILLISECONDS)) / 1000.0f) + " seconds");
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public RangeSet<Long> getLoadedRanges() {
        return this.liveMetaData.getLoadedRanges();
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public long getMinimumKnownSize() {
        return this.liveMetaData.getMinimumKnownSize();
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public long getMaximumKnownSize() {
        return this.liveMetaData.getMaximumKnownSize();
    }

    @Override // org.aksw.commons.io.slice.SliceWithAutoSync
    public Condition getHasDataCondition() {
        return this.hasDataCondition;
    }

    @Override // org.aksw.commons.io.slice.SliceWithPages
    public long getPageSize() {
        return this.pageSize;
    }

    @Override // org.aksw.commons.io.slice.SliceMetaDataBasic
    public RangeMap<Long, List<Throwable>> getFailedRanges() {
        return this.liveMetaData.getFailedRanges();
    }
}
