package org.hobbit.core.rabbit;

import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.utils.TerminatableRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/rabbit/DataReceiverImpl.class */
public class DataReceiverImpl implements DataReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DataReceiverImpl.class);
    private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES = 50;
    protected RabbitQueue queue;
    private int errorCount = 0;
    private DataHandler dataHandler;
    private ExecutorService executor;
    private TerminatableRunnable receiverTask;
    private Thread receiverThread;

    /* loaded from: input_file:org/hobbit/core/rabbit/DataReceiverImpl$Builder.class */
    public static class Builder {
        protected static final String QUEUE_INFO_MISSING_ERROR = "There are neither a queue nor a queue name and a queue factory provided for the DataReceiver. Either a queue or a name and a factory to create a new queue are mandatory.";
        protected static final String DATA_HANDLER_MISSING_ERROR = "The necessary data handler has not been provided for the DataReceiver.";
        protected DataHandler dataHandler;
        protected RabbitQueue queue;
        protected String queueName;
        protected int maxParallelProcessedMsgs = 50;
        protected RabbitQueueFactory factory;

        public Builder dataHandler(DataHandler dataHandler) {
            this.dataHandler = dataHandler;
            return this;
        }

        public Builder queue(RabbitQueue rabbitQueue) {
            this.queue = rabbitQueue;
            return this;
        }

        public Builder queue(RabbitQueueFactory rabbitQueueFactory, String str) {
            this.factory = rabbitQueueFactory;
            this.queueName = str;
            return this;
        }

        public Builder maxParallelProcessedMsgs(int i) {
            this.maxParallelProcessedMsgs = i;
            return this;
        }

        public DataReceiverImpl build() throws IllegalStateException, IOException {
            if (this.dataHandler == null) {
                throw new IllegalStateException(DATA_HANDLER_MISSING_ERROR);
            }
            if (this.queue == null) {
                if (this.queueName == null || this.factory == null) {
                    throw new IllegalStateException(QUEUE_INFO_MISSING_ERROR);
                }
                this.queue = this.factory.createDefaultRabbitQueue(this.queueName);
            }
            try {
                return new DataReceiverImpl(this.queue, this.dataHandler, this.maxParallelProcessedMsgs);
            } catch (IOException e) {
                IOUtils.closeQuietly(this.queue);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/hobbit/core/rabbit/DataReceiverImpl$MsgProcessingTask.class */
    public class MsgProcessingTask implements Runnable {
        private QueueingConsumer.Delivery delivery;

        public MsgProcessingTask(QueueingConsumer.Delivery delivery) {
            this.delivery = delivery;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                DataReceiverImpl.this.dataHandler.handleData(this.delivery.getBody());
            } catch (Throwable th) {
                DataReceiverImpl.LOGGER.error("Uncatched throwable inside the data handler.", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/hobbit/core/rabbit/DataReceiverImpl$MsgReceivingTask.class */
    public class MsgReceivingTask implements TerminatableRunnable {
        private QueueingConsumer consumer;
        private boolean runFlag = true;

        public MsgReceivingTask(QueueingConsumer queueingConsumer) {
            this.consumer = queueingConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            QueueingConsumer.Delivery delivery = null;
            while (true) {
                if (!this.runFlag && DataReceiverImpl.this.queue.messageCount() <= 0 && delivery == null) {
                    DataReceiverImpl.LOGGER.debug("Receiver task terminates after receiving {} messages.", Integer.valueOf(i));
                    return;
                }
                try {
                    delivery = this.consumer.nextDelivery(3000L);
                } catch (Exception e) {
                    DataReceiverImpl.LOGGER.error("Exception while waiting for delivery.", (Throwable) e);
                    DataReceiverImpl.this.increaseErrorCount();
                }
                if (delivery != null) {
                    DataReceiverImpl.this.executor.submit(DataReceiverImpl.this.buildMsgProcessingTask(delivery));
                    i++;
                }
            }
        }

        @Override // org.hobbit.utils.Terminatable
        public void terminate() {
            this.runFlag = false;
        }

        @Override // org.hobbit.utils.Terminatable
        public boolean isTerminated() {
            return !this.runFlag;
        }
    }

    protected DataReceiverImpl(RabbitQueue rabbitQueue, DataHandler dataHandler, int i) throws IOException {
        this.executor = null;
        this.queue = rabbitQueue;
        this.dataHandler = dataHandler;
        QueueingConsumer queueingConsumer = new QueueingConsumer(rabbitQueue.channel);
        rabbitQueue.channel.basicConsume(rabbitQueue.name, true, queueingConsumer);
        rabbitQueue.channel.basicQos(i);
        this.executor = Executors.newFixedThreadPool(i);
        this.receiverTask = buildMsgReceivingTask(queueingConsumer);
        this.receiverThread = new Thread(this.receiverTask);
        this.receiverThread.start();
    }

    @Override // org.hobbit.core.rabbit.DataReceiver
    public DataHandler getDataHandler() {
        return this.dataHandler;
    }

    @Override // org.hobbit.core.rabbit.DataReceiver
    public synchronized void increaseErrorCount() {
        this.errorCount++;
    }

    @Override // org.hobbit.core.rabbit.DataReceiver
    public int getErrorCount() {
        return this.errorCount;
    }

    @Override // org.hobbit.core.rabbit.DataReceiver
    public RabbitQueue getQueue() {
        return this.queue;
    }

    protected ExecutorService getExecutor() {
        return this.executor;
    }

    @Override // org.hobbit.core.rabbit.DataReceiver
    public void closeWhenFinished() {
        this.receiverTask.terminate();
        try {
            this.receiverThread.join();
        } catch (Exception e) {
            LOGGER.error("Exception while waiting for termination of receiver task. Closing receiver.", (Throwable) e);
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(1L, TimeUnit.DAYS);
        } catch (InterruptedException e2) {
            LOGGER.error("Exception while waiting for termination. Closing receiver.", (Throwable) e2);
        }
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        IOUtils.closeQuietly(this.queue);
        if (this.executor == null || this.executor.isShutdown()) {
            return;
        }
        this.executor.shutdownNow();
    }

    public static Builder builder() {
        return new Builder();
    }

    protected TerminatableRunnable buildMsgReceivingTask(QueueingConsumer queueingConsumer) {
        return new MsgReceivingTask(queueingConsumer);
    }

    protected Runnable buildMsgProcessingTask(QueueingConsumer.Delivery delivery) {
        return new MsgProcessingTask(delivery);
    }
}
