/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.commons.rx.op;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.FlowableOperator;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.aksw.commons.rx.op.LocalOrderBase;
import org.aksw.commons.rx.op.LocalOrderSpec;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OperatorLocalOrder<T, S>
extends LocalOrderBase<T, S>
implements FlowableOperator<T, T> {
    private static final Logger logger = LoggerFactory.getLogger(OperatorLocalOrder.class);
    protected S initialExpectedSeqId;

    public OperatorLocalOrder(S initialExpectedSeqId, Function<? super S, ? extends S> incrementSeqId, BiFunction<? super S, ? super S, ? extends Number> distanceFn, Function<? super T, ? extends S> extractSeqId) {
        super(incrementSeqId, distanceFn, extractSeqId);
        this.initialExpectedSeqId = initialExpectedSeqId;
    }

    public OperatorLocalOrder(S initialExpectedSeqId, LocalOrderSpec<T, S> localOrderSpec) {
        super(localOrderSpec);
        this.initialExpectedSeqId = initialExpectedSeqId;
    }

    public static <T> OperatorLocalOrder<T, Long> forLong(long initiallyExpectedId, Function<? super T, ? extends Long> extractSeqId) {
        return new OperatorLocalOrder<T, Long>(Long.valueOf(initiallyExpectedId), id -> id + 1L, (a, b) -> a - b, extractSeqId);
    }

    public static <T, S extends Comparable<S>> OperatorLocalOrder<T, S> wrap(S initiallyExpectedId, Function<? super S, ? extends S> incrementSeqId, BiFunction<? super S, ? super S, ? extends Number> distanceFn, Function<? super T, ? extends S> extractSeqId) {
        return new OperatorLocalOrder<T, S>(initiallyExpectedId, incrementSeqId, distanceFn, extractSeqId);
    }

    public static <T, S extends Comparable<S>> FlowableOperator<T, T> create(S initialExpectedSeqId, LocalOrderSpec<T, S> orderSpec) {
        return new OperatorLocalOrder<T, S>(initialExpectedSeqId, orderSpec);
    }

    public static <T, S extends Comparable<S>> FlowableOperator<T, T> create(S initialExpectedSeqId, Function<? super S, ? extends S> incrementSeqId, BiFunction<? super S, ? super S, ? extends Number> distanceFn, Function<? super T, ? extends S> extractSeqId) {
        return new OperatorLocalOrder<T, S>(initialExpectedSeqId, incrementSeqId, distanceFn, extractSeqId);
    }

    @NonNull
    public @NonNull Subscriber<? super @NonNull T> apply(@NonNull @NonNull Subscriber<? super @NonNull T> downstream) throws Throwable {
        return new SubscriberImpl(downstream);
    }

    public class SubscriberImpl
    implements FlowableSubscriber<T>,
    Subscription {
        protected Subscriber<? super T> downstream;
        protected Subscription upstream;
        protected volatile boolean isUpstreamComplete = false;
        protected S expectedSeqId;
        protected ConcurrentNavigableMap<S, T> seqIdToValue;
        protected AtomicLong pending;

        public SubscriberImpl(Subscriber<? super T> downstream) {
            this.expectedSeqId = OperatorLocalOrder.this.initialExpectedSeqId;
            this.seqIdToValue = new ConcurrentSkipListMap((a, b) -> ((Number)OperatorLocalOrder.this.distanceFn.apply(a, b)).intValue());
            this.pending = new AtomicLong();
            this.downstream = downstream;
        }

        public void onSubscribe(Subscription s) {
            if (this.upstream != null) {
                s.cancel();
            } else {
                this.upstream = s;
                this.downstream.onSubscribe((Subscription)this);
            }
        }

        public void onNext(T value) {
            Object seqId = OperatorLocalOrder.this.extractSeqId.apply(value);
            boolean checkForExistingKeys = true;
            if (checkForExistingKeys && this.seqIdToValue.containsKey(seqId)) {
                this.downstream.onError((Throwable)new RuntimeException("Already seen an item with id " + String.valueOf(seqId)));
            }
            this.drain(true, () -> this.seqIdToValue.put(seqId, value));
        }

        protected void drain() {
            this.drain(false, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void drain(boolean isCalledFromOnNext, Runnable action) {
            ArrayList buffer = new ArrayList();
            ConcurrentNavigableMap concurrentNavigableMap = this.seqIdToValue;
            synchronized (concurrentNavigableMap) {
                if (action != null) {
                    action.run();
                }
                this.drainTo(buffer);
            }
            for (Object item : buffer) {
                this.downstream.onNext(item);
            }
            this.postDrainRequests(isCalledFromOnNext);
        }

        public void postDrainRequests(boolean isCalledFromOnNext) {
            if (this.pending.get() > 0L) {
                if (this.isUpstreamComplete) {
                    this.downstream.onComplete();
                    if (!this.seqIdToValue.isEmpty()) {
                        int size = this.seqIdToValue.size();
                        String msg = "Upstream completed but " + size + " out of order items still queued";
                        logger.warn(msg);
                        throw new RuntimeException(msg);
                    }
                } else if (isCalledFromOnNext) {
                    this.upstream.request(1L);
                }
            }
        }

        protected void drainTo(Collection<T> buffer) {
            Iterator it = this.seqIdToValue.entrySet().iterator();
            while (it.hasNext() && this.pending.get() > 0L) {
                Map.Entry e = it.next();
                Object s = e.getKey();
                Object v = e.getValue();
                int d = ((Number)OperatorLocalOrder.this.distanceFn.apply(s, this.expectedSeqId)).intValue();
                if (d == 0) {
                    it.remove();
                    this.pending.decrementAndGet();
                    buffer.add(v);
                    this.expectedSeqId = OperatorLocalOrder.this.incrementSeqId.apply(this.expectedSeqId);
                    continue;
                }
                if (d < 0) {
                    logger.warn("Should not happen: received id " + String.valueOf(s) + " which is lower than the expected id " + String.valueOf(this.expectedSeqId));
                    it.remove();
                    continue;
                }
                logger.trace("Next id in queue is " + String.valueOf(s) + " but first need to wait for expected id " + String.valueOf(this.expectedSeqId));
                break;
            }
        }

        public void onError(Throwable t) {
            this.downstream.onError(t);
        }

        public void onComplete() {
            this.isUpstreamComplete = true;
            this.drain();
        }

        public void request(long n) {
            long before;
            if (SubscriptionHelper.validate((long)n) && (before = BackpressureHelper.add((AtomicLong)this.pending, (long)n)) == 0L) {
                this.upstream.request(1L);
            }
            this.drain();
        }

        public void cancel() {
            this.upstream.cancel();
        }
    }
}

