package org.hobbit.core.components;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.QueueingConsumer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.vocabulary.RDF;
import org.hobbit.core.Constants;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.vocab.HOBBIT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/hobbit/core/components/AbstractEvaluationModule.class */
public abstract class AbstractEvaluationModule extends AbstractPlatformConnectorComponent {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AbstractEvaluationModule.class);
    protected QueueingConsumer consumer;
    protected RabbitQueue evalModule2EvalStoreQueue;
    protected RabbitQueue evalStore2EvalModuleQueue;
    protected String experimentUri;

    public AbstractEvaluationModule() {
        this.defaultContainerType = Constants.CONTAINER_TYPE_BENCHMARK;
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, org.hobbit.core.components.Component
    public void init() throws Exception {
        super.init();
        Map<String, String> map = System.getenv();
        if (!map.containsKey(Constants.HOBBIT_EXPERIMENT_URI_KEY)) {
            LOGGER.error("Couldn't get the experiment URI from the variable HOBBIT_EXPERIMENT_URI. Aborting.");
            throw new Exception("Couldn't get the experiment URI from the variable HOBBIT_EXPERIMENT_URI. Aborting.");
        }
        this.experimentUri = map.get(Constants.HOBBIT_EXPERIMENT_URI_KEY);
        this.evalModule2EvalStoreQueue = getFactoryForOutgoingDataQueues().createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_MODULE_2_EVAL_STORAGE_DEFAULT_QUEUE_NAME));
        this.evalStore2EvalModuleQueue = getFactoryForIncomingDataQueues().createDefaultRabbitQueue(generateSessionQueueName(Constants.EVAL_STORAGE_2_EVAL_MODULE_DEFAULT_QUEUE_NAME));
        this.consumer = new QueueingConsumer(this.evalStore2EvalModuleQueue.channel);
        this.evalStore2EvalModuleQueue.channel.basicConsume(this.evalStore2EvalModuleQueue.name, this.consumer);
    }

    @Override // org.hobbit.core.components.Component
    public void run() throws Exception {
        sendToCmdQueue((byte) 6);
        collectResponses();
        Model summarizeEvaluation = summarizeEvaluation();
        LOGGER.info("The result model has " + summarizeEvaluation.size() + " triples.");
        sendResultModel(summarizeEvaluation);
    }

    protected void collectResponses() throws Exception {
        byte[] bArr = {-1};
        while (true) {
            this.evalModule2EvalStoreQueue.channel.basicPublish("", this.evalModule2EvalStoreQueue.name, new AMQP.BasicProperties.Builder().deliveryMode(2).replyTo(this.evalStore2EvalModuleQueue.name).build(), bArr);
            ByteBuffer wrap = ByteBuffer.wrap(this.consumer.nextDelivery().getBody());
            if (wrap.remaining() == 0) {
                LOGGER.error("Got a completely empty response from the evaluation storage.");
                return;
            }
            bArr[0] = wrap.get();
            if (wrap.remaining() == 0) {
                return;
            }
            byte[] readByteArray = RabbitMQUtils.readByteArray(wrap);
            long readLong = readByteArray.length > 0 ? RabbitMQUtils.readLong(readByteArray) : 0L;
            byte[] readByteArray2 = RabbitMQUtils.readByteArray(wrap);
            byte[] readByteArray3 = RabbitMQUtils.readByteArray(wrap);
            evaluateResponse(readByteArray2, RabbitMQUtils.readByteArray(wrap), readLong, readByteArray3.length > 0 ? RabbitMQUtils.readLong(readByteArray3) : 0L);
        }
    }

    protected abstract void evaluateResponse(byte[] bArr, byte[] bArr2, long j, long j2) throws Exception;

    protected abstract Model summarizeEvaluation() throws Exception;

    private void sendResultModel(Model model) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        model.write(byteArrayOutputStream, "JSONLD");
        sendToCmdQueue((byte) 9, byteArrayOutputStream.toByteArray());
    }

    @Override // org.hobbit.core.components.AbstractPlatformConnectorComponent, org.hobbit.core.components.CommandReceivingComponent
    public void receiveCommand(byte b, byte[] bArr) {
        super.receiveCommand(b, bArr);
    }

    @Override // org.hobbit.core.components.AbstractCommandReceivingComponent, org.hobbit.core.components.AbstractComponent, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly(this.evalModule2EvalStoreQueue);
        IOUtils.closeQuietly(this.evalStore2EvalModuleQueue);
        super.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Model createDefaultModel() {
        Model createDefaultModel = ModelFactory.createDefaultModel();
        createDefaultModel.add(createDefaultModel.createResource(this.experimentUri), RDF.type, HOBBIT.Experiment);
        return createDefaultModel;
    }
}
