/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.commons.rx.util;

import com.google.common.collect.Streams;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.operators.SimpleQueue;
import io.reactivex.rxjava3.operators.SpscArrayQueue;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.aksw.commons.rx.util.FlowBase;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RxUtils {
    private static final Logger logger = LoggerFactory.getLogger(RxUtils.class);
    public static final Object POISON = new Object();
    public static Map<String, AtomicInteger> nameMap = new ConcurrentHashMap<String, AtomicInteger>();

    public static <T> Stream<T> stream(Flowable<T> flowable) {
        Iterator it = flowable.blockingIterable().iterator();
        Disposable disposable = (Disposable)it;
        Stream result = (Stream)Streams.stream(it).onClose(() -> ((Disposable)disposable).dispose());
        return result;
    }

    public static <T> Maybe<T> safeMaybe(Callable<T> action) {
        Maybe result;
        try {
            T value = action.call();
            result = Maybe.just(value);
        }
        catch (Exception e) {
            logger.warn("An exception occurred; trying to continue", (Throwable)e);
            result = Maybe.empty();
        }
        return result;
    }

    public static <T> T poison() {
        return (T)POISON;
    }

    public static <T> FlowableTransformer<T, T> counter(final String name, final long interval) {
        return RxUtils.createTransformer(emitter -> new FlowBase<T>((FlowableEmitter)emitter){
            int id;
            long[] i;
            long startTimeMillis;
            {
                super(emitter);
                this.i = new long[]{0L};
            }

            @Override
            public void onSubscribe(Subscription s) {
                AtomicInteger n = nameMap.computeIfAbsent(name, k -> new AtomicInteger());
                this.id = n.incrementAndGet();
                this.startTimeMillis = System.currentTimeMillis();
                super.onSubscribe(s);
            }

            public void onNext(T item) {
                long elapsed = System.currentTimeMillis() - this.startTimeMillis;
                if (this.i[0] % interval == 0L) {
                    System.err.println("On " + name + "-" + this.id + " seen item count = " + this.i[0] + " - throughput: " + (float)this.i[0] / ((float)elapsed * 0.001f));
                }
                this.i[0] = this.i[0] + 1L;
                this.emitter.onNext(item);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <T> void put(SimpleQueue<T> queue, T item) throws InterruptedException {
        if (!queue.offer(item)) {
            SimpleQueue<T> simpleQueue = queue;
            synchronized (simpleQueue) {
                while (!queue.offer(item)) {
                    queue.notifyAll();
                    queue.wait();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <T> T take(SimpleQueue<T> queue) throws Throwable {
        Object result = queue.poll();
        if (result == null) {
            SimpleQueue<T> simpleQueue = queue;
            synchronized (simpleQueue) {
                queue.notifyAll();
                result = queue.poll();
                if (result == null) {
                    queue.wait(100L);
                    queue.notifyAll();
                }
            }
        }
        return (T)result;
    }

    public static <T> FlowableTransformer<T, T> queuedObserveOn(final Scheduler scheduler, final int capacity) {
        return upstream -> Flowable.create((FlowableOnSubscribe)new FlowableOnSubscribe<T>(){

            public void subscribe(FlowableEmitter<T> downstream) throws Exception {
                SpscArrayQueue queue = new SpscArrayQueue(capacity);
                Disposable[] disposable = new Disposable[]{null};
                Scheduler.Worker worker = scheduler.createWorker();
                Runnable action = () -> 2.lambda$subscribe$0((SimpleQueue)queue, downstream, worker);
                worker.schedulePeriodically(action, 0L, 0L, TimeUnit.MILLISECONDS);
                disposable[0] = upstream.subscribe(arg_0 -> 2.lambda$subscribe$1((SimpleQueue)queue, arg_0), e -> downstream.onError(e), () -> 2.lambda$subscribe$3((SimpleQueue)queue));
                downstream.setDisposable(disposable[0]);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private static /* synthetic */ void lambda$subscribe$3(SimpleQueue queue) throws Throwable {
                RxUtils.put(queue, POISON);
                SimpleQueue simpleQueue = queue;
                synchronized (simpleQueue) {
                    queue.notifyAll();
                }
            }

            private static /* synthetic */ void lambda$subscribe$1(SimpleQueue queue, Object x) throws Throwable {
                RxUtils.put(queue, x);
            }

            private static /* synthetic */ void lambda$subscribe$0(SimpleQueue queue, FlowableEmitter downstream, Scheduler.Worker worker) {
                while (!Thread.interrupted()) {
                    Object item;
                    try {
                        item = RxUtils.take(queue);
                    }
                    catch (Throwable e1) {
                        throw new RuntimeException(e1);
                    }
                    if (item == POISON) {
                        downstream.onComplete();
                        worker.dispose();
                        break;
                    }
                    if (item == null) break;
                    downstream.onNext(item);
                }
            }
        }, (BackpressureStrategy)BackpressureStrategy.ERROR);
    }

    public static <T> FlowableTransformer<T, BlockingQueue<T>> queueProducer(int capacity) {
        ArrayBlockingQueue queue = new ArrayBlockingQueue(capacity);
        return upstream -> upstream.map(item -> {
            System.err.println("Putting to queue " + System.identityHashCode(queue) + " state: " + queue.size());
            queue.put(item);
            System.err.println("Returned (and possibly woke up) from put");
            return queue;
        }).doOnComplete(() -> queue.put(POISON));
    }

    public static <T> Flowable<T> fromBlockingQueue(BlockingQueue<T> queue, Predicate<? super T> isPoison) {
        return Flowable.generate(() -> queue, (q, e) -> {
            Object item = q.take();
            if (isPoison.test((Object)item)) {
                e.onComplete();
            } else {
                e.onNext(item);
            }
        }, q -> {});
    }

    public static <T> FlowableTransformer<BlockingQueue<T>, T> queueConsumer() {
        return upstream -> Flowable.create((FlowableOnSubscribe)new FlowableOnSubscribe<T>(){

            public void subscribe(final FlowableEmitter<T> child) throws Exception {
                upstream.subscribe(new FlowableSubscriber<BlockingQueue<T>>(){
                    final /* synthetic */ 3 this$0;
                    {
                        this.this$0 = this$0;
                    }

                    public void onSubscribe(Subscription s) {
                        child.setCancellable(() -> ((Subscription)s).cancel());
                        s.request(Long.MAX_VALUE);
                    }

                    public void drain(BlockingQueue<T> queue) throws InterruptedException {
                        Object item;
                        while ((item = queue.take()) != null && !child.isCancelled()) {
                            if (item == POISON) {
                                System.err.println("POISON seen");
                                continue;
                            }
                            System.out.println("Passed on an item - QueueState " + System.identityHashCode(queue) + ": " + queue.size());
                            child.onNext(item);
                        }
                    }

                    public void onNext(BlockingQueue<T> queue) {
                        try {
                            this.drain(queue);
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }

                    public void onError(Throwable t) {
                        child.onError(t);
                    }

                    public void onComplete() {
                        System.err.println("On complete called");
                        child.onComplete();
                    }
                });
            }
        }, (BackpressureStrategy)BackpressureStrategy.ERROR);
    }

    public static <I, O> FlowableTransformer<I, O> createTransformer(Function<? super FlowableEmitter<O>, ? extends FlowableSubscriber<I>> fsSupp) {
        return RxUtils.createTransformer(fsSupp, BackpressureStrategy.ERROR);
    }

    public static <I, O> FlowableTransformer<I, O> createTransformer(final Function<? super FlowableEmitter<O>, ? extends FlowableSubscriber<I>> fsSupp, BackpressureStrategy backpressureStrategy) {
        return upstream -> {
            Flowable result = Flowable.create((FlowableOnSubscribe)new FlowableOnSubscribe<O>(){

                public void subscribe(FlowableEmitter<O> emitter) throws Exception {
                    FlowableSubscriber subscriber = (FlowableSubscriber)fsSupp.apply(emitter);
                    upstream.subscribe(subscriber);
                }
            }, (BackpressureStrategy)backpressureStrategy);
            return result;
        };
    }

    public static void consume(Flowable<?> flowable) {
        Flowable tmp = flowable.concatMapMaybe(x -> Maybe.empty()).onErrorReturn(t -> t);
        Throwable e = (Throwable)tmp.singleElement().blockingGet();
        if (e != null) {
            throw new RuntimeException(e);
        }
    }
}

