/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.jena_sparql_api.io.filter.sys;

import com.google.common.base.Stopwatch;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.observers.DisposableSingleObserver;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.aksw.commons.io.process.util.SimpleProcessExecutor;
import org.aksw.jena_sparql_api.io.endpoint.ConcurrentFileReader;
import org.aksw.jena_sparql_api.io.endpoint.HotFile;

public class HotFileFromProcess
implements HotFile {
    protected Path path;
    protected Channel writeChannel;
    protected Disposable disposable;
    protected CompletableFuture<Path> resultFuture;

    public HotFileFromProcess(Path path, Single<Integer> processSingle) {
        this.init(path, processSingle);
    }

    public static HotFileFromProcess createStarted(Path hotFile, Single<Integer> processSingle) {
        HotFileFromProcess result = new HotFileFromProcess(hotFile, processSingle);
        return result;
    }

    public void abort() {
        this.disposable.dispose();
    }

    public void init(final Path path, Single<Integer> processSingle) {
        this.path = path;
        this.resultFuture = new CompletableFuture();
        final boolean[] isOpenFlag = new boolean[]{true};
        final Disposable disposable = (Disposable)processSingle.subscribeOn(Schedulers.io()).doFinally(() -> {
            isOpenFlag[0] = false;
        }).subscribeWith((SingleObserver)new DisposableSingleObserver<Integer>(){

            public void onSuccess(Integer t) {
                HotFileFromProcess.this.resultFuture.complete(path);
            }

            public void onError(Throwable e) {
                HotFileFromProcess.this.resultFuture.completeExceptionally(e);
            }
        });
        this.writeChannel = new Channel(){

            @Override
            public boolean isOpen() {
                boolean result = isOpenFlag[0];
                return result;
            }

            @Override
            public void close() throws IOException {
                disposable.dispose();
            }
        };
    }

    public CompletableFuture<Path> future() {
        return this.resultFuture;
    }

    @Override
    public InputStream newInputStream() throws IOException {
        ConcurrentFileReader tmp = ConcurrentFileReader.create(this.path, this.writeChannel, 100);
        InputStream result = Channels.newInputStream(tmp);
        return result;
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Path src = Paths.get("/home/raven/Projects/Data/LSQ/deleteme.sorted.nt", new String[0]);
        Path tgt = Paths.get("/tmp/data.nt", new String[0]);
        String[] cmd = new String[]{"/bin/cp", src.toAbsolutePath().toString(), tgt.toAbsolutePath().toString()};
        Single single = SimpleProcessExecutor.wrap((ProcessBuilder)new ProcessBuilder(cmd)).executeFuture();
        HotFileFromProcess file = HotFileFromProcess.createStarted(tgt, (Single<Integer>)single);
        Thread.sleep(1000L);
        System.out.println("Here");
        file.future().whenComplete((path, t) -> System.out.println("File is ready!"));
        int numTasks = 1;
        int numWorkers = 4;
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        for (int i = 0; i < numTasks; ++i) {
            tasks.add(() -> {
                System.out.println("Thread #" + Thread.currentThread().getId() + ": Started reader ");
                try {
                    BufferedReader br = new BufferedReader(new InputStreamReader(file.newInputStream()));
                    System.out.println("Thread #" + Thread.currentThread().getId() + ": " + br.lines().count());
                    System.out.println("Thread #" + Thread.currentThread().getId() + ": Reader done");
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        ExecutorService es = Executors.newFixedThreadPool(Math.max(numWorkers, numTasks + 1));
        List futures = tasks.stream().map(es::submit).collect(Collectors.toList());
        es.shutdown();
        es.awaitTermination(3L, TimeUnit.SECONDS);
        for (Future f : futures) {
            try {
                f.get();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Time taken [concurrent read/write]: " + (float)stopwatch.stop().elapsed(TimeUnit.MILLISECONDS) * 0.001f);
    }

    public String toString() {
        return "HotFileFromProcess [path=" + String.valueOf(this.path) + "]";
    }
}

