/*
 * Decompiled with CFR 0.152.
 */
package fr.inria.eventcloud.overlay;

import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.rdf.model.Model;
import fr.inria.eventcloud.api.exceptions.MalformedSparqlQueryException;
import fr.inria.eventcloud.api.responses.SparqlAskResponse;
import fr.inria.eventcloud.api.responses.SparqlConstructResponse;
import fr.inria.eventcloud.api.responses.SparqlSelectResponse;
import fr.inria.eventcloud.api.wrappers.ModelWrapper;
import fr.inria.eventcloud.api.wrappers.ResultSetWrapper;
import fr.inria.eventcloud.datastore.TransactionalTdbDatastore;
import fr.inria.eventcloud.messages.request.can.SparqlAtomicRequest;
import fr.inria.eventcloud.messages.response.can.QuadruplePatternResponse;
import fr.inria.eventcloud.reasoner.SparqlColander;
import fr.inria.eventcloud.reasoner.SparqlReasoner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.messages.request.Request;
import org.objectweb.proactive.extensions.p2p.structured.overlay.StructuredOverlay;
import org.objectweb.proactive.extensions.p2p.structured.overlay.can.CanRequestResponseManager;
import org.objectweb.proactive.extensions.p2p.structured.utils.converters.ObjectToByteConverter;

public class SemanticRequestResponseManager
extends CanRequestResponseManager {
    private static final long serialVersionUID = 130L;
    private SparqlColander colander;
    private final ConcurrentHashMap<UUID, Future<? extends Object>> pendingResults;
    private ExecutorService threadPool;

    public SemanticRequestResponseManager(TransactionalTdbDatastore colanderDatastore) {
        this.colander = new SparqlColander(colanderDatastore);
        this.pendingResults = new ConcurrentHashMap(16, 0.75f, (Integer)P2PStructuredProperties.MAO_SOFT_LIMIT_PEERS.getValue());
        this.threadPool = Executors.newFixedThreadPool(30);
    }

    public SparqlAskResponse executeSparqlAsk(String sparqlAskQuery, StructuredOverlay overlay) throws MalformedSparqlQueryException {
        List<SparqlAtomicRequest> parsingResult = SparqlReasoner.parse(sparqlAskQuery);
        List<QuadruplePatternResponse> responses = this.dispatch(parsingResult, overlay);
        boolean result = this.getColander().filterSparqlAsk(sparqlAskQuery, responses);
        long[] measurements = this.aggregateMeasurements(responses);
        return new SparqlAskResponse(measurements[0], measurements[1], measurements[2], measurements[3], Boolean.valueOf(result));
    }

    public SparqlConstructResponse executeSparqlConstruct(String sparqlConstructQuery, StructuredOverlay overlay) throws MalformedSparqlQueryException {
        List<SparqlAtomicRequest> parsingResult = SparqlReasoner.parse(sparqlConstructQuery);
        List<QuadruplePatternResponse> responses = this.dispatch(parsingResult, overlay);
        Model result = this.getColander().filterSparqlConstruct(sparqlConstructQuery, responses);
        long[] measurements = this.aggregateMeasurements(responses);
        return new SparqlConstructResponse(measurements[0], measurements[1], measurements[2], measurements[3], new ModelWrapper(result));
    }

    public SparqlSelectResponse executeSparqlSelect(String sparqlSelectQuery, StructuredOverlay overlay) throws MalformedSparqlQueryException {
        List<SparqlAtomicRequest> parsingResult = SparqlReasoner.parse(sparqlSelectQuery);
        List<QuadruplePatternResponse> responses = this.dispatch(parsingResult, overlay);
        ResultSet result = this.getColander().filterSparqlSelect(sparqlSelectQuery, responses);
        long[] measurements = this.aggregateMeasurements(responses);
        SparqlSelectResponse sparqlSelectResponse = new SparqlSelectResponse(measurements[0], measurements[1], measurements[2], measurements[3], new ResultSetWrapper(result));
        if (((Boolean)P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
            HashMap<String, Integer> mapSubQueryNbResults = new HashMap<String, Integer>();
            long responsesSizeInBytes = 0L;
            int nbIntermediateResults = 0;
            for (int i = 0; i < responses.size(); ++i) {
                QuadruplePatternResponse response = responses.get(i);
                mapSubQueryNbResults.put(response.getInitialRequestForThisResponse(), ((List)response.getResult()).size());
                nbIntermediateResults += ((List)response.getResult()).size();
                for (int j = 0; j < ((List)response.getResult()).size(); ++j) {
                    try {
                        responsesSizeInBytes += (long)ObjectToByteConverter.convert(((List)response.getResult()).get(j)).length;
                        continue;
                    }
                    catch (IOException e) {
                        throw new IllegalStateException(e);
                    }
                }
            }
            sparqlSelectResponse.setMapSubQueryNbResults(mapSubQueryNbResults);
            sparqlSelectResponse.setNbIntermediateResults(nbIntermediateResults);
            sparqlSelectResponse.setSizeOfIntermediateResultsInBytes(responsesSizeInBytes);
        }
        return sparqlSelectResponse;
    }

    private long[] aggregateMeasurements(List<QuadruplePatternResponse> responses) {
        long inboundHopCount = 0L;
        long outboundHopCount = 0L;
        long latency = 0L;
        long queryDatastoreTime = 0L;
        for (QuadruplePatternResponse response : responses) {
            if ((long)response.getLatency() > latency) {
                latency = response.getLatency();
            }
            inboundHopCount += (long)response.getInboundHopCount();
            outboundHopCount += (long)response.getOutboundHopCount();
            queryDatastoreTime += response.getActionTime();
        }
        return new long[]{inboundHopCount, outboundHopCount, latency, queryDatastoreTime};
    }

    private List<QuadruplePatternResponse> dispatch(List<SparqlAtomicRequest> requests, final StructuredOverlay overlay) {
        final List<QuadruplePatternResponse> replies = Collections.synchronizedList(new ArrayList(requests.size()));
        final CountDownLatch doneSignal = new CountDownLatch(requests.size());
        for (final SparqlAtomicRequest request : requests) {
            this.getThreadPool().execute(new Runnable(){

                @Override
                public void run() {
                    QuadruplePatternResponse resp = (QuadruplePatternResponse)SemanticRequestResponseManager.this.dispatch((Request)request, overlay);
                    if (((Boolean)P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
                        resp.setInitialRequestForThisResponse(request.getQuery());
                    }
                    replies.add(resp);
                    doneSignal.countDown();
                }
            });
        }
        try {
            doneSignal.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return replies;
    }

    public ConcurrentHashMap<UUID, Future<? extends Object>> getPendingResults() {
        return this.pendingResults;
    }

    public ExecutorService getThreadPool() {
        return this.threadPool;
    }

    public SparqlColander getColander() {
        return this.colander;
    }

    public void close() throws IOException {
        this.colander.close();
        this.threadPool.shutdown();
    }
}

