/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.jenax.sparql.datasource.observable;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.BehaviorProcessor;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.aksw.jenax.sparql.datasource.observable.CachingPublisher;
import org.aksw.jenax.sparql.datasource.observable.ObservableSource;

public class ObservableSourceImpl<K, V>
implements ObservableSource<K, V> {
    protected Map<K, CachingPublisher<V>> queryToPublisher = new ConcurrentHashMap<K, CachingPublisher<V>>();
    protected Function<? super K, ? extends V> computeFn;

    public ObservableSourceImpl(Function<? super K, ? extends V> computeFn) {
        this.computeFn = computeFn;
    }

    @Override
    public Flowable<V> observe(K key) {
        CachingPublisher e = this.queryToPublisher.computeIfAbsent(key, k -> {
            AtomicInteger counter = new AtomicInteger();
            BehaviorProcessor pp = BehaviorProcessor.create();
            Flowable f = pp.doOnSubscribe(s -> {
                if (counter.getAndIncrement() == 0) {
                    try {
                        V value = this.computeFn.apply(key);
                        pp.onNext(value);
                    }
                    catch (Exception t) {
                        pp.onError((Throwable)t);
                    }
                }
            }).doOnCancel(() -> {}).doFinally(() -> this.queryToPublisher.compute(key, (kk, vv) -> {
                CachingPublisher r;
                CachingPublisher cachingPublisher = r = counter.decrementAndGet() == 0 ? null : vv;
                if (r == null) {
                    // empty if block
                }
                return r;
            }));
            return new CachingPublisher(pp, f);
        });
        return e.getFlowable();
    }

    @Override
    public void refreshAll(boolean cancelRunning) {
        for (Map.Entry<K, CachingPublisher<V>> e : this.queryToPublisher.entrySet()) {
            V newValue;
            K key = e.getKey();
            BehaviorProcessor<V> publisher = e.getValue().getPublisher();
            Object oldValue = publisher.getValue();
            if (Objects.equals(oldValue, newValue = this.computeFn.apply(key))) continue;
            publisher.onNext(newValue);
        }
    }
}

