package org.aksw.commons.rx.util;

import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.fuseable.SimpleQueue;
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aksw/commons/rx/util/RxUtils.class */
public class RxUtils {
    private static final Logger logger = LoggerFactory.getLogger(RxUtils.class);
    public static final Object POISON = new Object();
    public static Map<String, AtomicInteger> nameMap = new ConcurrentHashMap();

    public static <T> Maybe<T> safeMaybe(Callable<T> callable) {
        Maybe<T> empty;
        try {
            empty = Maybe.just(callable.call());
        } catch (Exception e) {
            logger.warn("An exception occurred; trying to continue", e);
            empty = Maybe.empty();
        }
        return empty;
    }

    public static <T> T poison() {
        return (T) POISON;
    }

    public static <T> FlowableTransformer<T, T> counter(String str, long j) {
        return createTransformer(flowableEmitter -> {
            return new FlowBase<T>(flowableEmitter) { // from class: org.aksw.commons.rx.util.RxUtils.1
                int id;
                long[] i = {0};
                long startTimeMillis;

                @Override // org.aksw.commons.rx.util.FlowBase
                public void onSubscribe(Subscription subscription) {
                    this.id = RxUtils.nameMap.computeIfAbsent(str, str2 -> {
                        return new AtomicInteger();
                    }).incrementAndGet();
                    this.startTimeMillis = System.currentTimeMillis();
                    super.onSubscribe(subscription);
                }

                public void onNext(T t) {
                    long currentTimeMillis = System.currentTimeMillis() - this.startTimeMillis;
                    if (this.i[0] % j == 0) {
                        System.err.println("On " + str + "-" + this.id + " seen item count = " + this.i[0] + " - throughput: " + (((float) this.i[0]) / (((float) currentTimeMillis) * 0.001f)));
                    }
                    long[] jArr = this.i;
                    jArr[0] = jArr[0] + 1;
                    this.emitter.onNext(t);
                }
            };
        });
    }

    public static <T> void put(SimpleQueue<T> simpleQueue, T t) throws InterruptedException {
        if (simpleQueue.offer(t)) {
            return;
        }
        synchronized (simpleQueue) {
            while (!simpleQueue.offer(t)) {
                simpleQueue.notifyAll();
                simpleQueue.wait();
            }
        }
    }

    public static <T> T take(SimpleQueue<T> simpleQueue) throws Throwable {
        Object poll = simpleQueue.poll();
        if (poll == null) {
            synchronized (simpleQueue) {
                simpleQueue.notifyAll();
                Object poll2 = simpleQueue.poll();
                poll = poll2;
                if (poll2 == null) {
                    simpleQueue.wait(100L);
                    simpleQueue.notifyAll();
                }
            }
        }
        return (T) poll;
    }

    public static <T> FlowableTransformer<T, T> queuedObserveOn(Scheduler scheduler, int i) {
        return flowable -> {
            return Flowable.create(new FlowableOnSubscribe<T>() { // from class: org.aksw.commons.rx.util.RxUtils.2
                public void subscribe(FlowableEmitter<T> flowableEmitter) throws Exception {
                    SpscArrayQueue spscArrayQueue = new SpscArrayQueue(i);
                    Disposable[] disposableArr = {null};
                    Scheduler.Worker createWorker = scheduler.createWorker();
                    createWorker.schedulePeriodically(() -> {
                        while (!Thread.interrupted()) {
                            try {
                                Object take = RxUtils.take(spscArrayQueue);
                                if (take == RxUtils.POISON) {
                                    flowableEmitter.onComplete();
                                    createWorker.dispose();
                                    return;
                                } else if (take == null) {
                                    return;
                                } else {
                                    flowableEmitter.onNext(take);
                                }
                            } catch (Throwable th) {
                                throw new RuntimeException(th);
                            }
                        }
                    }, 0L, 0L, TimeUnit.MILLISECONDS);
                    disposableArr[0] = flowable.subscribe(obj -> {
                        RxUtils.put(spscArrayQueue, obj);
                    }, th -> {
                        flowableEmitter.onError(th);
                    }, () -> {
                        RxUtils.put(spscArrayQueue, RxUtils.POISON);
                        synchronized (spscArrayQueue) {
                            spscArrayQueue.notifyAll();
                        }
                    });
                    flowableEmitter.setDisposable(disposableArr[0]);
                }
            }, BackpressureStrategy.ERROR);
        };
    }

    public static <T> FlowableTransformer<T, BlockingQueue<T>> queueProducer(int i) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(i);
        return flowable -> {
            return flowable.map(obj -> {
                System.err.println("Putting to queue " + System.identityHashCode(arrayBlockingQueue) + " state: " + arrayBlockingQueue.size());
                arrayBlockingQueue.put(obj);
                System.err.println("Returned (and possibly woke up) from put");
                return arrayBlockingQueue;
            }).doOnComplete(() -> {
                arrayBlockingQueue.put(POISON);
            });
        };
    }

    public static <T> Flowable<T> fromBlockingQueue(BlockingQueue<T> blockingQueue, Predicate<? super T> predicate) {
        return Flowable.generate(() -> {
            return blockingQueue;
        }, (blockingQueue2, emitter) -> {
            Object take = blockingQueue2.take();
            if (predicate.test(take)) {
                emitter.onComplete();
            } else {
                emitter.onNext(take);
            }
        }, blockingQueue3 -> {
        });
    }

    public static <T> FlowableTransformer<BlockingQueue<T>, T> queueConsumer() {
        return flowable -> {
            return Flowable.create(new FlowableOnSubscribe<T>() { // from class: org.aksw.commons.rx.util.RxUtils.3
                public void subscribe(final FlowableEmitter<T> flowableEmitter) throws Exception {
                    flowable.subscribe(new FlowableSubscriber<BlockingQueue<T>>() { // from class: org.aksw.commons.rx.util.RxUtils.3.1
                        public void onSubscribe(Subscription subscription) {
                            FlowableEmitter flowableEmitter2 = flowableEmitter;
                            Objects.requireNonNull(subscription);
                            flowableEmitter2.setCancellable(subscription::cancel);
                            subscription.request(Long.MAX_VALUE);
                        }

                        public void drain(BlockingQueue<T> blockingQueue) throws InterruptedException {
                            while (true) {
                                T take = blockingQueue.take();
                                if (take == null || flowableEmitter.isCancelled()) {
                                    return;
                                }
                                if (take == RxUtils.POISON) {
                                    System.err.println("POISON seen");
                                } else {
                                    System.out.println("Passed on an item - QueueState " + System.identityHashCode(blockingQueue) + ": " + blockingQueue.size());
                                    flowableEmitter.onNext(take);
                                }
                            }
                        }

                        public void onNext(BlockingQueue<T> blockingQueue) {
                            try {
                                drain(blockingQueue);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }

                        public void onError(Throwable th) {
                            flowableEmitter.onError(th);
                        }

                        public void onComplete() {
                            System.err.println("On complete called");
                            flowableEmitter.onComplete();
                        }
                    });
                }
            }, BackpressureStrategy.ERROR);
        };
    }

    public static <I, O> FlowableTransformer<I, O> createTransformer(Function<? super FlowableEmitter<O>, ? extends FlowableSubscriber<I>> function) {
        return createTransformer(function, BackpressureStrategy.ERROR);
    }

    public static <I, O> FlowableTransformer<I, O> createTransformer(Function<? super FlowableEmitter<O>, ? extends FlowableSubscriber<I>> function, BackpressureStrategy backpressureStrategy) {
        return flowable -> {
            return Flowable.create(new FlowableOnSubscribe<O>() { // from class: org.aksw.commons.rx.util.RxUtils.4
                public void subscribe(FlowableEmitter<O> flowableEmitter) throws Exception {
                    flowable.subscribe((FlowableSubscriber) function.apply(flowableEmitter));
                }
            }, backpressureStrategy);
        };
    }

    public static void consume(Flowable<?> flowable) {
        Throwable th = (Throwable) flowable.concatMapMaybe(obj -> {
            return Maybe.empty();
        }).onErrorReturn(th2 -> {
            return th2;
        }).singleElement().blockingGet();
        if (th != null) {
            throw new RuntimeException(th);
        }
    }
}
