package org.hobbit.core.components;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.commons.io.IOUtils;
import org.apache.jena.ext.com.google.common.collect.Lists;
import org.hobbit.core.Constants;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.core.data.Result;
import org.hobbit.core.data.ResultPair;
import org.hobbit.core.rabbit.DataHandler;
import org.hobbit.core.rabbit.DataReceiver;
import org.hobbit.core.rabbit.DataReceiverImpl;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.utils.EnvVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/components/AbstractEvaluationStorage.class */
public abstract class AbstractEvaluationStorage extends AbstractPlatformConnectorComponent implements ResponseReceivingComponent, ExpectedResponseReceivingComponent {
    public static final String RECEIVE_TIMESTAMP_FOR_SYSTEM_RESULTS_KEY = "HOBBIT_RECEIVE_TIMESTAMP_FOR_SYSTEM_RESULTS";
    public static final byte NEW_ITERATOR_ID = -1;
    private static final int DEFAULT_MAX_PARALLEL_PROCESSED_MESSAGES = 50;
    private Semaphore terminationMutex;
    private final int maxParallelProcessedMsgs;
    protected List<Iterator<? extends ResultPair>> resultPairIterators;
    protected DataReceiver taskResultReceiver;
    protected DataReceiver systemResultReceiver;
    protected RabbitQueue evalModule2EvalStoreQueue;
    protected Channel ackChannel;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AbstractEvaluationStorage.class);
    private static final byte[] EMPTY_RESPONSE = new byte[0];

    public AbstractEvaluationStorage() {
        this(50);
    }

    public AbstractEvaluationStorage(int i) {
        this.terminationMutex = new Semaphore(0);
        this.resultPairIterators = Lists.newArrayList();
        this.ackChannel = null;
        this.maxParallelProcessedMsgs = i;
        this.defaultContainerType = Constants.CONTAINER_TYPE_DATABASE;
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, org.hobbit.core.components.Component
    public void init() throws Exception {
        super.init();
        this.taskResultReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(this.maxParallelProcessedMsgs).queue(this.incomingDataQueueFactory, generateSessionQueueName(EnvVariables.getString(Constants.TASK_GEN_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.TASK_GEN_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME))).dataHandler(new DataHandler() { // from class: org.hobbit.core.components.AbstractEvaluationStorage.1
            @Override // org.hobbit.core.rabbit.DataHandler
            public void handleData(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                String readString = RabbitMQUtils.readString(wrap);
                AbstractEvaluationStorage.LOGGER.trace("Received from task generator {}.", readString);
                byte[] readByteArray = RabbitMQUtils.readByteArray(wrap);
                AbstractEvaluationStorage.this.receiveExpectedResponseData(readString, wrap.getLong(), readByteArray);
            }
        }).build();
        String string = EnvVariables.getString(Constants.SYSTEM_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.SYSTEM_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME);
        final boolean z = EnvVariables.getBoolean(RECEIVE_TIMESTAMP_FOR_SYSTEM_RESULTS_KEY, false, LOGGER);
        final String generateSessionQueueName = generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME);
        this.systemResultReceiver = DataReceiverImpl.builder().maxParallelProcessedMsgs(this.maxParallelProcessedMsgs).queue(this.incomingDataQueueFactory, generateSessionQueueName(string)).dataHandler(new DataHandler() { // from class: org.hobbit.core.components.AbstractEvaluationStorage.2
            @Override // org.hobbit.core.rabbit.DataHandler
            public void handleData(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                String readString = RabbitMQUtils.readString(wrap);
                AbstractEvaluationStorage.LOGGER.trace("Received from system {}.", readString);
                AbstractEvaluationStorage.this.receiveResponseData(readString, z ? wrap.getLong() : System.currentTimeMillis(), RabbitMQUtils.readByteArray(wrap));
                if (AbstractEvaluationStorage.this.ackChannel != null) {
                    try {
                        AbstractEvaluationStorage.this.ackChannel.basicPublish(generateSessionQueueName, "", null, RabbitMQUtils.writeString(readString));
                    } catch (IOException e) {
                        AbstractEvaluationStorage.LOGGER.error("Error while sending acknowledgement.", (Throwable) e);
                    }
                    AbstractEvaluationStorage.LOGGER.trace("Sent ack {}.", readString);
                }
            }
        }).build();
        this.evalModule2EvalStoreQueue = getFactoryForIncomingDataQueues().createDefaultRabbitQueue(generateSessionQueueName(EnvVariables.getString(Constants.EVAL_MODULE_2_EVAL_STORAGE_QUEUE_NAME_KEY, Constants.EVAL_MODULE_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME)));
        this.evalModule2EvalStoreQueue.channel.basicConsume(this.evalModule2EvalStoreQueue.name, true, new DefaultConsumer(this.evalModule2EvalStoreQueue.channel) { // from class: org.hobbit.core.components.AbstractEvaluationStorage.3
            /* JADX WARN: Type inference failed for: r1v15, types: [byte[], byte[][]] */
            @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                byte[] bArr2;
                byte[] bArr3;
                byte[] writeLong;
                byte[] bArr4;
                byte[] writeLong2;
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                if (wrap.remaining() < 1) {
                    bArr2 = AbstractEvaluationStorage.EMPTY_RESPONSE;
                    AbstractEvaluationStorage.LOGGER.error("Got a request without a valid iterator Id. Returning emtpy response.");
                } else {
                    byte b = wrap.get();
                    Iterator<? extends ResultPair> it = null;
                    if (b == -1) {
                        b = (byte) AbstractEvaluationStorage.this.resultPairIterators.size();
                        AbstractEvaluationStorage.LOGGER.info("Creating new iterator #{}", Byte.valueOf(b));
                        List<Iterator<? extends ResultPair>> list = AbstractEvaluationStorage.this.resultPairIterators;
                        Iterator<? extends ResultPair> createIterator = AbstractEvaluationStorage.this.createIterator();
                        it = createIterator;
                        list.add(createIterator);
                    } else if (b < 0 || b >= AbstractEvaluationStorage.this.resultPairIterators.size()) {
                        byte[] bArr5 = AbstractEvaluationStorage.EMPTY_RESPONSE;
                        AbstractEvaluationStorage.LOGGER.error("Got a request without a valid iterator Id (" + Byte.toString(b) + "). Returning emtpy response.");
                    } else {
                        it = AbstractEvaluationStorage.this.resultPairIterators.get(b);
                    }
                    if (it == null || !it.hasNext()) {
                        bArr2 = new byte[]{b};
                    } else {
                        ResultPair next = it.next();
                        Result expected = next.getExpected();
                        if (expected != null) {
                            bArr3 = expected.getData() != null ? expected.getData() : new byte[0];
                            writeLong = RabbitMQUtils.writeLong(expected.getSentTimestamp());
                        } else {
                            bArr3 = new byte[0];
                            writeLong = RabbitMQUtils.writeLong(0L);
                        }
                        Result actual = next.getActual();
                        if (actual != null) {
                            bArr4 = actual.getData() != null ? actual.getData() : new byte[0];
                            writeLong2 = RabbitMQUtils.writeLong(actual.getSentTimestamp());
                        } else {
                            bArr4 = new byte[0];
                            writeLong2 = RabbitMQUtils.writeLong(0L);
                        }
                        bArr2 = RabbitMQUtils.writeByteArrays(new byte[]{b}, new byte[]{writeLong, bArr3, writeLong2, bArr4}, null);
                    }
                }
                getChannel().basicPublish("", basicProperties.getReplyTo(), null, bArr2);
            }
        });
        if (EnvVariables.getBoolean(Constants.ACKNOWLEDGEMENT_FLAG_KEY, false, LOGGER)) {
            this.ackChannel = getFactoryForOutgoingCmdQueues().getConnection().createChannel();
            this.ackChannel.exchangeDeclare(generateSessionQueueName(Constants.HOBBIT_ACK_EXCHANGE_NAME), "fanout", false, true, null);
        }
    }

    protected abstract Iterator<? extends ResultPair> createIterator();

    @Override // org.hobbit.core.components.Component
    public void run() throws Exception {
        sendToCmdQueue((byte) 5);
        this.terminationMutex.acquire();
        this.taskResultReceiver.closeWhenFinished();
        this.systemResultReceiver.closeWhenFinished();
    }

    @Override // org.hobbit.core.components.AbstractPlatformConnectorComponent, org.hobbit.core.components.CommandReceivingComponent
    public void receiveCommand(byte b, byte[] bArr) {
        if (b == 10) {
            this.terminationMutex.release();
        }
        super.receiveCommand(b, bArr);
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly(this.taskResultReceiver);
        IOUtils.closeQuietly(this.systemResultReceiver);
        IOUtils.closeQuietly(this.evalModule2EvalStoreQueue);
        if (this.ackChannel != null) {
            try {
                this.ackChannel.close();
            } catch (Exception e) {
                LOGGER.error("Error while trying to close the acknowledgement channel.", (Throwable) e);
            }
        }
        super.close();
    }
}
