/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.jena_sparql_api.rx.io.resultset;

import io.reactivex.rxjava3.core.Flowable;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.aksw.commons.rx.util.RxUtils;
import org.aksw.jena_sparql_api.rx.io.resultset.SinkStreamingBase;
import org.aksw.jenax.sparql.query.rx.ResultSetRxImpl;
import org.apache.jena.atlas.io.IO;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.ResultSet;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.ResultSetMgr;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;

public class SinkStreamingBinding
extends SinkStreamingBase<Binding> {
    protected OutputStream out;
    protected List<Var> resultVars;
    protected Lang lang;
    protected BlockingQueue<Binding> blockingQueue = new ArrayBlockingQueue<Binding>(Flowable.bufferSize());
    protected volatile Thread thread = null;
    protected Throwable threadException = null;
    protected boolean closed = false;
    public static final Binding POISON = BindingFactory.binding();

    public SinkStreamingBinding(OutputStream out, List<Var> resultVars, Lang lang) {
        this.out = out;
        this.resultVars = resultVars;
        this.lang = lang;
    }

    public void flush() {
        IO.flush((OutputStream)this.out);
    }

    protected void checkThread() {
        if (this.threadException != null) {
            throw new IllegalStateException("Consumer thread terminated exceptionally", this.threadException);
        }
        if (!this.thread.isAlive()) {
            throw new IllegalStateException("Consumer thread already terminated (without exception)");
        }
    }

    @Override
    public void close() {
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    @Override
    protected void startActual() {
        Flowable flowable = RxUtils.fromBlockingQueue(this.blockingQueue, item -> item == POISON);
        this.thread = new Thread(() -> {
            try (QueryExecution qe = ResultSetRxImpl.create(this.resultVars, (Flowable<Binding>)flowable).asQueryExecution();){
                ResultSet resultSet = qe.execSelect();
                ResultSetMgr.write((OutputStream)this.out, (ResultSet)resultSet, (Lang)this.lang);
            }
            catch (Exception e) {
                this.threadException = e;
            }
        });
        this.thread.start();
    }

    @Override
    protected void sendActual(Binding item) {
        Binding copy = BindingFactory.copy((Binding)item);
        this.checkThread();
        try {
            this.blockingQueue.put(copy);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void finishActual() {
        block4: {
            this.checkThread();
            try {
                this.blockingQueue.put(POISON);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            try {
                this.thread.join();
            }
            catch (Exception e) {
                if (this.threadException == null) break block4;
                throw new RuntimeException("Consumer thread terminated exceptionally", this.threadException);
            }
        }
    }
}

