package org.aksw.commons.rx.op;

import io.reactivex.rxjava3.core.FlowableOperator;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import java.io.PrintStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.aksw.commons.rx.util.RxUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/aksw/commons/rx/op/OperatorObserveThroughput.class */
public class OperatorObserveThroughput<T> implements FlowableOperator<T, T> {
    protected String name;
    protected long interval;
    protected Consumer<ThroughputEvent> eventHandler;

    /* loaded from: input_file:org/aksw/commons/rx/op/OperatorObserveThroughput$SubscriberImpl.class */
    public class SubscriberImpl implements FlowableSubscriber<T>, Subscription {
        protected Subscriber<? super T> downstream;
        protected Subscription upstream;
        int id;
        long startTimeMillis;
        protected AtomicLong pending = new AtomicLong();
        protected AtomicLong seenItems = new AtomicLong();

        public SubscriberImpl(Subscriber<? super T> subscriber) {
            this.downstream = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.id = RxUtils.nameMap.computeIfAbsent(OperatorObserveThroughput.this.name, str -> {
                return new AtomicInteger();
            }).incrementAndGet();
            this.startTimeMillis = System.currentTimeMillis();
            if (this.upstream != null) {
                subscription.cancel();
            } else {
                this.upstream = subscription;
                this.downstream.onSubscribe(this);
            }
        }

        public void request(long j) {
            if (SubscriptionHelper.validate(j)) {
                this.pending.addAndGet(j);
                this.upstream.request(1L);
            }
        }

        public void onNext(T t) {
            long currentTimeMillis = System.currentTimeMillis() - this.startTimeMillis;
            long andIncrement = this.seenItems.getAndIncrement();
            if (andIncrement % OperatorObserveThroughput.this.interval == 0) {
                OperatorObserveThroughput.this.eventHandler.accept(new ThroughputEvent(OperatorObserveThroughput.this.interval, OperatorObserveThroughput.this.name, this.id, currentTimeMillis * 0.001d, andIncrement));
            }
            this.downstream.onNext(t);
            if (this.pending.decrementAndGet() > 0) {
                this.upstream.request(1L);
            }
        }

        public void onComplete() {
            this.downstream.onComplete();
        }

        public void onError(Throwable th) {
            this.downstream.onError(th);
        }

        public void cancel() {
            this.upstream.cancel();
        }
    }

    /* loaded from: input_file:org/aksw/commons/rx/op/OperatorObserveThroughput$ThroughputEvent.class */
    public static class ThroughputEvent {
        public long eventInterval;
        public String name;
        long instanceId;
        double elapsedSeconds;
        long totalSeenItemCount;

        public ThroughputEvent(long j, String str, long j2, double d, long j3) {
            this.eventInterval = j;
            this.name = str;
            this.instanceId = j2;
            this.elapsedSeconds = d;
            this.totalSeenItemCount = j3;
        }
    }

    public static void defaultThroughputEventHandler(ThroughputEvent throughputEvent) {
        PrintStream printStream = System.err;
        String str = throughputEvent.name;
        long j = throughputEvent.instanceId;
        long j2 = throughputEvent.totalSeenItemCount;
        double d = throughputEvent.totalSeenItemCount / throughputEvent.elapsedSeconds;
        printStream.println("On " + str + "-" + j + " seen item count = " + printStream + " - throughput: " + j2 + " items per second");
    }

    public static <T> OperatorObserveThroughput<T> create(String str, long j) {
        return new OperatorObserveThroughput<>(str, j, OperatorObserveThroughput::defaultThroughputEventHandler);
    }

    public static <T> OperatorObserveThroughput<T> create(String str, long j, Consumer<ThroughputEvent> consumer) {
        return new OperatorObserveThroughput<>(str, j, consumer);
    }

    public OperatorObserveThroughput(String str, long j, Consumer<ThroughputEvent> consumer) {
        this.name = str;
        this.interval = j;
        this.eventHandler = consumer;
    }

    public Subscriber<? super T> apply(Subscriber<? super T> subscriber) throws Exception {
        return new SubscriberImpl(subscriber);
    }
}
