package org.aksw.beast.concurrent;

import com.google.common.collect.AbstractIterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/* loaded from: input_file:org/aksw/beast/concurrent/ParallelStreams.class */
public class ParallelStreams {
    public static <T> Stream<T> join(Stream<Stream<T>> stream) {
        return join(stream, Executors.newCachedThreadPool());
    }

    public static <T> Stream<T> join(Stream<Stream<T>> stream, ExecutorService executorService) {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(executorService);
        final Set set = (Set) stream.map(stream2 -> {
            return executorCompletionService.submit(() -> {
                try {
                    linkedBlockingQueue.getClass();
                    stream2.forEach(linkedBlockingQueue::add);
                } finally {
                    stream2.close();
                }
            }, null);
        }).collect(Collectors.toSet());
        Thread currentThread = Thread.currentThread();
        final Boolean[] boolArr = {false};
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future<?> submit = newSingleThreadExecutor.submit(() -> {
            Throwable th = null;
            while (!Thread.interrupted() && !set.isEmpty()) {
                try {
                    Future take = executorCompletionService.take();
                    synchronized (set) {
                        set.remove(take);
                    }
                    take.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e2) {
                    th = e2.getCause();
                }
            }
            executorService.shutdownNow();
            synchronized (boolArr) {
                if (boolArr[0].booleanValue()) {
                    currentThread.interrupt();
                }
            }
            if (th != null) {
                throw new RuntimeException(th);
            }
        });
        executorService.shutdown();
        newSingleThreadExecutor.shutdown();
        final Runnable runnable = () -> {
            executorService.shutdownNow();
            submit.cancel(true);
            try {
                submit.get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                throw new RuntimeException(e2.getCause());
            }
        };
        AbstractIterator<T> abstractIterator = new AbstractIterator<T>() { // from class: org.aksw.beast.concurrent.ParallelStreams.1
            protected T computeNext() {
                Object endOfData;
                if (set.isEmpty() || Thread.interrupted()) {
                    endOfData = endOfData();
                    runnable.run();
                } else {
                    try {
                        synchronized (boolArr) {
                            boolArr[0] = true;
                        }
                        endOfData = linkedBlockingQueue.take();
                        synchronized (boolArr) {
                            boolArr[0] = false;
                        }
                    } catch (InterruptedException e) {
                        endOfData = endOfData();
                        runnable.run();
                        Thread.currentThread().interrupt();
                    }
                }
                return (T) endOfData;
            }
        };
        Iterable iterable = () -> {
            return abstractIterator;
        };
        Stream<T> stream3 = StreamSupport.stream(iterable.spliterator(), false);
        stream3.onClose(runnable);
        return stream3;
    }
}
