package org.aksw.jena_sparql_api.rx.io.resultset;

import io.reactivex.rxjava3.core.Flowable;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.aksw.commons.rx.util.RxUtils;
import org.aksw.jenax.sparql.query.rx.ResultSetRxImpl;
import org.apache.jena.atlas.io.IO;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.ResultSetMgr;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;

/* loaded from: input_file:org/aksw/jena_sparql_api/rx/io/resultset/SinkStreamingBinding.class */
public class SinkStreamingBinding extends SinkStreamingBase<Binding> {
    protected OutputStream out;
    protected List<Var> resultVars;
    protected Lang lang;
    protected BlockingQueue<Binding> blockingQueue = new ArrayBlockingQueue(Flowable.bufferSize());
    protected volatile Thread thread = null;
    protected Throwable threadException = null;
    protected boolean closed = false;
    public static final Binding POISON = BindingFactory.binding();

    public SinkStreamingBinding(OutputStream outputStream, List<Var> list, Lang lang) {
        this.out = outputStream;
        this.resultVars = list;
        this.lang = lang;
    }

    public void flush() {
        IO.flush(this.out);
    }

    protected void checkThread() {
        if (this.threadException != null) {
            throw new IllegalStateException("Consumer thread terminated exceptionally", this.threadException);
        }
        if (!this.thread.isAlive()) {
            throw new IllegalStateException("Consumer thread already terminated (without exception)");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    @Override // org.aksw.jena_sparql_api.rx.io.resultset.SinkStreamingBase
    protected void startActual() {
        Flowable fromBlockingQueue = RxUtils.fromBlockingQueue(this.blockingQueue, binding -> {
            return binding == POISON;
        });
        this.thread = new Thread(() -> {
            try {
                QueryExecution asQueryExecution = ResultSetRxImpl.create(this.resultVars, fromBlockingQueue).asQueryExecution();
                try {
                    ResultSetMgr.write(this.out, asQueryExecution.execSelect(), this.lang);
                    if (asQueryExecution != null) {
                        asQueryExecution.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                this.threadException = e;
            }
        });
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.aksw.jena_sparql_api.rx.io.resultset.SinkStreamingBase
    public void sendActual(Binding binding) {
        Binding copy = BindingFactory.copy(binding);
        checkThread();
        try {
            this.blockingQueue.put(copy);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.aksw.jena_sparql_api.rx.io.resultset.SinkStreamingBase
    public void finishActual() {
        checkThread();
        try {
            this.blockingQueue.put(POISON);
            try {
                this.thread.join();
            } catch (Exception e) {
                if (this.threadException != null) {
                    throw new RuntimeException("Consumer thread terminated exceptionally", this.threadException);
                }
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }
}
