/*
 * Decompiled with CFR 0.152.
 */
package fr.inria.eventcloud.pubsub;

import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashCode;
import com.hp.hpl.jena.datatypes.RDFDatatype;
import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.query.QueryExecution;
import com.hp.hpl.jena.query.QueryExecutionFactory;
import com.hp.hpl.jena.query.QueryFactory;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.sparql.algebra.Algebra;
import com.hp.hpl.jena.sparql.algebra.Op;
import com.hp.hpl.jena.sparql.algebra.OpAsQuery;
import com.hp.hpl.jena.sparql.algebra.Transform;
import com.hp.hpl.jena.sparql.algebra.TransformBase;
import com.hp.hpl.jena.sparql.algebra.Transformer;
import com.hp.hpl.jena.sparql.algebra.op.OpProject;
import com.hp.hpl.jena.sparql.core.Var;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import com.hp.hpl.jena.sparql.syntax.Element;
import com.hp.hpl.jena.sparql.syntax.ElementNamedGraph;
import com.hp.hpl.jena.sparql.syntax.ElementVisitor;
import com.hp.hpl.jena.sparql.syntax.ElementVisitorBase;
import com.hp.hpl.jena.sparql.syntax.ElementWalker;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.wrappers.BindingWrapper;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.datastore.AccessMode;
import fr.inria.eventcloud.datastore.TransactionalDatasetGraph;
import fr.inria.eventcloud.datastore.TransactionalTdbDatastore;
import fr.inria.eventcloud.datastore.Vars;
import fr.inria.eventcloud.messages.request.can.IndexEphemeralSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.IndexSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.UnsubscribeRequest;
import fr.inria.eventcloud.operations.can.RetrieveSubSolutionOperation;
import fr.inria.eventcloud.overlay.SemanticCanOverlay;
import fr.inria.eventcloud.overlay.SemanticPeer;
import fr.inria.eventcloud.proxies.SubscribeProxy;
import fr.inria.eventcloud.pubsub.SubscriberConnectionFailure;
import fr.inria.eventcloud.pubsub.Subscription;
import fr.inria.eventcloud.pubsub.SubscriptionRewriter;
import fr.inria.eventcloud.pubsub.Subsubscription;
import fr.inria.eventcloud.pubsub.notifications.BindingNotification;
import fr.inria.eventcloud.pubsub.notifications.PollingSignalNotification;
import fr.inria.eventcloud.pubsub.notifications.QuadruplesNotification;
import fr.inria.eventcloud.pubsub.notifications.SignalNotification;
import fr.inria.eventcloud.reasoner.AtomicQuery;
import fr.inria.eventcloud.utils.SparqlResultSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.mutable.MutableObject;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.messages.request.Request;
import org.objectweb.proactive.extensions.p2p.structured.utils.Pair;
import org.openjena.riot.out.NodeFmtLib;
import org.openjena.riot.out.OutputLangUtils;
import org.openjena.riot.tokens.Tokenizer;
import org.openjena.riot.tokens.TokenizerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PublishSubscribeUtils {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeUtils.class);

    private PublishSubscribeUtils() {
    }

    public static final Quadruple createMetaQuadruple(Quadruple quadrupleMatching, Node subscriptionIdUri, Node subSubscriptionId) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        OutputStreamWriter osw = new OutputStreamWriter(baos);
        OutputLangUtils.output((Writer)osw, (Node)subSubscriptionId, null);
        try {
            osw.write(32);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        OutputLangUtils.output((Writer)osw, (Node)quadrupleMatching.createMetaGraphNode(), (Node)quadrupleMatching.getSubject(), (Node)quadrupleMatching.getPredicate(), (Node)quadrupleMatching.getObject(), null, null);
        try {
            osw.flush();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return new Quadruple(PublishSubscribeUtils.createQuadrupleHashUri(quadrupleMatching), subscriptionIdUri, PublishSubscribeConstants.QUADRUPLE_MATCHES_SUBSCRIPTION_NODE, Node.createLiteral((String)new String(baos.toByteArray())));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static final List<SubscriptionId> findSubscriptionIds(TransactionalTdbDatastore datastore, SubscriptionId originalSubscriptionId) {
        StringBuilder query = new StringBuilder();
        query.append("SELECT ");
        query.append(Vars.SUBSCRIPTION_ID.toString());
        query.append(" WHERE {\n    GRAPH ");
        query.append(NodeFmtLib.str((Node)PublishSubscribeConstants.SUBSCRIPTION_NS_NODE));
        query.append(" {\n        ");
        query.append("?sIdUri ");
        query.append(NodeFmtLib.str((Node)PublishSubscribeConstants.SUBSCRIPTION_ORIGINAL_ID_NODE));
        query.append(' ');
        query.append(NodeFmtLib.str((Node)originalSubscriptionId.toJenaNode()));
        query.append(" .\n        ?sIdUri ");
        query.append(NodeFmtLib.str((Node)PublishSubscribeConstants.SUBSCRIPTION_ID_NODE));
        query.append(' ');
        query.append(Vars.SUBSCRIPTION_ID.toString());
        query.append(" .\n    }\n}");
        ArrayList<SubscriptionId> ids = new ArrayList<SubscriptionId>();
        TransactionalDatasetGraph txnGraph = datastore.begin(AccessMode.READ_ONLY);
        QueryExecution qExec = null;
        try {
            qExec = QueryExecutionFactory.create((String)query.toString(), (Dataset)txnGraph.getUnderlyingDataset());
            ResultSet result = qExec.execSelect();
            while (result.hasNext()) {
                Binding binding = result.nextBinding();
                SubscriptionId subscriptionId = SubscriptionId.parseSubscriptionId((String)binding.get(Vars.SUBSCRIPTION_ID).getLiteralLexicalForm());
                ids.add(subscriptionId);
            }
        }
        finally {
            if (qExec != null) {
                qExec.close();
            }
            txnGraph.end();
        }
        return ids;
    }

    public static final Pair<Quadruple, SubscriptionId> extractMetaInformation(Quadruple metaQuad) {
        String objectValue = metaQuad.getObject().getLiteralLexicalForm();
        ByteArrayInputStream bais = new ByteArrayInputStream(objectValue.getBytes());
        Tokenizer tokenizer = TokenizerFactory.makeTokenizerUTF8((InputStream)bais);
        Node subSubscriptionId = tokenizer.next().asNode();
        return Pair.create((Object)new Quadruple(tokenizer.next().asNode(), tokenizer.next().asNode(), tokenizer.next().asNode(), tokenizer.next().asNode()), (Object)SubscriptionId.parseSubscriptionId((String)subSubscriptionId.getLiteralLexicalForm()));
    }

    public static final Node createQuadrupleHashUri(Quadruple quad) {
        return PublishSubscribeUtils.createQuadrupleHashUri(quad.hashValue());
    }

    public static final Node createQuadrupleHashUri(HashCode quadHash) {
        return Node.createURI((String)"urn:ec:quad:".concat(quadHash.toString()));
    }

    public static final Node createSubscriptionIdUri(SubscriptionId id) {
        return PublishSubscribeUtils.createSubscriptionIdUri(id.toString());
    }

    public static final Node createSubSubscriptionIdUri(String subSubscriptionId) {
        return Node.createURI((String)("urn:ec:ss:" + subSubscriptionId));
    }

    public static final Node createSubSubscriptionIdUri(SubscriptionId subSubscriptionId) {
        return PublishSubscribeUtils.createSubSubscriptionIdUri(subSubscriptionId.toString());
    }

    public static final Node createSubscriptionIdUri(String subscriptionId) {
        return Node.createURI((String)("urn:ec:s:" + subscriptionId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static final void deleteSubscription(TransactionalTdbDatastore datastore, SubscriptionId subscriptionId) {
        Node subscriptionIdUri = PublishSubscribeUtils.createSubscriptionIdUri(subscriptionId);
        TransactionalDatasetGraph txnGraph = datastore.begin(AccessMode.READ_ONLY);
        ArrayList subscriptionQuadruples = null;
        try {
            subscriptionQuadruples = Lists.newArrayList((Iterator)txnGraph.find(PublishSubscribeConstants.SUBSCRIPTION_NS_NODE, subscriptionIdUri, PublishSubscribeConstants.SUBSCRIPTION_HAS_SUBSUBSCRIPTION_NODE, Node.ANY));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            txnGraph.end();
        }
        if (subscriptionQuadruples != null) {
            txnGraph = datastore.begin(AccessMode.WRITE);
            try {
                for (Quadruple quad : subscriptionQuadruples) {
                    txnGraph.delete(Node.ANY, PublishSubscribeUtils.createSubSubscriptionIdUri(quad.getObject().getLiteralLexicalForm()), Node.ANY, Node.ANY);
                }
                txnGraph.delete(Node.ANY, subscriptionIdUri, Node.ANY, Node.ANY);
                txnGraph.commit();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                txnGraph.end();
            }
        }
    }

    public static final SubscriptionId extractSubscriptionId(Node subscriptionIdUri) {
        if (!subscriptionIdUri.isURI() || !subscriptionIdUri.getURI().startsWith("urn:ec:s:")) {
            throw new IllegalArgumentException("The specified subscription id URI is not valid: " + subscriptionIdUri);
        }
        return SubscriptionId.parseSubscriptionId((String)subscriptionIdUri.getURI().substring(subscriptionIdUri.getURI().lastIndexOf(58) + 1));
    }

    public static final String extractSubscriptionId(String subscriptionIdUri) {
        int index = subscriptionIdUri.lastIndexOf(58);
        if (index == -1) {
            throw new IllegalArgumentException("The specified subscription id URI is not valid: " + subscriptionIdUri);
        }
        return subscriptionIdUri.substring(index + 1);
    }

    public static final Binding filter(Quadruple quad, Set<Var> resultVars, AtomicQuery atomicQuery) {
        Sets.SetView vars = Sets.intersection(resultVars, (Set)FluentIterable.from(atomicQuery.getVars()).toImmutableSet());
        BindingMap binding = new BindingMap();
        Node[] quadNodes = quad.toArray();
        int i = 0;
        for (Node node : atomicQuery.toArray()) {
            if (node.isVariable() && vars.contains(Var.alloc((String)node.getName()))) {
                Node resultNode = i == 0 ? quad.createMetaGraphNode() : quadNodes[i];
                binding.add(Var.alloc((String)node.getName()), resultNode);
            }
            ++i;
        }
        return binding;
    }

    public static final String removeResultVarsExceptGraphVar(String subscription) {
        Query query = QueryFactory.create((String)subscription);
        final MutableObject graphNode = new MutableObject();
        ElementWalker.walk((Element)query.getQueryPattern(), (ElementVisitor)new ElementVisitorBase(){

            public void visit(ElementNamedGraph el) {
                if (!el.getGraphNameNode().isVariable()) {
                    throw new IllegalArgumentException("The specified subscription does not have a graph variable: " + el.getGraphNameNode());
                }
                graphNode.setValue((Object)el.getGraphNameNode());
            }
        });
        Op op = Transformer.transform((Transform)new TransformBase(){

            public Op transform(OpProject opProject, Op subOp) {
                return new OpProject(subOp, (List)Lists.newArrayList((Object[])new Var[]{(Var)graphNode.getValue()}));
            }
        }, (Op)Algebra.compile((Query)query));
        return OpAsQuery.asQuery((Op)op).toString();
    }

    public static void rewriteSubscriptionOrNotifySender(SemanticCanOverlay semanticOverlay, Subscription subscription, Quadruple quadruple) {
        if (subscription.getSubSubscriptions().length == 1) {
            log.debug("{} matches a subscription which cannot be rewritten, a notification will be delivered", (Object)quadruple);
            PublishSubscribeUtils.notifySubscriberAboutSolution(semanticOverlay, subscription, quadruple);
        } else {
            PublishSubscribeUtils.rewriteAndIndexSubscription(semanticOverlay, subscription, quadruple);
        }
    }

    private static void notifySubscriberAboutSolution(SemanticCanOverlay semanticCanOverlay, Subscription subscription, Quadruple quadruple) {
        if (semanticCanOverlay.hasSocialFilter()) {
            double relationshipStrength = semanticCanOverlay.getSocialFilter().getRelationshipStrength(quadruple.getPublicationSource(), subscription.getSubscriptionDestination()).getStrength();
            PublishSubscribeUtils.logSocialFilterAnswer(subscription, quadruple, relationshipStrength);
            if (relationshipStrength < (Double)EventCloudProperties.SOCIAL_FILTER_THRESHOLD.getValue()) {
                return;
            }
        }
        try {
            SubscribeProxy subscriber = subscription.getSubscriberProxy();
            String source = PAActiveObject.getUrl((Object)semanticCanOverlay.getStub());
            switch (subscription.getType()) {
                case BINDING: {
                    BindingNotification notification = new BindingNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), source, (Binding)new BindingWrapper(PublishSubscribeUtils.createBindingSolution(subscription, quadruple)));
                    subscriber.receive(notification);
                    for (Subscription.Stub stub : subscription.getStubs()) {
                        SemanticPeer peerStub = semanticCanOverlay.findPeerStub(stub.peerUrl);
                        if (peerStub != null) {
                            peerStub.receive(new RetrieveSubSolutionOperation(notification.getId(), stub.quadrupleHash));
                            continue;
                        }
                        log.error("Error while retrieving peer stub for url: {}", (Object)stub.peerUrl);
                    }
                    break;
                }
                case COMPOUND_EVENT: {
                    if (EventCloudProperties.isSbce1PubSubAlgorithmUsed()) {
                        subscriber.receive(new PollingSignalNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), source));
                        break;
                    }
                    if (!EventCloudProperties.isSbce2PubSubAlgorithmUsed()) break;
                    QuadruplesNotification quadruplesNotification = new QuadruplesNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), source, (List<Quadruple>)ImmutableList.of((Object)quadruple));
                    if (semanticCanOverlay.markAsSent(quadruplesNotification.getId(), quadruple)) {
                        subscriber.receive(quadruplesNotification);
                    }
                    semanticCanOverlay.getStub().sendv((Request)new IndexEphemeralSubscriptionRequest(quadruple.createMetaGraphNode(), subscription.getOriginalId(), subscription.getSubscriberUrl()));
                    break;
                }
                case SIGNAL: {
                    subscriber.receive(new SignalNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), source));
                    break;
                }
                case UNKNOWN: {
                    throw new IllegalStateException();
                }
            }
        }
        catch (ExecutionException e) {
            log.warn("Notification cannot be sent because no SubscribeProxy found under URL: " + subscription.getSubscriberUrl());
            PublishSubscribeUtils.handleSubscriberConnectionFailure(semanticCanOverlay, subscription);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void handleSubscriberConnectionFailure(SemanticCanOverlay semanticCanOverlay, Subscription subscription) {
        SubscriberConnectionFailure subscriberConnectionFailure = new SubscriberConnectionFailure();
        SubscriberConnectionFailure oldValue = semanticCanOverlay.getSubscriberConnectionFailures().putIfAbsent(subscription.getOriginalId(), subscriberConnectionFailure);
        if (oldValue != null) {
            subscriberConnectionFailure = oldValue;
        }
        SubscriberConnectionFailure subscriberConnectionFailure2 = subscriberConnectionFailure;
        synchronized (subscriberConnectionFailure2) {
            subscriberConnectionFailure.incNbAttempts();
            if (subscriberConnectionFailure.getNbAttempts() == ((Integer)EventCloudProperties.PROXY_MAX_LOOKUP_ATTEMPTS.getValue()).intValue()) {
                for (Subsubscription subSubscription : subscription.getSubSubscriptions()) {
                    PAFuture.waitFor((Object)semanticCanOverlay.getStub().send((Request)new UnsubscribeRequest(subscription.getOriginalId(), subSubscription.getAtomicQuery(), subscription.getType() == NotificationListenerType.BINDING)));
                    semanticCanOverlay.getSubscriberConnectionFailures().remove(subscription.getOriginalId());
                    log.info("Removed subscription {} due to subscriber which is not reachable under URL {}", (Object)subscription.getId(), (Object)subscription.getSubscriberUrl());
                }
            }
        }
    }

    private static void logSocialFilterAnswer(Subscription subscription, Quadruple quadruple, double relationshipStrength) {
        if (log.isDebugEnabled()) {
            log.debug("SocialFilterAnswer[source={}, destination={}, threshold={}, relationship_strengh={}, quadruple={}|{}|{}|{}]", new Object[]{quadruple.getPublicationSource(), subscription.getSubscriptionDestination(), EventCloudProperties.SOCIAL_FILTER_THRESHOLD.getValue(), relationshipStrength, quadruple.getGraph(), quadruple.getSubject(), quadruple.getPredicate(), quadruple.getObject()});
        }
    }

    private static Binding createBindingSolution(Subscription subscription, Quadruple quadruple) {
        return PublishSubscribeUtils.filter(quadruple, subscription.getResultVars(), subscription.getSubSubscriptions()[0].getAtomicQuery());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void rewriteAndIndexSubscription(SemanticCanOverlay overlay, Subscription subscription, Quadruple quadrupleMatching) {
        if (subscription.getType() == NotificationListenerType.BINDING) {
            Quadruple metaQuad = PublishSubscribeUtils.createMetaQuadruple(quadrupleMatching, PublishSubscribeUtils.createSubscriptionIdUri(subscription.getId()), Node.createLiteral((String)subscription.getId().toString(), (RDFDatatype)XSDDatatype.XSDlong));
            TransactionalDatasetGraph txnGraph = overlay.getSubscriptionsDatastore().begin(AccessMode.WRITE);
            try {
                txnGraph.add(metaQuad);
                txnGraph.commit();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                txnGraph.end();
            }
        }
        Subscription rewrittenSubscription = SubscriptionRewriter.rewrite(subscription, quadrupleMatching);
        rewrittenSubscription.addStub(new Subscription.Stub(PAActiveObject.getUrl((Object)overlay.getStub()), quadrupleMatching.hashValue()));
        if (((Boolean)P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
            log.info("Peer " + (Object)((Object)overlay) + " is about to dispatch a rewritten subscription, creation time = " + rewrittenSubscription.getCreationTime() + " , subscription: " + rewrittenSubscription.getSparqlQuery());
        }
        overlay.dispatchv((Request)new IndexSubscriptionRequest(rewrittenSubscription));
    }

    public static final class BindingMap
    implements com.hp.hpl.jena.sparql.engine.binding.BindingMap,
    Serializable {
        private static final long serialVersionUID = 130L;
        private transient Map<Var, Node> content = new HashMap<Var, Node>();

        public Iterator<Var> vars() {
            return this.content.keySet().iterator();
        }

        public boolean contains(Var var) {
            return this.content.containsKey(var);
        }

        public Node get(Var var) {
            return this.content.get(var);
        }

        public int size() {
            return this.content.size();
        }

        public boolean isEmpty() {
            return this.content.isEmpty();
        }

        public void add(Var var, Node node) {
            this.content.put(var, node);
        }

        public void addAll(Binding binding) {
            Iterator varsIt = binding.vars();
            while (varsIt.hasNext()) {
                Var var = (Var)varsIt.next();
                this.content.put(var, binding.get(var));
            }
        }

        public String toString() {
            StringBuilder result = new StringBuilder("(");
            Iterator<Map.Entry<Var, Node>> it = this.content.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Var, Node> entry = it.next();
                result.append(entry.getKey());
                result.append('=');
                result.append(entry.getValue());
                if (!it.hasNext()) continue;
                result.append(", ");
            }
            result.append(')');
            return result.toString();
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            out.defaultWriteObject();
            SparqlResultSerializer.serialize((OutputStream)out, (Binding)this, (boolean)((Boolean)EventCloudProperties.COMPRESSION.getValue()));
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            Binding binding = SparqlResultSerializer.deserializeBinding((InputStream)in, (boolean)((Boolean)EventCloudProperties.COMPRESSION.getValue()));
            this.content = new HashMap<Var, Node>();
            Iterator it = binding.vars();
            while (it.hasNext()) {
                Var var = (Var)it.next();
                this.content.put(var, binding.get(var));
            }
        }
    }
}

