package org.dice_research.squirrel.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import org.dice_research.squirrel.utils.Closer;
import org.hobbit.core.data.RabbitQueue;
import org.hobbit.core.rabbit.DataHandler;
import org.hobbit.core.rabbit.DataReceiverImpl;
import org.hobbit.core.rabbit.RabbitQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/dice_research/squirrel/rabbit/RPCServer.class */
public class RPCServer extends DataReceiverImpl implements ResponseHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DataReceiverImpl.class);
    protected Channel responseChannel;

    /* loaded from: input_file:org/dice_research/squirrel/rabbit/RPCServer$Builder.class */
    public static class Builder extends DataReceiverImpl.Builder {
        private RabbitQueueFactory responseFactory = null;

        @Override // org.hobbit.core.rabbit.DataReceiverImpl.Builder
        public Builder dataHandler(DataHandler dataHandler) {
            if (!(dataHandler instanceof RespondingDataHandler)) {
                throw new IllegalArgumentException("An instance of " + RespondingDataHandler.class.getSimpleName() + " has been expected.");
            }
            this.dataHandler = dataHandler;
            return this;
        }

        public Builder responseQueueFactory(RabbitQueueFactory rabbitQueueFactory) {
            this.responseFactory = rabbitQueueFactory;
            return this;
        }

        @Override // org.hobbit.core.rabbit.DataReceiverImpl.Builder
        public Builder maxParallelProcessedMsgs(int i) {
            this.maxParallelProcessedMsgs = i;
            return this;
        }

        @Override // org.hobbit.core.rabbit.DataReceiverImpl.Builder
        public DataReceiverImpl build() throws IllegalStateException, IOException {
            if (this.dataHandler == null) {
                throw new IllegalStateException("The necessary data handler has not been provided for the DataReceiver.");
            }
            if (this.queue == null) {
                if (this.queueName == null || this.factory == null) {
                    throw new IllegalStateException("There are neither a queue nor a queue name and a queue factory provided for the DataReceiver. Either a queue or a name and a factory to create a new queue are mandatory.");
                }
                this.queue = this.factory.createDefaultRabbitQueue(this.queueName);
            }
            try {
                return new RPCServer(this.queue, (RespondingDataHandler) this.dataHandler, this.maxParallelProcessedMsgs, this.responseFactory == null ? this.factory == null ? this.queue.getChannel() : this.factory.createChannel() : this.responseFactory.createChannel());
            } catch (IOException e) {
                Closer.close(this.queue);
                throw e;
            }
        }
    }

    /* loaded from: input_file:org/dice_research/squirrel/rabbit/RPCServer$MsgProcessingTask.class */
    protected class MsgProcessingTask implements Runnable {
        private QueueingConsumer.Delivery delivery;
        private ResponseHandler responseHandler;

        public MsgProcessingTask(QueueingConsumer.Delivery delivery, ResponseHandler responseHandler) {
            this.delivery = delivery;
            this.responseHandler = responseHandler;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                String replyTo = this.delivery.getProperties().getReplyTo();
                String correlationId = this.delivery.getProperties().getCorrelationId();
                if (replyTo == null || correlationId == null) {
                    RPCServer.this.getDataHandler().handleData(this.delivery.getBody());
                } else {
                    ((RespondingDataHandler) RPCServer.this.getDataHandler()).handleData(this.delivery.getBody(), this.responseHandler, replyTo, correlationId);
                }
            } catch (Throwable th) {
                RPCServer.LOGGER.error("Uncatched throwable when processing an incoming message in the RPCServer.", th);
            }
        }
    }

    protected RPCServer(RabbitQueue rabbitQueue, RespondingDataHandler respondingDataHandler, int i, Channel channel) throws IOException {
        super(rabbitQueue, respondingDataHandler, i);
        this.responseChannel = channel;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.dice_research.squirrel.rabbit.ResponseHandler
    public void sendResponse(byte[] bArr, String str, String str2) {
        try {
            this.responseChannel.basicPublish("", str, new AMQP.BasicProperties.Builder().correlationId(str2).build(), bArr);
        } catch (Exception e) {
            LOGGER.error("Exception while sending response.", (Throwable) e);
        }
    }

    @Override // org.hobbit.core.rabbit.DataReceiverImpl
    protected Runnable buildMsgProcessingTask(QueueingConsumer.Delivery delivery) {
        return new MsgProcessingTask(delivery, this);
    }
}
