/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.jena_sparql_api.io.endpoint;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ObjectArrays;
import com.google.common.io.ByteSource;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.aksw.jena_sparql_api.io.endpoint.ConcurrentFileReader;

public class ConcurrentFileEndpoint
implements WritableByteChannel {
    protected Path path;
    protected SeekableByteChannel writeChannel;
    protected boolean isAbandoned;
    protected CompletableFuture<Path> isDone = new CompletableFuture();

    public static ConcurrentFileEndpoint create(Path path, OpenOption ... options) throws IOException {
        SeekableByteChannel writeChannel = Files.newByteChannel(path, (OpenOption[])ObjectArrays.concat((Object)StandardOpenOption.WRITE, (Object[])options));
        return new ConcurrentFileEndpoint(path, writeChannel);
    }

    public ConcurrentFileEndpoint(Path path, SeekableByteChannel writeChannel) {
        this.path = path;
        this.writeChannel = writeChannel;
    }

    public Path getPath() {
        return this.path;
    }

    public boolean isAbandoned() {
        return this.isAbandoned;
    }

    public void abandon() {
        if (this.isOpen()) {
            this.isAbandoned = true;
            try {
                this.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                this.isDone.completeExceptionally(new RuntimeException("Stream aborted"));
            }
        }
    }

    public CompletableFuture<Path> getIsDone() {
        return this.isDone;
    }

    @Override
    public boolean isOpen() {
        boolean result = this.writeChannel.isOpen();
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        ConcurrentFileEndpoint concurrentFileEndpoint = this;
        synchronized (concurrentFileEndpoint) {
            this.writeChannel.close();
            this.notifyAll();
            this.isDone.complete(this.path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int write(ByteBuffer src) throws IOException {
        int result = this.writeChannel.write(src);
        ConcurrentFileEndpoint concurrentFileEndpoint = this;
        synchronized (concurrentFileEndpoint) {
            this.notifyAll();
        }
        return result;
    }

    public ReadableByteChannel newReadChannel() throws IOException {
        ConcurrentFileReader result = ConcurrentFileReader.create(this.path, this, null);
        return result;
    }

    public String toString() {
        return "ConcurrentFileEndpoint [path=" + String.valueOf(this.path) + ", isAbandoned=" + this.isAbandoned + ", isOpen()=" + this.isOpen() + "]";
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Path file = Paths.get("/tmp/myfile.nt", new String[0]);
        for (int i = 0; i < 2; ++i) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            System.out.println("Lines: " + Files.lines(file).count());
            System.out.println("Time taken [scan on input file]: " + (float)stopwatch.stop().elapsed(TimeUnit.MILLISECONDS) * 0.001f);
        }
        ConcurrentFileEndpoint endpoint = ConcurrentFileEndpoint.create(file, StandardOpenOption.CREATE);
        ByteSource byteSource = com.google.common.io.Files.asByteSource((File)new File("/home/raven/Projects/Data/LSQ/deleteme.sorted.nt"));
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        tasks.add(() -> {
            try (OutputStream out = Channels.newOutputStream(endpoint);){
                byteSource.copyTo(out);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        int numTasks = 1;
        int numWorkers = 4;
        for (int i = 0; i < numTasks; ++i) {
            tasks.add(() -> {
                try {
                    BufferedReader br = new BufferedReader(new InputStreamReader(Channels.newInputStream(endpoint.newReadChannel())));
                    System.out.println("Thread #" + Thread.currentThread().getId() + ": " + br.lines().count());
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        ExecutorService es = Executors.newFixedThreadPool(Math.max(numWorkers, numTasks + 1));
        List futures = tasks.stream().map(es::submit).collect(Collectors.toList());
        es.shutdown();
        es.awaitTermination(3L, TimeUnit.SECONDS);
        for (Future f : futures) {
            try {
                f.get();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Time taken [concurrent read/write]: " + (float)stopwatch.stop().elapsed(TimeUnit.MILLISECONDS) * 0.001f);
    }
}

