/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.jenax.sparql.datasource.observable;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import org.aksw.jenax.arq.util.binding.ResultSetUtils;
import org.aksw.jenax.arq.util.binding.ResultTable;
import org.aksw.jenax.dataaccess.sparql.datasource.RDFDataSource;
import org.aksw.jenax.dataaccess.sparql.datasource.RDFDataSourceWrapperBase;
import org.aksw.jenax.dataaccess.sparql.engine.RDFEngines;
import org.aksw.jenax.dataaccess.sparql.factory.datasource.RDFDataSources;
import org.aksw.jenax.sparql.datasource.observable.ObservableSourceImpl;
import org.aksw.jenax.sparql.datasource.observable.RdfDataSourceObservable;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryFactory;
import org.apache.jena.query.ResultSet;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.vocabulary.RDF;
import org.apache.jena.vocabulary.RDFS;

public class RdfDataSourceObservableImpl
extends RDFDataSourceWrapperBase
implements RdfDataSourceObservable {
    protected ObservableSourceImpl<Query, ResultTable> mapRx = new ObservableSourceImpl<Query, ResultTable>(q -> RdfDataSourceObservableImpl.execSelect(delegate, q));

    public RdfDataSourceObservableImpl(RDFDataSource delegate) {
        super(delegate);
    }

    @Override
    public Flowable<ResultTable> observeSelect(Query query) {
        return this.mapRx.observe(query);
    }

    public static ResultTable execSelect(RDFDataSource dataSource, Query query) {
        ResultTable result = (ResultTable)RDFDataSources.exec((RDFDataSource)dataSource, (Query)query, qe -> RdfDataSourceObservableImpl.createResultTable(qe.execSelect()));
        return result;
    }

    public static ResultTable createResultTable(ResultSet rs) {
        Table table = ResultSetUtils.resultSetToTable((ResultSet)rs);
        Model model = rs.getResourceModel();
        ResultTable result = new ResultTable(table, model);
        return result;
    }

    @Override
    public void refreshAll(boolean cancelRunning) {
        this.mapRx.refreshAll(true);
    }

    public static void main(String[] args) {
        Dataset dataset = DatasetFactory.create();
        dataset.getDefaultModel().add((Resource)RDF.type, RDF.type, (RDFNode)RDF.Property);
        RdfDataSourceObservableImpl ds = new RdfDataSourceObservableImpl(RDFEngines.of((DatasetGraph)dataset.asDatasetGraph(), (boolean)true).getLinkSource().asDataSource());
        Flowable<ResultTable> flow = ds.observeSelect(QueryFactory.create((String)"SELECT * { ?s ?p ?o }"));
        Disposable disposable1 = flow.subscribe(t -> System.out.println("1: Got table of size: " + String.valueOf(t.getTable())));
        Disposable disposable2 = flow.subscribe(t -> System.out.println("2: Got table of size: " + String.valueOf(t.getTable())));
        System.out.println("Latest: " + String.valueOf(flow.blockingLatest().iterator().next()));
        disposable1.dispose();
        ds.refreshAll(true);
        dataset.getDefaultModel().add((Resource)RDF.type, RDFS.seeAlso, (RDFNode)RDF.Property);
        ds.refreshAll(true);
        disposable2.dispose();
    }
}

