/*
 * Decompiled with CFR 0.152.
 */
package fr.inria.eventcloud.proxies;

import com.google.common.hash.HashCode;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.QuadruplePattern;
import fr.inria.eventcloud.api.Subscription;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.BindingNotificationListener;
import fr.inria.eventcloud.api.listeners.CompoundEventNotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.listeners.SignalNotificationListener;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.formatters.QuadruplesFormatter;
import fr.inria.eventcloud.messages.request.can.ReconstructCompoundEventRequest;
import fr.inria.eventcloud.messages.request.can.RemoveEphemeralSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.UnsubscribeRequest;
import fr.inria.eventcloud.messages.response.can.QuadruplePatternResponse;
import fr.inria.eventcloud.proxies.AbstractProxy;
import fr.inria.eventcloud.proxies.EventCloudCache;
import fr.inria.eventcloud.proxies.SubscribeProxy;
import fr.inria.eventcloud.proxies.SubscribeProxyAttributeController;
import fr.inria.eventcloud.pubsub.PublishSubscribeUtils;
import fr.inria.eventcloud.pubsub.Subsubscription;
import fr.inria.eventcloud.pubsub.notifications.BindingNotification;
import fr.inria.eventcloud.pubsub.notifications.Notification;
import fr.inria.eventcloud.pubsub.notifications.NotificationId;
import fr.inria.eventcloud.pubsub.notifications.PollingSignalNotification;
import fr.inria.eventcloud.pubsub.notifications.QuadruplesNotification;
import fr.inria.eventcloud.pubsub.notifications.SignalNotification;
import fr.inria.eventcloud.pubsub.solutions.BindingSolution;
import fr.inria.eventcloud.pubsub.solutions.QuadruplesSolution;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.annotation.multiactivity.DefineGroups;
import org.objectweb.proactive.annotation.multiactivity.Group;
import org.objectweb.proactive.annotation.multiactivity.MemberOf;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.core.component.body.ComponentEndActive;
import org.objectweb.proactive.extensions.p2p.structured.messages.request.Request;
import org.objectweb.proactive.extensions.p2p.structured.proxies.Proxies;
import org.objectweb.proactive.multiactivity.MultiActiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefineGroups(value={@Group(name="parallel", selfCompatible=true)})
public class SubscribeProxyImpl
extends AbstractProxy
implements ComponentEndActive,
SubscribeProxy,
SubscribeProxyAttributeController {
    private static final long serialVersionUID = 130L;
    private static final String NOTIFICATIONS_DELIVERED_MAP_NAME = "notificationsDelivered";
    public static final String SUBSCRIBE_PROXY_ADL = "fr.inria.eventcloud.proxies.SubscribeProxy";
    public static final String SUBSCRIBE_SERVICES_ITF = "subscribe-services";
    private static final Logger log = LoggerFactory.getLogger(SubscribeProxyImpl.class);
    private ConcurrentMap<SubscriptionId, SubscriptionEntry<?>> subscriptions;
    private ConcurrentMap<NotificationId, BindingSolution> bindingSolutions;
    private ConcurrentMap<NotificationId, QuadruplesSolution> quadruplesSolutions;
    private DB notificationsDeliveredDB;
    private String componentUri;

    @Override
    public void initComponentActivity(Body body) {
        super.initComponentActivity(body);
        body.setImmediateService("setImmediateServices", false);
        body.setImmediateService("setAttributes", false);
        this.createAndRegisterNotificationsDeliveredDB(body);
    }

    public void runComponentActivity(Body body) {
        new MultiActiveService(body).multiActiveServing(((Integer)EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue(), false, false);
    }

    public void endComponentActivity(Body body) {
        this.closeNotificationsDeliveredDb();
    }

    private void createAndRegisterNotificationsDeliveredDB(Body body) {
        String dbPath = EventCloudProperties.getDefaultTemporaryPath() + "jdbm" + File.separatorChar;
        new File(dbPath).mkdirs();
        String dbFilename = dbPath + body.getID();
        this.notificationsDeliveredDB = DBMaker.newFileDB((File)new File(dbFilename)).cacheSoftRefEnable().closeOnJvmShutdown().deleteFilesAfterClose().transactionDisable().make();
        this.notificationsDeliveredDB.createHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME, (Serializer)new NotificationId.Serializer(), (Serializer)new SubscriptionId.Serializer());
    }

    @Override
    public void setAttributes(EventCloudCache proxy, String componentUri, AlterableElaProperty[] properties) {
        if (this.eventCloudCache == null) {
            this.eventCloudCache = proxy;
            this.proxy = Proxies.newProxy(this.eventCloudCache.getTrackers());
            this.componentUri = componentUri;
            this.subscriptions = new ConcurrentHashMap(100, 0.9f, 2);
            this.bindingSolutions = new ConcurrentHashMap<NotificationId, BindingSolution>((Integer)EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue() * (Integer)EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue(), 0.75f, (Integer)EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue());
            this.quadruplesSolutions = new ConcurrentHashMap<NotificationId, QuadruplesSolution>((Integer)EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue() * (Integer)EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue(), 0.75f, (Integer)EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue());
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

                @Override
                public void run() {
                    SubscribeProxyImpl.this.closeNotificationsDeliveredDb();
                }
            }));
        }
    }

    private synchronized void closeNotificationsDeliveredDb() {
        if (this.notificationsDeliveredDB != null) {
            this.notificationsDeliveredDB.close();
        }
    }

    @MemberOf(value="parallel")
    public void subscribe(Subscription subscription, BindingNotificationListener listener) {
        this.indexSubscription(subscription, (NotificationListener<?>)listener);
    }

    @MemberOf(value="parallel")
    public void subscribe(Subscription subscription, CompoundEventNotificationListener listener) {
        String sparqlQuery = EventCloudProperties.isSbce1PubSubAlgorithmUsed() || EventCloudProperties.isSbce2PubSubAlgorithmUsed() ? PublishSubscribeUtils.removeResultVarsExceptGraphVar(subscription.getSparqlQuery()) : subscription.getSparqlQuery();
        this.indexSubscription(subscription, sparqlQuery, (NotificationListener<?>)listener);
    }

    @MemberOf(value="parallel")
    public void subscribe(Subscription subscription, SignalNotificationListener listener) {
        this.indexSubscription(subscription, (NotificationListener<?>)listener);
    }

    private void indexSubscription(Subscription subscription, NotificationListener<?> listener) {
        this.indexSubscription(subscription, subscription.getSparqlQuery(), listener);
    }

    private void indexSubscription(Subscription subscription, String sparqlQuery, NotificationListener<?> listener) {
        fr.inria.eventcloud.pubsub.Subscription internalSubscription = SubscribeProxyImpl.createInternalSubscription(subscription, this.componentUri, sparqlQuery, listener.getType());
        if (this.subscriptions.putIfAbsent(subscription.getId(), new SubscriptionEntry(internalSubscription, listener)) != null) {
            throw new IllegalArgumentException("Subscription already registered for subscription id: " + internalSubscription.getId());
        }
        super.selectPeer().indexSubscription(internalSubscription);
        log.info("New subscription has been registered from {} with id {}", (Object)PAActiveObject.getBodyOnThis().getUrl(), (Object)internalSubscription.getId());
    }

    private static fr.inria.eventcloud.pubsub.Subscription createInternalSubscription(Subscription subscription, String componentUri, String sparqlSubscription, NotificationListenerType listenerType) {
        return new fr.inria.eventcloud.pubsub.Subscription(subscription.getId(), null, subscription.getId(), subscription.getCreationTime(), sparqlSubscription, componentUri, subscription.getSubscriptionDestination(), listenerType);
    }

    @MemberOf(value="parallel")
    public void unsubscribe(SubscriptionId id) {
        SubscriptionEntry entry = (SubscriptionEntry)this.subscriptions.remove(id);
        if (entry == null) {
            throw new IllegalArgumentException("No subscription registered with the specified subscription id: " + id);
        }
        fr.inria.eventcloud.pubsub.Subscription subscription = entry.subscription;
        for (Subsubscription subSubscription : subscription.getSubSubscriptions()) {
            super.selectPeer().send((Request)new UnsubscribeRequest(subscription.getOriginalId(), subSubscription.getAtomicQuery(), subscription.getType() == NotificationListenerType.BINDING));
        }
        this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).values().remove(id);
    }

    @Override
    @MemberOf(value="parallel")
    public void receive(BindingNotification notification) {
        SubscriptionId subscriptionId = notification.getSubscriptionId();
        SubscriptionEntry subscriptionEntry = (SubscriptionEntry)this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null) {
            return;
        }
        this.logNotificationReception(notification);
        BindingSolution solution = (BindingSolution)this.bindingSolutions.get(notification.getId());
        if (solution == null) {
            solution = new BindingSolution(subscriptionEntry.subscription.getSubSubscriptions().length, (Binding)notification.getContent());
            BindingSolution tmpSolution = null;
            tmpSolution = this.bindingSolutions.putIfAbsent(notification.getId(), solution);
            if (tmpSolution != null) {
                solution = tmpSolution;
                solution.merge((Binding)notification.getContent());
            }
        } else {
            solution.merge((Binding)notification.getContent());
        }
        if (solution.isReady()) {
            this.deliver(subscriptionEntry, solution);
            this.bindingSolutions.remove(notification.getId());
        }
    }

    private void deliver(SubscriptionEntry<BindingNotificationListener> entry, BindingSolution solution) {
        ((BindingNotificationListener)((SubscriptionEntry)entry).listener).onNotification(((SubscriptionEntry)entry).subscription.getId(), (Object)solution.getChunks());
    }

    @Override
    @MemberOf(value="parallel")
    public void receive(QuadruplesNotification notification) {
        SubscriptionEntry subscriptionEntry;
        Node eventId = ((Quadruple)((List)notification.getContent()).get(0)).getGraph();
        SubscriptionId subscriptionId = notification.getSubscriptionId();
        if (this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).containsKey(notification.getId())) {
            log.warn("Received some quadruple duplicates for a CE that has already been delivered:\n{}", (Object)notification);
            super.selectPeer().sendv((Request)new RemoveEphemeralSubscriptionRequest(eventId, notification.getSubscriptionId()));
        }
        if (log.isDebugEnabled()) {
            log.debug("Received quadruples notification subscriptionId=" + subscriptionId + ", contentSize=" + ((List)notification.getContent()).size() + ", from=" + notification.getSource() + "\n" + QuadruplesFormatter.toString((Collection)notification.getContent(), true));
        }
        if ((subscriptionEntry = (SubscriptionEntry)this.subscriptions.get(subscriptionId)) == null) {
            return;
        }
        this.logNotificationReception(notification);
        QuadruplesSolution solution = (QuadruplesSolution)this.quadruplesSolutions.get(notification.getId());
        if (solution == null) {
            solution = new QuadruplesSolution((List)notification.getContent());
            QuadruplesSolution tmpSolution = null;
            tmpSolution = this.quadruplesSolutions.putIfAbsent(notification.getId(), solution);
            if (tmpSolution != null) {
                solution = tmpSolution;
                solution.merge((Collection)notification.getContent());
            }
        } else {
            solution.merge((Collection)notification.getContent());
        }
        if (solution.isReady()) {
            if (this.markAsDelivered(notification.getId(), subscriptionId) != null) {
                log.warn("Received some quadruple duplicates for a CE that has already been delivered:\n{}", (Object)notification);
                this.quadruplesSolutions.remove(notification.getId());
                super.selectPeer().sendv((Request)new RemoveEphemeralSubscriptionRequest(eventId, subscriptionId));
            }
            CompoundEvent compoundEvent = new CompoundEvent((Collection)solution.getChunks());
            this.deliver(subscriptionEntry, compoundEvent.getGraph().toString(), compoundEvent);
            this.quadruplesSolutions.remove(notification.getId());
            super.selectPeer().sendv((Request)new RemoveEphemeralSubscriptionRequest(compoundEvent.getGraph(), subscriptionId));
        }
    }

    @Override
    @MemberOf(value="parallel")
    public void receive(SignalNotification notification) {
        SubscriptionId subscriptionId = notification.getSubscriptionId();
        SubscriptionEntry subscriptionEntry = (SubscriptionEntry)this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null) {
            return;
        }
        this.logNotificationReception(notification);
        this.deliver(subscriptionEntry);
    }

    private void deliver(SubscriptionEntry<SignalNotificationListener> entry) {
        ((SignalNotificationListener)((SubscriptionEntry)entry).listener).onNotification(((SubscriptionEntry)entry).subscription.getId());
    }

    @Override
    @MemberOf(value="parallel")
    public void receive(PollingSignalNotification notification) {
        SubscriptionEntry entry;
        SubscriptionId subscriptionId = notification.getSubscriptionId();
        CompoundEvent compoundEvent = this.reconstructCompoundEvent(notification.getId(), subscriptionId, Node.createURI((String)notification.getMetaEventId()));
        this.logNotificationReception(notification);
        if (compoundEvent != null && (entry = (SubscriptionEntry)this.subscriptions.get(subscriptionId)) != null) {
            this.deliver(entry, notification.getMetaEventId(), compoundEvent);
        }
    }

    private void deliver(SubscriptionEntry<CompoundEventNotificationListener> entry, String graph, CompoundEvent compoundEvent) {
        SubscriptionId subscriptionId = ((SubscriptionEntry)entry).subscription.getId();
        CompoundEventNotificationListener listener = (CompoundEventNotificationListener)((SubscriptionEntry)entry).listener;
        listener.onNotification(subscriptionId, (Object)compoundEvent);
        this.sendInputOutputMonitoringReport(Quadruple.getPublicationSource((String)graph), listener.getSubscriberUrl(), Quadruple.getPublicationTime((String)graph));
        this.logIntegrationInformation(graph);
    }

    @Override
    @MemberOf(value="parallel")
    public final CompoundEvent reconstructCompoundEvent(NotificationId notificationId, SubscriptionId subscriptionId, Node eventId) {
        if (this.markAsDelivered(notificationId, subscriptionId) != null) {
            return null;
        }
        int expectedNbQuadruples = -1;
        ArrayList<Quadruple> quadsReceived = new ArrayList<Quadruple>();
        HashSet<HashCode> quadHashesReceived = new HashSet<HashCode>();
        QuadruplePattern reconstructPattern = new QuadruplePattern(eventId, Node.ANY, Node.ANY, Node.ANY);
        while (quadsReceived.size() != expectedNbQuadruples) {
            if (log.isInfoEnabled()) {
                log.info("Reconstructing compound event for subscription {} and graph value {} ({}/{})", new Object[]{subscriptionId, eventId, quadsReceived.size(), expectedNbQuadruples});
            }
            List quads = (List)((QuadruplePatternResponse)((Object)PAFuture.getFutureValue((Object)super.selectPeer().send((Request)new ReconstructCompoundEventRequest(reconstructPattern, quadHashesReceived))))).getResult();
            for (Quadruple q : quads) {
                if (q.getPredicate().equals((Object)PublishSubscribeConstants.EVENT_NB_QUADRUPLES_NODE)) {
                    expectedNbQuadruples = (Integer)q.getObject().getLiteralValue();
                } else {
                    quadsReceived.add(q);
                }
                quadHashesReceived.add(q.hashValue());
            }
            try {
                Thread.sleep(((Integer)EventCloudProperties.RECONSTRUCTION_RETRY_THRESHOLD.getValue()).intValue());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return new CompoundEvent(quadsReceived);
    }

    private void sendInputOutputMonitoringReport(String source, String destination, long eventPublicationTimestamp) {
        if (source == null) {
            source = "http://0.0.0.0";
        }
        if (destination == null) {
            destination = this.componentUri;
        }
        if (this.monitoringManager != null) {
            this.monitoringManager.sendInputOutputMonitoringReport(source, destination, eventPublicationTimestamp);
        }
    }

    private void logIntegrationInformation(String graph) {
        if (((Boolean)EventCloudProperties.INTEGRATION_LOG.getValue()).booleanValue()) {
            String msg = "EventCloud Exit";
            if (graph != null) {
                msg = msg + " ";
                msg = msg + Quadruple.removeMetaInformation((Node)Node.createURI((String)graph));
            }
            msg = msg + " ";
            msg = msg + this.eventCloudCache.getId().getStreamUrl();
            log.info(msg);
        }
    }

    private void logNotificationReception(Notification<?> notification) {
        if (log.isDebugEnabled()) {
            log.debug("New notification received {} on {} for subscription id {}", new Object[]{notification.getId(), this.componentUri, notification.getSubscriptionId()});
        }
    }

    @Override
    @MemberOf(value="parallel")
    public fr.inria.eventcloud.pubsub.Subscription find(SubscriptionId id) {
        return ((SubscriptionEntry)this.subscriptions.get(id)).subscription;
    }

    @MemberOf(value="parallel")
    public String getComponentUri() {
        return this.componentUri;
    }

    private SubscriptionId markAsDelivered(NotificationId notificationId, SubscriptionId subscriptionId) {
        return this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).putIfAbsent(notificationId, subscriptionId);
    }

    private static final class SubscriptionEntry<T extends NotificationListener<?>> {
        private final fr.inria.eventcloud.pubsub.Subscription subscription;
        private final T listener;

        public SubscriptionEntry(fr.inria.eventcloud.pubsub.Subscription subscription, T listener) {
            this.subscription = subscription;
            this.listener = listener;
        }
    }
}

