package org.aksw.jenax.arq.service.vfs;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jena.atlas.lib.Lib;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.iterator.QueryIter1;
import org.apache.jena.sparql.engine.iterator.QueryIterConcat;
import org.apache.jena.sparql.engine.iterator.QueryIterPlainWrapper;

/* loaded from: input_file:org/aksw/jenax/arq/service/vfs/QueryIterRepeatApplyConcurrent.class */
public abstract class QueryIterRepeatApplyConcurrent extends QueryIter1 {
    private int count;
    private QueryIterator currentStage;
    private volatile boolean cancelRequested;
    private final ExecutorService executorService;
    private final BlockingQueue<TaskEntry<Prefetch>> taskQueue;
    private List<TaskEntry<Prefetch>> drainedTasks;
    private static final TaskEntry<Prefetch> POISON = new TaskEntry<>(null, null);
    private boolean isPoisonScheduled;

    public QueryIterRepeatApplyConcurrent(QueryIterator queryIterator, ExecutionContext executionContext, ExecutorService executorService, int i) {
        super(queryIterator, executionContext);
        this.count = 0;
        this.cancelRequested = false;
        this.isPoisonScheduled = false;
        this.currentStage = null;
        this.executorService = executorService;
        this.taskQueue = new LinkedBlockingQueue(i);
        if (queryIterator == null) {
            Log.error(this, "[QueryIterConcurrentSimple] Repeated application to null input iterator");
        }
    }

    protected QueryIterator getCurrentStage() {
        return this.currentStage;
    }

    protected abstract QueryIterator nextStage(Binding binding, ExecutionContext executionContext);

    protected boolean hasNextBinding() {
        if (isFinished()) {
            return false;
        }
        while (true) {
            if (this.currentStage == null) {
                this.currentStage = makeNextStage();
            }
            if (this.currentStage == null) {
                return false;
            }
            if (this.cancelRequested) {
                performRequestCancel(this.currentStage);
            }
            if (this.currentStage.hasNext()) {
                return true;
            }
            this.currentStage.close();
            this.currentStage = null;
        }
    }

    protected Binding moveToNextBinding() {
        if (hasNextBinding()) {
            return this.currentStage.nextBinding();
        }
        throw new NoSuchElementException(Lib.className(this) + ".next()/finished");
    }

    private synchronized void scheduleNextTasks() {
        while (!this.isPoisonScheduled && this.taskQueue.remainingCapacity() > 0) {
            scheduleNextTask();
        }
    }

    private synchronized void drainAndStopAllTasks() {
        if (this.drainedTasks == null) {
            this.drainedTasks = new ArrayList();
            this.taskQueue.drainTo(this.drainedTasks);
            this.drainedTasks = this.drainedTasks.stream().filter(taskEntry -> {
                return taskEntry != POISON;
            }).toList();
            this.drainedTasks.forEach(taskEntry2 -> {
                ((Prefetch) taskEntry2.task()).stop();
                try {
                    taskEntry2.future().get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            schedulePoison();
        }
    }

    private void scheduleNextTask() {
        if (this.cancelRequested) {
            schedulePoison();
            return;
        }
        QueryIterator input = getInput();
        if (!input.hasNext()) {
            schedulePoison();
            input.close();
            return;
        }
        this.count++;
        Binding binding = (Binding) input.next();
        ExecutionContext execContext = getExecContext();
        Prefetch prefetch = new Prefetch(binding, nextStage(binding, new ExecutionContext(execContext.getContext(), execContext.getActiveGraph(), execContext.getDataset(), execContext.getExecutor())));
        ExecutorService executorService = this.executorService;
        Objects.requireNonNull(prefetch);
        try {
            this.taskQueue.put(new TaskEntry<>(prefetch, executorService.submit(prefetch::run)));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void schedulePoison() {
        if (this.isPoisonScheduled) {
            return;
        }
        this.isPoisonScheduled = true;
        this.taskQueue.add(POISON);
    }

    private QueryIterator getNextStage() {
        QueryIterConcat queryIterConcat;
        try {
            TaskEntry<Prefetch> take = this.taskQueue.take();
            if (take == POISON) {
                queryIterConcat = null;
            } else {
                Prefetch task = take.task();
                task.stop();
                take.future().get();
                ExecutionContext execContext = getExecContext();
                queryIterConcat = new QueryIterConcat(execContext);
                queryIterConcat.add(QueryIterPlainWrapper.create(task.getBufferedItems().iterator(), execContext));
                queryIterConcat.add(task.getIterator());
            }
            return queryIterConcat;
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private QueryIterator makeNextStage() {
        scheduleNextTasks();
        return getNextStage();
    }

    protected void closeSubIterator() {
        if (this.currentStage != null) {
            this.currentStage.close();
        }
        drainAndStopAllTasks();
        this.drainedTasks.forEach(taskEntry -> {
            ((Prefetch) taskEntry.task()).getIterator().close();
        });
    }

    protected void requestSubCancel() {
        this.cancelRequested = true;
        if (this.currentStage != null) {
            this.currentStage.cancel();
        }
        drainAndStopAllTasks();
        this.drainedTasks.forEach(taskEntry -> {
            ((Prefetch) taskEntry.task()).getIterator().cancel();
        });
    }
}
