package org.aksw.jenax.sparql.datasource.observable;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.BehaviorProcessor;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/* loaded from: input_file:org/aksw/jenax/sparql/datasource/observable/ObservableSourceImpl.class */
public class ObservableSourceImpl<K, V> implements ObservableSource<K, V> {
    protected Map<K, CachingPublisher<V>> queryToPublisher = new ConcurrentHashMap();
    protected Function<? super K, ? extends V> computeFn;

    public ObservableSourceImpl(Function<? super K, ? extends V> function) {
        this.computeFn = function;
    }

    @Override // org.aksw.jenax.sparql.datasource.observable.ObservableSource
    public Flowable<V> observe(K k) {
        return this.queryToPublisher.computeIfAbsent(k, obj -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            BehaviorProcessor create = BehaviorProcessor.create();
            return new CachingPublisher(create, create.doOnSubscribe(subscription -> {
                if (atomicInteger.getAndIncrement() == 0) {
                    try {
                        create.onNext(this.computeFn.apply(k));
                    } catch (Exception e) {
                        create.onError(e);
                    }
                }
            }).doOnCancel(() -> {
            }).doFinally(() -> {
                this.queryToPublisher.compute(k, (obj, cachingPublisher) -> {
                    CachingPublisher cachingPublisher = atomicInteger.decrementAndGet() == 0 ? null : cachingPublisher;
                    if (cachingPublisher == null) {
                    }
                    return cachingPublisher;
                });
            }));
        }).getFlowable();
    }

    @Override // org.aksw.jenax.sparql.datasource.observable.ObservableSource
    public void refreshAll(boolean z) {
        for (Map.Entry<K, CachingPublisher<V>> entry : this.queryToPublisher.entrySet()) {
            K key = entry.getKey();
            BehaviorProcessor<V> publisher = entry.getValue().getPublisher();
            Object value = publisher.getValue();
            V apply = this.computeFn.apply(key);
            if (!Objects.equals(value, apply)) {
                publisher.onNext(apply);
            }
        }
    }
}
