/*
 * Decompiled with CFR 0.152.
 */
package fr.inria.eventcloud.proxies;

import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.parsers.RdfParser;
import fr.inria.eventcloud.proxies.AbstractProxy;
import fr.inria.eventcloud.proxies.EventCloudCache;
import fr.inria.eventcloud.proxies.PublishProxy;
import fr.inria.eventcloud.proxies.PublishProxyAttributeController;
import fr.inria.eventcloud.utils.Callback;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Collection;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.annotation.multiactivity.DefineGroups;
import org.objectweb.proactive.annotation.multiactivity.Group;
import org.objectweb.proactive.annotation.multiactivity.MemberOf;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.proxies.Proxies;
import org.objectweb.proactive.multiactivity.MultiActiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefineGroups(value={@Group(name="parallel", selfCompatible=true)})
public class PublishProxyImpl
extends AbstractProxy
implements PublishProxy,
PublishProxyAttributeController {
    private static final Logger log = LoggerFactory.getLogger(PublishProxyImpl.class);
    public static final String PUBLISH_PROXY_ADL = "fr.inria.eventcloud.proxies.PublishProxy";
    public static final String PUBLISH_SERVICES_ITF = "publish-services";

    @Override
    public void setAttributes(EventCloudCache proxy) {
        if (this.eventCloudCache == null) {
            this.eventCloudCache = proxy;
            this.proxy = Proxies.newProxy(this.eventCloudCache.getTrackers());
        }
    }

    @MemberOf(value="parallel")
    public void publish(Quadruple quad) {
        if (((Boolean)P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
            log.info("About to publish quad : " + quad.getSubject() + " " + quad.getPredicate() + " " + quad.getObject());
        }
        super.selectPeer().publish(quad);
    }

    @MemberOf(value="parallel")
    public void publish(CompoundEvent event) {
        if (((Boolean)EventCloudProperties.INTEGRATION_LOG.getValue()).booleanValue()) {
            log.info("EventCloud Entry {} {}", (Object)event.getGraph(), (Object)this.eventCloudCache.getId().getStreamUrl());
        }
        super.selectPeer().publish(event);
    }

    @MemberOf(value="parallel")
    public void publish(Collection<CompoundEvent> events) {
        for (CompoundEvent event : events) {
            this.publish(event);
        }
    }

    @MemberOf(value="parallel")
    public void publish(URL url, Quadruple.SerializationFormat format) {
        try {
            InputStream in = url.openConnection().getInputStream();
            RdfParser.parse(in, format, new Callback<Quadruple>(){

                public void execute(Quadruple quad) {
                    PublishProxyImpl.this.publish(quad);
                }
            });
            in.close();
        }
        catch (IOException ioe) {
            log.error("An error occurred when reading from the given URL", (Throwable)ioe);
        }
    }

    public void runComponentActivity(Body body) {
        new MultiActiveService(body).multiActiveServing(((Integer)EventCloudProperties.MAO_SOFT_LIMIT_PUBLISH_PROXIES.getValue()).intValue(), false, false);
    }
}

