/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.cpr;

import com.vaadin.external.org.slf4j.Logger;
import com.vaadin.external.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.atmosphere.cache.UUIDBroadcasterCache;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceEventListener;
import org.atmosphere.cpr.AtmosphereResourceFactory;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterConfig;
import org.atmosphere.cpr.BroadcasterFuture;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicy;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicyListener;
import org.atmosphere.cpr.BroadcasterListener;

public class DefaultBroadcaster
implements Broadcaster {
    public static final String CACHED = DefaultBroadcaster.class.getName() + ".messagesCached";
    public static final String ASYNC_TOKEN = DefaultBroadcaster.class.getName() + ".token";
    private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcaster.class);
    private static final String DESTROYED = "This Broadcaster has been destroyed and cannot be used {} by invoking {}";
    protected final ConcurrentLinkedQueue<AtmosphereResource> resources = new ConcurrentLinkedQueue();
    protected BroadcasterConfig bc;
    protected final BlockingQueue<Entry> messages = new LinkedBlockingQueue<Entry>();
    protected final ConcurrentLinkedQueue<BroadcasterListener> broadcasterListeners = new ConcurrentLinkedQueue();
    protected final AtomicBoolean started = new AtomicBoolean(false);
    protected final AtomicBoolean destroyed = new AtomicBoolean(false);
    protected Broadcaster.SCOPE scope = Broadcaster.SCOPE.APPLICATION;
    protected String name = DefaultBroadcaster.class.getSimpleName();
    protected final ConcurrentLinkedQueue<Entry> delayedBroadcast = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<Entry> broadcastOnResume = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners = new ConcurrentLinkedQueue();
    protected final ConcurrentHashMap<String, WriteQueue> writeQueues = new ConcurrentHashMap();
    protected final WriteQueue uniqueWriteQueue = new WriteQueue("-1");
    protected final AtomicInteger dispatchThread = new AtomicInteger();
    protected Future<?>[] notifierFuture;
    protected Future<?>[] asyncWriteFuture;
    private Broadcaster.POLICY policy = Broadcaster.POLICY.FIFO;
    private final AtomicLong maxSuspendResource = new AtomicLong(-1L);
    private final AtomicBoolean requestScoped = new AtomicBoolean(false);
    private final AtomicBoolean recentActivity = new AtomicBoolean(false);
    private BroadcasterLifeCyclePolicy lifeCyclePolicy = new BroadcasterLifeCyclePolicy.Builder().policy(BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.NEVER).build();
    private Future<?> currentLifecycleTask;
    protected URI uri;
    protected AtmosphereConfig config;
    protected BroadcasterCache.STRATEGY cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER;
    private final Object[] awaitBarrier = new Object[0];
    protected final AtomicBoolean outOfOrderBroadcastSupported = new AtomicBoolean(false);
    protected int writeTimeoutInSecond = -1;
    protected final AtmosphereResource noOpsResource;
    protected final boolean uuidCache;
    protected int waitTime = 1000;

    public DefaultBroadcaster(String name, URI uri, AtmosphereConfig config) {
        this.name = name;
        this.uri = uri;
        this.config = config;
        this.bc = this.createBroadcasterConfig(config);
        String s = config.getInitParameter(ApplicationConfig.BROADCASTER_CACHE_STRATEGY);
        if (s != null) {
            if (s.equalsIgnoreCase("afterFilter")) {
                this.cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER;
            } else if (s.equalsIgnoreCase("beforeFilter")) {
                this.cacheStrategy = BroadcasterCache.STRATEGY.BEFORE_FILTER;
            }
        }
        if ((s = config.getInitParameter(ApplicationConfig.OUT_OF_ORDER_BROADCAST)) != null) {
            this.outOfOrderBroadcastSupported.set(Boolean.valueOf(s));
        }
        if ((s = config.getInitParameter(ApplicationConfig.BROADCASTER_WAIT_TIME)) != null) {
            this.waitTime = Integer.valueOf(s);
        }
        if ((s = config.getInitParameter(ApplicationConfig.WRITE_TIMEOUT)) != null) {
            this.writeTimeoutInSecond = Integer.valueOf(s);
        }
        this.noOpsResource = AtmosphereResourceFactory.getDefault().create(config, "-1");
        this.uuidCache = UUIDBroadcasterCache.class.isAssignableFrom(this.bc.getBroadcasterCache().getClass());
        logger.info("{} support Out Of Order Broadcast: {}", (Object)name, (Object)this.outOfOrderBroadcastSupported.get());
    }

    public DefaultBroadcaster(String name, AtmosphereConfig config) {
        this(name, URI.create("http://localhost"), config);
    }

    protected BroadcasterConfig createBroadcasterConfig(AtmosphereConfig config) {
        return new BroadcasterConfig(config.framework().broadcasterFilters, config, this.getID());
    }

    public Broadcaster setBroadcasterCacheStrategy(BroadcasterCache.STRATEGY cacheStrategy) {
        this.cacheStrategy = cacheStrategy;
        return this;
    }

    @Override
    public void destroy() {
        if (this.destroyed.getAndSet(true)) {
            return;
        }
        this.notifyOnPreDestroy();
        this.notifyDestroyListener();
        try {
            logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Policy was {}", (Object)this.getID(), (Object)this.policy);
            logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Resources are {}", (Object)this.getID(), this.resources);
            if (this.config.getBroadcasterFactory() != null) {
                this.config.getBroadcasterFactory().remove(this, this.getID());
            }
            if (this.currentLifecycleTask != null) {
                this.currentLifecycleTask.cancel(true);
            }
            this.started.set(false);
            this.releaseExternalResources();
            this.killReactiveThreads();
            if (this.bc != null) {
                this.bc.destroy();
            }
            this.resources.clear();
            this.broadcastOnResume.clear();
            this.messages.clear();
            this.delayedBroadcast.clear();
            this.broadcasterListeners.clear();
            this.writeQueues.clear();
        }
        catch (Throwable t) {
            logger.error("Unexpected exception during Broadcaster destroy {}", (Object)this.getID(), (Object)t);
        }
    }

    @Override
    public Collection<AtmosphereResource> getAtmosphereResources() {
        return Collections.unmodifiableCollection(this.resources);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setScope(Broadcaster.SCOPE scope) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"setScope");
            return;
        }
        this.scope = scope;
        if (scope != Broadcaster.SCOPE.REQUEST) {
            return;
        }
        logger.debug("Changing broadcaster scope for {}. This broadcaster will be destroyed.", (Object)this.getID());
        ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            try {
                for (AtmosphereResource resource : this.resources) {
                    Broadcaster b = this.config.getBroadcasterFactory().get(this.getClass(), this.getClass().getSimpleName() + "/" + UUID.randomUUID());
                    resource.setBroadcaster(b);
                    b.setScope(Broadcaster.SCOPE.REQUEST);
                    if (resource.getAtmosphereResourceEvent().isSuspended()) {
                        b.addAtmosphereResource(resource);
                    }
                    logger.debug("Resource {} not using broadcaster {}", (Object)resource, (Object)b.getID());
                }
                if (this.resources.isEmpty()) {
                    return;
                }
                this.destroy();
            }
            catch (Exception e) {
                logger.error("Failed to set request scope for current resources", (Throwable)e);
            }
        }
    }

    @Override
    public Broadcaster.SCOPE getScope() {
        return this.scope;
    }

    @Override
    public synchronized void setID(String id) {
        if (id == null) {
            id = this.getClass().getSimpleName() + "/" + UUID.randomUUID();
        }
        if (this.config.getBroadcasterFactory() == null) {
            return;
        }
        Broadcaster b = this.config.getBroadcasterFactory().lookup(this.getClass(), id);
        if (b != null && b.getScope() == Broadcaster.SCOPE.REQUEST) {
            throw new IllegalStateException("Broadcaster ID already assigned to SCOPE.REQUEST. Cannot change the id");
        }
        if (b != null) {
            return;
        }
        this.config.getBroadcasterFactory().remove(this, this.name);
        this.name = id;
        this.config.getBroadcasterFactory().add(this, this.name);
        this.bc.broadcasterID(this.name);
    }

    @Override
    public String getID() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resumeAll() {
        ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            for (AtmosphereResource r : this.resources) {
                try {
                    r.resume();
                }
                catch (Throwable t) {
                    logger.trace("resumeAll", t);
                }
            }
        }
    }

    @Override
    public void releaseExternalResources() {
    }

    @Override
    public void setBroadcasterLifeCyclePolicy(final BroadcasterLifeCyclePolicy lifeCyclePolicy) {
        this.lifeCyclePolicy = lifeCyclePolicy;
        if (this.currentLifecycleTask != null) {
            this.currentLifecycleTask.cancel(false);
        }
        if (this.bc != null && this.bc.getScheduledExecutorService() == null) {
            logger.error("No Broadcaster's SchedulerExecutorService has been configured on {}. BroadcasterLifeCyclePolicy won't work.", (Object)this.getID());
            return;
        }
        if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE || lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME || lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY) {
            this.recentActivity.set(false);
            int time = lifeCyclePolicy.getTimeout();
            if (time == -1) {
                throw new IllegalStateException("BroadcasterLifeCyclePolicy time is not set");
            }
            final AtomicReference ref = new AtomicReference();
            this.currentLifecycleTask = this.bc.getScheduledExecutorService().scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (DefaultBroadcaster.this.recentActivity.getAndSet(false)) {
                            return;
                        }
                        if (DefaultBroadcaster.this.resources.isEmpty()) {
                            if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE) {
                                DefaultBroadcaster.this.notifyEmptyListener();
                                DefaultBroadcaster.this.notifyIdleListener();
                                DefaultBroadcaster.this.releaseExternalResources();
                                logger.debug("Applying BroadcasterLifeCyclePolicy IDLE policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                            } else if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY) {
                                DefaultBroadcaster.this.notifyEmptyListener();
                                DefaultBroadcaster.this.notifyIdleListener();
                                this.destroy(false);
                                logger.debug("Applying BroadcasterLifeCyclePolicy IDLE_DESTROY policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                            }
                        } else if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME) {
                            DefaultBroadcaster.this.notifyIdleListener();
                            this.destroy(true);
                            logger.debug("Applying BroadcasterLifeCyclePolicy IDLE_RESUME policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                        }
                    }
                    catch (Throwable t) {
                        if (DefaultBroadcaster.this.destroyed.get()) {
                            logger.trace("Scheduled BroadcasterLifeCyclePolicy exception", t);
                        }
                        logger.warn("Scheduled BroadcasterLifeCyclePolicy exception", t);
                    }
                }

                void destroy(boolean resume) {
                    if (resume) {
                        logger.info("All AtmosphereResource will now be resumed from Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                        DefaultBroadcaster.this.resumeAll();
                    }
                    DefaultBroadcaster.this.destroy();
                    if (ref.get() != null) {
                        DefaultBroadcaster.this.currentLifecycleTask.cancel(true);
                    }
                }
            }, time, time, lifeCyclePolicy.getTimeUnit());
            ref.set(this.currentLifecycleTask);
        }
    }

    @Override
    public void addBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.add(b);
    }

    @Override
    public void removeBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.remove(b);
    }

    @Override
    public boolean isDestroyed() {
        return this.destroyed.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public <T> Future<T> awaitAndBroadcast(T t, long time, TimeUnit timeUnit) {
        if (!this.resources.isEmpty()) return this.broadcast(t);
        Object[] objectArray = this.awaitBarrier;
        synchronized (this.awaitBarrier) {
            try {
                logger.trace("Awaiting for AtmosphereResource for {} {}", (Object)time, (Object)timeUnit);
                this.awaitBarrier.wait(this.translateTimeUnit(time, timeUnit));
            }
            catch (Throwable e) {
                logger.warn("awaitAndBroadcast", e);
                // ** MonitorExit[var5_4] (shouldn't be in output)
                return null;
            }
            return this.broadcast(t);
        }
    }

    @Override
    public Broadcaster addBroadcasterListener(BroadcasterListener b) {
        if (!this.broadcasterListeners.contains(b)) {
            this.broadcasterListeners.add(b);
        }
        return this;
    }

    @Override
    public Broadcaster removeBroadcasterListener(BroadcasterListener b) {
        this.broadcasterListeners.remove(b);
        return this;
    }

    protected Runnable getBroadcastHandler() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!DefaultBroadcaster.this.isDestroyed()) {
                    Entry msg = null;
                    try {
                        msg = DefaultBroadcaster.this.messages.poll(DefaultBroadcaster.this.waitTime, TimeUnit.MILLISECONDS);
                        if (msg == null) {
                            DefaultBroadcaster.this.dispatchThread.decrementAndGet();
                            return;
                        }
                    }
                    catch (InterruptedException ex) {
                        logger.trace("{} got interrupted for Broadcaster {}", (Object)Thread.currentThread().getName(), (Object)DefaultBroadcaster.this.getID());
                        logger.trace("", (Throwable)ex);
                        return;
                    }
                    finally {
                        if (DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            DefaultBroadcaster.this.bc.getExecutorService().submit(this);
                        }
                    }
                    try {
                        logger.trace("{} is about to broadcast {}", (Object)DefaultBroadcaster.this.getID(), (Object)msg);
                        DefaultBroadcaster.this.push(msg);
                    }
                    catch (Throwable ex) {
                        if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                            logger.trace("Failed to submit broadcast handler runnable on shutdown for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                            return;
                        }
                        logger.warn("This message {} will be lost", (Object)msg);
                        logger.debug("Failed to submit broadcast handler runnable to for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                    }
                    finally {
                        if (!DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) continue;
                        return;
                    }
                }
            }
        };
    }

    protected Runnable getAsyncWriteHandler(final WriteQueue writeQueue) {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            @Override
            public void run() {
                while (!DefaultBroadcaster.this.isDestroyed()) {
                    Object object;
                    AsyncWriteToken token = null;
                    try {
                        token = writeQueue.queue.poll(DefaultBroadcaster.this.waitTime, TimeUnit.MILLISECONDS);
                        if (token != null) {
                        }
                        if (!DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            object = writeQueue;
                            synchronized (object) {
                                if (writeQueue.queue.size() == 0) {
                                    writeQueue.monitored.set(false);
                                    DefaultBroadcaster.this.writeQueues.remove(writeQueue.uuid);
                                    return;
                                }
                            }
                        }
                    }
                    catch (InterruptedException ex) {
                        logger.trace("{} got interrupted for Broadcaster {}", (Object)Thread.currentThread().getName(), (Object)DefaultBroadcaster.this.getID());
                        logger.trace("", (Throwable)ex);
                        return;
                    }
                    finally {
                        if (!DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() && DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                            DefaultBroadcaster.this.bc.getAsyncWriteService().submit(this);
                        }
                    }
                    object = token.resource;
                    synchronized (object) {
                        block26: {
                            try {
                                logger.trace("About to write to {}", (Object)token.resource);
                                DefaultBroadcaster.this.executeAsyncWrite(token);
                                if (DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() || !DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) break block26;
                            }
                            catch (Throwable ex) {
                                try {
                                    if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                                        logger.trace("Failed to execute a write operation. Broadcaster is destroyed or not yet started for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                                        return;
                                    }
                                    if (token != null) {
                                        logger.warn("This message {} will be lost for AtmosphereResource {}, adding it to the BroadcasterCache", token.originalMessage, (Object)(token.resource != null ? token.resource.uuid() : "null"));
                                        DefaultBroadcaster.this.cacheLostMessage(token.resource, token, true);
                                    }
                                    logger.debug("Failed to execute a write operation for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                                }
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                                finally {
                                    if (!DefaultBroadcaster.this.bc.getAsyncWriteService().isShutdown() && DefaultBroadcaster.this.outOfOrderBroadcastSupported.get()) {
                                        return;
                                    }
                                }
                            }
                            return;
                        }
                    }
                }
            }
        };
    }

    protected void start() {
        if (!this.started.getAndSet(true)) {
            this.bc.getBroadcasterCache().start();
            this.setID(this.name);
            if (this.notifierFuture == null && this.asyncWriteFuture == null) {
                this.spawnReactor();
            }
        }
    }

    protected void spawnReactor() {
        this.killReactiveThreads();
        int threads = this.outOfOrderBroadcastSupported.get() ? this.reactiveThreadsCount() : 1;
        this.notifierFuture = new Future[threads];
        if (this.outOfOrderBroadcastSupported.get()) {
            this.asyncWriteFuture = new Future[threads];
            for (int i = 0; i < threads; ++i) {
                this.notifierFuture[i] = this.bc.getExecutorService().submit(this.getBroadcastHandler());
                this.asyncWriteFuture[i] = this.bc.getExecutorService().submit(this.getAsyncWriteHandler(this.uniqueWriteQueue));
            }
        } else {
            this.notifierFuture[0] = this.bc.getExecutorService().submit(this.getBroadcastHandler());
        }
        this.dispatchThread.set(threads);
    }

    protected void killReactiveThreads() {
        if (this.notifierFuture != null) {
            for (Future<?> f : this.notifierFuture) {
                if (f == null) continue;
                f.cancel(false);
            }
        }
        if (this.asyncWriteFuture != null) {
            for (Future<?> f : this.asyncWriteFuture) {
                if (f == null) continue;
                f.cancel(false);
            }
        }
    }

    protected int reactiveThreadsCount() {
        return Runtime.getRuntime().availableProcessors();
    }

    public void finalize() throws Throwable {
        super.finalize();
        try {
            this.killReactiveThreads();
        }
        catch (Throwable t) {
            logger.trace("", t);
        }
    }

    public void push(Entry entry) {
        if (this.destroyed.get()) {
            return;
        }
        this.deliverPush(entry, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliverPush(Entry entry, boolean rec) {
        Object finalMsg;
        Serializable b;
        if (this.destroyed.get()) {
            return;
        }
        this.recentActivity.set(true);
        String prevMessage = entry.message.toString();
        if (rec && !this.delayedBroadcast.isEmpty()) {
            Iterator<Entry> i = this.delayedBroadcast.iterator();
            b = new StringBuilder();
            while (i.hasNext()) {
                Entry e = i.next();
                e.future.cancel(true);
                try {
                    if (e.message instanceof String && entry.message instanceof String) {
                        ((StringBuilder)b).append(e.message);
                        continue;
                    }
                    this.deliverPush(e, false);
                }
                finally {
                    i.remove();
                }
            }
            if (((StringBuilder)b).length() > 0) {
                entry.message = ((StringBuilder)b).append(entry.message).toString();
            }
        }
        if ((finalMsg = this.callable(entry.message)) == null) {
            logger.error("Callable exception. Please catch all exception from you callable. Message {} will be lost and all AtmosphereResource associated with this Broadcaster resumed.", entry.message);
            this.entryDone(entry.future);
            b = this.resources;
            synchronized (b) {
                for (AtmosphereResource r : this.resources) {
                    if (!r.transport().equals((Object)AtmosphereResource.TRANSPORT.JSONP) && !r.transport().equals((Object)AtmosphereResource.TRANSPORT.LONG_POLLING)) continue;
                    try {
                        r.resume();
                    }
                    catch (Throwable t) {
                        logger.trace("resumeAll", t);
                    }
                }
            }
            return;
        }
        Object prevM = entry.originalMessage;
        Object object = entry.originalMessage = entry.originalMessage != entry.message ? this.callable(entry.originalMessage) : finalMsg;
        if (entry.originalMessage == null) {
            logger.trace("Broadcast message was null {}", prevM);
            this.entryDone(entry.future);
            return;
        }
        entry.message = finalMsg;
        if (this.resources.isEmpty()) {
            boolean continueProcessing = true;
            if (!this.bc.getBroadcasterCache().getClass().equals(BroadcasterConfig.DefaultBroadcasterCache.class.getName())) {
                ConcurrentLinkedQueue<AtmosphereResource> r = this.resources;
                synchronized (r) {
                    continueProcessing = this.invokeFiltersAndCache(entry);
                }
            } else {
                continueProcessing = this.invokeFiltersAndCache(entry);
            }
            if (!continueProcessing) {
                return;
            }
        }
        BroadcasterCache broadcasterCache = this.bc.getBroadcasterCache();
        if (this.uuidCache) {
            entry.cache = ((UUIDBroadcasterCache)UUIDBroadcasterCache.class.cast(broadcasterCache)).addCacheCandidate(this.getID(), null, entry.originalMessage);
        }
        try {
            if (logger.isTraceEnabled()) {
                for (AtmosphereResource r : this.resources) {
                    logger.trace("AtmosphereResources available for {}", (Object)r.uuid());
                }
            }
            if (entry.multipleAtmoResources == null) {
                for (AtmosphereResource r : this.resources) {
                    finalMsg = this.perRequestFilter(r, entry, true);
                    if (finalMsg == null) {
                        logger.debug("Skipping broadcast delivery resource {} ", (Object)r);
                        continue;
                    }
                    if (!entry.writeLocally) continue;
                    this.queueWriteIO(r, finalMsg, entry);
                }
            } else if (entry.multipleAtmoResources instanceof AtmosphereResource) {
                finalMsg = this.perRequestFilter((AtmosphereResource)entry.multipleAtmoResources, entry, true);
                if (finalMsg == null) {
                    logger.debug("Skipping broadcast delivery resource {} ", entry.multipleAtmoResources);
                    return;
                }
                if (entry.writeLocally) {
                    this.queueWriteIO((AtmosphereResource)entry.multipleAtmoResources, finalMsg, entry);
                }
            } else if (entry.multipleAtmoResources instanceof Set) {
                Set sub = (Set)entry.multipleAtmoResources;
                if (sub.size() != 0) {
                    for (AtmosphereResource r : sub) {
                        finalMsg = this.perRequestFilter(r, entry, true);
                        if (finalMsg == null) {
                            logger.debug("Skipping broadcast delivery resource {} ", (Object)r);
                            continue;
                        }
                        if (!entry.writeLocally) continue;
                        this.queueWriteIO(r, finalMsg, entry);
                    }
                } else if (this.cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER) {
                    this.trackBroadcastMessage(null, finalMsg);
                }
            }
            entry.message = prevMessage;
        }
        catch (InterruptedException ex) {
            logger.debug(ex.getMessage(), (Throwable)ex);
        }
    }

    protected boolean invokeFiltersAndCache(Entry entry) {
        if (this.resources.isEmpty()) {
            logger.trace("Broadcaster {} doesn't have any associated resource. Message will be cached in the configured BroadcasterCache {}", (Object)this.getID(), entry.message);
            AtmosphereResource r = null;
            if (entry.multipleAtmoResources != null && AtmosphereResource.class.isAssignableFrom(entry.multipleAtmoResources.getClass())) {
                r = (AtmosphereResource)AtmosphereResource.class.cast(entry.multipleAtmoResources);
            }
            if (r == null) {
                r = this.noOpsResource;
            }
            if (this.cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER) {
                if (this.bc.hasPerRequestFilters()) {
                    logger.debug("Invoking BroadcastFilter with dummy AtmosphereResource {}", (Object)r.uuid());
                }
                this.perRequestFilter(r, entry, true, true);
            } else {
                this.trackBroadcastMessage((AtmosphereResource)(r != null ? (r.uuid().equals("-1") ? null : r) : r), entry.originalMessage);
            }
            this.entryDone(entry.future);
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void queueWriteIO(AtmosphereResource r, Object finalMsg, Entry entry) throws InterruptedException {
        if (!this.bc.getBroadcasterCache().getClass().equals(BroadcasterConfig.DefaultBroadcasterCache.class.getName()) && (r.isResumed() || r.isCancelled())) {
            logger.trace("AtmosphereResource {} has been resumed or cancelled, unable to Broadcast message {}", (Object)r.uuid(), finalMsg);
            if (!this.uuidCache) {
                this.trackBroadcastMessage(r, this.cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER ? finalMsg : entry.originalMessage);
            }
            return;
        }
        AsyncWriteToken w = new AsyncWriteToken(r, finalMsg, entry.future, entry.originalMessage, entry.cache);
        if (!this.outOfOrderBroadcastSupported.get()) {
            WriteQueue writeQueue = this.writeQueues.get(r.uuid());
            if (writeQueue == null) {
                writeQueue = new WriteQueue(r.uuid());
                this.writeQueues.put(r.uuid(), writeQueue);
            }
            writeQueue.queue.put(w);
            WriteQueue writeQueue2 = writeQueue;
            synchronized (writeQueue2) {
                if (!writeQueue.monitored.getAndSet(true)) {
                    logger.trace("Broadcaster {} is about to queueWriteIO for AtmosphereResource {}", (Object)this.name, (Object)r.uuid());
                    this.bc.getAsyncWriteService().submit(this.getAsyncWriteHandler(writeQueue));
                }
            }
        } else {
            this.uniqueWriteQueue.queue.offer(w);
        }
    }

    protected Object perRequestFilter(AtmosphereResource r, Entry msg, boolean cache) {
        return this.perRequestFilter(r, msg, cache, false);
    }

    protected Object perRequestFilter(AtmosphereResource r, Entry msg, boolean cache, boolean force) {
        boolean after;
        if (r == null) {
            logger.trace("Null AtmosphereResource passed inside a Set");
            return msg.message;
        }
        Object finalMsg = msg.message;
        if (this.isAtmosphereResourceValid(r)) {
            if (this.bc.hasPerRequestFilters()) {
                BroadcastFilter.BroadcastAction a = this.bc.filter(r, msg.message, msg.originalMessage);
                if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT) {
                    return null;
                }
                if (a.message() != msg.originalMessage) {
                    finalMsg = a.message();
                }
            }
        } else {
            logger.warn("Request is not longer valid {}", (Object)r.uuid());
            this.removeAtmosphereResource(r);
            this.config.getBroadcasterFactory().removeAllAtmosphereResource(r);
        }
        cache = force || !this.uuidCache && cache;
        boolean bl = after = this.cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER;
        if (cache && after) {
            this.trackBroadcastMessage((AtmosphereResource)(r != null ? (r.uuid().equals("-1") ? null : r) : r), after ? finalMsg : msg.originalMessage);
        }
        return finalMsg;
    }

    private Object callable(Object msg) {
        if (Callable.class.isAssignableFrom(msg.getClass())) {
            try {
                return ((Callable)Callable.class.cast(msg)).call();
            }
            catch (Throwable e) {
                logger.warn("Callable exception", e);
                return null;
            }
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void executeAsyncWrite(AsyncWriteToken token) {
        boolean notifyListeners = true;
        boolean lostCandidate = false;
        AtmosphereResourceEventImpl event = (AtmosphereResourceEventImpl)token.resource.getAtmosphereResourceEvent();
        AtmosphereResourceImpl r = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(token.resource);
        try {
            event.setMessage(token.msg);
            BroadcasterCache broadcasterCache = this.bc.getBroadcasterCache();
            if (this.uuidCache) {
                ((UUIDBroadcasterCache)UUIDBroadcasterCache.class.cast(broadcasterCache)).clearCache(this.getID(), r, token.cache);
            }
            if (!this.isAtmosphereResourceValid(r)) {
                logger.trace("AtmosphereResource {} state is invalid for Broadcaster {}", (Object)r.uuid(), (Object)this.name);
                this.removeAtmosphereResource(r);
                lostCandidate = true;
                return;
            }
            try {
                r.getRequest().setAttribute(this.getID(), token.future);
                r.getRequest().setAttribute("org.atmosphere.cpr.CometSupport.maxInactiveActivity", System.currentTimeMillis());
            }
            catch (Throwable t) {
                logger.warn("Invalid AtmosphereResource state {}. The connection has been remotely closed and message {} will be added to the configured BroadcasterCache for later retrieval", (Object)r.uuid(), event.getMessage());
                logger.trace("If you are using Tomcat 7.0.22 and lower, your most probably hitting http://is.gd/NqicFT");
                logger.trace("", t);
                this.removeAtmosphereResource(r, false);
                this.config.getBroadcasterFactory().removeAllAtmosphereResource(r);
                event.setCancelled(true);
                event.setThrowable(t);
                r.setIsInScope(false);
                lostCandidate = true;
                if (notifyListeners) {
                    r.notifyListeners();
                }
                this.entryDone(token.future);
                if (lostCandidate) {
                    this.cacheLostMessage(r, token, true);
                }
                token.destroy();
                return;
            }
            r.getRequest().setAttribute(ASYNC_TOKEN, token);
            this.prepareInvokeOnStateChange(r, event);
        }
        finally {
            if (notifyListeners) {
                r.notifyListeners();
            }
            this.entryDone(token.future);
            if (lostCandidate) {
                this.cacheLostMessage(r, token, true);
            }
            token.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkCachedAndPush(AtmosphereResource r, AtmosphereResourceEvent e) {
        this.retrieveTrackedBroadcast(r, e);
        if (e.getMessage() instanceof List && !((List)e.getMessage()).isEmpty()) {
            logger.debug("Sending cached message {} to {}", e.getMessage(), (Object)r.uuid());
            List cacheMessages = (List)e.getMessage();
            BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(e.getMessage(), 1, (Broadcaster)this);
            if (this.cacheStrategy.equals((Object)BroadcasterCache.STRATEGY.BEFORE_FILTER)) {
                LinkedList<Object> filteredMessage = new LinkedList<Object>();
                for (Object o : cacheMessages) {
                    Object newMessage = this.filter(o);
                    if (newMessage == null || (newMessage = this.perRequestFilter(r, new Entry(newMessage, (Object)r, f, o), false)) == null) continue;
                    filteredMessage.addLast(newMessage);
                }
                if (filteredMessage.size() == 0) {
                    return false;
                }
                e.setMessage(filteredMessage);
            } else {
                e.setMessage(cacheMessages);
            }
            r.getRequest().setAttribute(CACHED, "true");
            AtmosphereResource atmosphereResource = r;
            synchronized (atmosphereResource) {
                try {
                    this.prepareInvokeOnStateChange(r, e);
                }
                catch (Throwable t) {
                    logger.error("Unable to write cached message {} for {}", e.getMessage(), (Object)r.uuid());
                    logger.error("", t);
                    for (Object o : cacheMessages) {
                        this.bc.getBroadcasterCache().addToCache(this.getID(), r, o);
                    }
                    return true;
                }
                for (AtmosphereResourceEventListener l : ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).atmosphereResourceEventListener()) {
                    l.onBroadcast(e);
                }
                switch (r.transport()) {
                    case JSONP: 
                    case AJAX: 
                    case LONG_POLLING: {
                        return true;
                    }
                    case SSE: {
                        break;
                    }
                    default: {
                        try {
                            r.getResponse().flushBuffer();
                            break;
                        }
                        catch (IOException ioe) {
                            logger.trace("", (Throwable)ioe);
                            ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r))._destroy();
                        }
                    }
                }
            }
        }
        return false;
    }

    protected boolean retrieveTrackedBroadcast(AtmosphereResource r, AtmosphereResourceEvent e) {
        logger.trace("Checking cached message for {}", (Object)r.uuid());
        List<Object> missedMsg = this.bc.getBroadcasterCache().retrieveFromCache(this.getID(), r);
        if (missedMsg != null && !missedMsg.isEmpty()) {
            e.setMessage(missedMsg);
            return true;
        }
        return false;
    }

    protected void trackBroadcastMessage(AtmosphereResource r, Object msg) {
        if (this.destroyed.get() || this.bc.getBroadcasterCache() == null) {
            return;
        }
        try {
            this.bc.getBroadcasterCache().addToCache(this.getID(), r, msg);
        }
        catch (Throwable t) {
            logger.warn("Unable to track messages {} {}", msg, (Object)t);
        }
    }

    protected void invokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e) {
        block2: {
            try {
                logger.trace("{} is broadcasting to {}", (Object)this.name, (Object)r.uuid());
                r.getAtmosphereHandler().onStateChange(e);
            }
            catch (Throwable t) {
                if (InterruptedException.class.isAssignableFrom(t.getClass())) break block2;
                this.onException(t, r);
            }
        }
    }

    protected void prepareInvokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e) {
        if (this.writeTimeoutInSecond != -1) {
            logger.trace("Registering Write timeout {} for {}", (Object)this.writeTimeoutInSecond, (Object)r.uuid());
            WriteOperation w = new WriteOperation(r, e, Thread.currentThread());
            this.bc.getScheduledExecutorService().schedule(w, (long)this.writeTimeoutInSecond, TimeUnit.MILLISECONDS);
            try {
                w.call();
            }
            catch (Exception ex) {
                logger.warn("", (Throwable)ex);
            }
        } else {
            this.invokeOnStateChange(r, e);
        }
    }

    public void onException(Throwable t, AtmosphereResource ar) {
        this.onException(t, ar, true);
    }

    public void onException(Throwable t, AtmosphereResource ar, boolean notifyAndCache) {
        final AtmosphereResourceImpl r = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(ar);
        this.removeAtmosphereResource(r);
        logger.trace("Unexpected exception for AtmosphereResource {} and Broadcaster " + this.name, (Object)ar.uuid(), (Object)t);
        if (notifyAndCache) {
            AtmosphereResourceEventImpl event = r.getAtmosphereResourceEvent();
            event.setThrowable(t);
            r.notifyListeners(event);
            r.removeEventListeners();
        }
        if (notifyAndCache) {
            this.cacheLostMessage(r, (AsyncWriteToken)r.getRequest(false).getAttribute(ASYNC_TOKEN), notifyAndCache);
        }
        if (!r.isResumed() && !r.isCancelled()) {
            if (this.bc != null && this.bc.getAsyncWriteService() != null) {
                this.bc.getAsyncWriteService().execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            r.resume();
                        }
                        catch (Throwable t) {
                            logger.trace("Was unable to resume a corrupted AtmosphereResource {}", (Object)r);
                            logger.trace("Cause", t);
                        }
                    }
                });
            } else {
                r.resume();
            }
        }
    }

    public void cacheLostMessage(AtmosphereResource r) {
        this.cacheLostMessage(r, (AsyncWriteToken)((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute(ASYNC_TOKEN));
    }

    public void cacheLostMessage(AtmosphereResource r, boolean force) {
        this.cacheLostMessage(r, (AsyncWriteToken)((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute(ASYNC_TOKEN), force);
    }

    public void cacheLostMessage(AtmosphereResource r, AsyncWriteToken token) {
        this.cacheLostMessage(r, token, false);
    }

    public void cacheLostMessage(AtmosphereResource r, AsyncWriteToken token, boolean force) {
        if (!force && this.uuidCache) {
            return;
        }
        try {
            if (token != null && token.originalMessage != null) {
                Object m = this.cacheStrategy.equals((Object)BroadcasterCache.STRATEGY.BEFORE_FILTER) ? token.originalMessage : token.msg;
                this.bc.getBroadcasterCache().addToCache(this.getID(), r, m);
                logger.trace("Lost message cached {}", m);
            }
        }
        catch (Throwable t2) {
            logger.error("Unable to cache message {} for AtmosphereResource {}", token.originalMessage, (Object)(r != null ? r.uuid() : ""));
            logger.error("Unable to cache message", t2);
        }
    }

    @Override
    public void setSuspendPolicy(long maxSuspendResource, Broadcaster.POLICY policy) {
        this.maxSuspendResource.set(maxSuspendResource);
        this.policy = policy;
    }

    @Override
    public <T> Future<T> broadcast(T msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg)");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        int callee = this.resources.size() == 0 ? 1 : this.resources.size();
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, callee, (Broadcaster)this);
        this.dispatchMessages(new Entry(newMsg, null, f, msg));
        return f;
    }

    protected BroadcasterFuture<Object> futureDone(Object msg) {
        this.notifyBroadcastListener();
        return new BroadcasterFuture<Object>(msg, this).done();
    }

    protected void dispatchMessages(Entry e) {
        this.messages.offer(e);
        if (this.dispatchThread.get() == 0) {
            this.dispatchThread.incrementAndGet();
            this.getBroadcasterConfig().getExecutorService().submit(this.getBroadcastHandler());
        }
    }

    protected Object filter(Object msg) {
        BroadcastFilter.BroadcastAction a = this.bc.filter(msg);
        if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT || msg == null) {
            return null;
        }
        return a.message();
    }

    @Override
    public <T> Future<T> broadcast(T msg, AtmosphereResource r) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, AtmosphereResource r");
            return this.futureDone(msg);
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, this.resources.size(), (Broadcaster)this);
        this.dispatchMessages(new Entry(newMsg, (Object)r, f, msg));
        return f;
    }

    @Override
    public <T> Future<T> broadcastOnResume(T msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcastOnResume(T msg)");
            return new BroadcasterFuture<T>(msg, this).done();
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return this.futureDone(msg);
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, this.resources.size(), (Broadcaster)this);
        this.broadcastOnResume.offer(new Entry(newMsg, null, f, msg));
        return f;
    }

    protected void broadcastOnResume(AtmosphereResource r) {
        for (Entry e : this.broadcastOnResume) {
            e.multipleAtmoResources = r;
            this.push(e);
        }
        if (this.resources.isEmpty()) {
            this.broadcastOnResume.clear();
        }
    }

    @Override
    public <T> Future<T> broadcast(T msg, Set<AtmosphereResource> subset) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, Set<AtmosphereResource> subset)");
            return new BroadcasterFuture<T>(msg, this).done();
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return new BroadcasterFuture<T>(msg, this).done();
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(null, newMsg, subset.size(), this);
        this.dispatchMessages(new Entry(newMsg, subset, f, msg));
        return f;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    @Override
    public Broadcaster addAtmosphereResource(AtmosphereResource r) {
        try {
            Object[] objectArray;
            if (this.destroyed.get()) {
                logger.debug(DESTROYED, (Object)this.getID(), (Object)"addAtmosphereResource(AtmosphereResource r");
                DefaultBroadcaster defaultBroadcaster = this;
                return defaultBroadcaster;
            }
            this.start();
            if (this.scope == Broadcaster.SCOPE.REQUEST && this.requestScoped.getAndSet(true)) {
                throw new IllegalStateException("Broadcaster " + this + " cannot be used as its scope is set to REQUEST");
            }
            if (this.maxSuspendResource.get() > 0L && (long)this.resources.size() >= this.maxSuspendResource.get()) {
                if (this.policy == Broadcaster.POLICY.FIFO) {
                    AtmosphereResource resource = this.resources.poll();
                    try {
                        logger.warn("Too many resource. Forcing resume of {} ", (Object)resource.uuid());
                        resource.resume();
                    }
                    catch (Throwable t) {
                        logger.warn("failed to resume resource {} ", (Object)resource, (Object)t);
                    }
                } else if (this.policy == Broadcaster.POLICY.REJECT) {
                    throw new RejectedExecutionException(String.format("Maximum suspended AtmosphereResources %s", this.maxSuspendResource));
                }
            }
            if (this.resources.contains(r)) {
                logger.debug("Duplicate resource {}", (Object)r.uuid());
                objectArray = this;
                return objectArray;
            }
            if (!this.bc.getBroadcasterCache().getClass().equals(BroadcasterConfig.DefaultBroadcasterCache.class.getName())) {
                objectArray = this.resources;
                // MONITORENTER : objectArray
                this.cacheAndSuspend(r);
                // MONITOREXIT : objectArray
                return this;
            }
            this.cacheAndSuspend(r);
            return this;
        }
        finally {
            if (this.resources.size() > 0) {
                Object[] objectArray = this.awaitBarrier;
            }
        }
    }

    protected void cacheAndSuspend(AtmosphereResource r) {
        boolean wasResumed;
        if (this.resources.isEmpty()) {
            this.config.getBroadcasterFactory().add(this, this.name);
        }
        if (!(wasResumed = this.checkCachedAndPush(r, r.getAtmosphereResourceEvent())) && this.isAtmosphereResourceValid(r)) {
            logger.trace("Associating AtmosphereResource {} with Broadcaster {}", (Object)r.uuid(), (Object)this.getID());
            String parentUUID = (String)r.getRequest().getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
            Boolean backwardCompatible = Boolean.parseBoolean(this.config.getInitParameter("org.atmosphere.websocket.backwardCompatible.atmosphereResource"));
            if (!backwardCompatible.booleanValue() && parentUUID != null && !parentUUID.equals(r.uuid())) {
                logger.warn("You are trying to add an AtmosphereResource {} for a WebSocket message. The original AtmosphereResource  {} will be added instead.", (Object)r.uuid(), (Object)parentUUID);
                AtmosphereResource p = AtmosphereResourceFactory.getDefault().find(parentUUID);
                if (p != null && !this.resources.contains(p)) {
                    this.resources.add(p);
                }
            } else {
                this.resources.add(r);
            }
        } else if (!wasResumed) {
            logger.debug("Unable to add AtmosphereResource {}", (Object)r.uuid());
        }
    }

    private boolean isAtmosphereResourceValid(AtmosphereResource r) {
        return !((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).isResumed() && !((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).isCancelled() && ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).isInScope();
    }

    protected void entryDone(BroadcasterFuture<?> f) {
        this.notifyBroadcastListener();
        if (f != null) {
            f.done();
        }
    }

    void notifyBroadcastListener() {
        for (BroadcasterListener b : this.broadcasterListeners) {
            try {
                b.onComplete(this);
            }
            catch (Exception ex) {
                logger.warn("", (Throwable)ex);
            }
        }
    }

    @Override
    public Broadcaster removeAtmosphereResource(AtmosphereResource r) {
        return this.removeAtmosphereResource(r, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Broadcaster removeAtmosphereResource(AtmosphereResource r, boolean executeDone) {
        AtmosphereResourceImpl aImpl;
        BroadcasterFuture f;
        boolean removed;
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"removeAtmosphereResource(AtmosphereResource r)");
            return this;
        }
        ConcurrentLinkedQueue<AtmosphereResource> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            removed = this.resources.remove(r);
        }
        if (!removed) {
            return this;
        }
        logger.trace("Removing AtmosphereResource {} for Broadcaster {}", (Object)r.uuid(), (Object)this.name);
        this.writeQueues.remove(r.uuid());
        if (executeDone && (f = (BroadcasterFuture)(aImpl = (AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).getRequest(false).getAttribute(this.getID())) != null && !f.isDone() && !f.isCancelled()) {
            aImpl.getRequest(false).removeAttribute(this.getID());
            this.entryDone(f);
        }
        if (this.resources.isEmpty()) {
            this.notifyEmptyListener();
            if (this.scope != Broadcaster.SCOPE.REQUEST && this.lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY) {
                this.releaseExternalResources();
            } else if (this.scope == Broadcaster.SCOPE.REQUEST || this.lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY_DESTROY) {
                this.config.getBroadcasterFactory().remove(this, this.name);
                this.destroy();
            }
        }
        return this;
    }

    private void notifyIdleListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onIdle();
        }
    }

    private void notifyDestroyListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onDestroy();
        }
    }

    private void notifyEmptyListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onEmpty();
        }
    }

    @Override
    public void setBroadcasterConfig(BroadcasterConfig bc) {
        this.bc = bc;
    }

    @Override
    public BroadcasterConfig getBroadcasterConfig() {
        return this.bc;
    }

    @Override
    public <T> Future<T> delayBroadcast(T o) {
        return this.delayBroadcast(o, 0L, null);
    }

    @Override
    public <T> Future<T> delayBroadcast(final T o, long delay, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"delayBroadcast(final T o, long delay, TimeUnit t)");
            return null;
        }
        this.start();
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        BroadcasterFuture<Object> future = new BroadcasterFuture<Object>(msg, this);
        final Entry e = new Entry(msg, null, future, o);
        if (delay > 0L) {
            ScheduledFuture f = this.bc.getScheduledExecutorService().schedule(new Callable<T>(){

                @Override
                public T call() throws Exception {
                    DefaultBroadcaster.this.delayedBroadcast.remove(e);
                    if (Callable.class.isAssignableFrom(o.getClass())) {
                        try {
                            Object r = ((Callable)Callable.class.cast(o)).call();
                            Object msg = DefaultBroadcaster.this.filter(r);
                            if (msg != null) {
                                Entry entry = new Entry(msg, null, null, r);
                                DefaultBroadcaster.this.push(entry);
                            }
                            return msg;
                        }
                        catch (Exception e1) {
                            logger.error("", (Object)e);
                        }
                    }
                    Object msg = DefaultBroadcaster.this.filter(o);
                    Entry e2 = new Entry(msg, null, null, o);
                    DefaultBroadcaster.this.push(e2);
                    return msg;
                }
            }, delay, t);
            e.future = new BroadcasterFuture<Object>(f, msg, (Broadcaster)this);
        }
        this.delayedBroadcast.offer(e);
        return future;
    }

    public Future<?> scheduleFixedBroadcast(Object o, long period, TimeUnit t) {
        return this.scheduleFixedBroadcast(o, 0L, period, t);
    }

    public Future<?> scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t)");
            return null;
        }
        this.start();
        if (period == 0L || t == null) {
            return null;
        }
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        return this.bc.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                if (Callable.class.isAssignableFrom(o.getClass())) {
                    try {
                        Object r = ((Callable)Callable.class.cast(o)).call();
                        Object msg = DefaultBroadcaster.this.filter(r);
                        if (msg != null) {
                            Entry entry = new Entry(msg, null, null, r);
                            DefaultBroadcaster.this.push(entry);
                        }
                        return;
                    }
                    catch (Exception e) {
                        logger.error("", (Throwable)e);
                    }
                }
                Object msg = DefaultBroadcaster.this.filter(o);
                Entry e = new Entry(msg, null, null, o);
                DefaultBroadcaster.this.push(e);
            }
        }, waitFor, period, t);
    }

    public String toString() {
        return "\nName: " + this.name + "\n" + "\n\tScope: " + (Object)((Object)this.scope) + "\n" + "\n\tBroasdcasterCache " + this.bc.getBroadcasterCache() + "\n" + "\n\tAtmosphereResource: " + this.resources.size() + "\n" + this.getClass().getName() + "@" + this.hashCode();
    }

    private long translateTimeUnit(long period, TimeUnit tu) {
        if (period == -1L) {
            return period;
        }
        switch (tu) {
            case SECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.SECONDS);
            }
            case MINUTES: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.MINUTES);
            }
            case HOURS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.HOURS);
            }
            case DAYS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.DAYS);
            }
            case MILLISECONDS: {
                return period;
            }
            case MICROSECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.MICROSECONDS);
            }
            case NANOSECONDS: {
                return TimeUnit.MILLISECONDS.convert(period, TimeUnit.NANOSECONDS);
            }
        }
        return period;
    }

    void notifyOnPreDestroy() {
        for (BroadcasterListener l : this.broadcasterListeners) {
            try {
                l.onPreDestroy(this);
            }
            catch (Exception ex) {
                logger.warn("onPreDestroy", (Throwable)ex);
            }
        }
    }

    protected static final class AsyncWriteToken {
        AtmosphereResource resource;
        Object msg;
        BroadcasterFuture future;
        Object originalMessage;
        UUIDBroadcasterCache.CacheMessage cache;

        public AsyncWriteToken(AtmosphereResource resource, Object msg, BroadcasterFuture future, Object originalMessage) {
            this.resource = resource;
            this.msg = msg;
            this.future = future;
            this.originalMessage = originalMessage;
        }

        public AsyncWriteToken(AtmosphereResource resource, Object msg, BroadcasterFuture future, Object originalMessage, UUIDBroadcasterCache.CacheMessage cache) {
            this.resource = resource;
            this.msg = msg;
            this.future = future;
            this.originalMessage = originalMessage;
            this.cache = cache;
        }

        public void destroy() {
            this.resource = null;
            this.msg = null;
            this.future = null;
            this.originalMessage = null;
        }

        public String toString() {
            return "AsyncWriteToken{resource=" + this.resource + ", msg=" + this.msg + ", future=" + this.future + '}';
        }
    }

    final class WriteOperation
    implements Callable<Object> {
        private final AtmosphereResource r;
        private final AtmosphereResourceEvent e;
        private AtomicBoolean completed = new AtomicBoolean();
        private AtomicBoolean executed = new AtomicBoolean();
        private final Thread ioThread;

        private WriteOperation(AtmosphereResource r, AtmosphereResourceEvent e, Thread ioThread) {
            this.r = r;
            this.e = e;
            this.ioThread = ioThread;
        }

        @Override
        public Object call() throws Exception {
            if (!this.completed.getAndSet(true)) {
                DefaultBroadcaster.this.invokeOnStateChange(this.r, this.e);
                logger.trace("Cancelling Write timeout {} for {}", (Object)DefaultBroadcaster.this.writeTimeoutInSecond, (Object)this.r.uuid());
                this.executed.set(true);
            } else if (!this.executed.get()) {
                try {
                    this.ioThread.interrupt();
                }
                catch (Throwable t) {
                    logger.trace("I/O failure, unable to interrupt the thread", t);
                }
                logger.trace("Honoring Write timeout {} for {}", (Object)DefaultBroadcaster.this.writeTimeoutInSecond, (Object)this.r.uuid());
                DefaultBroadcaster.this.onException(new IOException("Unable to write after " + DefaultBroadcaster.this.writeTimeoutInSecond), this.r);
                ((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(this.r)).cancel();
            }
            return null;
        }
    }

    private static final class WriteQueue {
        final BlockingQueue<AsyncWriteToken> queue = new LinkedBlockingQueue<AsyncWriteToken>();
        final AtomicBoolean monitored = new AtomicBoolean();
        final String uuid;

        private WriteQueue(String uuid) {
            this.uuid = uuid;
        }
    }

    public static final class Entry {
        public Object message;
        public Object multipleAtmoResources;
        public BroadcasterFuture<?> future;
        public boolean writeLocally;
        public Object originalMessage;
        public UUIDBroadcasterCache.CacheMessage cache;

        public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, Object originalMessage) {
            this.message = message;
            this.multipleAtmoResources = multipleAtmoResources;
            this.future = future;
            this.writeLocally = true;
            this.originalMessage = originalMessage;
        }

        public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, boolean writeLocally) {
            this.message = message;
            this.multipleAtmoResources = multipleAtmoResources;
            this.future = future;
            this.writeLocally = writeLocally;
            this.originalMessage = message;
        }

        public String toString() {
            return "Entry{message=" + this.message + ", multipleAtmoResources=" + this.multipleAtmoResources + ", future=" + this.future + '}';
        }
    }
}

