package org.hobbit.core.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.ConfirmListener;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Semaphore;
import org.apache.commons.io.IOUtils;
import org.hobbit.core.data.RabbitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/rabbit/DataSenderImpl.class */
public class DataSenderImpl implements DataSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataSenderImpl.class);
    private static final int DEFAULT_MESSAGE_BUFFER_SIZE = 1000;
    private static final int DEFAULT_DELIVERY_MODE = 2;
    private RabbitQueue queue;
    private final int deliveryMode;
    private final DataSenderConfirmHandler confirmHandler;

    /* loaded from: input_file:org/hobbit/core/rabbit/DataSenderImpl$Builder.class */
    public static class Builder {
        protected static final String QUEUE_INFO_MISSING_ERROR = "There are neither a queue nor a queue name and a queue factory provided for the DataSender. Either a queue or a name and a factory to create a new queue are mandatory.";
        protected RabbitQueue queue;
        protected String queueName;
        protected RabbitQueueFactory factory;
        protected int messageConfirmBuffer = DataSenderImpl.DEFAULT_MESSAGE_BUFFER_SIZE;
        protected int deliveryMode = 2;

        public Builder queue(RabbitQueue rabbitQueue) {
            this.queue = rabbitQueue;
            return this;
        }

        public Builder queue(RabbitQueueFactory rabbitQueueFactory, String str) {
            this.factory = rabbitQueueFactory;
            this.queueName = str;
            return this;
        }

        public Builder messageBuffer(int i) {
            this.messageConfirmBuffer = i;
            return this;
        }

        public Builder deliveryMode(int i) {
            this.deliveryMode = i;
            return this;
        }

        public DataSenderImpl build() throws IllegalStateException, IOException {
            if (this.queue == null) {
                if (this.queueName == null || this.factory == null) {
                    throw new IllegalStateException(QUEUE_INFO_MISSING_ERROR);
                }
                this.queue = this.factory.createDefaultRabbitQueue(this.queueName);
            }
            return new DataSenderImpl(this.queue, this.deliveryMode, this.messageConfirmBuffer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/hobbit/core/rabbit/DataSenderImpl$DataSenderConfirmHandler.class */
    public class DataSenderConfirmHandler implements ConfirmListener {
        private final Semaphore maxBufferedMessageCount;
        private final SortedMap<Long, Message> unconfirmedMsgs = Collections.synchronizedSortedMap(new TreeMap());
        private int successfullySubmitted = 0;

        public DataSenderConfirmHandler(int i) {
            this.maxBufferedMessageCount = new Semaphore(i);
        }

        public synchronized void sendDataWithConfirmation(AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            try {
                DataSenderImpl.LOGGER.trace("{}\tavailable\t{}", DataSenderImpl.this.toString(), Integer.valueOf(this.maxBufferedMessageCount.availablePermits()));
                this.maxBufferedMessageCount.acquire();
                synchronized (this.unconfirmedMsgs) {
                    sendData_unsecured(new Message(basicProperties, bArr));
                }
            } catch (InterruptedException e) {
                throw new IOException("Interrupted while waiting for free buffer to store the message before sending.", e);
            }
        }

        private void sendData_unsecured(Message message) throws IOException {
            synchronized (DataSenderImpl.this.queue.channel) {
                long nextPublishSeqNo = DataSenderImpl.this.queue.channel.getNextPublishSeqNo();
                DataSenderImpl.LOGGER.trace("{}\tsending\t{}", DataSenderImpl.this.toString(), Long.valueOf(nextPublishSeqNo));
                this.unconfirmedMsgs.put(Long.valueOf(nextPublishSeqNo), message);
                try {
                    DataSenderImpl.this.sendData(message.properties, message.data);
                } catch (IOException e) {
                    this.unconfirmedMsgs.remove(Long.valueOf(nextPublishSeqNo));
                    this.maxBufferedMessageCount.release();
                    throw e;
                }
            }
        }

        public void handleAck(long j, boolean z) throws IOException {
            synchronized (this.unconfirmedMsgs) {
                if (z) {
                    SortedMap<Long, Message> headMap = this.unconfirmedMsgs.headMap(Long.valueOf(j + 1));
                    int size = headMap.size();
                    headMap.clear();
                    this.maxBufferedMessageCount.release(size);
                    this.successfullySubmitted += size;
                    DataSenderImpl.LOGGER.trace("{}\tack\t{}+\t{}", new Object[]{DataSenderImpl.this.toString(), Long.valueOf(j), Integer.valueOf(this.maxBufferedMessageCount.availablePermits())});
                } else {
                    this.unconfirmedMsgs.remove(Long.valueOf(j));
                    this.successfullySubmitted++;
                    this.maxBufferedMessageCount.release();
                    DataSenderImpl.LOGGER.trace("{}\tack\t{}\t{}", new Object[]{DataSenderImpl.this.toString(), Long.valueOf(j), Integer.valueOf(this.maxBufferedMessageCount.availablePermits())});
                }
            }
        }

        public void handleNack(long j, boolean z) throws IOException {
            synchronized (this.unconfirmedMsgs) {
                DataSenderImpl.LOGGER.trace("nack\t{}{}", Long.valueOf(j), z ? "+" : "");
                if (z) {
                    SortedMap<Long, Message> headMap = this.unconfirmedMsgs.headMap(Long.valueOf(j + 1));
                    Message[] messageArr = (Message[]) headMap.values().toArray(new Message[headMap.size()]);
                    headMap.clear();
                    for (Message message : messageArr) {
                        sendData_unsecured(message);
                    }
                } else if (this.unconfirmedMsgs.containsKey(Long.valueOf(j))) {
                    sendData_unsecured(this.unconfirmedMsgs.remove(Long.valueOf(j)));
                } else {
                    DataSenderImpl.LOGGER.warn("Got a negative acknowledgement (nack) for an unknown message. It will be ignored.");
                }
            }
        }

        public void waitForConfirms() throws InterruptedException {
            while (true) {
                synchronized (this.unconfirmedMsgs) {
                    if (this.unconfirmedMsgs.size() == 0) {
                        DataSenderImpl.LOGGER.trace("sent {} messages.", Integer.valueOf(this.successfullySubmitted));
                        return;
                    }
                }
                Thread.sleep(200L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/hobbit/core/rabbit/DataSenderImpl$Message.class */
    public static class Message {
        public AMQP.BasicProperties properties;
        public byte[] data;

        public Message(AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.properties = basicProperties;
            this.data = bArr;
        }
    }

    protected DataSenderImpl(RabbitQueue rabbitQueue, int i, int i2) {
        this.queue = rabbitQueue;
        this.deliveryMode = i;
        if (i2 <= 0) {
            this.confirmHandler = null;
            return;
        }
        try {
            this.queue.channel.confirmSelect();
            this.confirmHandler = new DataSenderConfirmHandler(i2);
            this.queue.channel.addConfirmListener(this.confirmHandler);
        } catch (Exception e) {
            LOGGER.error("Exception whily trying to enable confirms. The sender might work, but it won't guarantee that messages are received.");
            this.confirmHandler = null;
        }
    }

    @Override // org.hobbit.core.rabbit.DataSender
    public void sendData(byte[] bArr) throws IOException {
        sendData(bArr, new AMQP.BasicProperties.Builder());
    }

    protected void sendData(byte[] bArr, AMQP.BasicProperties.Builder builder) throws IOException {
        builder.deliveryMode(Integer.valueOf(this.deliveryMode));
        if (this.confirmHandler != null) {
            this.confirmHandler.sendDataWithConfirmation(builder.build(), bArr);
        } else {
            sendData(builder.build(), bArr);
        }
    }

    protected void sendData(AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        this.queue.channel.basicPublish("", this.queue.name, basicProperties, bArr);
    }

    @Override // org.hobbit.core.rabbit.DataSender
    public void closeWhenFinished() {
        if (this.confirmHandler != null) {
            try {
                this.confirmHandler.waitForConfirms();
            } catch (InterruptedException e) {
                LOGGER.warn("Exception while waiting for confirmations. It can not be guaranteed that all messages have been consumed.", e);
            }
        }
        int i = 0;
        while (i < 5) {
            try {
                i = this.queue.messageCount() > 0 ? 0 : i + 1;
                Thread.sleep(200L);
            } catch (AlreadyClosedException e2) {
                LOGGER.info("The queue is already closed. Assuming that all messages have been consumed.");
            } catch (Exception e3) {
                LOGGER.warn("Exception while trying to check whether all messages have been consumed. It will be ignored.", e3);
            }
        }
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        IOUtils.closeQuietly(this.queue);
    }

    public static Builder builder() {
        return new Builder();
    }
}
